From cb0156f16146c40297bb613af3e8d0b52a3d391b Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Tue, 28 Apr 2026 21:42:33 -0400 Subject: [PATCH 01/12] wip: move the streams to a generic ABC wrapper. --- .../instrumentation/openai_v2/chat_buffers.py | 52 ++++ .../openai_v2/chat_wrappers.py | 221 ++++++++++++++ .../instrumentation/openai_v2/patch.py | 170 ++--------- .../instrumentation/openai_v2/utils.py | 45 +-- .../tests/test_async_chat_completions.py | 55 +++- .../tests/test_chat_completions.py | 44 +++ .../tests/test_choice_buffer.py | 2 +- .../src/opentelemetry/util/genai/_stream.py | 193 ++++++++++++ .../tests/test_stream.py | 286 ++++++++++++++++++ 9 files changed, 891 insertions(+), 177 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_buffers.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py create mode 100644 util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py create mode 100644 util/opentelemetry-util-genai/tests/test_stream.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_buffers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_buffers.py new file mode 100644 index 0000000000..bfa5d21a57 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_buffers.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ToolCallBuffer: + def __init__(self, index, tool_call_id, function_name): + self.index = index + self.function_name = function_name + self.tool_call_id = tool_call_id + self.arguments = [] + + def append_arguments(self, arguments): + if arguments is not None: + self.arguments.append(arguments) + + +class ChoiceBuffer: + def __init__(self, index): + self.index = index + self.finish_reason = None + self.text_content = [] + self.tool_calls_buffers = [] + + def append_text_content(self, content): + self.text_content.append(content) + + def append_tool_call(self, tool_call): + idx = tool_call.index + for _ in range(len(self.tool_calls_buffers), idx + 1): + self.tool_calls_buffers.append(None) + + function = tool_call.function + if not self.tool_calls_buffers[idx]: + self.tool_calls_buffers[idx] = ToolCallBuffer( + idx, + tool_call.id, + function.name if function else None, + ) + + if function: + self.tool_calls_buffers[idx].append_arguments(function.arguments) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py new file mode 100644 index 0000000000..50764b02b7 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -0,0 +1,221 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +from typing import Any, Optional + +from openai import Stream + +from opentelemetry.semconv._incubating.attributes import ( + openai_attributes as OpenAIAttributes, +) +from opentelemetry.util.genai.invocation import InferenceInvocation +from opentelemetry.util.genai._stream import ( + AsyncStreamWrapper, + SyncStreamWrapper, +) +from opentelemetry.util.genai.types import ( + OutputMessage, + Text, + ToolCallRequest, +) + +from .chat_buffers import ChoiceBuffer + + +class ChatStreamWrapper(SyncStreamWrapper[Any]): + invocation: InferenceInvocation + response_id: Optional[str] = None + response_model: Optional[str] = None + service_tier: Optional[str] = None + finish_reasons: list = [] + prompt_tokens: Optional[int] = None + completion_tokens: Optional[int] = None + + def __init__( + self, + stream: Stream, + invocation: InferenceInvocation, + capture_content: bool, + ): + SyncStreamWrapper.__init__(self, stream) + self.stream = stream + self.invocation = invocation + self.choice_buffers = [] + self._started = True + self.capture_content = capture_content + + def _set_response_model(self, chunk): + if self.response_model: + return + + if getattr(chunk, "model", None): + self.response_model = chunk.model + + def _set_response_id(self, chunk): + if self.response_id: + return + + if getattr(chunk, "id", None): + self.response_id = chunk.id + + def _set_response_service_tier(self, chunk): + if self.service_tier: + return + + if getattr(chunk, "service_tier", None): + self.service_tier = chunk.service_tier + + def _build_streaming_response(self, chunk): + if getattr(chunk, "choices", None) is None: + return + + choices = chunk.choices + for choice in choices: + if not choice.delta: + continue + + for idx in range(len(self.choice_buffers), choice.index + 1): + self.choice_buffers.append(ChoiceBuffer(idx)) + + if choice.finish_reason: + self.choice_buffers[ + choice.index + ].finish_reason = choice.finish_reason + + if choice.delta.content is not None: + self.choice_buffers[choice.index].append_text_content( + choice.delta.content + ) + + if choice.delta.tool_calls is not None: + for tool_call in choice.delta.tool_calls: + self.choice_buffers[choice.index].append_tool_call( + tool_call + ) + + def _set_usage(self, chunk): + if getattr(chunk, "usage", None): + self.completion_tokens = chunk.usage.completion_tokens + self.prompt_tokens = chunk.usage.prompt_tokens + + def _process_chunk(self, chunk): + self._set_response_id(chunk) + self._set_response_model(chunk) + self._set_response_service_tier(chunk) + self._build_streaming_response(chunk) + self._set_usage(chunk) + + def _set_output_messages(self): + if not self.capture_content: # optimization + return + output_messages = [] + for choice in self.choice_buffers: + message = OutputMessage( + role="assistant", + finish_reason=choice.finish_reason or "error", + parts=[], + ) + if choice.text_content: + message.parts.append( + Text(content="".join(choice.text_content)) + ) + if choice.tool_calls_buffers: + tool_calls = [] + for tool_call in choice.tool_calls_buffers: + arguments = None + arguments_str = "".join(tool_call.arguments) + if arguments_str: + try: + arguments = json.loads(arguments_str) + except json.JSONDecodeError: + arguments = arguments_str + tool_call_part = ToolCallRequest( + name=tool_call.function_name, + id=tool_call.tool_call_id, + arguments=arguments, + ) + tool_calls.append(tool_call_part) + message.parts.extend(tool_calls) + output_messages.append(message) + + self.invocation.output_messages = output_messages + + def _stop_stream(self) -> None: + self.cleanup() + + def _fail_stream(self, error: BaseException) -> None: + self.cleanup(error) + + def parse(self): + """Called when using with_raw_response with stream=True.""" + return self + + def cleanup(self, error: Optional[BaseException] = None): + if not self._started: + return + + self.invocation.response_model_name = self.response_model + self.invocation.response_id = self.response_id + self.invocation.input_tokens = self.prompt_tokens + self.invocation.output_tokens = self.completion_tokens + # TODO: Derive finish_reasons from choice_buffers so streaming + # invocations match non-streaming response finalization. + self.invocation.finish_reasons = self.finish_reasons + if self.service_tier: + self.invocation.attributes.update( + { + OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier + }, + ) + + self._set_output_messages() + + if error: + self.invocation.fail(error) + else: + self.invocation.stop() + self._started = False + + +class AsyncChatStreamWrapper( + AsyncStreamWrapper[Any], + ChatStreamWrapper, +): + invocation: InferenceInvocation + + def __init__( + self, + stream: Stream, + invocation: InferenceInvocation, + capture_content: bool, + ): + ChatStreamWrapper.__init__(self, stream, invocation, capture_content) + + def _process_chunk(self, chunk): + ChatStreamWrapper._process_chunk(self, chunk) + + def _stop_stream(self) -> None: + ChatStreamWrapper._stop_stream(self) + + def _fail_stream(self, error: BaseException) -> None: + ChatStreamWrapper._fail_stream(self, error) + + +__all__ = [ + "AsyncChatStreamWrapper", + "ChatStreamWrapper", +] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 30f74cfab1..a5b0b40afb 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -13,7 +13,6 @@ # limitations under the License. -import json from timeit import default_timer from typing import Any, Optional @@ -33,25 +32,23 @@ from opentelemetry.trace import Span, SpanKind, Tracer from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( ContentCapturingMode, - Error, - LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation - OutputMessage, - Text, - ToolCallRequest, ) +from .chat_buffers import ChoiceBuffer +from .chat_wrappers import AsyncChatStreamWrapper, ChatStreamWrapper from .instruments import Instruments from .utils import ( _prepare_output_messages, choice_to_event, - create_chat_invocation, get_llm_request_attributes, handle_span_exception, is_streaming, message_to_event, set_span_attribute, + start_chat_invocation, ) @@ -128,10 +125,8 @@ def chat_completions_create_v_new( capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT def traced_method(wrapped, instance, args, kwargs): - chat_invocation = handler.start_llm( - create_chat_invocation( - kwargs, instance, capture_content=capture_content - ) + chat_invocation = start_chat_invocation( + handler, kwargs, instance, capture_content=capture_content ) try: @@ -143,18 +138,16 @@ def traced_method(wrapped, instance, args, kwargs): parsed_result = result if is_streaming(kwargs): return ChatStreamWrapper( - parsed_result, handler, chat_invocation, capture_content + parsed_result, chat_invocation, capture_content ) _set_response_properties( chat_invocation, parsed_result, capture_content ) - handler.stop_llm(chat_invocation) + chat_invocation.stop() return result except Exception as error: - handler.fail_llm( - chat_invocation, Error(type=type(error), message=str(error)) - ) + chat_invocation.fail(error) raise return traced_method @@ -232,10 +225,8 @@ def async_chat_completions_create_v_new( capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT async def traced_method(wrapped, instance, args, kwargs): - chat_invocation = handler.start_llm( - create_chat_invocation( - kwargs, instance, capture_content=capture_content - ) + chat_invocation = start_chat_invocation( + handler, kwargs, instance, capture_content=capture_content ) try: @@ -246,20 +237,18 @@ async def traced_method(wrapped, instance, args, kwargs): else: parsed_result = result if is_streaming(kwargs): - return ChatStreamWrapper( - parsed_result, handler, chat_invocation, capture_content + return AsyncChatStreamWrapper( + parsed_result, chat_invocation, capture_content ) _set_response_properties( chat_invocation, parsed_result, capture_content ) - handler.stop_llm(chat_invocation) + chat_invocation.stop() return result except Exception as error: - handler.fail_llm( - chat_invocation, Error(type=type(error), message=str(error)) - ) + chat_invocation.fail(error) raise return traced_method @@ -495,8 +484,8 @@ def _set_response_attributes(span, result): def _set_response_properties( - chat_invocation: LLMInvocation, result, capture_content: bool -) -> LLMInvocation: + chat_invocation: InferenceInvocation, result, capture_content: bool +) -> InferenceInvocation: if getattr(result, "model", None): chat_invocation.response_model_name = result.model @@ -574,46 +563,6 @@ def _set_embeddings_response_attributes( # Don't set output tokens for embeddings as all tokens are input tokens -class ToolCallBuffer: - def __init__(self, index, tool_call_id, function_name): - self.index = index - self.function_name = function_name - self.tool_call_id = tool_call_id - self.arguments = [] - - def append_arguments(self, arguments): - if arguments is not None: - self.arguments.append(arguments) - - -class ChoiceBuffer: - def __init__(self, index): - self.index = index - self.finish_reason = None - self.text_content = [] - self.tool_calls_buffers = [] - - def append_text_content(self, content): - self.text_content.append(content) - - def append_tool_call(self, tool_call): - idx = tool_call.index - # make sure we have enough tool call buffers - for _ in range(len(self.tool_calls_buffers), idx + 1): - self.tool_calls_buffers.append(None) - - function = tool_call.function - if not self.tool_calls_buffers[idx]: - self.tool_calls_buffers[idx] = ToolCallBuffer( - idx, - tool_call.id, - function.name if function else None, - ) - - if function: - self.tool_calls_buffers[idx].append_arguments(function.arguments) - - class BaseStreamWrapper: response_id: Optional[str] = None response_model: Optional[str] = None @@ -865,88 +814,3 @@ def cleanup(self, error: Optional[BaseException] = None): else: self.span.end() self._started = False - - -class ChatStreamWrapper(BaseStreamWrapper): - handler: TelemetryHandler - invocation: LLMInvocation - response_id: Optional[str] = None - response_model: Optional[str] = None - service_tier: Optional[str] = None - finish_reasons: list = [] - prompt_tokens: Optional[int] = None - completion_tokens: Optional[int] = None - - def __init__( - self, - stream: Stream, - handler: TelemetryHandler, - invocation: LLMInvocation, - capture_content: bool, - ): - super().__init__(stream, capture_content=capture_content) - self.stream = stream - self.handler = handler - self.invocation = invocation - self.choice_buffers = [] - - def _set_output_messages(self): - if not self.capture_content: # optimization - return - output_messages = [] - for choice in self.choice_buffers: - message = OutputMessage( - role="assistant", - finish_reason=choice.finish_reason or "error", - parts=[], - ) - if choice.text_content: - message.parts.append( - Text(content="".join(choice.text_content)) - ) - if choice.tool_calls_buffers: - tool_calls = [] - for tool_call in choice.tool_calls_buffers: - arguments = None - arguments_str = "".join(tool_call.arguments) - if arguments_str: - try: - arguments = json.loads(arguments_str) - except json.JSONDecodeError: - arguments = arguments_str - tool_call_part = ToolCallRequest( - name=tool_call.function_name, - id=tool_call.tool_call_id, - arguments=arguments, - ) - tool_calls.append(tool_call_part) - message.parts.extend(tool_calls) - output_messages.append(message) - - self.invocation.output_messages = output_messages - - def cleanup(self, error: Optional[BaseException] = None): - if not self._started: - return - - self.invocation.response_model_name = self.response_model - self.invocation.response_id = self.response_id - self.invocation.input_tokens = self.prompt_tokens - self.invocation.output_tokens = self.completion_tokens - self.invocation.finish_reasons = self.finish_reasons - if self.service_tier: - self.invocation.attributes.update( - { - OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier - }, - ) - - self._set_output_messages() - - if error: - self.handler.fail_llm( - self.invocation, Error(type=type(error), message=str(error)) - ) - else: - self.handler.stop_llm(self.invocation) - self._started = False diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index 128697fe12..e3192b7070 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -39,9 +39,10 @@ from opentelemetry.util.genai.environment_variables import ( OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, ) +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( InputMessage, - LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation OutputMessage, Text, ToolCallRequest, @@ -327,35 +328,35 @@ def get_llm_request_attributes( return {k: v for k, v in attributes.items() if value_is_set(v)} -def create_chat_invocation( +def start_chat_invocation( + handler: TelemetryHandler, kwargs, client_instance, capture_content: bool, -) -> LLMInvocation: +) -> InferenceInvocation: # pylint: disable=too-many-branches - llm_invocation = LLMInvocation(request_model=kwargs.get("model", "")) - llm_invocation.provider = ( - GenAIAttributes.GenAiProviderNameValues.OPENAI.value + address, port = get_server_address_and_port(client_instance) + chat_invocation = handler.start_inference( + GenAIAttributes.GenAiProviderNameValues.OPENAI.value, + request_model=get_value(kwargs.get("model")), + server_address=address, + server_port=port, + ) + chat_invocation.temperature = get_value(kwargs.get("temperature")) + chat_invocation.top_p = get_value(kwargs.get("p") or kwargs.get("top_p")) + chat_invocation.max_tokens = get_value(kwargs.get("max_tokens")) + chat_invocation.presence_penalty = get_value( + kwargs.get("presence_penalty") ) - llm_invocation.temperature = get_value(kwargs.get("temperature")) - llm_invocation.top_p = get_value(kwargs.get("p") or kwargs.get("top_p")) - llm_invocation.max_tokens = get_value(kwargs.get("max_tokens")) - llm_invocation.presence_penalty = get_value(kwargs.get("presence_penalty")) - llm_invocation.frequency_penalty = get_value( + chat_invocation.frequency_penalty = get_value( kwargs.get("frequency_penalty") ) - llm_invocation.seed = get_value(kwargs.get("seed")) + chat_invocation.seed = get_value(kwargs.get("seed")) if (stop_sequences := get_value(kwargs.get("stop"))) is not None: if isinstance(stop_sequences, str): stop_sequences = [stop_sequences] - llm_invocation.stop_sequences = stop_sequences - - address, port = get_server_address_and_port(client_instance) - if address: - llm_invocation.server_address = address - if port: - llm_invocation.server_port = port + chat_invocation.stop_sequences = stop_sequences attributes = {} if (choice_count := get_value(kwargs.get("n"))) is not None: @@ -391,13 +392,13 @@ def create_chat_invocation( attributes[OpenAIAttributes.OPENAI_REQUEST_SERVICE_TIER] = service_tier if len(attributes) > 0: - llm_invocation.attributes = attributes + chat_invocation.attributes = attributes if capture_content: # optimization - llm_invocation.input_messages = _prepare_input_messages( + chat_invocation.input_messages = _prepare_input_messages( kwargs.get("messages", []) ) - return llm_invocation + return chat_invocation def get_value(v: Any): diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_async_chat_completions.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_async_chat_completions.py index c19a8b3d5b..2f4c916938 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_async_chat_completions.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_async_chat_completions.py @@ -197,6 +197,18 @@ async def test_async_chat_completion_404( assert "NotFoundError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] +@pytest.mark.asyncio() +async def test_async_chat_completion_api_exception_propagates( + async_openai_client, instrument_no_content, vcr +): + with vcr.use_cassette("test_async_chat_completion_404.yaml"): + with pytest.raises(NotFoundError): + await async_openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model="this-model-does-not-exist", + ) + + @pytest.mark.asyncio() async def test_async_chat_completion_extra_params( span_exporter, async_openai_client, instrument_no_content, vcr @@ -883,6 +895,44 @@ async def test_async_chat_completion_streaming( ) +@pytest.mark.asyncio() +async def test_async_chat_completion_streaming_user_exception_propagates( + span_exporter, + async_openai_client, + instrument_with_content, + vcr, +): + latest_experimental_enabled = is_experimental_mode() + llm_model_value = "gpt-4" + kwargs = { + "model": llm_model_value, + "messages": USER_ONLY_PROMPT, + "stream": True, + "stream_options": {"include_usage": True}, + } + response_stream_model = None + response_stream_id = None + + with vcr.use_cassette("test_async_chat_completion_streaming.yaml"): + response = await async_openai_client.chat.completions.create(**kwargs) + with pytest.raises(RuntimeError, match="user failure"): + async with response: + async for chunk in response: + response_stream_model = chunk.model + response_stream_id = chunk.id + raise RuntimeError("user failure") + + spans = span_exporter.get_finished_spans() + assert_all_attributes( + spans[0], + llm_model_value, + latest_experimental_enabled, + response_stream_id, + response_stream_model, + ) + assert "RuntimeError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] + + @pytest.mark.asyncio() async def test_async_chat_completion_streaming_not_complete( span_exporter, @@ -921,7 +971,10 @@ async def test_async_chat_completion_streaming_not_complete( response_stream_id = chunk.id idx += 1 - response.close() + if latest_experimental_enabled: + await response.close() + else: + response.close() spans = span_exporter.get_finished_spans() assert_all_attributes( spans[0], diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py index 3e4df914dc..adb857f2f2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py @@ -287,6 +287,17 @@ def test_chat_completion_404( ) +def test_chat_completion_api_exception_propagates( + openai_client, instrument_no_content, vcr +): + with vcr.use_cassette("test_chat_completion_404.yaml"): + with pytest.raises(NotFoundError): + openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model="this-model-does-not-exist", + ) + + def test_chat_completion_extra_params( span_exporter, openai_client, instrument_no_content, vcr ): @@ -997,6 +1008,39 @@ def test_chat_completion_streaming( ) +def test_chat_completion_streaming_user_exception_propagates( + span_exporter, openai_client, instrument_with_content, vcr +): + latest_experimental_enabled = is_experimental_mode() + kwargs = { + "model": DEFAULT_MODEL, + "messages": USER_ONLY_PROMPT, + "stream": True, + "stream_options": {"include_usage": True}, + } + response_stream_model = None + response_stream_id = None + + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create(**kwargs) + with pytest.raises(RuntimeError, match="user failure"): + with response: + for chunk in response: + response_stream_model = chunk.model + response_stream_id = chunk.id + raise RuntimeError("user failure") + + spans = span_exporter.get_finished_spans() + assert_all_attributes( + spans[0], + DEFAULT_MODEL, + latest_experimental_enabled, + response_stream_id, + response_stream_model, + ) + assert "RuntimeError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] + + def test_chat_completion_streaming_not_complete( span_exporter, log_exporter, openai_client, instrument_with_content, vcr ): diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py index 7717ff73b2..17aa0ea289 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py @@ -19,7 +19,7 @@ ChoiceDeltaToolCallFunction, ) -from opentelemetry.instrumentation.openai_v2.patch import ( +from opentelemetry.instrumentation.openai_v2.chat_buffers import ( ChoiceBuffer, ToolCallBuffer, ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py new file mode 100644 index 0000000000..4d4200546c --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py @@ -0,0 +1,193 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from types import TracebackType +from typing import Any, Generic, Literal, TypeVar + +ChunkT = TypeVar("ChunkT") +_logger = logging.getLogger(__name__) + + +class SyncStreamWrapper(ABC, Generic[ChunkT]): + """Base class for synchronous instrumented stream wrappers.""" + + def __init__(self, stream: Any): + self.stream = stream + self._finalized = False + + def __enter__(self): + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + try: + if exc_type is not None: + self._finalize_failure(exc_val or Exception()) + finally: + self.close() + return False + + def close(self) -> None: + try: + self.stream.close() + finally: + self._finalize_success() + + def __iter__(self): + return self + + def __next__(self) -> ChunkT: + try: + chunk = next(self.stream) + except StopIteration: + self._finalize_success() + raise + except Exception as error: + self._finalize_failure(error) + raise + try: + self._process_chunk(chunk) + except Exception as error: # pylint: disable=broad-exception-caught + self._handle_process_chunk_error(error) + return chunk + + def __getattr__(self, name: str) -> Any: + return getattr(self.stream, name) + + def _finalize_success(self) -> None: + if self._finalized: + return + self._stop_stream() + self._finalized = True + + def _finalize_failure(self, error: BaseException) -> None: + if self._finalized: + return + self._fail_stream(error) + self._finalized = True + + @abstractmethod + def _process_chunk(self, chunk: ChunkT) -> None: + """Process one stream chunk for telemetry.""" + + @abstractmethod + def _stop_stream(self) -> None: + """Finalize the stream successfully.""" + + @abstractmethod + def _fail_stream(self, error: BaseException) -> None: + """Finalize the stream with failure.""" + + @staticmethod + def _handle_process_chunk_error(_error: Exception) -> None: + _logger.debug( + "GenAI stream instrumentation error during chunk processing", + exc_info=True, + ) + + +class AsyncStreamWrapper(ABC, Generic[ChunkT]): + """Base class for asynchronous instrumented stream wrappers.""" + + def __init__(self, stream: Any): + self.stream = stream + self._finalized = False + + async def __aenter__(self): + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + try: + if exc_type is not None: + self._finalize_failure(exc_val or Exception()) + finally: + await self.close() + return False + + async def close(self) -> None: + try: + await self.stream.close() + finally: + self._finalize_success() + + def __aiter__(self): + return self + + async def __anext__(self) -> ChunkT: + try: + chunk = await self.stream.__anext__() + except StopAsyncIteration: + self._finalize_success() + raise + except Exception as error: + self._finalize_failure(error) + raise + try: + self._process_chunk(chunk) + except Exception as error: # pylint: disable=broad-exception-caught + self._handle_process_chunk_error(error) + return chunk + + def __getattr__(self, name: str) -> Any: + return getattr(self.stream, name) + + def _finalize_success(self) -> None: + if self._finalized: + return + self._stop_stream() + self._finalized = True + + def _finalize_failure(self, error: BaseException) -> None: + if self._finalized: + return + self._fail_stream(error) + self._finalized = True + + @abstractmethod + def _process_chunk(self, chunk: ChunkT) -> None: + """Process one stream chunk for telemetry.""" + + @abstractmethod + def _stop_stream(self) -> None: + """Finalize the stream successfully.""" + + @abstractmethod + def _fail_stream(self, error: BaseException) -> None: + """Finalize the stream with failure.""" + + @staticmethod + def _handle_process_chunk_error(_error: Exception) -> None: + _logger.debug( + "GenAI stream instrumentation error during chunk processing", + exc_info=True, + ) + + +__all__ = [ + "AsyncStreamWrapper", + "SyncStreamWrapper", +] diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py new file mode 100644 index 0000000000..f7ecb6706d --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -0,0 +1,286 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import inspect + +import pytest + +from opentelemetry.util.genai._stream import ( + AsyncStreamWrapper, + SyncStreamWrapper, +) + + +def test_stream_wrapper_abstract_method_signatures_match(): + method_names = ( + "_process_chunk", + "_stop_stream", + "_fail_stream", + "_handle_process_chunk_error", + ) + + for method_name in method_names: + assert inspect.signature( + getattr(SyncStreamWrapper, method_name) + ) == inspect.signature(getattr(AsyncStreamWrapper, method_name)) + + +class _FakeSyncStream: + def __init__(self, chunks=None, error=None): + self._chunks = list(chunks or []) + self._error = error + self.close_count = 0 + self.extra_attribute = "passthrough" + + def __next__(self): + if self._chunks: + return self._chunks.pop(0) + if self._error: + raise self._error + raise StopIteration + + def close(self): + self.close_count += 1 + + +class _TestSyncStreamWrapper(SyncStreamWrapper): + def __init__(self, stream): + super().__init__(stream) + self.processed = [] + self.stop_count = 0 + self.failures = [] + + def _process_chunk(self, chunk): + self.processed.append(chunk) + + def _stop_stream(self): + self.stop_count += 1 + + def _fail_stream(self, error): + self.failures.append(error) + + +class _FailingSyncProcessStreamWrapper(_TestSyncStreamWrapper): + def _process_chunk(self, chunk): + raise ValueError("instrumentation failed") + + +def test_sync_stream_wrapper_processes_chunks_and_stops(): + stream = _FakeSyncStream(chunks=["chunk"]) + wrapper = _TestSyncStreamWrapper(stream) + + assert next(wrapper) == "chunk" + assert wrapper.processed == ["chunk"] + + try: + next(wrapper) + except StopIteration: + pass + + assert wrapper.stop_count == 1 + + +def test_sync_stream_wrapper_fails_stream_errors(): + error = ValueError("boom") + wrapper = _TestSyncStreamWrapper(_FakeSyncStream(error=error)) + + try: + next(wrapper) + except ValueError: + pass + + assert wrapper.failures == [error] + + +def test_sync_stream_wrapper_close_stops_once(): + stream = _FakeSyncStream(chunks=["chunk"]) + wrapper = _TestSyncStreamWrapper(stream) + + wrapper.close() + wrapper.close() + + assert stream.close_count == 2 + assert wrapper.stop_count == 1 + assert not wrapper.failures + + +def test_sync_stream_wrapper_exit_closes_and_propagates_user_errors(): + stream = _FakeSyncStream(chunks=["chunk"]) + wrapper = _TestSyncStreamWrapper(stream) + error = RuntimeError("user failure") + + assert wrapper.__exit__(RuntimeError, error, None) is False + + assert stream.close_count == 1 + assert wrapper.stop_count == 0 + assert wrapper.failures == [error] + + +def test_sync_stream_wrapper_getattr_passthrough(): + wrapper = _TestSyncStreamWrapper(_FakeSyncStream()) + + assert wrapper.extra_attribute == "passthrough" + + +def test_sync_stream_wrapper_stop_iteration_does_not_double_finalize(): + wrapper = _TestSyncStreamWrapper(_FakeSyncStream()) + + with pytest.raises(StopIteration): + next(wrapper) + wrapper.close() + + assert wrapper.stop_count == 1 + assert not wrapper.failures + + +def test_sync_stream_wrapper_swallows_process_chunk_errors(): + wrapper = _FailingSyncProcessStreamWrapper( + _FakeSyncStream(chunks=["chunk"]) + ) + + assert next(wrapper) == "chunk" + assert not wrapper.failures + + +class _FakeAsyncStream: + def __init__(self, chunks=None, error=None): + self._chunks = list(chunks or []) + self._error = error + self.close_count = 0 + self.extra_attribute = "passthrough" + + async def __anext__(self): + if self._chunks: + return self._chunks.pop(0) + if self._error: + raise self._error + raise StopAsyncIteration + + async def close(self): + self.close_count += 1 + + +class _TestAsyncStreamWrapper(AsyncStreamWrapper): + def __init__(self, stream): + super().__init__(stream) + self.processed = [] + self.stop_count = 0 + self.failures = [] + + def _process_chunk(self, chunk): + self.processed.append(chunk) + + def _stop_stream(self): + self.stop_count += 1 + + def _fail_stream(self, error): + self.failures.append(error) + + +class _FailingAsyncProcessStreamWrapper(_TestAsyncStreamWrapper): + def _process_chunk(self, chunk): + raise ValueError("instrumentation failed") + + +def test_async_stream_wrapper_processes_chunks_and_stops(): + async def exercise(): + wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream(chunks=["chunk"])) + + assert await anext(wrapper) == "chunk" + assert wrapper.processed == ["chunk"] + + try: + await anext(wrapper) + except StopAsyncIteration: + pass + + assert wrapper.stop_count == 1 + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_fails_stream_errors(): + async def exercise(): + error = ValueError("boom") + wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream(error=error)) + + with pytest.raises(ValueError): + await anext(wrapper) + + assert wrapper.failures == [error] + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_close_stops_once(): + async def exercise(): + stream = _FakeAsyncStream(chunks=["chunk"]) + wrapper = _TestAsyncStreamWrapper(stream) + + await wrapper.close() + await wrapper.close() + + assert stream.close_count == 2 + assert wrapper.stop_count == 1 + assert not wrapper.failures + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_exit_closes_and_propagates_user_errors(): + async def exercise(): + stream = _FakeAsyncStream(chunks=["chunk"]) + wrapper = _TestAsyncStreamWrapper(stream) + error = RuntimeError("user failure") + + assert await wrapper.__aexit__(RuntimeError, error, None) is False + + assert stream.close_count == 1 + assert wrapper.stop_count == 0 + assert wrapper.failures == [error] + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_getattr_passthrough(): + wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream()) + + assert wrapper.extra_attribute == "passthrough" + + +def test_async_stream_wrapper_stop_iteration_does_not_double_finalize(): + async def exercise(): + wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream()) + + with pytest.raises(StopAsyncIteration): + await anext(wrapper) + await wrapper.close() + + assert wrapper.stop_count == 1 + assert not wrapper.failures + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_swallows_process_chunk_errors(): + async def exercise(): + wrapper = _FailingAsyncProcessStreamWrapper( + _FakeAsyncStream(chunks=["chunk"]) + ) + + assert await anext(wrapper) == "chunk" + assert not wrapper.failures + + asyncio.run(exercise()) From 11258d9afd94c41798088b7277b51003daa545ed Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Tue, 28 Apr 2026 21:49:55 -0400 Subject: [PATCH 02/12] polish: add changelogs. --- .../opentelemetry-instrumentation-openai-v2/CHANGELOG.md | 2 ++ util/opentelemetry-util-genai/CHANGELOG.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md index 87dc9461e7..23b3c3eb4f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Refactor chat completion stream wrappers to use shared GenAI stream lifecycle helpers. + ([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500)) - Import `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` from `opentelemetry.util.genai.environment_variables` instead of re-defining it locally, making `opentelemetry-util-genai` the single source of truth for this constant. diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index a6c1536c0d..815af94717 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add shared sync and async stream wrapper base classes for GenAI instrumentations. + ([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500)) - Add `AgentInvocation` type with `invoke_agent` span lifecycle ([#4274](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4274)) - Add metrics support for EmbeddingInvocation From 6f4e510ead7443e80eb74a9c1f53d293a65caf06 Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Tue, 28 Apr 2026 21:54:32 -0400 Subject: [PATCH 03/12] wip: precommit. --- .../opentelemetry/instrumentation/openai_v2/chat_wrappers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py index 50764b02b7..89ac1b300b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -22,11 +22,11 @@ from opentelemetry.semconv._incubating.attributes import ( openai_attributes as OpenAIAttributes, ) -from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai._stream import ( AsyncStreamWrapper, SyncStreamWrapper, ) +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( OutputMessage, Text, From 07b4a669ae6c26207c363a017509e3e288c15d0a Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Wed, 29 Apr 2026 22:35:17 -0400 Subject: [PATCH 04/12] wip: fix the merge issues. --- .../instrumentation/openai_v2/patch.py | 87 +------------------ 1 file changed, 2 insertions(+), 85 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 372c9d93dd..1b1402c396 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -36,9 +36,6 @@ from opentelemetry.util.genai.types import ( ContentCapturingMode, Error, - OutputMessage, - Text, - ToolCallRequest, ) from .chat_buffers import ChoiceBuffer @@ -47,12 +44,12 @@ from .utils import ( _prepare_output_messages, choice_to_event, + create_chat_invocation, get_llm_request_attributes, handle_span_exception, is_streaming, message_to_event, set_span_attribute, - start_chat_invocation, ) @@ -245,7 +242,7 @@ async def traced_method(wrapped, instance, args, kwargs): else: parsed_result = result if is_streaming(kwargs): - return ChatStreamWrapper( + return AsyncChatStreamWrapper( parsed_result, chat_invocation, capture_content ) @@ -824,83 +821,3 @@ def cleanup(self, error: Optional[BaseException] = None): else: self.span.end() self._started = False - - -class ChatStreamWrapper(BaseStreamWrapper): - invocation: InferenceInvocation - response_id: Optional[str] = None - response_model: Optional[str] = None - service_tier: Optional[str] = None - finish_reasons: list = [] - prompt_tokens: Optional[int] = None - completion_tokens: Optional[int] = None - - def __init__( - self, - stream: Stream, - invocation: InferenceInvocation, - capture_content: bool, - ): - super().__init__(stream, capture_content=capture_content) - self.stream = stream - self.invocation = invocation - self.choice_buffers = [] - - def _set_output_messages(self): - if not self.capture_content: # optimization - return - output_messages = [] - for choice in self.choice_buffers: - message = OutputMessage( - role="assistant", - finish_reason=choice.finish_reason or "error", - parts=[], - ) - if choice.text_content: - message.parts.append( - Text(content="".join(choice.text_content)) - ) - if choice.tool_calls_buffers: - tool_calls = [] - for tool_call in choice.tool_calls_buffers: - arguments = None - arguments_str = "".join(tool_call.arguments) - if arguments_str: - try: - arguments = json.loads(arguments_str) - except json.JSONDecodeError: - arguments = arguments_str - tool_call_part = ToolCallRequest( - name=tool_call.function_name, - id=tool_call.tool_call_id, - arguments=arguments, - ) - tool_calls.append(tool_call_part) - message.parts.extend(tool_calls) - output_messages.append(message) - - self.invocation.output_messages = output_messages - - def cleanup(self, error: Optional[BaseException] = None): - if not self._started: - return - - self.invocation.response_model_name = self.response_model - self.invocation.response_id = self.response_id - self.invocation.input_tokens = self.prompt_tokens - self.invocation.output_tokens = self.completion_tokens - self.invocation.finish_reasons = self.finish_reasons - if self.service_tier: - self.invocation.attributes.update( - { - OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier - }, - ) - - self._set_output_messages() - - if error: - self.invocation.fail(Error(type=type(error), message=str(error))) - else: - self.invocation.stop() - self._started = False From 04e067ccacd40ccd09fecb412f222ed3d4798efb Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Thu, 30 Apr 2026 20:53:45 -0400 Subject: [PATCH 05/12] polish: pr feedback. --- AGENTS.md | 5 ++ CLAUDE.md | 7 ++ .../openai_v2/chat_wrappers.py | 64 +++++++++---------- .../src/opentelemetry/util/genai/_stream.py | 27 +++++++- 4 files changed, 67 insertions(+), 36 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 09ab2575fb..a5ff9b853e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -76,6 +76,11 @@ Apply to packages under `instrumentation/` and `instrumentation-genai/`. - When catching exceptions from the underlying library to record telemetry, always re-raise the original exception unmodified. - Do not raise new exceptions in instrumentation/telemetry code. +- For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper` + helpers from `opentelemetry.util.genai._stream` instead of reimplementing iteration, + close/context-manager, and finalization behavior in provider packages. +- Put provider-specific chunk parsing and telemetry finalization in private hook methods or a + narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers. ### Semantic conventions diff --git a/CLAUDE.md b/CLAUDE.md index 43c994c2d3..b15db69322 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,8 @@ @AGENTS.md + +For GenAI streaming wrappers, use `SyncStreamWrapper` and +`AsyncStreamWrapper` from `opentelemetry.util.genai._stream` instead of +reimplementing iteration, close/context-manager, and finalization behavior in +provider packages. Keep provider-specific chunk parsing and telemetry +finalization in private hooks or a narrow mixin, and do not make async stream +wrappers inherit from sync stream wrappers. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py index 89ac1b300b..9cd1ab8e92 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -17,7 +17,7 @@ import json from typing import Any, Optional -from openai import Stream +from openai import AsyncStream, Stream from opentelemetry.semconv._incubating.attributes import ( openai_attributes as OpenAIAttributes, @@ -36,8 +36,12 @@ from .chat_buffers import ChoiceBuffer -class ChatStreamWrapper(SyncStreamWrapper[Any]): +class _ChatStreamMixin: + """Chat-specific hooks shared by sync and async stream wrappers.""" + invocation: InferenceInvocation + capture_content: bool + choice_buffers: list response_id: Optional[str] = None response_model: Optional[str] = None service_tier: Optional[str] = None @@ -45,19 +49,6 @@ class ChatStreamWrapper(SyncStreamWrapper[Any]): prompt_tokens: Optional[int] = None completion_tokens: Optional[int] = None - def __init__( - self, - stream: Stream, - invocation: InferenceInvocation, - capture_content: bool, - ): - SyncStreamWrapper.__init__(self, stream) - self.stream = stream - self.invocation = invocation - self.choice_buffers = [] - self._started = True - self.capture_content = capture_content - def _set_response_model(self, chunk): if self.response_model: return @@ -155,19 +146,16 @@ def _set_output_messages(self): self.invocation.output_messages = output_messages def _stop_stream(self) -> None: - self.cleanup() + self._cleanup() def _fail_stream(self, error: BaseException) -> None: - self.cleanup(error) + self._cleanup(error) def parse(self): """Called when using with_raw_response with stream=True.""" return self - def cleanup(self, error: Optional[BaseException] = None): - if not self._started: - return - + def _cleanup(self, error: Optional[BaseException] = None) -> None: self.invocation.response_model_name = self.response_model self.invocation.response_id = self.response_id self.invocation.input_tokens = self.prompt_tokens @@ -188,31 +176,39 @@ def cleanup(self, error: Optional[BaseException] = None): self.invocation.fail(error) else: self.invocation.stop() - self._started = False -class AsyncChatStreamWrapper( - AsyncStreamWrapper[Any], - ChatStreamWrapper, +class ChatStreamWrapper( + _ChatStreamMixin, + SyncStreamWrapper[Any], ): - invocation: InferenceInvocation - def __init__( self, stream: Stream, invocation: InferenceInvocation, capture_content: bool, ): - ChatStreamWrapper.__init__(self, stream, invocation, capture_content) + super().__init__(stream) + self.invocation = invocation + self.choice_buffers = [] + self.capture_content = capture_content - def _process_chunk(self, chunk): - ChatStreamWrapper._process_chunk(self, chunk) - def _stop_stream(self) -> None: - ChatStreamWrapper._stop_stream(self) +class AsyncChatStreamWrapper( + _ChatStreamMixin, + AsyncStreamWrapper[Any], +): - def _fail_stream(self, error: BaseException) -> None: - ChatStreamWrapper._fail_stream(self, error) + def __init__( + self, + stream: AsyncStream, + invocation: InferenceInvocation, + capture_content: bool, + ): + super().__init__(stream) + self.invocation = invocation + self.choice_buffers = [] + self.capture_content = capture_content __all__ = [ diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py index 4d4200546c..9726258d15 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py @@ -24,7 +24,18 @@ class SyncStreamWrapper(ABC, Generic[ChunkT]): - """Base class for synchronous instrumented stream wrappers.""" + """Base class for synchronous instrumented stream wrappers. + + Subclass this when wrapping a provider SDK stream that is consumed with + normal iteration. The subclass should pass the SDK stream to + ``super().__init__(stream)`` and implement the three telemetry hooks: + ``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful + finalization, and ``_fail_stream`` for failure finalization. + + Users should consume subclasses as normal streams, for example with + ``for chunk in wrapper`` or ``with wrapper``. The hook methods are called + internally by the wrapper lifecycle and are not part of the public API. + """ def __init__(self, stream: Any): self.stream = stream @@ -106,7 +117,19 @@ def _handle_process_chunk_error(_error: Exception) -> None: class AsyncStreamWrapper(ABC, Generic[ChunkT]): - """Base class for asynchronous instrumented stream wrappers.""" + """Base class for asynchronous instrumented stream wrappers. + + Subclass this when wrapping a provider SDK stream that is consumed with + async iteration. The subclass should pass the SDK stream to + ``super().__init__(stream)`` and implement the three telemetry hooks: + ``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful + finalization, and ``_fail_stream`` for failure finalization. + + Users should consume subclasses as normal async streams, for example with + ``async for chunk in wrapper`` or ``async with wrapper``. The hook methods + remain synchronous telemetry hooks; async stream reads and close handling + are owned by this base class. + """ def __init__(self, stream: Any): self.stream = stream From 2157d285fb590a3e2c607fe06fc6e217798ab4ed Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Fri, 1 May 2026 09:05:43 -0400 Subject: [PATCH 06/12] polish: fixing the precommit. --- .../src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py index 9cd1ab8e92..0ec7e96b4d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -198,7 +198,6 @@ class AsyncChatStreamWrapper( _ChatStreamMixin, AsyncStreamWrapper[Any], ): - def __init__( self, stream: AsyncStream, From 8a8b9c75a263265d906876e533c6f845fb9615f1 Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Sat, 2 May 2026 13:28:48 -0400 Subject: [PATCH 07/12] wip: renamed to stream.py and changed finish_reasons population style. --- AGENTS.md | 2 +- CLAUDE.md | 2 +- .../instrumentation/openai_v2/chat_wrappers.py | 13 ++++++++----- .../util/genai/{_stream.py => stream.py} | 0 util/opentelemetry-util-genai/tests/test_stream.py | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) rename util/opentelemetry-util-genai/src/opentelemetry/util/genai/{_stream.py => stream.py} (100%) diff --git a/AGENTS.md b/AGENTS.md index a5ff9b853e..d601680f39 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -77,7 +77,7 @@ Apply to packages under `instrumentation/` and `instrumentation-genai/`. original exception unmodified. - Do not raise new exceptions in instrumentation/telemetry code. - For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper` - helpers from `opentelemetry.util.genai._stream` instead of reimplementing iteration, + helpers from `opentelemetry.util.genai.stream` instead of reimplementing iteration, close/context-manager, and finalization behavior in provider packages. - Put provider-specific chunk parsing and telemetry finalization in private hook methods or a narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers. diff --git a/CLAUDE.md b/CLAUDE.md index b15db69322..20900eab97 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,7 +1,7 @@ @AGENTS.md For GenAI streaming wrappers, use `SyncStreamWrapper` and -`AsyncStreamWrapper` from `opentelemetry.util.genai._stream` instead of +`AsyncStreamWrapper` from `opentelemetry.util.genai.stream` instead of reimplementing iteration, close/context-manager, and finalization behavior in provider packages. Keep provider-specific chunk parsing and telemetry finalization in private hooks or a narrow mixin, and do not make async stream diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py index 0ec7e96b4d..4f4db53484 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -22,7 +22,7 @@ from opentelemetry.semconv._incubating.attributes import ( openai_attributes as OpenAIAttributes, ) -from opentelemetry.util.genai._stream import ( +from opentelemetry.util.genai.stream import ( AsyncStreamWrapper, SyncStreamWrapper, ) @@ -45,7 +45,6 @@ class _ChatStreamMixin: response_id: Optional[str] = None response_model: Optional[str] = None service_tier: Optional[str] = None - finish_reasons: list = [] prompt_tokens: Optional[int] = None completion_tokens: Optional[int] = None @@ -160,9 +159,13 @@ def _cleanup(self, error: Optional[BaseException] = None) -> None: self.invocation.response_id = self.response_id self.invocation.input_tokens = self.prompt_tokens self.invocation.output_tokens = self.completion_tokens - # TODO: Derive finish_reasons from choice_buffers so streaming - # invocations match non-streaming response finalization. - self.invocation.finish_reasons = self.finish_reasons + finish_reasons = [ + choice.finish_reason + for choice in self.choice_buffers + if choice.finish_reason + ] + if finish_reasons: + self.invocation.finish_reasons = finish_reasons if self.service_tier: self.invocation.attributes.update( { diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py similarity index 100% rename from util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py rename to util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py index f7ecb6706d..6b8e0061fa 100644 --- a/util/opentelemetry-util-genai/tests/test_stream.py +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -17,7 +17,7 @@ import pytest -from opentelemetry.util.genai._stream import ( +from opentelemetry.util.genai.stream import ( AsyncStreamWrapper, SyncStreamWrapper, ) From ae2663bb784f87f44b7631b3aa7f1c407631631d Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Sat, 2 May 2026 16:27:00 -0400 Subject: [PATCH 08/12] wip: implementing copilot suggestions. --- .../openai_v2/chat_wrappers.py | 2 +- .../instrumentation/openai_v2/utils.py | 2 +- .../tests/test_chat_completions.py | 84 +++++++++ .../src/opentelemetry/util/genai/stream.py | 92 ++++++++-- .../tests/test_stream.py | 160 +++++++++++++++++- 5 files changed, 318 insertions(+), 22 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py index 4f4db53484..dcce2efbac 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py @@ -22,11 +22,11 @@ from opentelemetry.semconv._incubating.attributes import ( openai_attributes as OpenAIAttributes, ) +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.stream import ( AsyncStreamWrapper, SyncStreamWrapper, ) -from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( OutputMessage, Text, diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index 4dab04d977..17794d6d6f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -399,7 +399,7 @@ def create_chat_invocation( extra_body = get_value(kwargs.get("extra_body")) if isinstance(extra_body, Mapping): service_tier = get_value(extra_body.get("service_tier")) - if service_tier is not None: + if service_tier is not None and service_tier != "auto": invocation.attributes[OpenAIAttributes.OPENAI_REQUEST_SERVICE_TIER] = ( service_tier ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py index adb857f2f2..7d25e400f1 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py @@ -1041,6 +1041,90 @@ def test_chat_completion_streaming_user_exception_propagates( assert "RuntimeError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] +def test_chat_completion_streaming_user_exception_wins_over_close_exception( + span_exporter, openai_client, instrument_with_content, vcr, monkeypatch +): + if not is_experimental_mode(): + pytest.skip("new stream wrapper only") + + kwargs = { + "model": DEFAULT_MODEL, + "messages": USER_ONLY_PROMPT, + "stream": True, + "stream_options": {"include_usage": True}, + } + + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create(**kwargs) + original_close = response.stream.close + + def close_raises(): + original_close() + raise RuntimeError("close failure") + + monkeypatch.setattr(response.stream, "close", close_raises) + with pytest.raises(RuntimeError, match="user failure"): + with response: + raise RuntimeError("user failure") + + spans = span_exporter.get_finished_spans() + assert "RuntimeError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] + + +def test_chat_completion_streaming_close_exception_propagates_when_first( + span_exporter, openai_client, instrument_with_content, vcr, monkeypatch +): + if not is_experimental_mode(): + pytest.skip("new stream wrapper only") + + kwargs = { + "model": DEFAULT_MODEL, + "messages": USER_ONLY_PROMPT, + "stream": True, + "stream_options": {"include_usage": True}, + } + + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create(**kwargs) + original_close = response.stream.close + + def close_raises(): + original_close() + raise RuntimeError("close failure") + + monkeypatch.setattr(response.stream, "close", close_raises) + with pytest.raises(RuntimeError, match="close failure"): + response.close() + + spans = span_exporter.get_finished_spans() + assert "RuntimeError" == spans[0].attributes[ErrorAttributes.ERROR_TYPE] + + +def test_chat_completion_streaming_instrumentation_finalize_errors_swallowed( + span_exporter, openai_client, instrument_with_content, vcr, monkeypatch +): + if not is_experimental_mode(): + pytest.skip("new stream wrapper only") + + kwargs = { + "model": DEFAULT_MODEL, + "messages": USER_ONLY_PROMPT, + "stream": True, + "stream_options": {"include_usage": True}, + } + + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create(**kwargs) + + def stop_raises(): + raise RuntimeError("instrumentation failure") + + monkeypatch.setattr(response, "_stop_stream", stop_raises) + response.close() + + assert span_exporter.get_finished_spans() == () + + def test_chat_completion_streaming_not_complete( span_exporter, log_exporter, openai_client, instrument_with_content, vcr ): diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py index 9726258d15..360605f48f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py @@ -50,18 +50,28 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> Literal[False]: - try: - if exc_type is not None: - self._finalize_failure(exc_val or Exception()) - finally: - self.close() + if exc_type is not None: + self._safe_finalize_failure(exc_val or Exception()) + try: + self.stream.close() + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream close error after user exception", + exc_info=True, + ) + return False + + self.close() return False def close(self) -> None: try: self.stream.close() - finally: - self._finalize_success() + except Exception as error: + self._safe_finalize_failure(error) + raise + else: + self._safe_finalize_success() def __iter__(self): return self @@ -70,10 +80,10 @@ def __next__(self) -> ChunkT: try: chunk = next(self.stream) except StopIteration: - self._finalize_success() + self._safe_finalize_success() raise except Exception as error: - self._finalize_failure(error) + self._safe_finalize_failure(error) raise try: self._process_chunk(chunk) @@ -96,6 +106,24 @@ def _finalize_failure(self, error: BaseException) -> None: self._fail_stream(error) self._finalized = True + def _safe_finalize_success(self) -> None: + try: + self._finalize_success() + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error during finalization", + exc_info=True, + ) + + def _safe_finalize_failure(self, error: BaseException) -> None: + try: + self._finalize_failure(error) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error during failure finalization", + exc_info=True, + ) + @abstractmethod def _process_chunk(self, chunk: ChunkT) -> None: """Process one stream chunk for telemetry.""" @@ -144,18 +172,28 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> Literal[False]: - try: - if exc_type is not None: - self._finalize_failure(exc_val or Exception()) - finally: - await self.close() + if exc_type is not None: + self._safe_finalize_failure(exc_val or Exception()) + try: + await self.stream.close() + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream close error after user exception", + exc_info=True, + ) + return False + + await self.close() return False async def close(self) -> None: try: await self.stream.close() - finally: - self._finalize_success() + except Exception as error: + self._safe_finalize_failure(error) + raise + else: + self._safe_finalize_success() def __aiter__(self): return self @@ -164,10 +202,10 @@ async def __anext__(self) -> ChunkT: try: chunk = await self.stream.__anext__() except StopAsyncIteration: - self._finalize_success() + self._safe_finalize_success() raise except Exception as error: - self._finalize_failure(error) + self._safe_finalize_failure(error) raise try: self._process_chunk(chunk) @@ -190,6 +228,24 @@ def _finalize_failure(self, error: BaseException) -> None: self._fail_stream(error) self._finalized = True + def _safe_finalize_success(self) -> None: + try: + self._finalize_success() + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error during finalization", + exc_info=True, + ) + + def _safe_finalize_failure(self, error: BaseException) -> None: + try: + self._finalize_failure(error) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error during failure finalization", + exc_info=True, + ) + @abstractmethod def _process_chunk(self, chunk: ChunkT) -> None: """Process one stream chunk for telemetry.""" diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py index 6b8e0061fa..5fadf59e45 100644 --- a/util/opentelemetry-util-genai/tests/test_stream.py +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -38,9 +38,10 @@ def test_stream_wrapper_abstract_method_signatures_match(): class _FakeSyncStream: - def __init__(self, chunks=None, error=None): + def __init__(self, chunks=None, error=None, close_error=None): self._chunks = list(chunks or []) self._error = error + self._close_error = close_error self.close_count = 0 self.extra_attribute = "passthrough" @@ -53,6 +54,8 @@ def __next__(self): def close(self): self.close_count += 1 + if self._close_error: + raise self._close_error class _TestSyncStreamWrapper(SyncStreamWrapper): @@ -77,6 +80,16 @@ def _process_chunk(self, chunk): raise ValueError("instrumentation failed") +class _FailingSyncStopStreamWrapper(_TestSyncStreamWrapper): + def _stop_stream(self): + raise ValueError("instrumentation failed") + + +class _FailingSyncFailStreamWrapper(_TestSyncStreamWrapper): + def _fail_stream(self, error): + raise ValueError("instrumentation failed") + + def test_sync_stream_wrapper_processes_chunks_and_stops(): stream = _FakeSyncStream(chunks=["chunk"]) wrapper = _TestSyncStreamWrapper(stream) @@ -116,6 +129,19 @@ def test_sync_stream_wrapper_close_stops_once(): assert not wrapper.failures +def test_sync_stream_wrapper_close_fails_with_close_error(): + error = RuntimeError("close failure") + wrapper = _TestSyncStreamWrapper( + _FakeSyncStream(chunks=["chunk"], close_error=error) + ) + + with pytest.raises(RuntimeError, match="close failure"): + wrapper.close() + + assert wrapper.failures == [error] + assert wrapper.stop_count == 0 + + def test_sync_stream_wrapper_exit_closes_and_propagates_user_errors(): stream = _FakeSyncStream(chunks=["chunk"]) wrapper = _TestSyncStreamWrapper(stream) @@ -128,6 +154,48 @@ def test_sync_stream_wrapper_exit_closes_and_propagates_user_errors(): assert wrapper.failures == [error] +def test_sync_stream_wrapper_exit_keeps_user_error_when_close_fails(): + close_error = RuntimeError("close failure") + stream = _FakeSyncStream(chunks=["chunk"], close_error=close_error) + wrapper = _TestSyncStreamWrapper(stream) + error = RuntimeError("user failure") + + assert wrapper.__exit__(RuntimeError, error, None) is False + + assert stream.close_count == 1 + assert wrapper.failures == [error] + assert wrapper.stop_count == 0 + + +def test_sync_stream_wrapper_swallows_finalize_errors(): + wrapper = _FailingSyncStopStreamWrapper(_FakeSyncStream()) + + wrapper.close() + + +def test_sync_stream_wrapper_swallows_failure_finalize_errors(): + error = RuntimeError("close failure") + wrapper = _FailingSyncFailStreamWrapper(_FakeSyncStream(close_error=error)) + + with pytest.raises(RuntimeError, match="close failure"): + wrapper.close() + + +def test_sync_stream_wrapper_swallows_stop_iteration_finalize_errors(): + wrapper = _FailingSyncStopStreamWrapper(_FakeSyncStream()) + + with pytest.raises(StopIteration): + next(wrapper) + + +def test_sync_stream_wrapper_preserves_stream_error_when_finalize_fails(): + error = RuntimeError("stream failure") + wrapper = _FailingSyncFailStreamWrapper(_FakeSyncStream(error=error)) + + with pytest.raises(RuntimeError, match="stream failure"): + next(wrapper) + + def test_sync_stream_wrapper_getattr_passthrough(): wrapper = _TestSyncStreamWrapper(_FakeSyncStream()) @@ -155,9 +223,10 @@ def test_sync_stream_wrapper_swallows_process_chunk_errors(): class _FakeAsyncStream: - def __init__(self, chunks=None, error=None): + def __init__(self, chunks=None, error=None, close_error=None): self._chunks = list(chunks or []) self._error = error + self._close_error = close_error self.close_count = 0 self.extra_attribute = "passthrough" @@ -170,6 +239,8 @@ async def __anext__(self): async def close(self): self.close_count += 1 + if self._close_error: + raise self._close_error class _TestAsyncStreamWrapper(AsyncStreamWrapper): @@ -194,6 +265,16 @@ def _process_chunk(self, chunk): raise ValueError("instrumentation failed") +class _FailingAsyncStopStreamWrapper(_TestAsyncStreamWrapper): + def _stop_stream(self): + raise ValueError("instrumentation failed") + + +class _FailingAsyncFailStreamWrapper(_TestAsyncStreamWrapper): + def _fail_stream(self, error): + raise ValueError("instrumentation failed") + + def test_async_stream_wrapper_processes_chunks_and_stops(): async def exercise(): wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream(chunks=["chunk"])) @@ -239,6 +320,22 @@ async def exercise(): asyncio.run(exercise()) +def test_async_stream_wrapper_close_fails_with_close_error(): + async def exercise(): + error = RuntimeError("close failure") + wrapper = _TestAsyncStreamWrapper( + _FakeAsyncStream(chunks=["chunk"], close_error=error) + ) + + with pytest.raises(RuntimeError, match="close failure"): + await wrapper.close() + + assert wrapper.failures == [error] + assert wrapper.stop_count == 0 + + asyncio.run(exercise()) + + def test_async_stream_wrapper_exit_closes_and_propagates_user_errors(): async def exercise(): stream = _FakeAsyncStream(chunks=["chunk"]) @@ -254,6 +351,65 @@ async def exercise(): asyncio.run(exercise()) +def test_async_stream_wrapper_exit_keeps_user_error_when_close_fails(): + async def exercise(): + close_error = RuntimeError("close failure") + stream = _FakeAsyncStream(chunks=["chunk"], close_error=close_error) + wrapper = _TestAsyncStreamWrapper(stream) + error = RuntimeError("user failure") + + assert await wrapper.__aexit__(RuntimeError, error, None) is False + + assert stream.close_count == 1 + assert wrapper.failures == [error] + assert wrapper.stop_count == 0 + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_swallows_finalize_errors(): + async def exercise(): + wrapper = _FailingAsyncStopStreamWrapper(_FakeAsyncStream()) + + await wrapper.close() + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_swallows_failure_finalize_errors(): + async def exercise(): + error = RuntimeError("close failure") + wrapper = _FailingAsyncFailStreamWrapper( + _FakeAsyncStream(close_error=error) + ) + + with pytest.raises(RuntimeError, match="close failure"): + await wrapper.close() + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_swallows_stop_iteration_finalize_errors(): + async def exercise(): + wrapper = _FailingAsyncStopStreamWrapper(_FakeAsyncStream()) + + with pytest.raises(StopAsyncIteration): + await anext(wrapper) + + asyncio.run(exercise()) + + +def test_async_stream_wrapper_preserves_stream_error_when_finalize_fails(): + async def exercise(): + error = RuntimeError("stream failure") + wrapper = _FailingAsyncFailStreamWrapper(_FakeAsyncStream(error=error)) + + with pytest.raises(RuntimeError, match="stream failure"): + await anext(wrapper) + + asyncio.run(exercise()) + + def test_async_stream_wrapper_getattr_passthrough(): wrapper = _TestAsyncStreamWrapper(_FakeAsyncStream()) From a246ee1528fa02f54fd4ae0996a7982b9f36d799 Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Sat, 2 May 2026 18:03:23 -0400 Subject: [PATCH 09/12] wip: fixing the tox. --- .../src/opentelemetry/util/genai/stream.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py index 360605f48f..fbcb7ffa84 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py @@ -70,8 +70,7 @@ def close(self) -> None: except Exception as error: self._safe_finalize_failure(error) raise - else: - self._safe_finalize_success() + self._safe_finalize_success() def __iter__(self): return self @@ -192,8 +191,7 @@ async def close(self) -> None: except Exception as error: self._safe_finalize_failure(error) raise - else: - self._safe_finalize_success() + self._safe_finalize_success() def __aiter__(self): return self From 568965413454f79cd929493bcfbf2309685a5b28 Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Sat, 2 May 2026 22:25:46 -0400 Subject: [PATCH 10/12] polish: copilot feedback. --- .../src/opentelemetry/util/genai/stream.py | 8 ++--- .../tests/test_stream.py | 30 +++++++++++++++---- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py index fbcb7ffa84..e428bd6262 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py @@ -96,14 +96,14 @@ def __getattr__(self, name: str) -> Any: def _finalize_success(self) -> None: if self._finalized: return - self._stop_stream() self._finalized = True + self._stop_stream() def _finalize_failure(self, error: BaseException) -> None: if self._finalized: return - self._fail_stream(error) self._finalized = True + self._fail_stream(error) def _safe_finalize_success(self) -> None: try: @@ -217,14 +217,14 @@ def __getattr__(self, name: str) -> Any: def _finalize_success(self) -> None: if self._finalized: return - self._stop_stream() self._finalized = True + self._stop_stream() def _finalize_failure(self, error: BaseException) -> None: if self._finalized: return - self._fail_stream(error) self._finalized = True + self._fail_stream(error) def _safe_finalize_success(self) -> None: try: diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py index 5fadf59e45..7a3e038274 100644 --- a/util/opentelemetry-util-genai/tests/test_stream.py +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -82,11 +82,13 @@ def _process_chunk(self, chunk): class _FailingSyncStopStreamWrapper(_TestSyncStreamWrapper): def _stop_stream(self): + self.stop_count += 1 raise ValueError("instrumentation failed") class _FailingSyncFailStreamWrapper(_TestSyncStreamWrapper): def _fail_stream(self, error): + self.failures.append(error) raise ValueError("instrumentation failed") @@ -171,14 +173,22 @@ def test_sync_stream_wrapper_swallows_finalize_errors(): wrapper = _FailingSyncStopStreamWrapper(_FakeSyncStream()) wrapper.close() + wrapper.close() + + assert wrapper.stop_count == 1 def test_sync_stream_wrapper_swallows_failure_finalize_errors(): - error = RuntimeError("close failure") - wrapper = _FailingSyncFailStreamWrapper(_FakeSyncStream(close_error=error)) + close_error = RuntimeError("close failure") + stream = _FakeSyncStream(close_error=close_error) + wrapper = _FailingSyncFailStreamWrapper(stream) with pytest.raises(RuntimeError, match="close failure"): wrapper.close() + stream._close_error = None + wrapper.close() + + assert wrapper.failures == [close_error] def test_sync_stream_wrapper_swallows_stop_iteration_finalize_errors(): @@ -267,11 +277,13 @@ def _process_chunk(self, chunk): class _FailingAsyncStopStreamWrapper(_TestAsyncStreamWrapper): def _stop_stream(self): + self.stop_count += 1 raise ValueError("instrumentation failed") class _FailingAsyncFailStreamWrapper(_TestAsyncStreamWrapper): def _fail_stream(self, error): + self.failures.append(error) raise ValueError("instrumentation failed") @@ -371,20 +383,26 @@ def test_async_stream_wrapper_swallows_finalize_errors(): async def exercise(): wrapper = _FailingAsyncStopStreamWrapper(_FakeAsyncStream()) + await wrapper.close() await wrapper.close() + assert wrapper.stop_count == 1 + asyncio.run(exercise()) def test_async_stream_wrapper_swallows_failure_finalize_errors(): async def exercise(): - error = RuntimeError("close failure") - wrapper = _FailingAsyncFailStreamWrapper( - _FakeAsyncStream(close_error=error) - ) + close_error = RuntimeError("close failure") + stream = _FakeAsyncStream(close_error=close_error) + wrapper = _FailingAsyncFailStreamWrapper(stream) with pytest.raises(RuntimeError, match="close failure"): await wrapper.close() + stream._close_error = None + await wrapper.close() + + assert wrapper.failures == [close_error] asyncio.run(exercise()) From c43b094460812ede0daeb533361a42b64ea36a9d Mon Sep 17 00:00:00 2001 From: eternalcuriouslearner Date: Sat, 2 May 2026 22:51:37 -0400 Subject: [PATCH 11/12] polish: copilot suggestions. --- .../src/opentelemetry/util/genai/stream.py | 6 +- .../tests/test_stream.py | 59 +++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py index e428bd6262..4a754a97e6 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py @@ -39,6 +39,7 @@ class SyncStreamWrapper(ABC, Generic[ChunkT]): def __init__(self, stream: Any): self.stream = stream + self._iterator = iter(stream) self._finalized = False def __enter__(self): @@ -77,7 +78,7 @@ def __iter__(self): def __next__(self) -> ChunkT: try: - chunk = next(self.stream) + chunk = next(self._iterator) except StopIteration: self._safe_finalize_success() raise @@ -160,6 +161,7 @@ class AsyncStreamWrapper(ABC, Generic[ChunkT]): def __init__(self, stream: Any): self.stream = stream + self._aiter = aiter(stream) self._finalized = False async def __aenter__(self): @@ -198,7 +200,7 @@ def __aiter__(self): async def __anext__(self) -> ChunkT: try: - chunk = await self.stream.__anext__() + chunk = await anext(self._aiter) except StopAsyncIteration: self._safe_finalize_success() raise diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py index 7a3e038274..b34012cff7 100644 --- a/util/opentelemetry-util-genai/tests/test_stream.py +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -45,6 +45,9 @@ def __init__(self, chunks=None, error=None, close_error=None): self.close_count = 0 self.extra_attribute = "passthrough" + def __iter__(self): + return self + def __next__(self): if self._chunks: return self._chunks.pop(0) @@ -58,6 +61,18 @@ def close(self): raise self._close_error +class _FakeSyncIterable: + def __init__(self, chunks=None): + self.iterator = iter(chunks or []) + self.close_count = 0 + + def __iter__(self): + return self.iterator + + def close(self): + self.close_count += 1 + + class _TestSyncStreamWrapper(SyncStreamWrapper): def __init__(self, stream): super().__init__(stream) @@ -107,6 +122,19 @@ def test_sync_stream_wrapper_processes_chunks_and_stops(): assert wrapper.stop_count == 1 +def test_sync_stream_wrapper_processes_iterables(): + stream = _FakeSyncIterable(chunks=["chunk"]) + wrapper = _TestSyncStreamWrapper(stream) + + assert next(wrapper) == "chunk" + assert wrapper.processed == ["chunk"] + + with pytest.raises(StopIteration): + next(wrapper) + + assert wrapper.stop_count == 1 + + def test_sync_stream_wrapper_fails_stream_errors(): error = ValueError("boom") wrapper = _TestSyncStreamWrapper(_FakeSyncStream(error=error)) @@ -240,6 +268,9 @@ def __init__(self, chunks=None, error=None, close_error=None): self.close_count = 0 self.extra_attribute = "passthrough" + def __aiter__(self): + return self + async def __anext__(self): if self._chunks: return self._chunks.pop(0) @@ -253,6 +284,18 @@ async def close(self): raise self._close_error +class _FakeAsyncIterable: + def __init__(self, chunks=None): + self.iterator = _FakeAsyncStream(chunks=chunks) + self.close_count = 0 + + def __aiter__(self): + return self.iterator + + async def close(self): + self.close_count += 1 + + class _TestAsyncStreamWrapper(AsyncStreamWrapper): def __init__(self, stream): super().__init__(stream) @@ -304,6 +347,22 @@ async def exercise(): asyncio.run(exercise()) +def test_async_stream_wrapper_processes_async_iterables(): + async def exercise(): + stream = _FakeAsyncIterable(chunks=["chunk"]) + wrapper = _TestAsyncStreamWrapper(stream) + + assert await anext(wrapper) == "chunk" + assert wrapper.processed == ["chunk"] + + with pytest.raises(StopAsyncIteration): + await anext(wrapper) + + assert wrapper.stop_count == 1 + + asyncio.run(exercise()) + + def test_async_stream_wrapper_fails_stream_errors(): async def exercise(): error = ValueError("boom") From 1cf0924cfa23b0ee8158d43b81c314f196db4eeb Mon Sep 17 00:00:00 2001 From: Surya Date: Tue, 5 May 2026 20:48:39 -0400 Subject: [PATCH 12/12] Update CLAUDE.md Co-authored-by: Liudmila Molkova --- CLAUDE.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 20900eab97..ce60f10b9b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,8 +1,2 @@ @AGENTS.md -For GenAI streaming wrappers, use `SyncStreamWrapper` and -`AsyncStreamWrapper` from `opentelemetry.util.genai.stream` instead of -reimplementing iteration, close/context-manager, and finalization behavior in -provider packages. Keep provider-specific chunk parsing and telemetry -finalization in private hooks or a narrow mixin, and do not make async stream -wrappers inherit from sync stream wrappers.