diff --git a/langfuse/__init__.py b/langfuse/__init__.py index ec441e745..d4d24cd04 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -29,6 +29,7 @@ LangfuseTool, ) from ._version import __version__ +from .media import LangfuseMedia, LangfuseMediaReference from .span_filter import ( KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES, is_default_export_span, @@ -49,6 +50,8 @@ __all__ = [ "Langfuse", + "LangfuseMedia", + "LangfuseMediaReference", "get_client", "observe", "propagate_attributes", diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 8e70e03b1..72dc5f0e6 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -27,6 +27,7 @@ import backoff import httpx +from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped] from opentelemetry import context as otel_context_api from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -94,6 +95,7 @@ CreateTextPromptRequest, Dataset, DatasetItem, + DatasetItemMediaReferenceField, DatasetRunWithItems, DatasetStatus, DeleteDatasetRunResponse, @@ -126,7 +128,7 @@ _run_task, ) from langfuse.logger import langfuse_logger -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference from langfuse.model import ( ChatMessageDict, ChatMessageWithPlaceholdersDict, @@ -2322,15 +2324,17 @@ def get_dataset( *, fetch_items_page_size: Optional[int] = 50, version: Optional[datetime] = None, + resolve_media_references: bool = False, ) -> "DatasetClient": """Fetch a dataset by its name. Args: - name (str): The name of the dataset to fetch. - fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. - version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). + name: The name of the dataset to fetch. + fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. + version: Retrieve dataset items as they existed at this specific point in time (UTC). If provided, returns the state of items at the specified UTC timestamp. If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. + resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects. Returns: DatasetClient: The dataset with the given name. @@ -2339,7 +2343,7 @@ def get_dataset( langfuse_logger.debug(f"Getting datasets {name}") dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) - dataset_items = [] + dataset_items: List[DatasetItem] = [] page = 1 while True: @@ -2348,8 +2352,16 @@ def get_dataset( page=page, limit=fetch_items_page_size, version=version, + include_media_references=resolve_media_references or None, + ) + dataset_items.extend( + [ + self._hydrate_dataset_item_media_references(item) + for item in new_items.data + ] + if resolve_media_references + else new_items.data ) - dataset_items.extend(new_items.data) if new_items.meta.total_pages <= page: break @@ -3355,6 +3367,17 @@ def create_dataset_item( try: langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") + uploaded_media_ids: set[str] = set() + input = self._process_dataset_item_media( + data=input, uploaded_media_ids=uploaded_media_ids + ) + expected_output = self._process_dataset_item_media( + data=expected_output, uploaded_media_ids=uploaded_media_ids + ) + metadata = self._process_dataset_item_media( + data=metadata, uploaded_media_ids=uploaded_media_ids + ) + result = self.api.dataset_items.create( dataset_name=dataset_name, input=input, @@ -3371,6 +3394,136 @@ def create_dataset_item( handle_fern_exception(e) raise e + def _process_dataset_item_media( + self, *, data: Any, uploaded_media_ids: set[str] + ) -> Any: + if self._resources is None: + return data + + max_levels = 10 + + def _process_data_recursively( + data: Any, level: int, ancestor_container_ids: set[int] + ) -> Any: + # Avoid jsonpath-ng here: dataset writes should keep working + # under python -OO where parser docstrings may be stripped. + if isinstance(data, LangfuseMedia): + return self._upload_dataset_item_media( + media=data, uploaded_media_ids=uploaded_media_ids + ) + + if isinstance(data, LangfuseMediaReference): + return data.reference_string if data.reference_string else data + + # Tuples are intentionally excluded: namedtuple subclasses can't be + # rebuilt from an iterable, so media inside them is left untouched. + if not isinstance(data, (list, set, frozenset, dict)): + return data + + # Container ids only protect against recursive cycles; media upload + # dedupe is handled by uploaded_media_ids. + data_id = id(data) + if data_id in ancestor_container_ids or level > max_levels: + return data + + next_ancestor_container_ids = ancestor_container_ids | {data_id} + + if isinstance(data, (list, set, frozenset)): + processed = ( + _process_data_recursively( + item, level + 1, next_ancestor_container_ids + ) + for item in data + ) + return type(data)(processed) + + return { + key: _process_data_recursively( + value, level + 1, next_ancestor_container_ids + ) + for key, value in data.items() + } + + return _process_data_recursively(data, 1, set()) + + def _upload_dataset_item_media( + self, *, media: LangfuseMedia, uploaded_media_ids: set[str] + ) -> str: + reference_string = media._reference_string + media_id = media._media_id + + if reference_string is None or media_id is None: + raise ValueError("Cannot create dataset item with invalid LangfuseMedia.") + + if media_id not in uploaded_media_ids: + assert self._resources is not None + self._resources._media_manager._upload_media_sync(media=media) + uploaded_media_ids.add(media_id) + + return reference_string + + def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: + media_references = item.media_references or [] + if not media_references: + return item + + # Map the API enum member to the snake_case model attribute so this keeps + # working regardless of the enum's wire value (e.g. "expectedOutput"). + attr_by_field = { + DatasetItemMediaReferenceField.INPUT: "input", + DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", + DatasetItemMediaReferenceField.METADATA: "metadata", + } + hydrated_fields = { + "input": item.input, + "expected_output": item.expected_output, + "metadata": item.metadata, + } + + for media_reference in media_references: + media = media_reference.media + if media is None: + continue + + field = attr_by_field.get(media_reference.field) + if field is None: + continue + + replacement = LangfuseMediaReference( + media_id=media.media_id, + content_type=media.content_type, + url=media.url, + url_expiry=media.url_expiry, + content_length=media.content_length, + reference_string=media_reference.reference_string, + ) + hydrated_fields[field] = self._replace_json_path_value( + value=hydrated_fields[field], + json_path=media_reference.json_path, + replacement=replacement, + ) + + return item.model_copy( + update={ + "input": hydrated_fields["input"], + "expected_output": hydrated_fields["expected_output"], + "metadata": hydrated_fields["metadata"], + } + ) + + def _replace_json_path_value( + self, *, value: Any, json_path: str, replacement: LangfuseMediaReference + ) -> Any: + try: + value = parse_jsonpath(json_path).update(value, replacement) + except Exception as e: + langfuse_logger.warning( + f"Failed to hydrate dataset media reference at JSONPath {json_path}", + exc_info=e, + ) + + return value + def resolve_media_references( self, *, diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 004566c8f..ab8416dcb 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -79,6 +79,29 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + @classmethod + def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: + with cls._lock: + instances = list(cls._instances.values()) + + if not instances: + return None + + if len(instances) > 1: + # Mirror get_client's safety stance: with multiple clients we + # cannot tell which one produced a given reference, so fall back + # to a default httpx client rather than silently using an + # arbitrary instance's transport config (proxy / CA / mTLS). + langfuse_logger.warning( + "Multiple Langfuse clients are instantiated; falling back to a " + "default httpx client for LangfuseMediaReference fetches. Pass an " + "explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to " + "honor per-client transport settings." + ) + return None + + return instances[0].httpx_client + def __new__( cls, *, diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index f3bd7b9cb..18aaa6951 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -284,6 +284,34 @@ def _process_media( f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}" ) + def _upload_media_sync(self, *, media: LangfuseMedia) -> None: + if not self._enabled: + raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") + + if ( + media._content_length is None + or media._content_type is None + or media._content_sha256_hash is None + or media._content_bytes is None + ): + raise ValueError("Cannot upload invalid LangfuseMedia.") + + if media._media_id is None: + raise ValueError("Cannot upload LangfuseMedia without media ID.") + + upload_media_job = UploadMediaJob( + media_id=media._media_id, + content_bytes=media._content_bytes, + content_type=media._content_type, + content_length=media._content_length, + content_sha256_hash=media._content_sha256_hash, + trace_id=None, + observation_id=None, + field=None, + ) + + self._process_upload_media_job(data=upload_media_job) + def _process_upload_media_job( self, *, diff --git a/langfuse/_task_manager/media_upload_queue.py b/langfuse/_task_manager/media_upload_queue.py index e4cd8ebee..aac852105 100644 --- a/langfuse/_task_manager/media_upload_queue.py +++ b/langfuse/_task_manager/media_upload_queue.py @@ -7,6 +7,6 @@ class UploadMediaJob(TypedDict): content_length: int content_bytes: bytes content_sha256_hash: str - trace_id: str + trace_id: Optional[str] observation_id: Optional[str] - field: str + field: Optional[str] diff --git a/langfuse/_utils/serializer.py b/langfuse/_utils/serializer.py index 27294bf80..135d1f625 100644 --- a/langfuse/_utils/serializer.py +++ b/langfuse/_utils/serializer.py @@ -15,7 +15,7 @@ from pydantic import BaseModel -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Attempt to import Serializable try: @@ -62,6 +62,12 @@ def _default_inner(self, obj: Any) -> Any: or f"" ) + if ( + isinstance(obj, LangfuseMediaReference) + and obj.reference_string is not None + ): + return obj.reference_string + # Check if numpy is available and if the object is a numpy scalar # If so, convert it to a Python scalar using the item() method if np is not None and isinstance(obj, np.generic): diff --git a/langfuse/api/__init__.py b/langfuse/api/__init__.py index 46985c0b9..256f1230b 100644 --- a/langfuse/api/__init__.py +++ b/langfuse/api/__init__.py @@ -86,6 +86,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -396,6 +399,9 @@ "CreateTextPromptType": ".prompts", "Dataset": ".commons", "DatasetItem": ".commons", + "DatasetItemMediaReference": ".commons", + "DatasetItemMediaReferenceField": ".commons", + "DatasetItemMediaReferenceMedia": ".commons", "DatasetRun": ".commons", "DatasetRunItem": ".commons", "DatasetRunWithItems": ".commons", @@ -717,6 +723,9 @@ def __dir__(): "CreateTextPromptType", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/__init__.py b/langfuse/api/commons/__init__.py index 81cb57f96..a35064081 100644 --- a/langfuse/api/commons/__init__.py +++ b/langfuse/api/commons/__init__.py @@ -20,6 +20,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -84,6 +87,9 @@ "CreateScoreValue": ".types", "Dataset": ".types", "DatasetItem": ".types", + "DatasetItemMediaReference": ".types", + "DatasetItemMediaReferenceField": ".types", + "DatasetItemMediaReferenceMedia": ".types", "DatasetRun": ".types", "DatasetRunItem": ".types", "DatasetRunWithItems": ".types", @@ -174,6 +180,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/__init__.py b/langfuse/api/commons/types/__init__.py index 5ce0a58cd..12faf307f 100644 --- a/langfuse/api/commons/types/__init__.py +++ b/langfuse/api/commons/types/__init__.py @@ -19,6 +19,9 @@ from .create_score_value import CreateScoreValue from .dataset import Dataset from .dataset_item import DatasetItem + from .dataset_item_media_reference import DatasetItemMediaReference + from .dataset_item_media_reference_field import DatasetItemMediaReferenceField + from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia from .dataset_run import DatasetRun from .dataset_run_item import DatasetRunItem from .dataset_run_with_items import DatasetRunWithItems @@ -78,6 +81,9 @@ "CreateScoreValue": ".create_score_value", "Dataset": ".dataset", "DatasetItem": ".dataset_item", + "DatasetItemMediaReference": ".dataset_item_media_reference", + "DatasetItemMediaReferenceField": ".dataset_item_media_reference_field", + "DatasetItemMediaReferenceMedia": ".dataset_item_media_reference_media", "DatasetRun": ".dataset_run", "DatasetRunItem": ".dataset_run_item", "DatasetRunWithItems": ".dataset_run_with_items", @@ -163,6 +169,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/dataset_item.py b/langfuse/api/commons/types/dataset_item.py index 54a13d81a..8d49f6439 100644 --- a/langfuse/api/commons/types/dataset_item.py +++ b/langfuse/api/commons/types/dataset_item.py @@ -7,6 +7,7 @@ import typing_extensions from ...core.pydantic_utilities import UniversalBaseModel from ...core.serialization import FieldMetadata +from .dataset_item_media_reference import DatasetItemMediaReference from .dataset_status import DatasetStatus @@ -52,6 +53,13 @@ class DatasetItem(UniversalBaseModel): updated_at: typing_extensions.Annotated[ dt.datetime, FieldMetadata(alias="updatedAt") ] + media_references: typing_extensions.Annotated[ + typing.Optional[typing.List[DatasetItemMediaReference]], + FieldMetadata(alias="mediaReferences"), + ] = pydantic.Field(default=None) + """ + Resolved Langfuse media references found in input, expectedOutput, and metadata. Only present when requested via includeMediaReferences. + """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( extra="allow", frozen=True diff --git a/langfuse/api/commons/types/dataset_item_media_reference.py b/langfuse/api/commons/types/dataset_item_media_reference.py new file mode 100644 index 000000000..95121fa41 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference.py @@ -0,0 +1,42 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata +from .dataset_item_media_reference_field import DatasetItemMediaReferenceField +from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia + + +class DatasetItemMediaReference(UniversalBaseModel): + field: DatasetItemMediaReferenceField = pydantic.Field() + """ + The dataset item field containing the reference + """ + + reference_string: typing_extensions.Annotated[ + str, FieldMetadata(alias="referenceString") + ] = pydantic.Field() + """ + The Langfuse media reference string, e.g. `@@@langfuseMedia:type=image/png|id=...|source=bytes@@@` + """ + + json_path: typing_extensions.Annotated[str, FieldMetadata(alias="jsonPath")] = ( + pydantic.Field() + ) + """ + JSONPath of the string holding the reference within the field, e.g. `$['image']` + """ + + media: typing.Optional[DatasetItemMediaReferenceMedia] = pydantic.Field( + default=None + ) + """ + The resolved media record. Null if the referenced media does not exist or has not been uploaded successfully. + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/commons/types/dataset_item_media_reference_field.py b/langfuse/api/commons/types/dataset_item_media_reference_field.py new file mode 100644 index 000000000..9dc9df5cd --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_field.py @@ -0,0 +1,26 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +from ...core import enum + +T_Result = typing.TypeVar("T_Result") + + +class DatasetItemMediaReferenceField(enum.StrEnum): + INPUT = "input" + EXPECTED_OUTPUT = "expectedOutput" + METADATA = "metadata" + + def visit( + self, + input: typing.Callable[[], T_Result], + expected_output: typing.Callable[[], T_Result], + metadata: typing.Callable[[], T_Result], + ) -> T_Result: + if self is DatasetItemMediaReferenceField.INPUT: + return input() + if self is DatasetItemMediaReferenceField.EXPECTED_OUTPUT: + return expected_output() + if self is DatasetItemMediaReferenceField.METADATA: + return metadata() diff --git a/langfuse/api/commons/types/dataset_item_media_reference_media.py b/langfuse/api/commons/types/dataset_item_media_reference_media.py new file mode 100644 index 000000000..94ca39f38 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_media.py @@ -0,0 +1,47 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata + + +class DatasetItemMediaReferenceMedia(UniversalBaseModel): + media_id: typing_extensions.Annotated[str, FieldMetadata(alias="mediaId")] = ( + pydantic.Field() + ) + """ + The unique langfuse identifier of the media record + """ + + content_type: typing_extensions.Annotated[ + str, FieldMetadata(alias="contentType") + ] = pydantic.Field() + """ + The MIME type of the media record + """ + + content_length: typing_extensions.Annotated[ + int, FieldMetadata(alias="contentLength") + ] = pydantic.Field() + """ + The size of the media record in bytes + """ + + url: str = pydantic.Field() + """ + The signed download URL of the media record + """ + + url_expiry: typing_extensions.Annotated[str, FieldMetadata(alias="urlExpiry")] = ( + pydantic.Field() + ) + """ + The expiry date and time of the download URL + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/dataset_items/client.py b/langfuse/api/dataset_items/client.py index 1f28c96fd..27b1377ba 100644 --- a/langfuse/api/dataset_items/client.py +++ b/langfuse/api/dataset_items/client.py @@ -103,7 +103,11 @@ def create( return _response.data def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -112,6 +116,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -135,7 +142,11 @@ def get( id="id", ) """ - _response = self._raw_client.get(id, request_options=request_options) + _response = self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data def list( @@ -145,6 +156,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -166,6 +178,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -198,6 +213,7 @@ def list( source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, @@ -337,7 +353,11 @@ async def main() -> None: return _response.data async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -346,6 +366,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -377,7 +400,11 @@ async def main() -> None: asyncio.run(main()) """ - _response = await self._raw_client.get(id, request_options=request_options) + _response = await self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data async def list( @@ -387,6 +414,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -408,6 +436,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -448,6 +479,7 @@ async def main() -> None: source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, diff --git a/langfuse/api/dataset_items/raw_client.py b/langfuse/api/dataset_items/raw_client.py index 970bc25d0..dd94f9991 100644 --- a/langfuse/api/dataset_items/raw_client.py +++ b/langfuse/api/dataset_items/raw_client.py @@ -167,7 +167,11 @@ def create( ) def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[DatasetItem]: """ Get a dataset item @@ -176,6 +180,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -186,6 +193,9 @@ def get( _response = self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -273,6 +283,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -294,6 +305,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -315,6 +329,7 @@ def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, @@ -641,7 +656,11 @@ async def create( ) async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[DatasetItem]: """ Get a dataset item @@ -650,6 +669,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -660,6 +682,9 @@ async def get( _response = await self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -747,6 +772,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -768,6 +794,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -789,6 +818,7 @@ async def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, diff --git a/langfuse/api/media/client.py b/langfuse/api/media/client.py index b22272b92..648cc72f5 100644 --- a/langfuse/api/media/client.py +++ b/langfuse/api/media/client.py @@ -138,12 +138,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -151,9 +151,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -162,11 +159,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -189,20 +189,18 @@ def get_upload_url( base_url="https://yourhost.com/path/to/api", ) client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) """ _response = self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data @@ -349,12 +347,12 @@ async def main() -> None: async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -362,9 +360,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -373,11 +368,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -405,23 +403,21 @@ async def get_upload_url( async def main() -> None: await client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) asyncio.run(main()) """ _response = await self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data diff --git a/langfuse/api/media/raw_client.py b/langfuse/api/media/raw_client.py index 4cc619770..9f4fa3d81 100644 --- a/langfuse/api/media/raw_client.py +++ b/langfuse/api/media/raw_client.py @@ -251,12 +251,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[GetMediaUploadUrlResponse]: """ @@ -264,9 +264,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -275,11 +272,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -609,12 +609,12 @@ async def patch( async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[GetMediaUploadUrlResponse]: """ @@ -622,9 +622,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -633,11 +630,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. diff --git a/langfuse/api/media/types/get_media_upload_url_request.py b/langfuse/api/media/types/get_media_upload_url_request.py index 99f055847..7222fbdba 100644 --- a/langfuse/api/media/types/get_media_upload_url_request.py +++ b/langfuse/api/media/types/get_media_upload_url_request.py @@ -10,18 +10,18 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): - trace_id: typing_extensions.Annotated[str, FieldMetadata(alias="traceId")] = ( - pydantic.Field() - ) + trace_id: typing_extensions.Annotated[ + typing.Optional[str], FieldMetadata(alias="traceId") + ] = pydantic.Field(default=None) """ - The trace ID associated with the media record + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. """ observation_id: typing_extensions.Annotated[ typing.Optional[str], FieldMetadata(alias="observationId") ] = pydantic.Field(default=None) """ - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. """ content_type: typing_extensions.Annotated[ @@ -41,9 +41,9 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): The SHA-256 hash of the media record """ - field: str = pydantic.Field() + field: typing.Optional[str] = pydantic.Field(default=None) """ - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( diff --git a/langfuse/media.py b/langfuse/media.py index 53940382c..5cc7d15ea 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -4,6 +4,8 @@ import hashlib import os import re +from dataclasses import dataclass +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast import httpx @@ -18,6 +20,81 @@ T = TypeVar("T") +@dataclass(frozen=True) +class LangfuseMediaReference: + """Resolved reference to media stored in Langfuse.""" + + media_id: str + content_type: str + url: str + url_expiry: Optional[str] = None + content_length: Optional[int] = None + reference_string: Optional[str] = None + + def url_is_expired(self) -> bool: + """Return whether the signed URL is already expired.""" + if self.url_expiry is None: + return False + + expiry = self.url_expiry.replace("Z", "+00:00") + + try: + expiry_datetime = datetime.fromisoformat(expiry) + except ValueError: + return False + + if expiry_datetime.tzinfo is None: + expiry_datetime = expiry_datetime.replace(tzinfo=timezone.utc) + + return expiry_datetime <= datetime.now(timezone.utc) + + def fetch_bytes( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> bytes: + """Fetch the media content from the signed URL. + + Args: + timeout: Request timeout in seconds. + client: Optional httpx client to use for the request. Pass this to + honor custom transport settings (proxy, CA bundle, mTLS) — in + particular when multiple Langfuse clients are configured, since + the SDK cannot otherwise tell which client produced this + reference. When omitted, the single configured client is used, + falling back to a default httpx client. + """ + from langfuse._client.resource_manager import LangfuseResourceManager + + httpx_client = client or LangfuseResourceManager.get_singleton_httpx_client() + response = ( + httpx_client.get(self.url, timeout=timeout) + if httpx_client is not None + else httpx.get(self.url, timeout=timeout) + ) + response.raise_for_status() + + return response.content + + def fetch_base64( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return raw base64 without a data URI prefix. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return base64.b64encode( + self.fetch_bytes(timeout=timeout, client=client) + ).decode() + + def fetch_data_uri( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return it as a data URI. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout, client=client)}" + + class LangfuseMedia: """A class for wrapping media objects for upload to Langfuse. diff --git a/pyproject.toml b/pyproject.toml index c5b20160d..aadacdd62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "opentelemetry-api>=1.33.1,<2", "opentelemetry-sdk>=1.33.1,<2", "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", + "jsonpath-ng>=1.8.0,<2", ] [dependency-groups] diff --git a/tests/e2e/test_datasets.py b/tests/e2e/test_datasets.py index 8d575180a..3ba58e03d 100644 --- a/tests/e2e/test_datasets.py +++ b/tests/e2e/test_datasets.py @@ -1,10 +1,15 @@ import time from datetime import timedelta +from pathlib import Path from langfuse import Langfuse from langfuse.api import DatasetStatus +from langfuse.media import LangfuseMedia, LangfuseMediaReference from tests.support.utils import create_uuid, wait_for_result +SAMPLE_IMAGE_PATH = Path("static/puton.jpg") +SAMPLE_IMAGE_CONTENT_TYPE = "image/jpeg" + def test_create_and_get_dataset(): langfuse = Langfuse(debug=False) @@ -69,6 +74,57 @@ def test_create_dataset_item(): assert dataset.items[0].dataset_name == name +def test_create_and_get_dataset_item_with_media(): + langfuse = Langfuse(debug=False) + name = create_uuid() + langfuse.create_dataset(name=name) + sample_image_bytes = SAMPLE_IMAGE_PATH.read_bytes() + + created_item = langfuse.create_dataset_item( + dataset_name=name, + input={ + "question": "What is in this image?", + "image": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + expected_output={ + "reference": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + metadata={ + "thumbnail": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ) + }, + ) + + assert created_item.input["image"].startswith("@@@langfuseMedia:") + + raw_dataset = langfuse.get_dataset(name) + assert isinstance(raw_dataset.items[0].input["image"], str) + + resolved_dataset = wait_for_result( + lambda: langfuse.get_dataset(name, resolve_media_references=True), + is_result_ready=lambda dataset: isinstance( + dataset.items[0].input["image"], LangfuseMediaReference + ), + ) + + resolved_item = resolved_dataset.items[0] + assert isinstance(resolved_item.input["image"], LangfuseMediaReference) + assert isinstance( + resolved_item.expected_output["reference"], LangfuseMediaReference + ) + assert isinstance(resolved_item.metadata["thumbnail"], LangfuseMediaReference) + assert resolved_item.input["image"].content_type == SAMPLE_IMAGE_CONTENT_TYPE + assert resolved_item.input["image"].fetch_bytes() == sample_image_bytes + + def test_get_all_items(): langfuse = Langfuse(debug=False) name = create_uuid() diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py new file mode 100644 index 000000000..78fb87094 --- /dev/null +++ b/tests/unit/test_datasets.py @@ -0,0 +1,227 @@ +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import Mock + +import pytest + +from langfuse._client.client import Langfuse +from langfuse.api import ( + DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, + DatasetStatus, +) +from langfuse.media import LangfuseMedia, LangfuseMediaReference + + +@pytest.mark.parametrize( + ("field", "field_value", "json_path", "assert_resolved"), + [ + ( + DatasetItemMediaReferenceField.INPUT, + "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@", + "$", + lambda item: isinstance(item.input, LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.INPUT, + { + "image": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + "$['image']", + lambda item: isinstance(item.input["image"], LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + ["@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@"], + "$[0]", + lambda item: isinstance( + item.expected_output[0], LangfuseMediaReference + ), + ), + ( + DatasetItemMediaReferenceField.METADATA, + { + "image'key": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + r"$['image\'key']", + lambda item: isinstance( + item.metadata["image'key"], LangfuseMediaReference + ), + ), + ], +) +def test_hydrate_dataset_item_media_references_supports_json_path_cases( + field, + field_value, + json_path, + assert_resolved, +): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input=field_value if field == DatasetItemMediaReferenceField.INPUT else None, + expected_output=field_value + if field == DatasetItemMediaReferenceField.EXPECTED_OUTPUT + else None, + metadata=field_value if field == DatasetItemMediaReferenceField.METADATA else None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=field, + reference_string=reference_string, + json_path=json_path, + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert assert_resolved(hydrated) + + +def test_create_dataset_item_processes_media_before_api_call(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + input_data = {"image": media} + metadata = {"items": [media], "keep": "value"} + + result = client.create_dataset_item( + dataset_name="dataset", + input=input_data, + expected_output=root_media, + metadata=metadata, + ) + + assert result == "created-item" + assert input_data == {"image": media} + assert metadata == {"items": [media], "keep": "value"} + media_manager._upload_media_sync.assert_any_call(media=media) + media_manager._upload_media_sync.assert_any_call(media=root_media) + assert media_manager._upload_media_sync.call_count == 2 + dataset_items_api.create.assert_called_once_with( + dataset_name="dataset", + input={"image": media._reference_string}, + expected_output=root_media._reference_string, + metadata={"items": [media._reference_string], "keep": "value"}, + source_trace_id=None, + source_observation_id=None, + status=None, + id=None, + ) + + +def test_create_dataset_item_roundtrips_resolved_media_reference(): + # get_dataset(resolve_media_references=True) hydrates strings into + # LangfuseMediaReference instances. Feeding such an item back into + # create_dataset_item must re-emit the original reference string, otherwise + # the dataclass is serialized as an opaque dict and the media is orphaned. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={"image": reference_string}, + expected_output=None, + metadata=None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + hydrated = client._hydrate_dataset_item_media_references(item) + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input=hydrated.input) + + assert dataset_items_api.create.call_args.kwargs["input"] == { + "image": reference_string + } + media_manager._upload_media_sync.assert_not_called() + + +def test_create_dataset_item_processes_shared_media_subtrees(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + shared = {"image": media} + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"a": shared, "b": shared}, + ) + + assert shared == {"image": media} + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "a": {"image": media._reference_string}, + "b": {"image": media._reference_string}, + } + + +def test_create_dataset_item_processes_media_in_sets(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input={"images": {media}}) + + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": {media._reference_string} + } diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 63df03920..d4b3c86cc 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,7 +4,8 @@ import pytest -from langfuse.media import LangfuseMedia +from langfuse._client.resource_manager import LangfuseResourceManager +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00" @@ -107,6 +108,105 @@ def test_nonexistent_file(): assert media._content_type is None +def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): + response = Mock() + response.content = b"test-bytes" + response.raise_for_status.return_value = None + configured_httpx_client = Mock() + configured_httpx_client.get.return_value = response + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=configured_httpx_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" + configured_httpx_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=12.5 + ) + httpx_get.assert_not_called() + + +def test_media_reference_fetch_uses_explicit_client(monkeypatch): + response = Mock() + response.content = b"explicit-bytes" + response.raise_for_status.return_value = None + explicit_client = Mock() + explicit_client.get.return_value = response + + singleton_client = Mock() + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=singleton_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert ( + reference.fetch_bytes(timeout=5.0, client=explicit_client) == b"explicit-bytes" + ) + explicit_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=5.0 + ) + # Explicit client wins over the configured singleton and the default httpx. + singleton_client.get.assert_not_called() + httpx_get.assert_not_called() + + +def test_media_reference_fetch_falls_back_to_default_with_multiple_clients( + monkeypatch, caplog +): + import logging + + response = Mock() + response.content = b"default-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + client_a = Mock() + client_b = Mock() + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + { + "pk-a": SimpleNamespace(httpx_client=client_a), + "pk-b": SimpleNamespace(httpx_client=client_b), + }, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + with caplog.at_level(logging.WARNING, logger="langfuse"): + assert reference.fetch_bytes(timeout=8.0) == b"default-bytes" + + # Ambiguous multi-client setup: warn and fall back to the default httpx + # instead of silently using an arbitrary instance's transport config. + assert "Multiple Langfuse clients" in caplog.text + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=8.0) + client_a.get.assert_not_called() + client_b.get.assert_not_called() + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7 diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 5e6a8c00a..3cf152106 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -142,6 +142,18 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() +def test_upload_media_sync_rejects_invalid_media(): + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=Queue(), + ) + media = LangfuseMedia() + + with pytest.raises(ValueError, match="Cannot upload invalid LangfuseMedia"): + manager._upload_media_sync(media=media) + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( diff --git a/tests/unit/test_serializer.py b/tests/unit/test_serializer.py index f4c8dde86..6605485de 100644 --- a/tests/unit/test_serializer.py +++ b/tests/unit/test_serializer.py @@ -12,6 +12,7 @@ from langfuse._utils.serializer import ( EventSerializer, ) +from langfuse.media import LangfuseMediaReference class TestEnum(Enum): @@ -70,6 +71,30 @@ def test_pydantic_model(): assert json.loads(serializer.encode(model)) == {"field": "test"} +def test_langfuse_media_reference_serializes_to_reference_string(): + # Resolved references must round-trip back to their original reference string + # rather than falling through to asdict() and emitting an opaque dict. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + reference_string=reference_string, + ) + serializer = EventSerializer() + assert serializer.encode(ref) == f'"{reference_string}"' + + +def test_langfuse_media_reference_without_reference_string_falls_back_to_dict(): + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + ) + serializer = EventSerializer() + assert json.loads(serializer.encode(ref))["media_id"] == "media-id" + + def test_path(): path = Path("/tmp/test.txt") serializer = EventSerializer() diff --git a/uv.lock b/uv.lock index 1dab4f017..9a219dbfe 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-09T08:44:29.596356662Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -469,6 +469,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/07/02e16ed01e04a374e644b575638ec7987ae846d25ad97bcc9945a3ee4b0e/jsonpatch-1.33-py2.py3-none-any.whl", hash = "sha256:0ae28c0cd062bbd8b8ecc26d7d164fbbea9652a1a3693f3b956c1eae5145dade", size = 12898, upload-time = "2023-06-16T21:01:28.466Z" }, ] +[[package]] +name = "jsonpath-ng" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" }, +] + [[package]] name = "jsonpointer" version = "3.1.0" @@ -559,6 +568,7 @@ source = { editable = "." } dependencies = [ { name = "backoff" }, { name = "httpx" }, + { name = "jsonpath-ng" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, @@ -593,6 +603,7 @@ docs = [ requires-dist = [ { name = "backoff", specifier = ">=1.10.0" }, { name = "httpx", specifier = ">=0.15.4,<1.0" }, + { name = "jsonpath-ng", specifier = ">=1.8.0,<2" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2" },