Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cb0156f
wip: move the streams to a generic ABC wrapper.
eternalcuriouslearner Apr 29, 2026
11258d9
polish: add changelogs.
eternalcuriouslearner Apr 29, 2026
6f4e510
wip: precommit.
eternalcuriouslearner Apr 29, 2026
787ecbb
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 29, 2026
8a273c5
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 30, 2026
07b4a66
wip: fix the merge issues.
eternalcuriouslearner Apr 30, 2026
c496ecf
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lzchen Apr 30, 2026
250a7bf
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 30, 2026
04e067c
polish: pr feedback.
eternalcuriouslearner May 1, 2026
2157d28
polish: fixing the precommit.
eternalcuriouslearner May 1, 2026
fc3bd4d
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 1, 2026
8a8b9c7
wip: renamed to stream.py and changed finish_reasons population style.
eternalcuriouslearner May 2, 2026
ae2663b
wip: implementing copilot suggestions.
eternalcuriouslearner May 2, 2026
a246ee1
wip: fixing the tox.
eternalcuriouslearner May 2, 2026
26743fb
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 2, 2026
5689654
polish: copilot feedback.
eternalcuriouslearner May 3, 2026
c43b094
polish: copilot suggestions.
eternalcuriouslearner May 3, 2026
c06a7e7
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 5, 2026
57e2957
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 5, 2026
1cf0924
Update CLAUDE.md
eternalcuriouslearner May 6, 2026
46540b3
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lmolkova May 6, 2026
3b31fc1
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lzchen May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ Apply to packages under `instrumentation/` and `instrumentation-genai/`.
- When catching exceptions from the underlying library to record telemetry, always re-raise the
original exception unmodified.
- Do not raise new exceptions in instrumentation/telemetry code.
- For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper`
Copy link
Copy Markdown
Member

@lmolkova lmolkova May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this belongs in ./instrumentation-genai/AGENTS.md, not in the root

helpers from `opentelemetry.util.genai.stream` instead of reimplementing iteration,
close/context-manager, and finalization behavior in provider packages.
- Put provider-specific chunk parsing and telemetry finalization in private hook methods or a
narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers.

### Semantic conventions

Expand Down
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
@AGENTS.md

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Refactor chat completion stream wrappers to use shared GenAI stream lifecycle helpers.
([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500))
## Version 2.4b0 (2026-05-01)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class ToolCallBuffer:
def __init__(self, index, tool_call_id, function_name):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is new code, can we add types?

self.index = index
self.function_name = function_name
self.tool_call_id = tool_call_id
self.arguments = []

def append_arguments(self, arguments):
if arguments is not None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't look like arguments is ever None

self.arguments.append(arguments)


class ChoiceBuffer:
def __init__(self, index):
self.index = index
self.finish_reason = None
self.text_content = []
self.tool_calls_buffers = []

def append_text_content(self, content):
self.text_content.append(content)

def append_tool_call(self, tool_call):
idx = tool_call.index
for _ in range(len(self.tool_calls_buffers), idx + 1):
self.tool_calls_buffers.append(None)

function = tool_call.function
if not self.tool_calls_buffers[idx]:
self.tool_calls_buffers[idx] = ToolCallBuffer(
idx,
tool_call.id,
function.name if function else None,
)

if function:
self.tool_calls_buffers[idx].append_arguments(function.arguments)
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is new code, please add type annotations. It looks like some of it is copied/refactored which I'm OK to ignore to not bloat the scope of this PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from typing import Any, Optional

from openai import AsyncStream, Stream

from opentelemetry.semconv._incubating.attributes import (
openai_attributes as OpenAIAttributes,
)
from opentelemetry.util.genai.invocation import InferenceInvocation
from opentelemetry.util.genai.stream import (
AsyncStreamWrapper,
SyncStreamWrapper,
)
from opentelemetry.util.genai.types import (
OutputMessage,
Text,
ToolCallRequest,
)

from .chat_buffers import ChoiceBuffer


class _ChatStreamMixin:
"""Chat-specific hooks shared by sync and async stream wrappers."""

invocation: InferenceInvocation
capture_content: bool
choice_buffers: list
response_id: Optional[str] = None
response_model: Optional[str] = None
service_tier: Optional[str] = None
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Comment thread
eternalcuriouslearner marked this conversation as resolved.

def _set_response_model(self, chunk):
if self.response_model:
return

if getattr(chunk, "model", None):
self.response_model = chunk.model
Comment on lines +55 to +56
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need getattr()?


def _set_response_id(self, chunk):
if self.response_id:
return

if getattr(chunk, "id", None):
self.response_id = chunk.id

def _set_response_service_tier(self, chunk):
if self.service_tier:
return

if getattr(chunk, "service_tier", None):
self.service_tier = chunk.service_tier

def _build_streaming_response(self, chunk):
if getattr(chunk, "choices", None) is None:
return

choices = chunk.choices
for choice in choices:
if not choice.delta:
continue

for idx in range(len(self.choice_buffers), choice.index + 1):
self.choice_buffers.append(ChoiceBuffer(idx))

if choice.finish_reason:
self.choice_buffers[
choice.index
].finish_reason = choice.finish_reason

if choice.delta.content is not None:
self.choice_buffers[choice.index].append_text_content(
choice.delta.content
)

if choice.delta.tool_calls is not None:
for tool_call in choice.delta.tool_calls:
self.choice_buffers[choice.index].append_tool_call(
tool_call
)

def _set_usage(self, chunk):
if getattr(chunk, "usage", None):
self.completion_tokens = chunk.usage.completion_tokens
self.prompt_tokens = chunk.usage.prompt_tokens

def _process_chunk(self, chunk):
self._set_response_id(chunk)
self._set_response_model(chunk)
self._set_response_service_tier(chunk)
self._build_streaming_response(chunk)
self._set_usage(chunk)

def _set_output_messages(self):
if not self.capture_content: # optimization
return
output_messages = []
for choice in self.choice_buffers:
message = OutputMessage(
role="assistant",
finish_reason=choice.finish_reason or "error",
parts=[],
)
if choice.text_content:
message.parts.append(
Text(content="".join(choice.text_content))
)
if choice.tool_calls_buffers:
tool_calls = []
for tool_call in choice.tool_calls_buffers:
arguments = None
arguments_str = "".join(tool_call.arguments)
if arguments_str:
try:
arguments = json.loads(arguments_str)
except json.JSONDecodeError:
arguments = arguments_str
Comment on lines +130 to +135
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ik this code already existed, but shouldn't this join by ","?

tool_call_part = ToolCallRequest(
name=tool_call.function_name,
id=tool_call.tool_call_id,
arguments=arguments,
)
tool_calls.append(tool_call_part)
message.parts.extend(tool_calls)
output_messages.append(message)

self.invocation.output_messages = output_messages

def _stop_stream(self) -> None:
self._cleanup()

def _fail_stream(self, error: BaseException) -> None:
self._cleanup(error)

def parse(self):
"""Called when using with_raw_response with stream=True."""
return self

def _cleanup(self, error: Optional[BaseException] = None) -> None:
self.invocation.response_model_name = self.response_model
self.invocation.response_id = self.response_id
self.invocation.input_tokens = self.prompt_tokens
self.invocation.output_tokens = self.completion_tokens
finish_reasons = [
choice.finish_reason
for choice in self.choice_buffers
if choice.finish_reason
]
if finish_reasons:
self.invocation.finish_reasons = finish_reasons
if self.service_tier:
Comment thread
eternalcuriouslearner marked this conversation as resolved.
self.invocation.attributes.update(
{
OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier
},
)

self._set_output_messages()

if error:
self.invocation.fail(error)
else:
self.invocation.stop()


class ChatStreamWrapper(
_ChatStreamMixin,
SyncStreamWrapper[Any],
):
def __init__(
self,
stream: Stream,
invocation: InferenceInvocation,
capture_content: bool,
):
super().__init__(stream)
self.invocation = invocation
self.choice_buffers = []
self.capture_content = capture_content


class AsyncChatStreamWrapper(
_ChatStreamMixin,
AsyncStreamWrapper[Any],
):
def __init__(
self,
stream: AsyncStream,
invocation: InferenceInvocation,
capture_content: bool,
):
super().__init__(stream)
self.invocation = invocation
self.choice_buffers = []
self.capture_content = capture_content


__all__ = [
"AsyncChatStreamWrapper",
"ChatStreamWrapper",
]
Loading
Loading