-
Notifications
You must be signed in to change notification settings - Fork 296
feat(datasets): support multimodal dataset items #1710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ed19fe4
cd791b6
a9a4a96
2c6718a
592f9de
3832a4e
88905ce
d7eb3ed
7db0fe7
a2cba12
0e2ea09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
|
|
||
| import backoff | ||
| import httpx | ||
| from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped] | ||
| from opentelemetry import context as otel_context_api | ||
| from opentelemetry import trace as otel_trace_api | ||
| from opentelemetry.sdk.trace import ReadableSpan, TracerProvider | ||
|
|
@@ -94,6 +95,7 @@ | |
| CreateTextPromptRequest, | ||
| Dataset, | ||
| DatasetItem, | ||
| DatasetItemMediaReferenceField, | ||
| DatasetRunWithItems, | ||
| DatasetStatus, | ||
| DeleteDatasetRunResponse, | ||
|
|
@@ -126,7 +128,7 @@ | |
| _run_task, | ||
| ) | ||
| from langfuse.logger import langfuse_logger | ||
| from langfuse.media import LangfuseMedia | ||
| from langfuse.media import LangfuseMedia, LangfuseMediaReference | ||
| from langfuse.model import ( | ||
| ChatMessageDict, | ||
| ChatMessageWithPlaceholdersDict, | ||
|
|
@@ -2322,15 +2324,17 @@ def get_dataset( | |
| *, | ||
| fetch_items_page_size: Optional[int] = 50, | ||
| version: Optional[datetime] = None, | ||
| resolve_media_references: bool = False, | ||
| ) -> "DatasetClient": | ||
| """Fetch a dataset by its name. | ||
|
|
||
| Args: | ||
| name (str): The name of the dataset to fetch. | ||
| fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. | ||
| version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). | ||
| name: The name of the dataset to fetch. | ||
| fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. | ||
| version: Retrieve dataset items as they existed at this specific point in time (UTC). | ||
| If provided, returns the state of items at the specified UTC timestamp. | ||
| If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. | ||
| resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects. | ||
|
|
||
| Returns: | ||
| DatasetClient: The dataset with the given name. | ||
|
|
@@ -2339,7 +2343,7 @@ def get_dataset( | |
| langfuse_logger.debug(f"Getting datasets {name}") | ||
| dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) | ||
|
|
||
| dataset_items = [] | ||
| dataset_items: List[DatasetItem] = [] | ||
| page = 1 | ||
|
|
||
| while True: | ||
|
|
@@ -2348,8 +2352,16 @@ def get_dataset( | |
| page=page, | ||
| limit=fetch_items_page_size, | ||
| version=version, | ||
| include_media_references=resolve_media_references or None, | ||
| ) | ||
| dataset_items.extend( | ||
| [ | ||
| self._hydrate_dataset_item_media_references(item) | ||
| for item in new_items.data | ||
| ] | ||
| if resolve_media_references | ||
| else new_items.data | ||
| ) | ||
| dataset_items.extend(new_items.data) | ||
|
|
||
| if new_items.meta.total_pages <= page: | ||
| break | ||
|
|
@@ -3355,6 +3367,17 @@ def create_dataset_item( | |
| try: | ||
| langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") | ||
|
|
||
| uploaded_media_ids: set[str] = set() | ||
| input = self._process_dataset_item_media( | ||
| data=input, uploaded_media_ids=uploaded_media_ids | ||
| ) | ||
| expected_output = self._process_dataset_item_media( | ||
| data=expected_output, uploaded_media_ids=uploaded_media_ids | ||
| ) | ||
| metadata = self._process_dataset_item_media( | ||
| data=metadata, uploaded_media_ids=uploaded_media_ids | ||
| ) | ||
|
|
||
| result = self.api.dataset_items.create( | ||
| dataset_name=dataset_name, | ||
| input=input, | ||
|
|
@@ -3371,6 +3394,136 @@ def create_dataset_item( | |
| handle_fern_exception(e) | ||
| raise e | ||
|
|
||
| def _process_dataset_item_media( | ||
| self, *, data: Any, uploaded_media_ids: set[str] | ||
| ) -> Any: | ||
| if self._resources is None: | ||
| return data | ||
|
|
||
| max_levels = 10 | ||
|
|
||
| def _process_data_recursively( | ||
| data: Any, level: int, ancestor_container_ids: set[int] | ||
| ) -> Any: | ||
| # Avoid jsonpath-ng here: dataset writes should keep working | ||
| # under python -OO where parser docstrings may be stripped. | ||
|
Comment on lines
+3408
to
+3409
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in my opinion an okay compromise as the write path doesn't become a hard blocker for users and easy to maintain Avoid hand rolling jsonpath parsing for the read path -> users can also opt out here so in my opinion fine |
||
| if isinstance(data, LangfuseMedia): | ||
| return self._upload_dataset_item_media( | ||
| media=data, uploaded_media_ids=uploaded_media_ids | ||
| ) | ||
|
|
||
| if isinstance(data, LangfuseMediaReference): | ||
| return data.reference_string if data.reference_string else data | ||
|
|
||
| # Tuples are intentionally excluded: namedtuple subclasses can't be | ||
| # rebuilt from an iterable, so media inside them is left untouched. | ||
| if not isinstance(data, (list, set, frozenset, dict)): | ||
| return data | ||
|
|
||
| # Container ids only protect against recursive cycles; media upload | ||
| # dedupe is handled by uploaded_media_ids. | ||
| data_id = id(data) | ||
| if data_id in ancestor_container_ids or level > max_levels: | ||
| return data | ||
|
|
||
| next_ancestor_container_ids = ancestor_container_ids | {data_id} | ||
|
|
||
| if isinstance(data, (list, set, frozenset)): | ||
| processed = ( | ||
| _process_data_recursively( | ||
| item, level + 1, next_ancestor_container_ids | ||
| ) | ||
| for item in data | ||
| ) | ||
| return type(data)(processed) | ||
|
|
||
| return { | ||
| key: _process_data_recursively( | ||
| value, level + 1, next_ancestor_container_ids | ||
|
wochinge marked this conversation as resolved.
|
||
| ) | ||
| for key, value in data.items() | ||
| } | ||
|
|
||
| return _process_data_recursively(data, 1, set()) | ||
|
|
||
| def _upload_dataset_item_media( | ||
| self, *, media: LangfuseMedia, uploaded_media_ids: set[str] | ||
| ) -> str: | ||
| reference_string = media._reference_string | ||
| media_id = media._media_id | ||
|
|
||
| if reference_string is None or media_id is None: | ||
| raise ValueError("Cannot create dataset item with invalid LangfuseMedia.") | ||
|
|
||
| if media_id not in uploaded_media_ids: | ||
| assert self._resources is not None | ||
| self._resources._media_manager._upload_media_sync(media=media) | ||
| uploaded_media_ids.add(media_id) | ||
|
|
||
| return reference_string | ||
|
|
||
| def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: | ||
| media_references = item.media_references or [] | ||
| if not media_references: | ||
| return item | ||
|
|
||
| # Map the API enum member to the snake_case model attribute so this keeps | ||
| # working regardless of the enum's wire value (e.g. "expectedOutput"). | ||
| attr_by_field = { | ||
| DatasetItemMediaReferenceField.INPUT: "input", | ||
| DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", | ||
| DatasetItemMediaReferenceField.METADATA: "metadata", | ||
| } | ||
| hydrated_fields = { | ||
| "input": item.input, | ||
| "expected_output": item.expected_output, | ||
| "metadata": item.metadata, | ||
| } | ||
|
|
||
| for media_reference in media_references: | ||
| media = media_reference.media | ||
| if media is None: | ||
| continue | ||
|
|
||
| field = attr_by_field.get(media_reference.field) | ||
| if field is None: | ||
| continue | ||
|
|
||
| replacement = LangfuseMediaReference( | ||
| media_id=media.media_id, | ||
| content_type=media.content_type, | ||
| url=media.url, | ||
| url_expiry=media.url_expiry, | ||
| content_length=media.content_length, | ||
| reference_string=media_reference.reference_string, | ||
| ) | ||
|
claude[bot] marked this conversation as resolved.
|
||
| hydrated_fields[field] = self._replace_json_path_value( | ||
| value=hydrated_fields[field], | ||
| json_path=media_reference.json_path, | ||
| replacement=replacement, | ||
| ) | ||
|
|
||
| return item.model_copy( | ||
| update={ | ||
| "input": hydrated_fields["input"], | ||
| "expected_output": hydrated_fields["expected_output"], | ||
| "metadata": hydrated_fields["metadata"], | ||
| } | ||
| ) | ||
|
|
||
| def _replace_json_path_value( | ||
| self, *, value: Any, json_path: str, replacement: LangfuseMediaReference | ||
| ) -> Any: | ||
| try: | ||
| value = parse_jsonpath(json_path).update(value, replacement) | ||
| except Exception as e: | ||
| langfuse_logger.warning( | ||
| f"Failed to hydrate dataset media reference at JSONPath {json_path}", | ||
| exc_info=e, | ||
| ) | ||
|
|
||
| return value | ||
|
wochinge marked this conversation as resolved.
|
||
|
|
||
| def resolve_media_references( | ||
| self, | ||
| *, | ||
|
claude[bot] marked this conversation as resolved.
|
||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actual (non generated changes) |
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actual (non generated changes) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
|
|
||
| from pydantic import BaseModel | ||
|
|
||
| from langfuse.media import LangfuseMedia | ||
| from langfuse.media import LangfuseMedia, LangfuseMediaReference | ||
|
|
||
| # Attempt to import Serializable | ||
| try: | ||
|
|
@@ -59,12 +59,18 @@ | |
| if isinstance(obj, LangfuseMedia): | ||
| return ( | ||
| obj._reference_string | ||
| or f"<Upload handling failed for LangfuseMedia of type {obj._content_type}>" | ||
| ) | ||
|
|
||
| if ( | ||
| isinstance(obj, LangfuseMediaReference) | ||
| and obj.reference_string is not None | ||
| ): | ||
| return obj.reference_string | ||
|
|
||
| # Check if numpy is available and if the object is a numpy scalar | ||
| # If so, convert it to a Python scalar using the item() method | ||
| if np is not None and isinstance(obj, np.generic): | ||
|
Check failure on line 73 in langfuse/_utils/serializer.py
|
||
|
Comment on lines
62
to
73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The new Extended reasoning...What the bug isThis PR adds a new opacity branch in That branch only fires when the LMR is dispatched directly to Why existing safeguards don't catch it
Step-by-step proofVerified empirically against current HEAD: @dataclass
class Container:
image_ref: LangfuseMediaReference
note: str = 'hello'
lmr = LangfuseMediaReference(
media_id='abc', content_type='image/png',
url='https://signed.example.com/SECRET_TOKEN',
url_expiry='2026-06-15T12:00:00Z', content_length=1234,
reference_string='@@@langfuseMedia:type=image/png|id=abc|source=bytes@@@',
)
s = EventSerializer()
print(s.encode(lmr))
# "@@@langfuseMedia:type=image/png|id=abc|source=bytes@@@" (opacity preserved)
print(s.encode({'image_ref': lmr}))
# {"image_ref": "@@@langfuseMedia:..."} (opacity preserved)
print(s.encode(Container(image_ref=lmr)))
# {"image_ref": {"media_id": "abc", "content_type": "image/png",
# "url": "https://signed.example.com/SECRET_TOKEN",
# "url_expiry": "...", "content_length": 1234,
# "reference_string": "@@@..."}, "note": "hello"} (URL LEAKED)Same shape for ImpactThe signed The round-trip case from bug #5's fix breaks specifically when an LMR is wrapped in a user dataclass/Pydantic model: the server's media-reference scanner still finds the The trigger is plausible: users with How to fixIn the dataclass branch (and analogously for the Pydantic branch), iterate fields manually and route each value back through if is_dataclass(obj):
from dataclasses import fields
return {f.name: self.default(getattr(obj, f.name)) for f in fields(obj)}
if isinstance(obj, BaseModel):
obj.model_rebuild()
if isinstance(raw := getattr(obj, "raw", None), BaseModel):
raw.model_rebuild()
return {k: self.default(v) for k, v in obj.model_dump().items()}The Pydantic case needs a small pre-pass or field iteration since (The write path — fern's |
||
| return obj.item() | ||
|
|
||
| # Check if numpy is available and if the object is a numpy array | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actual (non generated changes)