diff --git a/langfuse/openai.py b/langfuse/openai.py index 96fd55ce0..0b092535a 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -185,6 +185,93 @@ class OpenAiDefinition: ] +_RESPONSES_PROMPT_FIELDS = ("tools", "tool_choice", "parallel_tool_calls") +_STRUCTURED_OUTPUT_METADATA_FIELDS = ("response_format", "text_format") + + +def _is_not_given(value: Any) -> bool: + return isinstance(value, NotGiven) + + +def _get_attr_or_item(value: Any, key: str, default: Any = None) -> Any: + if isinstance(value, dict): + return value.get(key, default) + + return getattr(value, key, default) + + +def _serialize_openai_value(value: Any) -> Any: + """Convert OpenAI SDK request/response wrapper values into plain data.""" + + if _is_not_given(value): + return None + + if isclass(value) and issubclass(value, BaseModel): + return value.model_json_schema() + + if isinstance(value, BaseModel): + value.model_rebuild() + + try: + return _serialize_openai_value( + value.model_dump(mode="json", warnings=False) + ) + except Exception: + return _serialize_openai_value(value.model_dump(warnings=False)) + + if isinstance(value, dict): + return { + key: _serialize_openai_value(val) + for key, val in value.items() + if not _is_not_given(val) + } + + if isinstance(value, (list, tuple)): + return [_serialize_openai_value(item) for item in value] + + return value + + +def _get_structured_output_metadata(metadata: Optional[Any], kwargs: Any) -> Any: + structured_output_metadata = {} + + for key in _STRUCTURED_OUTPUT_METADATA_FIELDS: + value = kwargs.get(key, None) + + if value is not None and not _is_not_given(value): + structured_output_metadata[key] = _serialize_openai_value(value) + + if not structured_output_metadata: + return metadata + + metadata_dict = ( + metadata.model_dump() if isinstance(metadata, BaseModel) else metadata + ) + + if metadata_dict is None: + metadata_dict = {} + + if not isinstance(metadata_dict, dict): + metadata_dict = {} + + return {**metadata_dict, **structured_output_metadata} + + +def _extract_response_api_completion(output: Any) -> Any: + output = _serialize_openai_value(output) + + if not isinstance(output, list): + return output + + if len(output) > 1: + return output + + if len(output) == 1: + return output[0] + + return None + + class OpenAiArgsExtractor: def __init__( self, @@ -199,17 +286,7 @@ def __init__( **kwargs: Any, ) -> None: self.args = {} - self.args["metadata"] = ( - metadata - if "response_format" not in kwargs - else { - **(metadata or {}), - "response_format": kwargs["response_format"].model_json_schema() - if isclass(kwargs["response_format"]) - and issubclass(kwargs["response_format"], BaseModel) - else kwargs["response_format"], - } - ) + self.args["metadata"] = _get_structured_output_metadata(metadata, kwargs) self.args["name"] = name self.args["langfuse_public_key"] = langfuse_public_key self.args["langfuse_prompt"] = langfuse_prompt @@ -231,7 +308,8 @@ def get_openai_args(self) -> Any: # OpenAI does not support non-string type values in metadata when using # model distillation feature - self.kwargs["metadata"].pop("response_format", None) + for key in _STRUCTURED_OUTPUT_METADATA_FIELDS: + self.kwargs["metadata"].pop(key, None) return self.kwargs @@ -249,6 +327,13 @@ def wrapper(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any: def _extract_responses_prompt(kwargs: Any) -> Any: input_value = kwargs.get("input", None) instructions = kwargs.get("instructions", None) + prompt_fields = {} + + for key in _RESPONSES_PROMPT_FIELDS: + value = kwargs.get(key, None) + + if value is not None and not isinstance(value, NotGiven): + prompt_fields[key] = _serialize_openai_value(value) if isinstance(input_value, NotGiven): input_value = None @@ -257,21 +342,29 @@ def _extract_responses_prompt(kwargs: Any) -> Any: instructions = None if instructions is None: - return input_value - - if input_value is None: - return {"instructions": instructions} - - if isinstance(input_value, str): - return [ + prompt = input_value + elif input_value is None: + prompt = {"instructions": instructions} + elif isinstance(input_value, str): + prompt = [ {"role": "system", "content": instructions}, {"role": "user", "content": input_value}, ] + elif isinstance(input_value, list): + prompt = [{"role": "system", "content": instructions}, *input_value] + else: + prompt = {"instructions": instructions, "input": input_value} + + if not prompt_fields: + return prompt - if isinstance(input_value, list): - return [{"role": "system", "content": instructions}, *input_value] + if isinstance(prompt, dict) and set(prompt.keys()) <= {"instructions", "input"}: + return {**prompt, **prompt_fields} - return {"instructions": instructions, "input": input_value} + if prompt is not None: + return {"input": prompt, **prompt_fields} + + return prompt_fields def _extract_chat_prompt(kwargs: Any) -> Any: @@ -620,13 +713,7 @@ def _extract_streamed_response_api_response(chunks: Any) -> Any: metadata[key] = val if key == "output": - output = val - if not isinstance(output, list): - completion = output - elif len(output) > 1: - completion = output - elif len(output) == 1: - completion = output[0] + completion = _extract_response_api_completion(val) return (model, completion, usage, metadata) @@ -670,7 +757,8 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any: if completion["content"] is None else completion["content"] + delta.get("content", None) ) - elif delta.get("function_call", None) is not None: + + if delta.get("function_call", None) is not None: curr = completion["function_call"] tool_call_chunk = delta.get("function_call", None) @@ -686,70 +774,86 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any: ) curr["arguments"] += getattr(tool_call_chunk, "arguments", "") - elif ( + if ( delta.get("tool_calls", None) is not None and len(delta.get("tool_calls")) > 0 ): curr = completion["tool_calls"] - tool_call_chunk = getattr( - delta.get("tool_calls", None)[0], "function", None - ) if not curr: - completion["tool_calls"] = [ - { - "name": getattr(tool_call_chunk, "name", ""), - "arguments": getattr(tool_call_chunk, "arguments", ""), - } - ] - - elif getattr(tool_call_chunk, "name", None) is not None: - curr.append( - { - "name": getattr(tool_call_chunk, "name", None), - "arguments": getattr( - tool_call_chunk, "arguments", None - ), - } - ) + completion["tool_calls"] = [] + curr = completion["tool_calls"] - else: - curr[-1]["name"] = curr[-1]["name"] or getattr( - tool_call_chunk, "name", None + for raw_tool_call in delta.get("tool_calls", []): + index = _get_attr_or_item(raw_tool_call, "index", None) + + if not isinstance(index, int): + index = len(curr) - 1 if curr else 0 + + while len(curr) <= index: + curr.append({"function": {"name": "", "arguments": ""}}) + + current_tool_call = curr[index] + tool_call_id = _get_attr_or_item(raw_tool_call, "id", None) + tool_call_type = _get_attr_or_item(raw_tool_call, "type", None) + tool_call_chunk = _get_attr_or_item( + raw_tool_call, "function", None ) - if curr[-1]["arguments"] is None: - curr[-1]["arguments"] = "" + if tool_call_id is not None: + current_tool_call["id"] = tool_call_id + + if tool_call_type is not None: + current_tool_call["type"] = tool_call_type + + if tool_call_chunk is None: + continue - curr[-1]["arguments"] += getattr( - tool_call_chunk, "arguments", "" + function_call = current_tool_call.setdefault("function", {}) + tool_name = _get_attr_or_item(tool_call_chunk, "name", None) + tool_arguments = _get_attr_or_item( + tool_call_chunk, "arguments", None ) + if tool_name is not None: + function_call["name"] = ( + function_call.get("name") or tool_name + ) + + if tool_arguments is not None: + function_call["arguments"] = ( + function_call.get("arguments") or "" + ) + tool_arguments + if resource.type == "completion": completion += choice.get("text", "") def get_response_for_chat() -> Any: - return ( - completion["content"] - or ( - completion["function_call"] - and { - "role": "assistant", - "function_call": completion["function_call"], - } - ) - or ( - completion["tool_calls"] - and { - "role": "assistant", - # "tool_calls": [{"function": completion["tool_calls"]}], - "tool_calls": [ - {"function": data} for data in completion["tool_calls"] - ], - } - ) - or None - ) + content = completion["content"] + + if completion["tool_calls"]: + response = { + "role": "assistant", + "tool_calls": completion["tool_calls"], + } + + if content is not None: + response["content"] = content + + return response + + if completion["function_call"]: + response = { + "role": "assistant", + "function_call": completion["function_call"], + } + + if content is not None: + response["content"] = content + + return response + + return content or None return ( model, @@ -777,14 +881,7 @@ def _get_langfuse_data_from_default_response( completion = choice.text if _is_openai_v1() else choice.get("text", None) elif resource.object == "Responses" or resource.object == "AsyncResponses": - output = response.get("output", {}) - - if not isinstance(output, list): - completion = output - elif len(output) > 1: - completion = output - elif len(output) == 1: - completion = output[0] + completion = _extract_response_api_completion(response.get("output", {})) elif resource.type == "chat": choices = response.get("choices", []) diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index 72923f425..937c96bf0 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -3,6 +3,8 @@ from unittest.mock import patch import pytest +from openai.types.responses import ParsedResponseOutputMessage, ParsedResponseOutputText +from pydantic import BaseModel import langfuse.openai as lf_openai_module from langfuse._client.attributes import LangfuseOtelSpanAttributes @@ -159,6 +161,78 @@ def _make_chat_stream_chunks_with_trailing_content_filter_chunk(): ] +def _make_chat_stream_chunks_with_content_before_tool_call(): + usage = SimpleNamespace(prompt_tokens=10, completion_tokens=5, total_tokens=15) + + return [ + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role="assistant", + content="\n\n", + function_call=None, + tool_calls=None, + ), + finish_reason=None, + ) + ], + usage=None, + ), + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role=None, + content=None, + function_call=None, + tool_calls=[ + SimpleNamespace( + index=0, + id="call_weather", + type="function", + function=SimpleNamespace( + name="get_weather", + arguments='{"city"', + ), + ) + ], + ), + finish_reason=None, + ) + ], + usage=None, + ), + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role=None, + content=None, + function_call=None, + tool_calls=[ + SimpleNamespace( + index=0, + id=None, + type=None, + function=SimpleNamespace( + name=None, + arguments=': "Berlin"}', + ), + ) + ], + ), + finish_reason="tool_calls", + ) + ], + usage=usage, + ), + ] + + def _make_single_chunk_stream(): return SimpleNamespace( model="gpt-4o-mini", @@ -237,6 +311,79 @@ def test_chat_completion_exports_generation_span( } +def test_streaming_chat_completion_preserves_tool_calls_after_content(): + model, completion, usage, metadata = ( + lf_openai_module._extract_streamed_openai_response( + SimpleNamespace(type="chat"), + _make_chat_stream_chunks_with_content_before_tool_call(), + ) + ) + + assert model == "gpt-4o-mini" + assert completion == { + "role": "assistant", + "content": "\n\n", + "tool_calls": [ + { + "id": "call_weather", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"city": "Berlin"}', + }, + } + ], + } + assert usage.prompt_tokens == 10 + assert metadata == {"finish_reason": "tool_calls"} + + +def test_response_api_output_serializes_openai_parsed_response_objects(): + class ParsedOutput(BaseModel): + name: str + + _, completion, _ = lf_openai_module._get_langfuse_data_from_default_response( + SimpleNamespace(type="chat", object="Responses"), + { + "model": "gpt-4.1-mini", + "output": [ + ParsedResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + status="completed", + content=[ + ParsedResponseOutputText( + annotations=[], + text='{"name":"dave"}', + type="output_text", + parsed=ParsedOutput(name="dave"), + ) + ], + ) + ], + "usage": None, + }, + ) + + assert completion == { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [ + { + "annotations": [], + "text": '{"name":"dave"}', + "type": "output_text", + "logprobs": None, + "parsed": {"name": "dave"}, + } + ], + "phase": None, + } + + def test_streaming_chat_completion_exports_ttft( langfuse_memory_client, get_span, json_attr ): diff --git a/tests/unit/test_openai_prompt_extraction.py b/tests/unit/test_openai_prompt_extraction.py index 9d1ab02cc..fd102f982 100644 --- a/tests/unit/test_openai_prompt_extraction.py +++ b/tests/unit/test_openai_prompt_extraction.py @@ -1,4 +1,5 @@ import pytest +from pydantic import BaseModel try: # Compatibility across OpenAI SDK versions where NOT_GIVEN export moved. @@ -6,7 +7,12 @@ except ImportError: from openai._types import NOT_GIVEN -from langfuse.openai import _extract_responses_prompt +from langfuse.openai import ( + OpenAiArgsExtractor, + OpenAiDefinition, + _extract_responses_prompt, + _get_langfuse_data_from_kwargs, +) @pytest.mark.parametrize( @@ -40,7 +46,72 @@ ), ({"instructions": NOT_GIVEN, "input": "Hello!"}, "Hello!"), ({"instructions": NOT_GIVEN, "input": NOT_GIVEN}, None), + ( + { + "input": "Search for the weather in Berlin.", + "tools": [ + { + "type": "function", + "name": "get_weather", + "parameters": {"type": "object"}, + } + ], + "tool_choice": "auto", + "parallel_tool_calls": False, + }, + { + "input": "Search for the weather in Berlin.", + "tools": [ + { + "type": "function", + "name": "get_weather", + "parameters": {"type": "object"}, + } + ], + "tool_choice": "auto", + "parallel_tool_calls": False, + }, + ), + ( + { + "instructions": "You are helpful.", + "input": "Hello!", + "tools": [{"type": "web_search_preview"}], + }, + { + "input": [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello!"}, + ], + "tools": [{"type": "web_search_preview"}], + }, + ), ], ) def test_extract_responses_prompt(kwargs, expected): assert _extract_responses_prompt(kwargs) == expected + + +def test_responses_parse_text_format_is_captured_as_metadata(): + class ResponseFormat(BaseModel): + name: str + + resource = OpenAiDefinition( + module="", + object="Responses", + method="parse", + type="chat", + sync=True, + ) + args = OpenAiArgsExtractor( + model="gpt-4.1", + input="What is your name?", + text_format=ResponseFormat, + ).get_langfuse_args() + + langfuse_data = _get_langfuse_data_from_kwargs(resource, args) + + assert ( + langfuse_data["metadata"]["text_format"]["properties"]["name"]["type"] + == "string" + )