Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 183 additions & 86 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,93 @@ class OpenAiDefinition:
]


_RESPONSES_PROMPT_FIELDS = ("tools", "tool_choice", "parallel_tool_calls")
_STRUCTURED_OUTPUT_METADATA_FIELDS = ("response_format", "text_format")


def _is_not_given(value: Any) -> bool:
return isinstance(value, NotGiven)


def _get_attr_or_item(value: Any, key: str, default: Any = None) -> Any:
if isinstance(value, dict):
return value.get(key, default)

return getattr(value, key, default)


def _serialize_openai_value(value: Any) -> Any:
"""Convert OpenAI SDK request/response wrapper values into plain data."""

if _is_not_given(value):
return None

if isclass(value) and issubclass(value, BaseModel):
return value.model_json_schema()

if isinstance(value, BaseModel):
value.model_rebuild()

try:
return _serialize_openai_value(
value.model_dump(mode="json", warnings=False)
)
except Exception:
return _serialize_openai_value(value.model_dump(warnings=False))
Comment on lines +219 to +220

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fallback model_dump may return non-serialisable values

When model_dump(mode="json", warnings=False) raises, the bare except Exception silently retries with model_dump(warnings=False) (no mode="json"). That second call can return raw Python objects — datetime, UUID, Decimal, etc. — which then pass through _serialize_openai_value unchanged (they are not NotGiven, BaseModel, dict, or list) and land in the trace payload. Any downstream JSON serialiser will fail on those values, and the silent catch makes the failure hard to diagnose.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/openai.py
Line: 219-220

Comment:
**Fallback `model_dump` may return non-serialisable values**

When `model_dump(mode="json", warnings=False)` raises, the bare `except Exception` silently retries with `model_dump(warnings=False)` (no `mode="json"`). That second call can return raw Python objects — `datetime`, `UUID`, `Decimal`, etc. — which then pass through `_serialize_openai_value` unchanged (they are not `NotGiven`, `BaseModel`, `dict`, or `list`) and land in the trace payload. Any downstream JSON serialiser will fail on those values, and the silent catch makes the failure hard to diagnose.

How can I resolve this? If you propose a fix, please make it concise.


if isinstance(value, dict):
return {
key: _serialize_openai_value(val)
for key, val in value.items()
if not _is_not_given(val)
}

if isinstance(value, (list, tuple)):
return [_serialize_openai_value(item) for item in value]

return value


def _get_structured_output_metadata(metadata: Optional[Any], kwargs: Any) -> Any:
structured_output_metadata = {}

for key in _STRUCTURED_OUTPUT_METADATA_FIELDS:
value = kwargs.get(key, None)

if value is not None and not _is_not_given(value):
structured_output_metadata[key] = _serialize_openai_value(value)

if not structured_output_metadata:
return metadata

metadata_dict = (
metadata.model_dump() if isinstance(metadata, BaseModel) else metadata
Comment on lines +247 to +248

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 model_dump() without mode="json" may embed non-serialisable objects in metadata

When the caller passes a Pydantic BaseModel as metadata, the conversion metadata.model_dump() is done without mode="json". This differs from _serialize_openai_value's own Pydantic handling (which uses mode="json" as the primary path), so datetime, Enum, and similar field types could appear as raw Python objects in the merged metadata dict that Langfuse eventually serialises.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/openai.py
Line: 247-248

Comment:
**`model_dump()` without `mode="json"` may embed non-serialisable objects in metadata**

When the caller passes a Pydantic `BaseModel` as `metadata`, the conversion `metadata.model_dump()` is done without `mode="json"`. This differs from `_serialize_openai_value`'s own Pydantic handling (which uses `mode="json"` as the primary path), so `datetime`, `Enum`, and similar field types could appear as raw Python objects in the merged metadata dict that Langfuse eventually serialises.

How can I resolve this? If you propose a fix, please make it concise.

)

if metadata_dict is None:
metadata_dict = {}

