Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion burr/core/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ def call(self, key, *args, **kwargs):
if key in self.func_map:
return self.func_map[key](*args, **kwargs)
else:
raise ValueError(f"No function registered for key: {key}")
raise ValueError(
f"No deserializer registered for key: '{key}'. "
f"Registered keys: {sorted(self.func_map)}. "
f"Make sure the module registering the deserializer for '{key}' has been "
f"imported (e.g. `import burr.integrations.serde.pandas` for 'pandas.DataFrame')."
)


deserializer = StringDispatch()
Expand Down
32 changes: 20 additions & 12 deletions burr/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,20 @@ def serialize(self, **kwargs) -> dict:

def _serialize(k, v, **extrakwargs) -> Union[dict, str]:
"""chooses the correct serde function for the given key and calls it"""
if k in FIELD_SERIALIZATION:
result = FIELD_SERIALIZATION[k][0](v, **extrakwargs)
if not isinstance(result, dict):
raise ValueError(
f"Field serde for {k} must return a dict,"
f" but {FIELD_SERIALIZATION[k][0].__name__} returned {type(result)} ({str(result)[0:10]})."
)
return result
return serde.serialize(v, **extrakwargs)
try:
if k in FIELD_SERIALIZATION:
result = FIELD_SERIALIZATION[k][0](v, **extrakwargs)
if not isinstance(result, dict):
raise ValueError(
f"Field serde for {k} must return a dict,"
f" but {FIELD_SERIALIZATION[k][0].__name__} returned {type(result)} ({str(result)[0:10]})."
)
return result
return serde.serialize(v, **extrakwargs)
except Exception as e:
raise ValueError(
f"Failed to serialize state field '{k}' (value of type {type(v).__name__}): {e}"
) from e

return {k: _serialize(k, v, **kwargs) for k, v in _dict.items()}

Expand All @@ -346,9 +351,12 @@ def deserialize(cls, json_dict: dict, **kwargs) -> "State[StateType]":

def _deserialize(k, v: Union[str, dict], **extrakwargs) -> Callable:
"""chooses the correct serde function for the given key and calls it"""
if k in FIELD_SERIALIZATION:
return FIELD_SERIALIZATION[k][1](v, **extrakwargs)
return serde.deserialize(v, **extrakwargs)
try:
if k in FIELD_SERIALIZATION:
return FIELD_SERIALIZATION[k][1](v, **extrakwargs)
return serde.deserialize(v, **extrakwargs)
except Exception as e:
raise ValueError(f"Failed to deserialize state field '{k}': {e}") from e

return State({k: _deserialize(k, v, **kwargs) for k, v in json_dict.items()})

Expand Down
22 changes: 18 additions & 4 deletions burr/integrations/serde/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
# try to import to serialize Pandas Objects
import hashlib
import os
from typing import Optional

import pandas as pd

from burr.core import serde


@serde.serialize.register(pd.DataFrame)
def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> dict:
def serialize_pandas_df(
value: pd.DataFrame, pandas_kwargs: Optional[dict] = None, **kwargs
) -> dict:
"""Custom serde for pandas dataframes.

Saves the dataframe to a parquet file and returns the path to the file.
Expand All @@ -37,6 +40,15 @@ def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> d
:param kwargs:
:return:
"""
if not isinstance(pandas_kwargs, dict) or "path" not in pandas_kwargs:
raise ValueError(
"Serializing a pandas DataFrame requires a `path` entry in `pandas_kwargs` -- "
"this is the base path where the dataframe is saved as a parquet file. "
"Pass it through whatever triggers serialization, e.g. "
'`LocalTrackingClient(..., serde_kwargs={"pandas_kwargs": {"path": "/some/dir"}})` '
'or `state.serialize(pandas_kwargs={"path": "/some/dir"})`. '
f"Got pandas_kwargs={pandas_kwargs!r}."
)
hash_object = hashlib.sha256()
hash_value = str(value.columns) + str(value.shape) + str(value.dtypes)
hash_object.update(hash_value.encode())
Expand All @@ -53,15 +65,17 @@ def serialize_pandas_df(value: pd.DataFrame, pandas_kwargs: dict, **kwargs) -> d


@serde.deserializer.register("pandas.DataFrame")
def deserialize_pandas_df(value: dict, pandas_kwargs: dict, **kwargs) -> pd.DataFrame:
def deserialize_pandas_df(
value: dict, pandas_kwargs: Optional[dict] = None, **kwargs
) -> pd.DataFrame:
"""Custom deserializer for pandas dataframes.

:param value: the dictionary to pull the path from to load the parquet file.
:param pandas_kwargs: other args to pass to the pandas read_parquet function.
:param pandas_kwargs: other args to pass to the pandas read_parquet function. Optional.
:param kwargs:
:return: pandas dataframe
"""
kwargs = pandas_kwargs.copy()
kwargs = pandas_kwargs.copy() if pandas_kwargs is not None else {}
if "path" in kwargs:
# remove this to not clash; we already have the full path.
kwargs.pop("path")
Expand Down
10 changes: 10 additions & 0 deletions tests/core/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,13 @@ def test_string_dispatch_with_key():
dispatch = StringDispatch()
dispatch.register("test_key")(lambda x: x)
assert dispatch.call("test_key", "test_value") == "test_value"


def test_string_dispatch_no_key_informative_message():
dispatch = StringDispatch()
dispatch.register("known_key")(lambda value: value)
with pytest.raises(ValueError) as exc_info:
dispatch.call("nonexistent_key")
assert "nonexistent_key" in str(exc_info.value)
assert "known_key" in str(exc_info.value)
assert "imported" in str(exc_info.value)
27 changes: 27 additions & 0 deletions tests/integrations/serde/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os

import pandas as pd
import pytest

from burr.core import serde, state

Expand All @@ -41,3 +42,29 @@ def test_serde_of_pandas_dataframe(tmp_path):
ng = state.State.deserialize(serialized, pandas_kwargs={"path": tmp_path})
assert isinstance(ng["df"], pd.DataFrame)
pd.testing.assert_frame_equal(ng["df"], df)


def test_serialize_pandas_df_without_pandas_kwargs_raises_informative_error():
df = pd.DataFrame({"a": [1, 2, 3]})
og = state.State({"df": df})
with pytest.raises(ValueError) as exc_info:
og.serialize()
assert "Failed to serialize state field 'df'" in str(exc_info.value)
assert "pandas_kwargs" in str(exc_info.value)
assert "path" in str(exc_info.value)


def test_serialize_pandas_df_without_path_raises_informative_error():
df = pd.DataFrame({"a": [1, 2, 3]})
og = state.State({"df": df})
with pytest.raises(ValueError) as exc_info:
og.serialize(pandas_kwargs={"compression": "snappy"})
assert "path" in str(exc_info.value)


def test_deserialize_pandas_df_without_pandas_kwargs(tmp_path):
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
og = state.State({"df": df})
serialized = og.serialize(pandas_kwargs={"path": tmp_path})
ng = state.State.deserialize(serialized)
pd.testing.assert_frame_equal(ng["df"], df)
Loading