Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LangfuseTool,
)
from ._version import __version__
from .media import LangfuseMedia, LangfuseMediaReference
from .span_filter import (
KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES,
is_default_export_span,
Expand All @@ -49,6 +50,8 @@

__all__ = [
"Langfuse",
"LangfuseMedia",
"LangfuseMediaReference",
"get_client",
"observe",
"propagate_attributes",
Expand Down
165 changes: 159 additions & 6 deletions langfuse/_client/client.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import backoff
import httpx
from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped]
from opentelemetry import context as otel_context_api
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
Expand Down Expand Up @@ -94,6 +95,7 @@
CreateTextPromptRequest,
Dataset,
DatasetItem,
DatasetItemMediaReferenceField,
DatasetRunWithItems,
DatasetStatus,
DeleteDatasetRunResponse,
Expand Down Expand Up @@ -126,7 +128,7 @@
_run_task,
)
from langfuse.logger import langfuse_logger
from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference
from langfuse.model import (
ChatMessageDict,
ChatMessageWithPlaceholdersDict,
Expand Down Expand Up @@ -2322,15 +2324,17 @@ def get_dataset(
*,
fetch_items_page_size: Optional[int] = 50,
version: Optional[datetime] = None,
resolve_media_references: bool = False,
) -> "DatasetClient":
"""Fetch a dataset by its name.

Args:
name (str): The name of the dataset to fetch.
fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC).
name: The name of the dataset to fetch.
fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version: Retrieve dataset items as they existed at this specific point in time (UTC).
If provided, returns the state of items at the specified UTC timestamp.
If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC.
resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects.

Returns:
DatasetClient: The dataset with the given name.
Expand All @@ -2339,7 +2343,7 @@ def get_dataset(
langfuse_logger.debug(f"Getting datasets {name}")
dataset = self.api.datasets.get(dataset_name=self._url_encode(name))

dataset_items = []
dataset_items: List[DatasetItem] = []
page = 1

while True:
Expand All @@ -2348,8 +2352,16 @@ def get_dataset(
page=page,
limit=fetch_items_page_size,
version=version,
include_media_references=resolve_media_references or None,
)
dataset_items.extend(
[
self._hydrate_dataset_item_media_references(item)
for item in new_items.data
]
if resolve_media_references
else new_items.data
)
dataset_items.extend(new_items.data)

if new_items.meta.total_pages <= page:
break
Expand Down Expand Up @@ -3355,6 +3367,17 @@ def create_dataset_item(
try:
langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}")

uploaded_media_ids: set[str] = set()
input = self._process_dataset_item_media(
data=input, uploaded_media_ids=uploaded_media_ids
)
expected_output = self._process_dataset_item_media(
data=expected_output, uploaded_media_ids=uploaded_media_ids
)
metadata = self._process_dataset_item_media(
data=metadata, uploaded_media_ids=uploaded_media_ids
)

result = self.api.dataset_items.create(
dataset_name=dataset_name,
input=input,
Expand All @@ -3371,6 +3394,136 @@ def create_dataset_item(
handle_fern_exception(e)
raise e

def _process_dataset_item_media(
self, *, data: Any, uploaded_media_ids: set[str]
) -> Any:
if self._resources is None:
return data

max_levels = 10

def _process_data_recursively(
data: Any, level: int, ancestor_container_ids: set[int]
) -> Any:
# Avoid jsonpath-ng here: dataset writes should keep working
# under python -OO where parser docstrings may be stripped.
Comment on lines +3408 to +3409

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my opinion an okay compromise as the write path doesn't become a hard blocker for users and easy to maintain

Avoid hand rolling jsonpath parsing for the read path -> users can also opt out here so in my opinion fine

if isinstance(data, LangfuseMedia):
return self._upload_dataset_item_media(
media=data, uploaded_media_ids=uploaded_media_ids
)

if isinstance(data, LangfuseMediaReference):
return data.reference_string if data.reference_string else data

# Tuples are intentionally excluded: namedtuple subclasses can't be
# rebuilt from an iterable, so media inside them is left untouched.
if not isinstance(data, (list, set, frozenset, dict)):
return data

# Container ids only protect against recursive cycles; media upload
# dedupe is handled by uploaded_media_ids.
data_id = id(data)
if data_id in ancestor_container_ids or level > max_levels:
return data

next_ancestor_container_ids = ancestor_container_ids | {data_id}

if isinstance(data, (list, set, frozenset)):
processed = (
_process_data_recursively(
item, level + 1, next_ancestor_container_ids
)
for item in data
)
return type(data)(processed)

return {
key: _process_data_recursively(
value, level + 1, next_ancestor_container_ids
Comment thread
wochinge marked this conversation as resolved.
)
for key, value in data.items()
}

return _process_data_recursively(data, 1, set())

def _upload_dataset_item_media(
self, *, media: LangfuseMedia, uploaded_media_ids: set[str]
) -> str:
reference_string = media._reference_string
media_id = media._media_id

if reference_string is None or media_id is None:
raise ValueError("Cannot create dataset item with invalid LangfuseMedia.")

if media_id not in uploaded_media_ids:
assert self._resources is not None
self._resources._media_manager._upload_media_sync(media=media)
uploaded_media_ids.add(media_id)

return reference_string

def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem:
media_references = item.media_references or []
if not media_references:
return item

# Map the API enum member to the snake_case model attribute so this keeps
# working regardless of the enum's wire value (e.g. "expectedOutput").
attr_by_field = {
DatasetItemMediaReferenceField.INPUT: "input",
DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output",
DatasetItemMediaReferenceField.METADATA: "metadata",
}
hydrated_fields = {
"input": item.input,
"expected_output": item.expected_output,
"metadata": item.metadata,
}

for media_reference in media_references:
media = media_reference.media
if media is None:
continue

field = attr_by_field.get(media_reference.field)
if field is None:
continue

replacement = LangfuseMediaReference(
media_id=media.media_id,
content_type=media.content_type,
url=media.url,
url_expiry=media.url_expiry,
content_length=media.content_length,
reference_string=media_reference.reference_string,
)
Comment thread
claude[bot] marked this conversation as resolved.
hydrated_fields[field] = self._replace_json_path_value(
value=hydrated_fields[field],
json_path=media_reference.json_path,
replacement=replacement,
)

return item.model_copy(
update={
"input": hydrated_fields["input"],
"expected_output": hydrated_fields["expected_output"],
"metadata": hydrated_fields["metadata"],
}
)

def _replace_json_path_value(
self, *, value: Any, json_path: str, replacement: LangfuseMediaReference
) -> Any:
try:
value = parse_jsonpath(json_path).update(value, replacement)
except Exception as e:
langfuse_logger.warning(
f"Failed to hydrate dataset media reference at JSONPath {json_path}",
exc_info=e,
)

return value
Comment thread
wochinge marked this conversation as resolved.

def resolve_media_references(
self,
*,
Comment thread
claude[bot] marked this conversation as resolved.
Expand Down
23 changes: 23 additions & 0 deletions langfuse/_client/resource_manager.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ class LangfuseResourceManager:
_instances: Dict[str, "LangfuseResourceManager"] = {}
_lock = threading.RLock()

@classmethod
def get_singleton_httpx_client(cls) -> Optional[httpx.Client]:
with cls._lock:
instances = list(cls._instances.values())

if not instances:
return None

if len(instances) > 1:
# Mirror get_client's safety stance: with multiple clients we
# cannot tell which one produced a given reference, so fall back
# to a default httpx client rather than silently using an
# arbitrary instance's transport config (proxy / CA / mTLS).
langfuse_logger.warning(
"Multiple Langfuse clients are instantiated; falling back to a "
"default httpx client for LangfuseMediaReference fetches. Pass an "
"explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to "
"honor per-client transport settings."
)
return None

return instances[0].httpx_client
Comment thread
claude[bot] marked this conversation as resolved.

def __new__(
cls,
*,
Expand Down
28 changes: 28 additions & 0 deletions langfuse/_task_manager/media_manager.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,34 @@ def _process_media(
f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}"
)

def _upload_media_sync(self, *, media: LangfuseMedia) -> None:
if not self._enabled:
raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.")

if (
media._content_length is None
or media._content_type is None
or media._content_sha256_hash is None
or media._content_bytes is None
):
raise ValueError("Cannot upload invalid LangfuseMedia.")

if media._media_id is None:
raise ValueError("Cannot upload LangfuseMedia without media ID.")

upload_media_job = UploadMediaJob(
media_id=media._media_id,
content_bytes=media._content_bytes,
content_type=media._content_type,
content_length=media._content_length,
content_sha256_hash=media._content_sha256_hash,
trace_id=None,
observation_id=None,
field=None,
)

self._process_upload_media_job(data=upload_media_job)
Comment thread
wochinge marked this conversation as resolved.

def _process_upload_media_job(
self,
*,
Expand Down
4 changes: 2 additions & 2 deletions langfuse/_task_manager/media_upload_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ class UploadMediaJob(TypedDict):
content_length: int
content_bytes: bytes
content_sha256_hash: str
trace_id: str
trace_id: Optional[str]
observation_id: Optional[str]
field: str
field: Optional[str]
8 changes: 7 additions & 1 deletion langfuse/_utils/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from pydantic import BaseModel

from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference

# Attempt to import Serializable
try:
Expand Down Expand Up @@ -59,12 +59,18 @@
if isinstance(obj, LangfuseMedia):
return (
obj._reference_string
or f"<Upload handling failed for LangfuseMedia of type {obj._content_type}>"
)

if (
isinstance(obj, LangfuseMediaReference)
and obj.reference_string is not None
):
return obj.reference_string

# Check if numpy is available and if the object is a numpy scalar
# If so, convert it to a Python scalar using the item() method
if np is not None and isinstance(obj, np.generic):

Check failure on line 73 in langfuse/_utils/serializer.py

View check run for this annotation

Claude / Claude Code Review

EventSerializer LMR opacity bypassed by parent dataclass/BaseModel via asdict/model_dump

The new `LangfuseMediaReference` opacity branch at serializer.py:65-69 only fires when an LMR is dispatched directly to `default()` (top-level or as a dict/list element). When an LMR is nested inside a user `@dataclass` or `pydantic.BaseModel`, the existing `is_dataclass: return asdict(obj)` branch at line 130 (and the `model_dump()` branch at 132-139) does CPython's recursive conversion that never re-enters `default()`, so the nested LMR (itself a frozen dataclass) is emitted as a flat dict exp
Comment on lines 62 to 73

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The new LangfuseMediaReference opacity branch at serializer.py:65-69 only fires when an LMR is dispatched directly to default() (top-level or as a dict/list element). When an LMR is nested inside a user @dataclass or pydantic.BaseModel, the existing is_dataclass: return asdict(obj) branch at line 130 (and the model_dump() branch at 132-139) does CPython's recursive conversion that never re-enters default(), so the nested LMR (itself a frozen dataclass) is emitted as a flat dict exposing url, url_expiry, content_length, and reference_string as raw JSON fields — defeating the opacity this PR was explicitly added to provide. Fix: in the dataclass and BaseModel branches, iterate fields manually and route each value through self.default() so nested LMRs (and LangfuseMedia) hit the opacity branches.

Extended reasoning...

What the bug is

This PR adds a new opacity branch in EventSerializer._default_inner for LangfuseMediaReference (serializer.py:65-69), mirroring the existing branch for LangfuseMedia at 59-63. Both are meant to collapse the rich media object back to its @@@langfuseMedia:...@@@ reference string instead of letting CPython's default object serialization leak the signed URL and other transport-only fields.

That branch only fires when the LMR is dispatched directly to default() — i.e., top-level, or as a dict/list element (since the dict/list branches at lines 148-152 explicitly call self.default(v) on each value). But the dataclass branch (return asdict(obj), line 130) and the Pydantic BaseModel branch (return obj.model_dump(), line 139) do CPython-native recursive conversion that never calls back into default(). LangfuseMediaReference is itself a @dataclass(frozen=True), so when wrapped in a parent dataclass asdict walks into it and emits its fields as a flat dict.

Why existing safeguards don't catch it

  • The new unit test test_langfuse_media_reference_serializes_to_reference_string only verifies the top-level case.
  • The existing LangfuseMedia branch has the exact same blind spot but is partially masked by the trace-side flow: MediaManager._find_and_process_media traverses dicts/lists and replaces nested LangfuseMedia instances with reference-string equivalents before EventSerializer ever sees them. There's no equivalent traversal for LangfuseMediaReference.
  • Same shape exists on the write path via fern's jsonable_encoder (langfuse/api/core/jsonable_encoder.py), which also uses asdict / model_dump — so the same nested-LMR-in-user-dataclass leak reaches create_dataset_item API bodies and persists in stored dataset items, breaking the read-side round-trip guarantee that bug Bump langchain from 0.0.237 to 0.0.262 #5's reference_string field added for direct-LMR-in-dict.

Step-by-step proof

Verified empirically against current HEAD:

@dataclass
class Container:
    image_ref: LangfuseMediaReference
    note: str = 'hello'

lmr = LangfuseMediaReference(
    media_id='abc', content_type='image/png',
    url='https://signed.example.com/SECRET_TOKEN',
    url_expiry='2026-06-15T12:00:00Z', content_length=1234,
    reference_string='@@@langfuseMedia:type=image/png|id=abc|source=bytes@@@',
)
s = EventSerializer()

print(s.encode(lmr))
# "@@@langfuseMedia:type=image/png|id=abc|source=bytes@@@"        (opacity preserved)

print(s.encode({'image_ref': lmr}))
# {"image_ref": "@@@langfuseMedia:..."}                            (opacity preserved)

print(s.encode(Container(image_ref=lmr)))
# {"image_ref": {"media_id": "abc", "content_type": "image/png",
#                "url": "https://signed.example.com/SECRET_TOKEN",
#                "url_expiry": "...", "content_length": 1234,
#                "reference_string": "@@@..."}, "note": "hello"}   (URL LEAKED)

Same shape for pydantic.BaseModel: s.encode(PydanticContainer(image_ref=lmr)) emits the identical flat-dict structure with the signed URL inlined.

Impact

The signed url and url_expiry end up serialized into trace data and (via fern's encoder on the write path) into create_dataset_item API bodies / stored dataset items, even though this PR explicitly added the opacity branch to prevent that.

The round-trip case from bug #5's fix breaks specifically when an LMR is wrapped in a user dataclass/Pydantic model: the server's media-reference scanner still finds the @@@langfuseMedia:...@@@ pattern inside the persisted reference_string field on hydration, so get_dataset(resolve_media_references=True) emits an LMR at $['wrap']['image_ref']['reference_string'] but leaves the sibling url/url_expiry/content_length strings stale — the user observes a mixed/confusing shape where item.input['wrap']['image_ref'] is a dict (not an LMR) and ['url'] is a soon-to-expire string from the original write.

The trigger is plausible: users with @observe-decorated functions returning typed results (dataclasses or Pydantic models) wrapping LMRs obtained from get_dataset(resolve_media_references=True) — a natural pattern in ML/LLM application code that builds domain types around SDK outputs.

How to fix

In the dataclass branch (and analogously for the Pydantic branch), iterate fields manually and route each value back through self.default() so nested LMRs / LangfuseMedia hit the opacity branches:

if is_dataclass(obj):
    from dataclasses import fields
    return {f.name: self.default(getattr(obj, f.name)) for f in fields(obj)}

if isinstance(obj, BaseModel):
    obj.model_rebuild()
    if isinstance(raw := getattr(obj, "raw", None), BaseModel):
        raw.model_rebuild()
    return {k: self.default(v) for k, v in obj.model_dump().items()}

The Pydantic case needs a small pre-pass or field iteration since model_dump() itself recursively converts; alternatively the LMR substitution can be done before model_dump() is called. Either approach preserves opacity for nested LMRs without regressing the LMR-in-dict round-trip the PR already restored.

(The write path — fern's jsonable_encoder — has the same structural issue and is worth tracking separately; the serializer fix alone closes the trace-side leak.)

return obj.item()

# Check if numpy is available and if the object is a numpy array
Expand Down
9 changes: 9 additions & 0 deletions langfuse/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
CreateScoreValue,
Dataset,
DatasetItem,
DatasetItemMediaReference,
DatasetItemMediaReferenceField,
DatasetItemMediaReferenceMedia,
DatasetRun,
DatasetRunItem,
DatasetRunWithItems,
Expand Down Expand Up @@ -396,6 +399,9 @@
"CreateTextPromptType": ".prompts",
"Dataset": ".commons",
"DatasetItem": ".commons",
"DatasetItemMediaReference": ".commons",
"DatasetItemMediaReferenceField": ".commons",
"DatasetItemMediaReferenceMedia": ".commons",
"DatasetRun": ".commons",
"DatasetRunItem": ".commons",
"DatasetRunWithItems": ".commons",
Expand Down Expand Up @@ -717,6 +723,9 @@ def __dir__():
"CreateTextPromptType",
"Dataset",
"DatasetItem",
"DatasetItemMediaReference",
"DatasetItemMediaReferenceField",
"DatasetItemMediaReferenceMedia",
"DatasetRun",
"DatasetRunItem",
"DatasetRunWithItems",
Expand Down
Loading
Loading