From ed19fe4984b29c99f074a94127e80ea39c692cf3 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 11:40:32 +0200 Subject: [PATCH 01/10] feat(api): update generated dataset media API --- langfuse/api/__init__.py | 9 ++++ langfuse/api/commons/__init__.py | 9 ++++ langfuse/api/commons/types/__init__.py | 9 ++++ langfuse/api/commons/types/dataset_item.py | 8 ++++ .../types/dataset_item_media_reference.py | 42 +++++++++++++++++ .../dataset_item_media_reference_field.py | 26 ++++++++++ .../dataset_item_media_reference_media.py | 47 +++++++++++++++++++ langfuse/api/dataset_items/client.py | 40 ++++++++++++++-- langfuse/api/dataset_items/raw_client.py | 34 +++++++++++++- langfuse/api/media/client.py | 44 ++++++++--------- langfuse/api/media/raw_client.py | 32 ++++++------- .../types/get_media_upload_url_request.py | 14 +++--- 12 files changed, 261 insertions(+), 53 deletions(-) create mode 100644 langfuse/api/commons/types/dataset_item_media_reference.py create mode 100644 langfuse/api/commons/types/dataset_item_media_reference_field.py create mode 100644 langfuse/api/commons/types/dataset_item_media_reference_media.py diff --git a/langfuse/api/__init__.py b/langfuse/api/__init__.py index 46985c0b9..256f1230b 100644 --- a/langfuse/api/__init__.py +++ b/langfuse/api/__init__.py @@ -86,6 +86,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -396,6 +399,9 @@ "CreateTextPromptType": ".prompts", "Dataset": ".commons", "DatasetItem": ".commons", + "DatasetItemMediaReference": ".commons", + "DatasetItemMediaReferenceField": ".commons", + "DatasetItemMediaReferenceMedia": ".commons", "DatasetRun": ".commons", "DatasetRunItem": ".commons", "DatasetRunWithItems": ".commons", @@ -717,6 +723,9 @@ def __dir__(): "CreateTextPromptType", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/__init__.py b/langfuse/api/commons/__init__.py index 81cb57f96..a35064081 100644 --- a/langfuse/api/commons/__init__.py +++ b/langfuse/api/commons/__init__.py @@ -20,6 +20,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -84,6 +87,9 @@ "CreateScoreValue": ".types", "Dataset": ".types", "DatasetItem": ".types", + "DatasetItemMediaReference": ".types", + "DatasetItemMediaReferenceField": ".types", + "DatasetItemMediaReferenceMedia": ".types", "DatasetRun": ".types", "DatasetRunItem": ".types", "DatasetRunWithItems": ".types", @@ -174,6 +180,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/__init__.py b/langfuse/api/commons/types/__init__.py index 5ce0a58cd..12faf307f 100644 --- a/langfuse/api/commons/types/__init__.py +++ b/langfuse/api/commons/types/__init__.py @@ -19,6 +19,9 @@ from .create_score_value import CreateScoreValue from .dataset import Dataset from .dataset_item import DatasetItem + from .dataset_item_media_reference import DatasetItemMediaReference + from .dataset_item_media_reference_field import DatasetItemMediaReferenceField + from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia from .dataset_run import DatasetRun from .dataset_run_item import DatasetRunItem from .dataset_run_with_items import DatasetRunWithItems @@ -78,6 +81,9 @@ "CreateScoreValue": ".create_score_value", "Dataset": ".dataset", "DatasetItem": ".dataset_item", + "DatasetItemMediaReference": ".dataset_item_media_reference", + "DatasetItemMediaReferenceField": ".dataset_item_media_reference_field", + "DatasetItemMediaReferenceMedia": ".dataset_item_media_reference_media", "DatasetRun": ".dataset_run", "DatasetRunItem": ".dataset_run_item", "DatasetRunWithItems": ".dataset_run_with_items", @@ -163,6 +169,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/dataset_item.py b/langfuse/api/commons/types/dataset_item.py index 54a13d81a..8d49f6439 100644 --- a/langfuse/api/commons/types/dataset_item.py +++ b/langfuse/api/commons/types/dataset_item.py @@ -7,6 +7,7 @@ import typing_extensions from ...core.pydantic_utilities import UniversalBaseModel from ...core.serialization import FieldMetadata +from .dataset_item_media_reference import DatasetItemMediaReference from .dataset_status import DatasetStatus @@ -52,6 +53,13 @@ class DatasetItem(UniversalBaseModel): updated_at: typing_extensions.Annotated[ dt.datetime, FieldMetadata(alias="updatedAt") ] + media_references: typing_extensions.Annotated[ + typing.Optional[typing.List[DatasetItemMediaReference]], + FieldMetadata(alias="mediaReferences"), + ] = pydantic.Field(default=None) + """ + Resolved Langfuse media references found in input, expectedOutput, and metadata. Only present when requested via includeMediaReferences. + """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( extra="allow", frozen=True diff --git a/langfuse/api/commons/types/dataset_item_media_reference.py b/langfuse/api/commons/types/dataset_item_media_reference.py new file mode 100644 index 000000000..95121fa41 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference.py @@ -0,0 +1,42 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata +from .dataset_item_media_reference_field import DatasetItemMediaReferenceField +from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia + + +class DatasetItemMediaReference(UniversalBaseModel): + field: DatasetItemMediaReferenceField = pydantic.Field() + """ + The dataset item field containing the reference + """ + + reference_string: typing_extensions.Annotated[ + str, FieldMetadata(alias="referenceString") + ] = pydantic.Field() + """ + The Langfuse media reference string, e.g. `@@@langfuseMedia:type=image/png|id=...|source=bytes@@@` + """ + + json_path: typing_extensions.Annotated[str, FieldMetadata(alias="jsonPath")] = ( + pydantic.Field() + ) + """ + JSONPath of the string holding the reference within the field, e.g. `$['image']` + """ + + media: typing.Optional[DatasetItemMediaReferenceMedia] = pydantic.Field( + default=None + ) + """ + The resolved media record. Null if the referenced media does not exist or has not been uploaded successfully. + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/commons/types/dataset_item_media_reference_field.py b/langfuse/api/commons/types/dataset_item_media_reference_field.py new file mode 100644 index 000000000..6a7c3a23e --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_field.py @@ -0,0 +1,26 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +from ...core import enum + +T_Result = typing.TypeVar("T_Result") + + +class DatasetItemMediaReferenceField(enum.StrEnum): + INPUT = "input" + EXPECTED_OUTPUT = "expected_output" + METADATA = "metadata" + + def visit( + self, + input: typing.Callable[[], T_Result], + expected_output: typing.Callable[[], T_Result], + metadata: typing.Callable[[], T_Result], + ) -> T_Result: + if self is DatasetItemMediaReferenceField.INPUT: + return input() + if self is DatasetItemMediaReferenceField.EXPECTED_OUTPUT: + return expected_output() + if self is DatasetItemMediaReferenceField.METADATA: + return metadata() diff --git a/langfuse/api/commons/types/dataset_item_media_reference_media.py b/langfuse/api/commons/types/dataset_item_media_reference_media.py new file mode 100644 index 000000000..94ca39f38 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_media.py @@ -0,0 +1,47 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata + + +class DatasetItemMediaReferenceMedia(UniversalBaseModel): + media_id: typing_extensions.Annotated[str, FieldMetadata(alias="mediaId")] = ( + pydantic.Field() + ) + """ + The unique langfuse identifier of the media record + """ + + content_type: typing_extensions.Annotated[ + str, FieldMetadata(alias="contentType") + ] = pydantic.Field() + """ + The MIME type of the media record + """ + + content_length: typing_extensions.Annotated[ + int, FieldMetadata(alias="contentLength") + ] = pydantic.Field() + """ + The size of the media record in bytes + """ + + url: str = pydantic.Field() + """ + The signed download URL of the media record + """ + + url_expiry: typing_extensions.Annotated[str, FieldMetadata(alias="urlExpiry")] = ( + pydantic.Field() + ) + """ + The expiry date and time of the download URL + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/dataset_items/client.py b/langfuse/api/dataset_items/client.py index c44ca24d8..62416019c 100644 --- a/langfuse/api/dataset_items/client.py +++ b/langfuse/api/dataset_items/client.py @@ -103,7 +103,11 @@ def create( return _response.data def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -112,6 +116,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -135,7 +142,11 @@ def get( id="id", ) """ - _response = self._raw_client.get(id, request_options=request_options) + _response = self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data def list( @@ -145,6 +156,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -166,6 +178,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -198,6 +213,7 @@ def list( source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, @@ -337,7 +353,11 @@ async def main() -> None: return _response.data async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -346,6 +366,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -377,7 +400,11 @@ async def main() -> None: asyncio.run(main()) """ - _response = await self._raw_client.get(id, request_options=request_options) + _response = await self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data async def list( @@ -387,6 +414,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -408,6 +436,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -448,6 +479,7 @@ async def main() -> None: source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, diff --git a/langfuse/api/dataset_items/raw_client.py b/langfuse/api/dataset_items/raw_client.py index 6aeafb54d..9fd58b78a 100644 --- a/langfuse/api/dataset_items/raw_client.py +++ b/langfuse/api/dataset_items/raw_client.py @@ -167,7 +167,11 @@ def create( ) def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[DatasetItem]: """ Get a dataset item @@ -176,6 +180,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -186,6 +193,9 @@ def get( _response = self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -273,6 +283,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -294,6 +305,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -315,6 +329,7 @@ def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, @@ -641,7 +656,11 @@ async def create( ) async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[DatasetItem]: """ Get a dataset item @@ -650,6 +669,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -660,6 +682,9 @@ async def get( _response = await self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -747,6 +772,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -768,6 +794,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -789,6 +818,7 @@ async def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, diff --git a/langfuse/api/media/client.py b/langfuse/api/media/client.py index b22272b92..648cc72f5 100644 --- a/langfuse/api/media/client.py +++ b/langfuse/api/media/client.py @@ -138,12 +138,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -151,9 +151,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -162,11 +159,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -189,20 +189,18 @@ def get_upload_url( base_url="https://yourhost.com/path/to/api", ) client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) """ _response = self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data @@ -349,12 +347,12 @@ async def main() -> None: async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -362,9 +360,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -373,11 +368,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -405,23 +403,21 @@ async def get_upload_url( async def main() -> None: await client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) asyncio.run(main()) """ _response = await self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data diff --git a/langfuse/api/media/raw_client.py b/langfuse/api/media/raw_client.py index 4cc619770..9f4fa3d81 100644 --- a/langfuse/api/media/raw_client.py +++ b/langfuse/api/media/raw_client.py @@ -251,12 +251,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[GetMediaUploadUrlResponse]: """ @@ -264,9 +264,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -275,11 +272,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -609,12 +609,12 @@ async def patch( async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[GetMediaUploadUrlResponse]: """ @@ -622,9 +622,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -633,11 +630,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. diff --git a/langfuse/api/media/types/get_media_upload_url_request.py b/langfuse/api/media/types/get_media_upload_url_request.py index 99f055847..7222fbdba 100644 --- a/langfuse/api/media/types/get_media_upload_url_request.py +++ b/langfuse/api/media/types/get_media_upload_url_request.py @@ -10,18 +10,18 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): - trace_id: typing_extensions.Annotated[str, FieldMetadata(alias="traceId")] = ( - pydantic.Field() - ) + trace_id: typing_extensions.Annotated[ + typing.Optional[str], FieldMetadata(alias="traceId") + ] = pydantic.Field(default=None) """ - The trace ID associated with the media record + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. """ observation_id: typing_extensions.Annotated[ typing.Optional[str], FieldMetadata(alias="observationId") ] = pydantic.Field(default=None) """ - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. """ content_type: typing_extensions.Annotated[ @@ -41,9 +41,9 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): The SHA-256 hash of the media record """ - field: str = pydantic.Field() + field: typing.Optional[str] = pydantic.Field(default=None) """ - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( From cd791b64eee1677fdddca4070cdd289f4958969a Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 17:35:27 +0200 Subject: [PATCH 02/10] feat(datasets): support media references Adds dataset item media upload and optional get_dataset media hydration. --- langfuse/__init__.py | 3 + langfuse/_client/client.py | 126 ++++++++++++++++++- langfuse/_task_manager/media_manager.py | 29 +++++ langfuse/_task_manager/media_upload_queue.py | 4 +- langfuse/media.py | 45 +++++++ pyproject.toml | 1 + tests/e2e/test_datasets.py | 56 +++++++++ tests/unit/test_datasets.py | 116 +++++++++++++++++ tests/unit/test_media_manager.py | 30 +++++ uv.lock | 13 +- 10 files changed, 414 insertions(+), 9 deletions(-) create mode 100644 tests/unit/test_datasets.py diff --git a/langfuse/__init__.py b/langfuse/__init__.py index 08d8325cf..c8144a0e4 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -29,6 +29,7 @@ LangfuseTool, ) from ._version import __version__ +from .media import LangfuseMedia, LangfuseMediaReference from .span_filter import ( KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES, is_default_export_span, @@ -41,6 +42,8 @@ __all__ = [ "Langfuse", + "LangfuseMedia", + "LangfuseMediaReference", "get_client", "observe", "propagate_attributes", diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 2f1c8d783..a989b7ac1 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -27,6 +27,7 @@ import backoff import httpx +from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped] from opentelemetry import context as otel_context_api from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -127,7 +128,7 @@ _run_task, ) from langfuse.logger import langfuse_logger -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference from langfuse.model import ( ChatMessageDict, ChatMessageWithPlaceholdersDict, @@ -2262,15 +2263,17 @@ def get_dataset( *, fetch_items_page_size: Optional[int] = 50, version: Optional[datetime] = None, + resolve_media_references: bool = False, ) -> "DatasetClient": """Fetch a dataset by its name. Args: - name (str): The name of the dataset to fetch. - fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. - version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). + name: The name of the dataset to fetch. + fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. + version: Retrieve dataset items as they existed at this specific point in time (UTC). If provided, returns the state of items at the specified UTC timestamp. If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. + resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects. Returns: DatasetClient: The dataset with the given name. @@ -2279,7 +2282,7 @@ def get_dataset( langfuse_logger.debug(f"Getting datasets {name}") dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) - dataset_items = [] + dataset_items: List[DatasetItem] = [] page = 1 while True: @@ -2288,8 +2291,16 @@ def get_dataset( page=page, limit=fetch_items_page_size, version=version, + include_media_references=resolve_media_references or None, + ) + dataset_items.extend( + [ + self._hydrate_dataset_item_media_references(item) + for item in new_items.data + ] + if resolve_media_references + else new_items.data ) - dataset_items.extend(new_items.data) if new_items.meta.total_pages <= page: break @@ -3295,6 +3306,17 @@ def create_dataset_item( try: langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") + uploaded_media_ids: set[str] = set() + input = self._process_dataset_item_media( + data=input, uploaded_media_ids=uploaded_media_ids + ) + expected_output = self._process_dataset_item_media( + data=expected_output, uploaded_media_ids=uploaded_media_ids + ) + metadata = self._process_dataset_item_media( + data=metadata, uploaded_media_ids=uploaded_media_ids + ) + result = self.api.dataset_items.create( dataset_name=dataset_name, input=input, @@ -3311,6 +3333,98 @@ def create_dataset_item( handle_fern_exception(e) raise e + def _process_dataset_item_media( + self, *, data: Any, uploaded_media_ids: set[str] + ) -> Any: + if self._resources is None: + return data + + for match in parse_jsonpath("$..`this`").find(data): + if not isinstance(match.value, LangfuseMedia): + continue + + data = match.full_path.update( + data, + self._upload_dataset_item_media( + media=match.value, uploaded_media_ids=uploaded_media_ids + ), + ) + + return data + + def _upload_dataset_item_media( + self, *, media: LangfuseMedia, uploaded_media_ids: set[str] + ) -> str: + reference_string = media._reference_string + media_id = media._media_id + + if reference_string is None or media_id is None: + raise ValueError("Cannot create dataset item with invalid LangfuseMedia.") + + if media_id not in uploaded_media_ids: + assert self._resources is not None + self._resources._media_manager._upload_media_sync(media=media) + uploaded_media_ids.add(media_id) + + return reference_string + + def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: + media_references = item.media_references or [] + if not media_references: + return item + + hydrated_fields = { + "input": item.input, + "expected_output": item.expected_output, + "metadata": item.metadata, + } + + for media_reference in media_references: + media = media_reference.media + if media is None: + continue + + field = media_reference.field.value + if field not in hydrated_fields: + continue + + replacement = LangfuseMediaReference( + media_id=media.media_id, + content_type=media.content_type, + url=media.url, + url_expiry=media.url_expiry, + content_length=media.content_length, + ) + hydrated_fields[field] = self._replace_json_path_value( + value=hydrated_fields[field], + json_path=media_reference.json_path, + replacement=replacement, + ) + + return item.model_copy( + update={ + "input": hydrated_fields["input"], + "expected_output": hydrated_fields["expected_output"], + "metadata": hydrated_fields["metadata"], + } + ) + + def _replace_json_path_value( + self, *, value: Any, json_path: str, replacement: LangfuseMediaReference + ) -> Any: + if json_path == "$": + return replacement + + try: + parse_jsonpath(json_path).update(value, replacement) + except Exception as e: + langfuse_logger.debug( + f"Failed to hydrate dataset media reference at JSONPath {json_path}", + exc_info=e, + ) + + return value + def resolve_media_references( self, *, diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 7a7123798..9f71c1bef 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -230,6 +230,35 @@ def _process_media( f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}" ) + def _upload_media_sync(self, *, media: LangfuseMedia) -> None: + if not self._enabled: + raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") + + if ( + media._content_length is None + or media._content_type is None + or media._content_sha256_hash is None + or media._content_bytes is None + ): + return + + if media._media_id is None: + logger.error("Media ID is None. Skipping upload.") + return + + upload_media_job = UploadMediaJob( + media_id=media._media_id, + content_bytes=media._content_bytes, + content_type=media._content_type, + content_length=media._content_length, + content_sha256_hash=media._content_sha256_hash, + trace_id=None, + observation_id=None, + field=None, + ) + + self._process_upload_media_job(data=upload_media_job) + def _process_upload_media_job( self, *, diff --git a/langfuse/_task_manager/media_upload_queue.py b/langfuse/_task_manager/media_upload_queue.py index e4cd8ebee..aac852105 100644 --- a/langfuse/_task_manager/media_upload_queue.py +++ b/langfuse/_task_manager/media_upload_queue.py @@ -7,6 +7,6 @@ class UploadMediaJob(TypedDict): content_length: int content_bytes: bytes content_sha256_hash: str - trace_id: str + trace_id: Optional[str] observation_id: Optional[str] - field: str + field: Optional[str] diff --git a/langfuse/media.py b/langfuse/media.py index 53940382c..b20c6e52d 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -4,6 +4,8 @@ import hashlib import os import re +from dataclasses import dataclass +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast import httpx @@ -18,6 +20,49 @@ T = TypeVar("T") +@dataclass(frozen=True) +class LangfuseMediaReference: + """Resolved reference to media stored in Langfuse.""" + + media_id: str + content_type: str + url: str + url_expiry: Optional[str] = None + content_length: Optional[int] = None + + def url_is_expired(self) -> bool: + """Return whether the signed URL is already expired.""" + if self.url_expiry is None: + return False + + expiry = self.url_expiry.replace("Z", "+00:00") + + try: + expiry_datetime = datetime.fromisoformat(expiry) + except ValueError: + return False + + if expiry_datetime.tzinfo is None: + expiry_datetime = expiry_datetime.replace(tzinfo=timezone.utc) + + return expiry_datetime <= datetime.now(timezone.utc) + + def fetch_bytes(self) -> bytes: + """Fetch the media content from the signed URL.""" + response = httpx.get(self.url) + response.raise_for_status() + + return response.content + + def fetch_base64(self) -> str: + """Fetch media and return raw base64 without a data URI prefix.""" + return base64.b64encode(self.fetch_bytes()).decode() + + def fetch_data_uri(self) -> str: + """Fetch media and return it as a data URI.""" + return f"data:{self.content_type};base64,{self.fetch_base64()}" + + class LangfuseMedia: """A class for wrapping media objects for upload to Langfuse. diff --git a/pyproject.toml b/pyproject.toml index ceb7ea368..d210149f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "opentelemetry-api>=1.33.1,<2", "opentelemetry-sdk>=1.33.1,<2", "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", + "jsonpath-ng>=1.8.0", ] [dependency-groups] diff --git a/tests/e2e/test_datasets.py b/tests/e2e/test_datasets.py index 8d575180a..3ba58e03d 100644 --- a/tests/e2e/test_datasets.py +++ b/tests/e2e/test_datasets.py @@ -1,10 +1,15 @@ import time from datetime import timedelta +from pathlib import Path from langfuse import Langfuse from langfuse.api import DatasetStatus +from langfuse.media import LangfuseMedia, LangfuseMediaReference from tests.support.utils import create_uuid, wait_for_result +SAMPLE_IMAGE_PATH = Path("static/puton.jpg") +SAMPLE_IMAGE_CONTENT_TYPE = "image/jpeg" + def test_create_and_get_dataset(): langfuse = Langfuse(debug=False) @@ -69,6 +74,57 @@ def test_create_dataset_item(): assert dataset.items[0].dataset_name == name +def test_create_and_get_dataset_item_with_media(): + langfuse = Langfuse(debug=False) + name = create_uuid() + langfuse.create_dataset(name=name) + sample_image_bytes = SAMPLE_IMAGE_PATH.read_bytes() + + created_item = langfuse.create_dataset_item( + dataset_name=name, + input={ + "question": "What is in this image?", + "image": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + expected_output={ + "reference": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + metadata={ + "thumbnail": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ) + }, + ) + + assert created_item.input["image"].startswith("@@@langfuseMedia:") + + raw_dataset = langfuse.get_dataset(name) + assert isinstance(raw_dataset.items[0].input["image"], str) + + resolved_dataset = wait_for_result( + lambda: langfuse.get_dataset(name, resolve_media_references=True), + is_result_ready=lambda dataset: isinstance( + dataset.items[0].input["image"], LangfuseMediaReference + ), + ) + + resolved_item = resolved_dataset.items[0] + assert isinstance(resolved_item.input["image"], LangfuseMediaReference) + assert isinstance( + resolved_item.expected_output["reference"], LangfuseMediaReference + ) + assert isinstance(resolved_item.metadata["thumbnail"], LangfuseMediaReference) + assert resolved_item.input["image"].content_type == SAMPLE_IMAGE_CONTENT_TYPE + assert resolved_item.input["image"].fetch_bytes() == sample_image_bytes + + def test_get_all_items(): langfuse = Langfuse(debug=False) name = create_uuid() diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py new file mode 100644 index 000000000..1819fb8f4 --- /dev/null +++ b/tests/unit/test_datasets.py @@ -0,0 +1,116 @@ +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import Mock + +from langfuse._client.client import Langfuse +from langfuse.api import ( + DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, + DatasetStatus, +) +from langfuse.media import LangfuseMedia, LangfuseMediaReference + + +def test_hydrate_dataset_item_media_references_replaces_matching_fields(): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={ + "image": reference_string, + "duplicate": reference_string, + "text": "keep", + }, + expected_output=[reference_string], + metadata={"nested": {"image": reference_string}}, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + reference_string=reference_string, + json_path="$[0]", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.METADATA, + reference_string=reference_string, + json_path="$['nested']['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert hydrated.input["text"] == "keep" + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + assert hydrated.input["duplicate"] == reference_string + assert isinstance(hydrated.expected_output[0], LangfuseMediaReference) + assert isinstance(hydrated.metadata["nested"]["image"], LangfuseMediaReference) + assert hydrated.input["image"].media_id == "media-id" + + +def test_create_dataset_item_processes_media_before_api_call(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + result = client.create_dataset_item( + dataset_name="dataset", + input={"image": media}, + expected_output=root_media, + metadata={"items": [media], "keep": "value"}, + ) + + assert result == "created-item" + media_manager._upload_media_sync.assert_any_call(media=media) + media_manager._upload_media_sync.assert_any_call(media=root_media) + assert media_manager._upload_media_sync.call_count == 2 + dataset_items_api.create.assert_called_once_with( + dataset_name="dataset", + input={"image": media._reference_string}, + expected_output=root_media._reference_string, + metadata={"items": [media._reference_string], "keep": "value"}, + source_trace_id=None, + source_observation_id=None, + status=None, + id=None, + ) diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 68684fac4..43b57aa98 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -142,6 +142,36 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() +def test_upload_media_sync_uploads_without_trace_context(): + media_api = Mock() + media_api.get_upload_url.return_value = SimpleNamespace( + upload_url="https://example.com/upload", + media_id=None, + ) + media_api.patch.return_value = None + + httpx_client = Mock() + httpx_client.put.return_value = _upload_response(200, "ok") + + manager = MediaManager( + api_client=SimpleNamespace(media=media_api), + httpx_client=httpx_client, + media_upload_queue=Queue(), + ) + + media = LangfuseMedia(content_bytes=b"payload", content_type="image/jpeg") + media_api.get_upload_url.return_value.media_id = media._media_id + + manager._upload_media_sync(media=media) + + media_api.get_upload_url.assert_called_once() + assert media_api.get_upload_url.call_args.kwargs["trace_id"] is None + assert media_api.get_upload_url.call_args.kwargs["observation_id"] is None + assert media_api.get_upload_url.call_args.kwargs["field"] is None + httpx_client.put.assert_called_once() + media_api.patch.assert_called_once() + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( diff --git a/uv.lock b/uv.lock index 7c321118f..9356aef55 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-04T15:30:59.411452714Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -469,6 +469,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/07/02e16ed01e04a374e644b575638ec7987ae846d25ad97bcc9945a3ee4b0e/jsonpatch-1.33-py2.py3-none-any.whl", hash = "sha256:0ae28c0cd062bbd8b8ecc26d7d164fbbea9652a1a3693f3b956c1eae5145dade", size = 12898, upload-time = "2023-06-16T21:01:28.466Z" }, ] +[[package]] +name = "jsonpath-ng" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" }, +] + [[package]] name = "jsonpointer" version = "3.1.0" @@ -559,6 +568,7 @@ source = { editable = "." } dependencies = [ { name = "backoff" }, { name = "httpx" }, + { name = "jsonpath-ng" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, @@ -593,6 +603,7 @@ docs = [ requires-dist = [ { name = "backoff", specifier = ">=1.10.0" }, { name = "httpx", specifier = ">=0.15.4,<1.0" }, + { name = "jsonpath-ng", specifier = ">=1.8.0" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2" }, From a9a4a965f2fe08b966d13ccf638a8099d0cb0ad0 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 18:19:17 +0200 Subject: [PATCH 03/10] fix(datasets): address media review feedback --- langfuse/_client/client.py | 38 ++++++++++++++++++------- langfuse/_task_manager/media_manager.py | 5 ++-- langfuse/media.py | 12 ++++---- tests/unit/test_datasets.py | 8 ++++-- tests/unit/test_media.py | 19 ++++++++++++- tests/unit/test_media_manager.py | 12 ++++++++ 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a989b7ac1..89a801b2d 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3339,18 +3339,34 @@ def _process_dataset_item_media( if self._resources is None: return data - for match in parse_jsonpath("$..`this`").find(data): - if not isinstance(match.value, LangfuseMedia): - continue + seen = set() + max_levels = 10 + + def _process_data_recursively(data: Any, level: int) -> Any: + # Avoid jsonpath-ng here: create_dataset_item should not fail under + # python -OO for users who are not resolving media references. + if isinstance(data, LangfuseMedia): + return self._upload_dataset_item_media( + media=data, uploaded_media_ids=uploaded_media_ids + ) - data = match.full_path.update( - data, - self._upload_dataset_item_media( - media=match.value, uploaded_media_ids=uploaded_media_ids - ), - ) + if not isinstance(data, (list, dict)): + return data + + if id(data) in seen or level > max_levels: + return data + + seen.add(id(data)) + + if isinstance(data, list): + return [_process_data_recursively(item, level + 1) for item in data] + + return { + key: _process_data_recursively(value, level + 1) + for key, value in data.items() + } - return data + return _process_data_recursively(data, 1) def _upload_dataset_item_media( self, *, media: LangfuseMedia, uploaded_media_ids: set[str] @@ -3416,7 +3432,7 @@ def _replace_json_path_value( return replacement try: - parse_jsonpath(json_path).update(value, replacement) + value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: langfuse_logger.debug( f"Failed to hydrate dataset media reference at JSONPath {json_path}", diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 9f71c1bef..0122176e0 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -240,11 +240,10 @@ def _upload_media_sync(self, *, media: LangfuseMedia) -> None: or media._content_sha256_hash is None or media._content_bytes is None ): - return + raise ValueError("Cannot upload invalid LangfuseMedia.") if media._media_id is None: - logger.error("Media ID is None. Skipping upload.") - return + raise ValueError("Cannot upload LangfuseMedia without media ID.") upload_media_job = UploadMediaJob( media_id=media._media_id, diff --git a/langfuse/media.py b/langfuse/media.py index b20c6e52d..13bf6ee7c 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -47,20 +47,20 @@ def url_is_expired(self) -> bool: return expiry_datetime <= datetime.now(timezone.utc) - def fetch_bytes(self) -> bytes: + def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: """Fetch the media content from the signed URL.""" - response = httpx.get(self.url) + response = httpx.get(self.url, timeout=timeout) response.raise_for_status() return response.content - def fetch_base64(self) -> str: + def fetch_base64(self, *, timeout: float = 30.0) -> str: """Fetch media and return raw base64 without a data URI prefix.""" - return base64.b64encode(self.fetch_bytes()).decode() + return base64.b64encode(self.fetch_bytes(timeout=timeout)).decode() - def fetch_data_uri(self) -> str: + def fetch_data_uri(self, *, timeout: float = 30.0) -> str: """Fetch media and return it as a data URI.""" - return f"data:{self.content_type};base64,{self.fetch_base64()}" + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout)}" class LangfuseMedia: diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 1819fb8f4..8ae14c25d 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -92,15 +92,19 @@ def test_create_dataset_item_processes_media_before_api_call(): client = object.__new__(Langfuse) client._resources = SimpleNamespace(_media_manager=media_manager) client.api = SimpleNamespace(dataset_items=dataset_items_api) + input_data = {"image": media} + metadata = {"items": [media], "keep": "value"} result = client.create_dataset_item( dataset_name="dataset", - input={"image": media}, + input=input_data, expected_output=root_media, - metadata={"items": [media], "keep": "value"}, + metadata=metadata, ) assert result == "created-item" + assert input_data == {"image": media} + assert metadata == {"items": [media], "keep": "value"} media_manager._upload_media_sync.assert_any_call(media=media) media_manager._upload_media_sync.assert_any_call(media=root_media) assert media_manager._upload_media_sync.call_count == 2 diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 63df03920..9239ab52b 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,7 +4,7 @@ import pytest -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00" @@ -107,6 +107,23 @@ def test_nonexistent_file(): assert media._content_type is None +def test_media_reference_fetch_uses_timeout(monkeypatch): + response = Mock() + response.content = b"test-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=12.5) + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7 diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 43b57aa98..f856f1592 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -172,6 +172,18 @@ def test_upload_media_sync_uploads_without_trace_context(): media_api.patch.assert_called_once() +def test_upload_media_sync_rejects_invalid_media(): + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=Queue(), + ) + media = LangfuseMedia() + + with pytest.raises(ValueError, match="Cannot upload invalid LangfuseMedia"): + manager._upload_media_sync(media=media) + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( From 2c6718a327881f7b8e39556918f776f7d6663bd7 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 11:01:52 +0200 Subject: [PATCH 04/10] fix(datasets): refine media review fixes --- langfuse/_client/client.py | 32 +++++--- langfuse/_client/resource_manager.py | 10 +++ langfuse/media.py | 9 ++- tests/unit/test_datasets.py | 106 +++++++++++++++++++++++++++ tests/unit/test_media.py | 17 ++++- 5 files changed, 158 insertions(+), 16 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 89a801b2d..ef18ae09e 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3339,12 +3339,13 @@ def _process_dataset_item_media( if self._resources is None: return data - seen = set() max_levels = 10 - def _process_data_recursively(data: Any, level: int) -> Any: - # Avoid jsonpath-ng here: create_dataset_item should not fail under - # python -OO for users who are not resolving media references. + def _process_data_recursively( + data: Any, level: int, ancestor_container_ids: set[int] + ) -> Any: + # Avoid jsonpath-ng here: dataset writes should keep working + # under python -OO where parser docstrings may be stripped. if isinstance(data, LangfuseMedia): return self._upload_dataset_item_media( media=data, uploaded_media_ids=uploaded_media_ids @@ -3353,20 +3354,30 @@ def _process_data_recursively(data: Any, level: int) -> Any: if not isinstance(data, (list, dict)): return data - if id(data) in seen or level > max_levels: + # Container ids only protect against recursive cycles; media upload + # dedupe is handled by uploaded_media_ids. + data_id = id(data) + if data_id in ancestor_container_ids or level > max_levels: return data - seen.add(id(data)) + next_ancestor_container_ids = ancestor_container_ids | {data_id} if isinstance(data, list): - return [_process_data_recursively(item, level + 1) for item in data] + return [ + _process_data_recursively( + item, level + 1, next_ancestor_container_ids + ) + for item in data + ] return { - key: _process_data_recursively(value, level + 1) + key: _process_data_recursively( + value, level + 1, next_ancestor_container_ids + ) for key, value in data.items() } - return _process_data_recursively(data, 1) + return _process_data_recursively(data, 1, set()) def _upload_dataset_item_media( self, *, media: LangfuseMedia, uploaded_media_ids: set[str] @@ -3428,9 +3439,6 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt def _replace_json_path_value( self, *, value: Any, json_path: str, replacement: LangfuseMediaReference ) -> Any: - if json_path == "$": - return replacement - try: value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 2d42f6ce1..5dfcf29ba 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -79,6 +79,16 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + @classmethod + def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: + with cls._lock: + instances = list(cls._instances.values()) + + if len(instances) != 1: + return None + + return instances[0].httpx_client + def __new__( cls, *, diff --git a/langfuse/media.py b/langfuse/media.py index 13bf6ee7c..8c90612dd 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -49,7 +49,14 @@ def url_is_expired(self) -> bool: def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: """Fetch the media content from the signed URL.""" - response = httpx.get(self.url, timeout=timeout) + from langfuse._client.resource_manager import LangfuseResourceManager + + httpx_client = LangfuseResourceManager.get_singleton_httpx_client() + response = ( + httpx_client.get(self.url, timeout=timeout) + if httpx_client is not None + else httpx.get(self.url, timeout=timeout) + ) response.raise_for_status() return response.content diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 8ae14c25d..fe0bcf75a 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -2,6 +2,8 @@ from types import SimpleNamespace from unittest.mock import Mock +import pytest + from langfuse._client.client import Langfuse from langfuse.api import ( DatasetItem, @@ -81,6 +83,85 @@ def test_hydrate_dataset_item_media_references_replaces_matching_fields(): assert hydrated.input["image"].media_id == "media-id" +@pytest.mark.parametrize( + ("field", "field_value", "json_path", "assert_resolved"), + [ + ( + DatasetItemMediaReferenceField.INPUT, + "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@", + "$", + lambda item: isinstance(item.input, LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.INPUT, + { + "image": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + "$['image']", + lambda item: isinstance(item.input["image"], LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + ["@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@"], + "$[0]", + lambda item: isinstance( + item.expected_output[0], LangfuseMediaReference + ), + ), + ( + DatasetItemMediaReferenceField.METADATA, + { + "image'key": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + r"$['image\'key']", + lambda item: isinstance( + item.metadata["image'key"], LangfuseMediaReference + ), + ), + ], +) +def test_hydrate_dataset_item_media_references_supports_json_path_cases( + field, + field_value, + json_path, + assert_resolved, +): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input=field_value if field == DatasetItemMediaReferenceField.INPUT else None, + expected_output=field_value + if field == DatasetItemMediaReferenceField.EXPECTED_OUTPUT + else None, + metadata=field_value if field == DatasetItemMediaReferenceField.METADATA else None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=field, + reference_string=reference_string, + json_path=json_path, + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert assert_resolved(hydrated) + + def test_create_dataset_item_processes_media_before_api_call(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") @@ -118,3 +199,28 @@ def test_create_dataset_item_processes_media_before_api_call(): status=None, id=None, ) + + +def test_create_dataset_item_processes_shared_media_subtrees(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + shared = {"image": media} + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"a": shared, "b": shared}, + ) + + assert shared == {"image": media} + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "a": {"image": media._reference_string}, + "b": {"image": media._reference_string}, + } diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 9239ab52b..4f42d506b 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,6 +4,7 @@ import pytest +from langfuse._client.resource_manager import LangfuseResourceManager from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data @@ -107,12 +108,19 @@ def test_nonexistent_file(): assert media._content_type is None -def test_media_reference_fetch_uses_timeout(monkeypatch): +def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): response = Mock() response.content = b"test-bytes" response.raise_for_status.return_value = None - httpx_get = Mock(return_value=response) + configured_httpx_client = Mock() + configured_httpx_client.get.return_value = response + httpx_get = Mock() monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=configured_httpx_client)}, + ) reference = LangfuseMediaReference( media_id="media-id", @@ -121,7 +129,10 @@ def test_media_reference_fetch_uses_timeout(monkeypatch): ) assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" - httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=12.5) + configured_httpx_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=12.5 + ) + httpx_get.assert_not_called() def test_resolve_media_references_uses_configured_httpx_client(): From 3832a4e9dc9175e0f7c91a1730b4c6bb279e5bfc Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 11:25:13 +0200 Subject: [PATCH 05/10] fix(datasets): clean up media review follow-ups --- langfuse/_client/client.py | 2 +- langfuse/_client/resource_manager.py | 2 +- pyproject.toml | 2 +- tests/unit/test_datasets.py | 68 ---------------------------- tests/unit/test_media_manager.py | 30 ------------ uv.lock | 4 +- 6 files changed, 5 insertions(+), 103 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index c2c4c3da4..feef93f86 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3502,7 +3502,7 @@ def _replace_json_path_value( try: value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: - langfuse_logger.debug( + langfuse_logger.warning( f"Failed to hydrate dataset media reference at JSONPath {json_path}", exc_info=e, ) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 4febe05d9..fffdd7026 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -84,7 +84,7 @@ def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: with cls._lock: instances = list(cls._instances.values()) - if len(instances) != 1: + if not instances: return None return instances[0].httpx_client diff --git a/pyproject.toml b/pyproject.toml index 2a40a432f..aadacdd62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "opentelemetry-api>=1.33.1,<2", "opentelemetry-sdk>=1.33.1,<2", "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", - "jsonpath-ng>=1.8.0", + "jsonpath-ng>=1.8.0,<2", ] [dependency-groups] diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index fe0bcf75a..c8edb72d6 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -15,74 +15,6 @@ from langfuse.media import LangfuseMedia, LangfuseMediaReference -def test_hydrate_dataset_item_media_references_replaces_matching_fields(): - reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" - item = DatasetItem( - id="item-id", - status=DatasetStatus.ACTIVE, - input={ - "image": reference_string, - "duplicate": reference_string, - "text": "keep", - }, - expected_output=[reference_string], - metadata={"nested": {"image": reference_string}}, - dataset_id="dataset-id", - dataset_name="dataset-name", - created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc), - media_references=[ - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.INPUT, - reference_string=reference_string, - json_path="$['image']", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT, - reference_string=reference_string, - json_path="$[0]", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.METADATA, - reference_string=reference_string, - json_path="$['nested']['image']", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - ], - ) - - client = object.__new__(Langfuse) - - hydrated = client._hydrate_dataset_item_media_references(item) - - assert hydrated.input["text"] == "keep" - assert isinstance(hydrated.input["image"], LangfuseMediaReference) - assert hydrated.input["duplicate"] == reference_string - assert isinstance(hydrated.expected_output[0], LangfuseMediaReference) - assert isinstance(hydrated.metadata["nested"]["image"], LangfuseMediaReference) - assert hydrated.input["image"].media_id == "media-id" - - @pytest.mark.parametrize( ("field", "field_value", "json_path", "assert_resolved"), [ diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index f06df3a21..3cf152106 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -142,36 +142,6 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() -def test_upload_media_sync_uploads_without_trace_context(): - media_api = Mock() - media_api.get_upload_url.return_value = SimpleNamespace( - upload_url="https://example.com/upload", - media_id=None, - ) - media_api.patch.return_value = None - - httpx_client = Mock() - httpx_client.put.return_value = _upload_response(200, "ok") - - manager = MediaManager( - api_client=SimpleNamespace(media=media_api), - httpx_client=httpx_client, - media_upload_queue=Queue(), - ) - - media = LangfuseMedia(content_bytes=b"payload", content_type="image/jpeg") - media_api.get_upload_url.return_value.media_id = media._media_id - - manager._upload_media_sync(media=media) - - media_api.get_upload_url.assert_called_once() - assert media_api.get_upload_url.call_args.kwargs["trace_id"] is None - assert media_api.get_upload_url.call_args.kwargs["observation_id"] is None - assert media_api.get_upload_url.call_args.kwargs["field"] is None - httpx_client.put.assert_called_once() - media_api.patch.assert_called_once() - - def test_upload_media_sync_rejects_invalid_media(): manager = MediaManager( api_client=SimpleNamespace(media=Mock()), diff --git a/uv.lock b/uv.lock index fa17776db..9a219dbfe 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-09T08:44:29.596356662Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -603,7 +603,7 @@ docs = [ requires-dist = [ { name = "backoff", specifier = ">=1.10.0" }, { name = "httpx", specifier = ">=0.15.4,<1.0" }, - { name = "jsonpath-ng", specifier = ">=1.8.0" }, + { name = "jsonpath-ng", specifier = ">=1.8.0,<2" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2" }, From 88905ce873ca7eb5fae8694929e20b230e86bbcf Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 15:34:38 +0200 Subject: [PATCH 06/10] fix(datasets): round-trip resolved media references Resolved LangfuseMediaReference items from get_dataset(resolve_media_ references=True) discarded the original @@@langfuseMedia:...@@@ string, so feeding them back into create_dataset_item or run_experiment serialized them as opaque dicts and orphaned the media. Persist the reference string and emit it from both _process_dataset_item_media and EventSerializer. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 4 +++ langfuse/_utils/serializer.py | 8 +++++- langfuse/media.py | 1 + tests/unit/test_datasets.py | 50 +++++++++++++++++++++++++++++++++++ tests/unit/test_serializer.py | 25 ++++++++++++++++++ 5 files changed, 87 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index feef93f86..6a014f0a0 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3411,6 +3411,9 @@ def _process_data_recursively( media=data, uploaded_media_ids=uploaded_media_ids ) + if isinstance(data, LangfuseMediaReference): + return data.reference_string if data.reference_string else data + if not isinstance(data, (list, dict)): return data @@ -3481,6 +3484,7 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt url=media.url, url_expiry=media.url_expiry, content_length=media.content_length, + reference_string=media_reference.reference_string, ) hydrated_fields[field] = self._replace_json_path_value( value=hydrated_fields[field], diff --git a/langfuse/_utils/serializer.py b/langfuse/_utils/serializer.py index 27294bf80..135d1f625 100644 --- a/langfuse/_utils/serializer.py +++ b/langfuse/_utils/serializer.py @@ -15,7 +15,7 @@ from pydantic import BaseModel -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Attempt to import Serializable try: @@ -62,6 +62,12 @@ def _default_inner(self, obj: Any) -> Any: or f"" ) + if ( + isinstance(obj, LangfuseMediaReference) + and obj.reference_string is not None + ): + return obj.reference_string + # Check if numpy is available and if the object is a numpy scalar # If so, convert it to a Python scalar using the item() method if np is not None and isinstance(obj, np.generic): diff --git a/langfuse/media.py b/langfuse/media.py index 8c90612dd..04ee1182a 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -29,6 +29,7 @@ class LangfuseMediaReference: url: str url_expiry: Optional[str] = None content_length: Optional[int] = None + reference_string: Optional[str] = None def url_is_expired(self) -> bool: """Return whether the signed URL is already expired.""" diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index c8edb72d6..169ad7ce6 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -133,6 +133,56 @@ def test_create_dataset_item_processes_media_before_api_call(): ) +def test_create_dataset_item_roundtrips_resolved_media_reference(): + # get_dataset(resolve_media_references=True) hydrates strings into + # LangfuseMediaReference instances. Feeding such an item back into + # create_dataset_item must re-emit the original reference string, otherwise + # the dataclass is serialized as an opaque dict and the media is orphaned. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={"image": reference_string}, + expected_output=None, + metadata=None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + hydrated = client._hydrate_dataset_item_media_references(item) + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input=hydrated.input) + + assert dataset_items_api.create.call_args.kwargs["input"] == { + "image": reference_string + } + media_manager._upload_media_sync.assert_not_called() + + def test_create_dataset_item_processes_shared_media_subtrees(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") shared = {"image": media} diff --git a/tests/unit/test_serializer.py b/tests/unit/test_serializer.py index f4c8dde86..6605485de 100644 --- a/tests/unit/test_serializer.py +++ b/tests/unit/test_serializer.py @@ -12,6 +12,7 @@ from langfuse._utils.serializer import ( EventSerializer, ) +from langfuse.media import LangfuseMediaReference class TestEnum(Enum): @@ -70,6 +71,30 @@ def test_pydantic_model(): assert json.loads(serializer.encode(model)) == {"field": "test"} +def test_langfuse_media_reference_serializes_to_reference_string(): + # Resolved references must round-trip back to their original reference string + # rather than falling through to asdict() and emitting an opaque dict. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + reference_string=reference_string, + ) + serializer = EventSerializer() + assert serializer.encode(ref) == f'"{reference_string}"' + + +def test_langfuse_media_reference_without_reference_string_falls_back_to_dict(): + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + ) + serializer = EventSerializer() + assert json.loads(serializer.encode(ref))["media_id"] == "media-id" + + def test_path(): path = Path("/tmp/test.txt") serializer = EventSerializer() From d7eb3edcce88cc3abbd25fa9d9562ca1cc37fee2 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 16:25:23 +0200 Subject: [PATCH 07/10] fix(datasets): process media inside tuples and sets _process_dataset_item_media only recursed into list/dict, so a LangfuseMedia held in a tuple, set, or frozenset slipped through to fern's encoder and got silently base64-inlined instead of uploaded. Widen the walker to those containers and rebuild them in place. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 9 +++++---- tests/unit/test_datasets.py | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 6a014f0a0..ac732c3d8 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3414,7 +3414,7 @@ def _process_data_recursively( if isinstance(data, LangfuseMediaReference): return data.reference_string if data.reference_string else data - if not isinstance(data, (list, dict)): + if not isinstance(data, (list, tuple, set, frozenset, dict)): return data # Container ids only protect against recursive cycles; media upload @@ -3425,13 +3425,14 @@ def _process_data_recursively( next_ancestor_container_ids = ancestor_container_ids | {data_id} - if isinstance(data, list): - return [ + if isinstance(data, (list, tuple, set, frozenset)): + processed = ( _process_data_recursively( item, level + 1, next_ancestor_container_ids ) for item in data - ] + ) + return type(data)(processed) return { key: _process_data_recursively( diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 169ad7ce6..bdc6f394c 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -206,3 +206,41 @@ def test_create_dataset_item_processes_shared_media_subtrees(): "a": {"image": media._reference_string}, "b": {"image": media._reference_string}, } + + +def test_create_dataset_item_processes_media_in_tuples(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input={"images": (media,)}) + + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": (media._reference_string,) + } + + +def test_create_dataset_item_processes_media_in_sets(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input={"images": {media}}) + + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": {media._reference_string} + } From 7db0fe7df9bda288082ca61eda18ffc6a319c5bc Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 12:16:18 +0200 Subject: [PATCH 08/10] fix(datasets): align media reference field with expectedOutput API rename The DatasetItemMediaReferenceField enum value changed from expected_output to expectedOutput. Decouple hydration from the wire value by mapping the enum member to the model attribute, so the rename (and any future one) no longer silently skips expected-output media references. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 12 ++++++++++-- .../types/dataset_item_media_reference_field.py | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index ac732c3d8..915b2c65c 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -95,6 +95,7 @@ CreateTextPromptRequest, Dataset, DatasetItem, + DatasetItemMediaReferenceField, DatasetRunWithItems, DatasetStatus, DeleteDatasetRunResponse, @@ -3464,6 +3465,13 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt if not media_references: return item + # Map the API enum member to the snake_case model attribute so this keeps + # working regardless of the enum's wire value (e.g. "expectedOutput"). + attr_by_field = { + DatasetItemMediaReferenceField.INPUT: "input", + DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", + DatasetItemMediaReferenceField.METADATA: "metadata", + } hydrated_fields = { "input": item.input, "expected_output": item.expected_output, @@ -3475,8 +3483,8 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt if media is None: continue - field = media_reference.field.value - if field not in hydrated_fields: + field = attr_by_field.get(media_reference.field) + if field is None: continue replacement = LangfuseMediaReference( diff --git a/langfuse/api/commons/types/dataset_item_media_reference_field.py b/langfuse/api/commons/types/dataset_item_media_reference_field.py index 6a7c3a23e..9dc9df5cd 100644 --- a/langfuse/api/commons/types/dataset_item_media_reference_field.py +++ b/langfuse/api/commons/types/dataset_item_media_reference_field.py @@ -9,7 +9,7 @@ class DatasetItemMediaReferenceField(enum.StrEnum): INPUT = "input" - EXPECTED_OUTPUT = "expected_output" + EXPECTED_OUTPUT = "expectedOutput" METADATA = "metadata" def visit( From a2cba12d7dd2836bc0e5dac5d1b555566e8aa692 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 13:42:10 +0200 Subject: [PATCH 09/10] fix(datasets): drop tuple media support to avoid namedtuple breakage Rebuilding tuples via type(data)(iterable) breaks namedtuple/NamedTuple subclasses, which take positional field args rather than an iterable. Keep list/set/frozenset handling and leave tuples untouched. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 6 ++++-- tests/unit/test_datasets.py | 19 ------------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 915b2c65c..72dc5f0e6 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3415,7 +3415,9 @@ def _process_data_recursively( if isinstance(data, LangfuseMediaReference): return data.reference_string if data.reference_string else data - if not isinstance(data, (list, tuple, set, frozenset, dict)): + # Tuples are intentionally excluded: namedtuple subclasses can't be + # rebuilt from an iterable, so media inside them is left untouched. + if not isinstance(data, (list, set, frozenset, dict)): return data # Container ids only protect against recursive cycles; media upload @@ -3426,7 +3428,7 @@ def _process_data_recursively( next_ancestor_container_ids = ancestor_container_ids | {data_id} - if isinstance(data, (list, tuple, set, frozenset)): + if isinstance(data, (list, set, frozenset)): processed = ( _process_data_recursively( item, level + 1, next_ancestor_container_ids diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index bdc6f394c..78fb87094 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -208,25 +208,6 @@ def test_create_dataset_item_processes_shared_media_subtrees(): } -def test_create_dataset_item_processes_media_in_tuples(): - media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") - - media_manager = Mock() - dataset_items_api = Mock() - dataset_items_api.create.return_value = "created-item" - - client = object.__new__(Langfuse) - client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) - - client.create_dataset_item(dataset_name="dataset", input={"images": (media,)}) - - media_manager._upload_media_sync.assert_called_once_with(media=media) - assert dataset_items_api.create.call_args.kwargs["input"] == { - "images": (media._reference_string,) - } - - def test_create_dataset_item_processes_media_in_sets(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") From 0e2ea09e4db72751b3b7924c3c8f680af3214f1e Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 15:57:31 +0200 Subject: [PATCH 10/10] fix(media): honor per-client httpx config for media reference fetches get_singleton_httpx_client silently returned the first-inserted instance, so a LangfuseMediaReference fetched from one client could go out through another client's transport (proxy/CA/mTLS). Mirror get_client: warn and fall back to default httpx when multiple clients exist, and let fetch_bytes/fetch_base64/fetch_data_uri take an explicit httpx client to deterministically honor per-client settings. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/resource_manager.py | 13 +++++ langfuse/media.py | 42 ++++++++++++---- tests/unit/test_media.py | 72 ++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 9 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index fffdd7026..ab8416dcb 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -87,6 +87,19 @@ def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: if not instances: return None + if len(instances) > 1: + # Mirror get_client's safety stance: with multiple clients we + # cannot tell which one produced a given reference, so fall back + # to a default httpx client rather than silently using an + # arbitrary instance's transport config (proxy / CA / mTLS). + langfuse_logger.warning( + "Multiple Langfuse clients are instantiated; falling back to a " + "default httpx client for LangfuseMediaReference fetches. Pass an " + "explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to " + "honor per-client transport settings." + ) + return None + return instances[0].httpx_client def __new__( diff --git a/langfuse/media.py b/langfuse/media.py index 04ee1182a..5cc7d15ea 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -48,11 +48,23 @@ def url_is_expired(self) -> bool: return expiry_datetime <= datetime.now(timezone.utc) - def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: - """Fetch the media content from the signed URL.""" + def fetch_bytes( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> bytes: + """Fetch the media content from the signed URL. + + Args: + timeout: Request timeout in seconds. + client: Optional httpx client to use for the request. Pass this to + honor custom transport settings (proxy, CA bundle, mTLS) — in + particular when multiple Langfuse clients are configured, since + the SDK cannot otherwise tell which client produced this + reference. When omitted, the single configured client is used, + falling back to a default httpx client. + """ from langfuse._client.resource_manager import LangfuseResourceManager - httpx_client = LangfuseResourceManager.get_singleton_httpx_client() + httpx_client = client or LangfuseResourceManager.get_singleton_httpx_client() response = ( httpx_client.get(self.url, timeout=timeout) if httpx_client is not None @@ -62,13 +74,25 @@ def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: return response.content - def fetch_base64(self, *, timeout: float = 30.0) -> str: - """Fetch media and return raw base64 without a data URI prefix.""" - return base64.b64encode(self.fetch_bytes(timeout=timeout)).decode() + def fetch_base64( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return raw base64 without a data URI prefix. - def fetch_data_uri(self, *, timeout: float = 30.0) -> str: - """Fetch media and return it as a data URI.""" - return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout)}" + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return base64.b64encode( + self.fetch_bytes(timeout=timeout, client=client) + ).decode() + + def fetch_data_uri( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return it as a data URI. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout, client=client)}" class LangfuseMedia: diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 4f42d506b..d4b3c86cc 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -135,6 +135,78 @@ def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): httpx_get.assert_not_called() +def test_media_reference_fetch_uses_explicit_client(monkeypatch): + response = Mock() + response.content = b"explicit-bytes" + response.raise_for_status.return_value = None + explicit_client = Mock() + explicit_client.get.return_value = response + + singleton_client = Mock() + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=singleton_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert ( + reference.fetch_bytes(timeout=5.0, client=explicit_client) == b"explicit-bytes" + ) + explicit_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=5.0 + ) + # Explicit client wins over the configured singleton and the default httpx. + singleton_client.get.assert_not_called() + httpx_get.assert_not_called() + + +def test_media_reference_fetch_falls_back_to_default_with_multiple_clients( + monkeypatch, caplog +): + import logging + + response = Mock() + response.content = b"default-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + client_a = Mock() + client_b = Mock() + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + { + "pk-a": SimpleNamespace(httpx_client=client_a), + "pk-b": SimpleNamespace(httpx_client=client_b), + }, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + with caplog.at_level(logging.WARNING, logger="langfuse"): + assert reference.fetch_bytes(timeout=8.0) == b"default-bytes" + + # Ambiguous multi-client setup: warn and fall back to the default httpx + # instead of silently using an arbitrary instance's transport config. + assert "Multiple Langfuse clients" in caplog.text + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=8.0) + client_a.get.assert_not_called() + client_b.get.assert_not_called() + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7