if not isinstance(metadata_dict, dict):
metadata_dict = {}

return {**metadata_dict, **structured_output_metadata}


def _extract_response_api_completion(output: Any) -> Any:
output = _serialize_openai_value(output)

if not isinstance(output, list):
return output

if len(output) > 1:
return output

if len(output) == 1:
return output[0]

return None


class OpenAiArgsExtractor:
def __init__(
self,
Expand All @@ -199,17 +286,7 @@ def __init__(
**kwargs: Any,
) -> None:
self.args = {}
self.args["metadata"] = (
metadata
if "response_format" not in kwargs
else {
**(metadata or {}),
"response_format": kwargs["response_format"].model_json_schema()
if isclass(kwargs["response_format"])
and issubclass(kwargs["response_format"], BaseModel)
else kwargs["response_format"],
}
)
self.args["metadata"] = _get_structured_output_metadata(metadata, kwargs)
self.args["name"] = name
self.args["langfuse_public_key"] = langfuse_public_key
self.args["langfuse_prompt"] = langfuse_prompt
Expand All @@ -231,7 +308,8 @@ def get_openai_args(self) -> Any:

# OpenAI does not support non-string type values in metadata when using
# model distillation feature
self.kwargs["metadata"].pop("response_format", None)
for key in _STRUCTURED_OUTPUT_METADATA_FIELDS:
self.kwargs["metadata"].pop(key, None)
Comment on lines +311 to +312

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve user text_format metadata when storing

When store=True is used without a structured text_format kwarg, callers can still pass valid OpenAI metadata such as metadata={"text_format": "plain"}. This loop now blindly removes text_format from the metadata dict before forwarding the request, and when no structured output metadata was injected it also mutates the caller's original dict, so valid distillation metadata is silently dropped; only the SDK-injected structured field should be removed when text_format was actually supplied as an OpenAI argument.

Useful? React with 👍 / 👎.


return self.kwargs

Expand All @@ -249,6 +327,13 @@ def wrapper(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any:
def _extract_responses_prompt(kwargs: Any) -> Any:
input_value = kwargs.get("input", None)
instructions = kwargs.get("instructions", None)
prompt_fields = {}

for key in _RESPONSES_PROMPT_FIELDS:
value = kwargs.get(key, None)

if value is not None and not isinstance(value, NotGiven):
prompt_fields[key] = _serialize_openai_value(value)

if isinstance(input_value, NotGiven):
input_value = None
Expand All @@ -257,21 +342,29 @@ def _extract_responses_prompt(kwargs: Any) -> Any:
instructions = None

if instructions is None:
return input_value

if input_value is None:
return {"instructions": instructions}

if isinstance(input_value, str):
return [
prompt = input_value
elif input_value is None:
prompt = {"instructions": instructions}
elif isinstance(input_value, str):
prompt = [
{"role": "system", "content": instructions},
{"role": "user", "content": input_value},
]
elif isinstance(input_value, list):
prompt = [{"role": "system", "content": instructions}, *input_value]
else:
prompt = {"instructions": instructions, "input": input_value}

if not prompt_fields:
return prompt

if isinstance(input_value, list):
return [{"role": "system", "content": instructions}, *input_value]
if isinstance(prompt, dict) and set(prompt.keys()) <= {"instructions", "input"}:
return {**prompt, **prompt_fields}

return {"instructions": instructions, "input": input_value}
if prompt is not None:
return {"input": prompt, **prompt_fields}

return prompt_fields


def _extract_chat_prompt(kwargs: Any) -> Any:
Expand Down Expand Up @@ -620,13 +713,7 @@ def _extract_streamed_response_api_response(chunks: Any) -> Any:
metadata[key] = val

if key == "output":
output = val
if not isinstance(output, list):
completion = output
elif len(output) > 1:
completion = output
elif len(output) == 1:
completion = output[0]
completion = _extract_response_api_completion(val)

return (model, completion, usage, metadata)

Expand Down Expand Up @@ -670,7 +757,8 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any:
if completion["content"] is None
else completion["content"] + delta.get("content", None)
)
elif delta.get("function_call", None) is not None:

if delta.get("function_call", None) is not None:
curr = completion["function_call"]
tool_call_chunk = delta.get("function_call", None)

Expand All @@ -686,70 +774,86 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any:
)
curr["arguments"] += getattr(tool_call_chunk, "arguments", "")

elif (
if (
delta.get("tool_calls", None) is not None
and len(delta.get("tool_calls")) > 0
):
curr = completion["tool_calls"]
tool_call_chunk = getattr(
delta.get("tool_calls", None)[0], "function", None
)

if not curr:
completion["tool_calls"] = [
{
"name": getattr(tool_call_chunk, "name", ""),
"arguments": getattr(tool_call_chunk, "arguments", ""),
}
]

elif getattr(tool_call_chunk, "name", None) is not None:
curr.append(
{
"name": getattr(tool_call_chunk, "name", None),
"arguments": getattr(
tool_call_chunk, "arguments", None
),
}
)
completion["tool_calls"] = []
curr = completion["tool_calls"]

else:
curr[-1]["name"] = curr[-1]["name"] or getattr(
tool_call_chunk, "name", None
for raw_tool_call in delta.get("tool_calls", []):
index = _get_attr_or_item(raw_tool_call, "index", None)

if not isinstance(index, int):
index = len(curr) - 1 if curr else 0

while len(curr) <= index:
curr.append({"function": {"name": "", "arguments": ""}})

current_tool_call = curr[index]
tool_call_id = _get_attr_or_item(raw_tool_call, "id", None)
tool_call_type = _get_attr_or_item(raw_tool_call, "type", None)
tool_call_chunk = _get_attr_or_item(
raw_tool_call, "function", None
)

if curr[-1]["arguments"] is None:
curr[-1]["arguments"] = ""
if tool_call_id is not None:
current_tool_call["id"] = tool_call_id

if tool_call_type is not None:
current_tool_call["type"] = tool_call_type

if tool_call_chunk is None:
continue

curr[-1]["arguments"] += getattr(
tool_call_chunk, "arguments", ""
function_call = current_tool_call.setdefault("function", {})
tool_name = _get_attr_or_item(tool_call_chunk, "name", None)
tool_arguments = _get_attr_or_item(
tool_call_chunk, "arguments", None
)

if tool_name is not None:
function_call["name"] = (
function_call.get("name") or tool_name
)

if tool_arguments is not None:
function_call["arguments"] = (
function_call.get("arguments") or ""
) + tool_arguments

if resource.type == "completion":
completion += choice.get("text", "")

def get_response_for_chat() -> Any:
return (
completion["content"]
or (
completion["function_call"]
and {
"role": "assistant",
"function_call": completion["function_call"],
}
)
or (
completion["tool_calls"]
and {
"role": "assistant",
# "tool_calls": [{"function": completion["tool_calls"]}],
"tool_calls": [
{"function": data} for data in completion["tool_calls"]
],
}
)
or None
)
content = completion["content"]

if completion["tool_calls"]:
response = {
"role": "assistant",
"tool_calls": completion["tool_calls"],
}

if content is not None:
response["content"] = content

return response

if completion["function_call"]:
response = {
"role": "assistant",
"function_call": completion["function_call"],
}

if content is not None:
response["content"] = content

return response

return content or None

return (
model,
Expand Down Expand Up @@ -777,14 +881,7 @@ def _get_langfuse_data_from_default_response(
completion = choice.text if _is_openai_v1() else choice.get("text", None)

elif resource.object == "Responses" or resource.object == "AsyncResponses":
output = response.get("output", {})

if not isinstance(output, list):
completion = output
elif len(output) > 1:
completion = output
elif len(output) == 1:
completion = output[0]
completion = _extract_response_api_completion(response.get("output", {}))

elif resource.type == "chat":
choices = response.get("choices", [])
Expand Down
Loading
Loading