-
Notifications
You must be signed in to change notification settings - Fork 940
genai-util: refactor streams to a generic ABC wrapper. #4500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cb0156f
11258d9
6f4e510
787ecbb
8a273c5
07b4a66
c496ecf
250a7bf
04e067c
2157d28
fc3bd4d
8a8b9c7
ae2663b
a246ee1
26743fb
5689654
c43b094
c06a7e7
57e2957
1cf0924
46540b3
3b31fc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| @AGENTS.md | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| # Copyright The OpenTelemetry Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
|
|
||
| class ToolCallBuffer: | ||
| def __init__(self, index, tool_call_id, function_name): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is new code, can we add types? |
||
| self.index = index | ||
| self.function_name = function_name | ||
| self.tool_call_id = tool_call_id | ||
| self.arguments = [] | ||
|
|
||
| def append_arguments(self, arguments): | ||
| if arguments is not None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it doesn't look like arguments is ever None |
||
| self.arguments.append(arguments) | ||
|
|
||
|
|
||
| class ChoiceBuffer: | ||
| def __init__(self, index): | ||
| self.index = index | ||
| self.finish_reason = None | ||
| self.text_content = [] | ||
| self.tool_calls_buffers = [] | ||
|
|
||
| def append_text_content(self, content): | ||
| self.text_content.append(content) | ||
|
|
||
| def append_tool_call(self, tool_call): | ||
| idx = tool_call.index | ||
| for _ in range(len(self.tool_calls_buffers), idx + 1): | ||
| self.tool_calls_buffers.append(None) | ||
|
|
||
| function = tool_call.function | ||
| if not self.tool_calls_buffers[idx]: | ||
| self.tool_calls_buffers[idx] = ToolCallBuffer( | ||
| idx, | ||
| tool_call.id, | ||
| function.name if function else None, | ||
| ) | ||
|
|
||
| if function: | ||
| self.tool_calls_buffers[idx].append_arguments(function.arguments) | ||
|
eternalcuriouslearner marked this conversation as resolved.
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is new code, please add type annotations. It looks like some of it is copied/refactored which I'm OK to ignore to not bloat the scope of this PR. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,219 @@ | ||
| # Copyright The OpenTelemetry Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| from typing import Any, Optional | ||
|
|
||
| from openai import AsyncStream, Stream | ||
|
|
||
| from opentelemetry.semconv._incubating.attributes import ( | ||
| openai_attributes as OpenAIAttributes, | ||
| ) | ||
| from opentelemetry.util.genai.invocation import InferenceInvocation | ||
| from opentelemetry.util.genai.stream import ( | ||
| AsyncStreamWrapper, | ||
| SyncStreamWrapper, | ||
| ) | ||
| from opentelemetry.util.genai.types import ( | ||
| OutputMessage, | ||
| Text, | ||
| ToolCallRequest, | ||
| ) | ||
|
|
||
| from .chat_buffers import ChoiceBuffer | ||
|
|
||
|
|
||
| class _ChatStreamMixin: | ||
| """Chat-specific hooks shared by sync and async stream wrappers.""" | ||
|
|
||
| invocation: InferenceInvocation | ||
| capture_content: bool | ||
| choice_buffers: list | ||
| response_id: Optional[str] = None | ||
| response_model: Optional[str] = None | ||
| service_tier: Optional[str] = None | ||
| prompt_tokens: Optional[int] = None | ||
| completion_tokens: Optional[int] = None | ||
|
eternalcuriouslearner marked this conversation as resolved.
eternalcuriouslearner marked this conversation as resolved.
|
||
|
|
||
| def _set_response_model(self, chunk): | ||
| if self.response_model: | ||
| return | ||
|
|
||
| if getattr(chunk, "model", None): | ||
| self.response_model = chunk.model | ||
|
Comment on lines
+55
to
+56
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need getattr()? |
||
|
|
||
| def _set_response_id(self, chunk): | ||
| if self.response_id: | ||
| return | ||
|
|
||
| if getattr(chunk, "id", None): | ||
| self.response_id = chunk.id | ||
|
|
||
| def _set_response_service_tier(self, chunk): | ||
| if self.service_tier: | ||
| return | ||
|
|
||
| if getattr(chunk, "service_tier", None): | ||
| self.service_tier = chunk.service_tier | ||
|
|
||
| def _build_streaming_response(self, chunk): | ||
| if getattr(chunk, "choices", None) is None: | ||
| return | ||
|
|
||
| choices = chunk.choices | ||
| for choice in choices: | ||
| if not choice.delta: | ||
| continue | ||
|
|
||
| for idx in range(len(self.choice_buffers), choice.index + 1): | ||
| self.choice_buffers.append(ChoiceBuffer(idx)) | ||
|
|
||
| if choice.finish_reason: | ||
| self.choice_buffers[ | ||
| choice.index | ||
| ].finish_reason = choice.finish_reason | ||
|
|
||
| if choice.delta.content is not None: | ||
| self.choice_buffers[choice.index].append_text_content( | ||
| choice.delta.content | ||
| ) | ||
|
|
||
| if choice.delta.tool_calls is not None: | ||
| for tool_call in choice.delta.tool_calls: | ||
| self.choice_buffers[choice.index].append_tool_call( | ||
| tool_call | ||
| ) | ||
|
|
||
| def _set_usage(self, chunk): | ||
| if getattr(chunk, "usage", None): | ||
| self.completion_tokens = chunk.usage.completion_tokens | ||
| self.prompt_tokens = chunk.usage.prompt_tokens | ||
|
|
||
| def _process_chunk(self, chunk): | ||
| self._set_response_id(chunk) | ||
| self._set_response_model(chunk) | ||
| self._set_response_service_tier(chunk) | ||
| self._build_streaming_response(chunk) | ||
| self._set_usage(chunk) | ||
|
|
||
| def _set_output_messages(self): | ||
| if not self.capture_content: # optimization | ||
| return | ||
| output_messages = [] | ||
| for choice in self.choice_buffers: | ||
| message = OutputMessage( | ||
| role="assistant", | ||
| finish_reason=choice.finish_reason or "error", | ||
| parts=[], | ||
| ) | ||
| if choice.text_content: | ||
| message.parts.append( | ||
| Text(content="".join(choice.text_content)) | ||
| ) | ||
| if choice.tool_calls_buffers: | ||
| tool_calls = [] | ||
| for tool_call in choice.tool_calls_buffers: | ||
| arguments = None | ||
| arguments_str = "".join(tool_call.arguments) | ||
| if arguments_str: | ||
| try: | ||
| arguments = json.loads(arguments_str) | ||
| except json.JSONDecodeError: | ||
| arguments = arguments_str | ||
|
Comment on lines
+130
to
+135
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ik this code already existed, but shouldn't this join by |
||
| tool_call_part = ToolCallRequest( | ||
| name=tool_call.function_name, | ||
| id=tool_call.tool_call_id, | ||
| arguments=arguments, | ||
| ) | ||
| tool_calls.append(tool_call_part) | ||
| message.parts.extend(tool_calls) | ||
| output_messages.append(message) | ||
|
|
||
| self.invocation.output_messages = output_messages | ||
|
|
||
| def _stop_stream(self) -> None: | ||
| self._cleanup() | ||
|
|
||
| def _fail_stream(self, error: BaseException) -> None: | ||
| self._cleanup(error) | ||
|
|
||
| def parse(self): | ||
| """Called when using with_raw_response with stream=True.""" | ||
| return self | ||
|
|
||
| def _cleanup(self, error: Optional[BaseException] = None) -> None: | ||
| self.invocation.response_model_name = self.response_model | ||
| self.invocation.response_id = self.response_id | ||
| self.invocation.input_tokens = self.prompt_tokens | ||
| self.invocation.output_tokens = self.completion_tokens | ||
| finish_reasons = [ | ||
| choice.finish_reason | ||
| for choice in self.choice_buffers | ||
| if choice.finish_reason | ||
| ] | ||
| if finish_reasons: | ||
| self.invocation.finish_reasons = finish_reasons | ||
| if self.service_tier: | ||
|
eternalcuriouslearner marked this conversation as resolved.
|
||
| self.invocation.attributes.update( | ||
| { | ||
| OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier | ||
| }, | ||
| ) | ||
|
|
||
| self._set_output_messages() | ||
|
|
||
| if error: | ||
| self.invocation.fail(error) | ||
| else: | ||
| self.invocation.stop() | ||
|
|
||
|
|
||
| class ChatStreamWrapper( | ||
| _ChatStreamMixin, | ||
| SyncStreamWrapper[Any], | ||
| ): | ||
| def __init__( | ||
| self, | ||
| stream: Stream, | ||
| invocation: InferenceInvocation, | ||
| capture_content: bool, | ||
| ): | ||
| super().__init__(stream) | ||
| self.invocation = invocation | ||
| self.choice_buffers = [] | ||
| self.capture_content = capture_content | ||
|
|
||
|
|
||
| class AsyncChatStreamWrapper( | ||
| _ChatStreamMixin, | ||
| AsyncStreamWrapper[Any], | ||
| ): | ||
| def __init__( | ||
| self, | ||
| stream: AsyncStream, | ||
| invocation: InferenceInvocation, | ||
| capture_content: bool, | ||
| ): | ||
| super().__init__(stream) | ||
| self.invocation = invocation | ||
| self.choice_buffers = [] | ||
| self.capture_content = capture_content | ||
|
|
||
|
|
||
| __all__ = [ | ||
| "AsyncChatStreamWrapper", | ||
| "ChatStreamWrapper", | ||
| ] | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this belongs in ./instrumentation-genai/AGENTS.md, not in the root