From e3daa8b263625e89dd5121301c1443484ecf4402 Mon Sep 17 00:00:00 2001 From: adawang Date: Thu, 11 Jun 2026 14:55:50 +0800 Subject: [PATCH] fix: #351 make serde errors informative Pandas serde now raises a clear ValueError explaining the required pandas_kwargs 'path' entry instead of an opaque TypeError/KeyError; State.serialize/deserialize wrap failures with the offending field name; unknown deserializer keys now list registered keys and hint at the missing registration import. Signed-off-by: adawang --- burr/core/serde.py | 7 +++++- burr/core/state.py | 32 +++++++++++++++---------- burr/integrations/serde/pandas.py | 22 +++++++++++++---- tests/core/test_serde.py | 10 ++++++++ tests/integrations/serde/test_pandas.py | 27 +++++++++++++++++++++ 5 files changed, 81 insertions(+), 17 deletions(-) diff --git a/burr/core/serde.py b/burr/core/serde.py index d7f9824e4..effdd892b 100644 --- a/burr/core/serde.py +++ b/burr/core/serde.py @@ -61,7 +61,12 @@ def call(self, key, *args, **kwargs): if key in self.func_map: return self.func_map[key](*args, **kwargs) else: - raise ValueError(f"No function registered for key: {key}") + raise ValueError( + f"No deserializer registered for key: '{key}'. " + f"Registered keys: {sorted(self.func_map)}. " + f"Make sure the module registering the deserializer for '{key}' has been " + f"imported (e.g. `import burr.integrations.serde.pandas` for 'pandas.DataFrame')." + ) deserializer = StringDispatch() diff --git a/burr/core/state.py b/burr/core/state.py index ad0d96a54..f86c1c786 100644 --- a/burr/core/state.py +++ b/burr/core/state.py @@ -328,15 +328,20 @@ def serialize(self, **kwargs) -> dict: def _serialize(k, v, **extrakwargs) -> Union[dict, str]: """chooses the correct serde function for the given key and calls it""" - if k in FIELD_SERIALIZATION: - result = FIELD_SERIALIZATION[k][0](v, **extrakwargs) - if not isinstance(result, dict): - raise ValueError( - f"Field serde for {k} must return a dict," - f" but {FIELD_SERIALIZATION[k][0].__name__} returned {type(result)} ({str(result)[0:10]})." - ) - return result - return serde.serialize(v, **extrakwargs) + try: + if k in FIELD_SERIALIZATION: + result = FIELD_SERIALIZATION[k][0](v, **extrakwargs) + if not isinstance(result, dict): + raise ValueError( + f"Field serde for {k} must return a dict," + f" but {FIELD_SERIALIZATION[k][0].__name__} returned {type(result)} ({str(result)[0:10]})." + ) + return result + return serde.serialize(v, **extrakwargs) + except Exception as e: + raise ValueError( + f"Failed to serialize state field '{k}' (value of type {type(v).__name__}): {e}" + ) from e return {k: _serialize(k, v, **kwargs) for k, v in _dict.items()} @@ -346,9 +351,12 @@ def deserialize(cls, json_dict: dict, **kwargs) -> "State[StateType]": def _deserialize(k, v: Union[str, dict], **extrakwargs) -> Callable: """chooses the correct serde function for the given key and calls it""" - if k in FIELD_SERIALIZATION: - return FIELD_SERIALIZATION[k][1](v, **extrakwargs) - return serde.deserialize(v, **extrakwargs) + try: + if k in FIELD_SERIALIZATION: + return FIELD_SERIALIZATION[k][1](v, **extrakwargs) + return serde.deserialize(v, **extrakwargs) + except Exception as e: + raise ValueError(f"Failed to deserialize state field '{k}': {e}") from e return State({k: _deserialize(k, v, **kwargs) for k, v in json_dict.items()}) diff --git a/burr/integrations/serde/pandas.py b/burr/integrations/serde/pandas.py index 62f502598..22ee32b01 100644 --- a/burr/integrations/serde/pandas.py +++ b/burr/integrations/serde/pandas.py @@ -18,6 +18,7 @@ # try to import to serialize Pandas Objects import hashlib import os +from typing import Optional import pandas as pd @@ -25,7 +26,9 @@ @serde.serialize.register(pd.DataFrame) -def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> dict: +def serialize_pandas_df( + value: pd.DataFrame, pandas_kwargs: Optional[dict] = None, **kwargs +) -> dict: """Custom serde for pandas dataframes. Saves the dataframe to a parquet file and returns the path to the file. @@ -37,6 +40,15 @@ def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> d :param kwargs: :return: """ + if not isinstance(pandas_kwargs, dict) or "path" not in pandas_kwargs: + raise ValueError( + "Serializing a pandas DataFrame requires a `path` entry in `pandas_kwargs` -- " + "this is the base path where the dataframe is saved as a parquet file. " + "Pass it through whatever triggers serialization, e.g. " + '`LocalTrackingClient(..., serde_kwargs={"pandas_kwargs": {"path": "/some/dir"}})` ' + 'or `state.serialize(pandas_kwargs={"path": "/some/dir"})`. ' + f"Got pandas_kwargs={pandas_kwargs!r}." + ) hash_object = hashlib.sha256() hash_value = str(value.columns) + str(value.shape) + str(value.dtypes) hash_object.update(hash_value.encode()) @@ -53,15 +65,17 @@ def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> d @serde.deserializer.register("pandas.DataFrame") -def deserialize_pandas_df(value: dict, pandas_kwargs: dict, **kwargs) -> pd.DataFrame: +def deserialize_pandas_df( + value: dict, pandas_kwargs: Optional[dict] = None, **kwargs +) -> pd.DataFrame: """Custom deserializer for pandas dataframes. :param value: the dictionary to pull the path from to load the parquet file. - :param pandas_kwargs: other args to pass to the pandas read_parquet function. + :param pandas_kwargs: other args to pass to the pandas read_parquet function. Optional. :param kwargs: :return: pandas dataframe """ - kwargs = pandas_kwargs.copy() + kwargs = pandas_kwargs.copy() if pandas_kwargs is not None else {} if "path" in kwargs: # remove this to not clash; we already have the full path. kwargs.pop("path") diff --git a/tests/core/test_serde.py b/tests/core/test_serde.py index 7e3de390c..302a1cdce 100644 --- a/tests/core/test_serde.py +++ b/tests/core/test_serde.py @@ -64,3 +64,13 @@ def test_string_dispatch_with_key(): dispatch = StringDispatch() dispatch.register("test_key")(lambda x: x) assert dispatch.call("test_key", "test_value") == "test_value" + + +def test_string_dispatch_no_key_informative_message(): + dispatch = StringDispatch() + dispatch.register("known_key")(lambda value: value) + with pytest.raises(ValueError) as exc_info: + dispatch.call("nonexistent_key") + assert "nonexistent_key" in str(exc_info.value) + assert "known_key" in str(exc_info.value) + assert "imported" in str(exc_info.value) diff --git a/tests/integrations/serde/test_pandas.py b/tests/integrations/serde/test_pandas.py index 90879052d..90ce9950b 100644 --- a/tests/integrations/serde/test_pandas.py +++ b/tests/integrations/serde/test_pandas.py @@ -18,6 +18,7 @@ import os import pandas as pd +import pytest from burr.core import serde, state @@ -41,3 +42,29 @@ def test_serde_of_pandas_dataframe(tmp_path): ng = state.State.deserialize(serialized, pandas_kwargs={"path": tmp_path}) assert isinstance(ng["df"], pd.DataFrame) pd.testing.assert_frame_equal(ng["df"], df) + + +def test_serialize_pandas_df_without_pandas_kwargs_raises_informative_error(): + df = pd.DataFrame({"a": [1, 2, 3]}) + og = state.State({"df": df}) + with pytest.raises(ValueError) as exc_info: + og.serialize() + assert "Failed to serialize state field 'df'" in str(exc_info.value) + assert "pandas_kwargs" in str(exc_info.value) + assert "path" in str(exc_info.value) + + +def test_serialize_pandas_df_without_path_raises_informative_error(): + df = pd.DataFrame({"a": [1, 2, 3]}) + og = state.State({"df": df}) + with pytest.raises(ValueError) as exc_info: + og.serialize(pandas_kwargs={"compression": "snappy"}) + assert "path" in str(exc_info.value) + + +def test_deserialize_pandas_df_without_pandas_kwargs(tmp_path): + df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + og = state.State({"df": df}) + serialized = og.serialize(pandas_kwargs={"path": tmp_path}) + ng = state.State.deserialize(serialized) + pd.testing.assert_frame_equal(ng["df"], df)