genai-util: refactor streams to a generic ABC wrapper.#4500
genai-util: refactor streams to a generic ABC wrapper.#4500eternalcuriouslearner wants to merge 22 commits intoopen-telemetry:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces shared synchronous/asynchronous stream wrapper base classes in opentelemetry-util-genai and refactors the OpenAI v2 chat streaming implementation to use the shared lifecycle shape (including moving chat choice buffering into a dedicated module and switching chat spans to the InferenceInvocation lifecycle).
Changes:
- Added
SyncStreamWrapper/AsyncStreamWrapperABCs inopentelemetry-util-genaiplus unit tests for stream lifecycle behavior. - Refactored OpenAI v2 chat streaming to use new
ChatStreamWrapper/AsyncChatStreamWrapperand extracted buffering intochat_buffers.py. - Added/updated OpenAI v2 tests for API exception and user exception propagation; updated changelogs.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py | Adds new generic sync/async stream wrapper base classes and lifecycle finalization logic. |
| util/opentelemetry-util-genai/tests/test_stream.py | Adds unit coverage for wrapper iteration, close/context-manager behavior, and error handling. |
| util/opentelemetry-util-genai/CHANGELOG.md | Documents the new shared stream wrapper helpers. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py | Switches chat invocation creation to TelemetryHandler.start_inference() / InferenceInvocation. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py | Updates chat create wrappers to use the new invocation lifecycle and new chat stream wrappers. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py | New chat streaming wrappers built on the shared stream wrapper base classes. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_buffers.py | Extracts choice/tool-call buffering helpers to avoid import cycles. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py | Updates tests to import buffers from the new module. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_chat_completions.py | Adds tests for API exception propagation and streaming user-exception propagation. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_async_chat_completions.py | Adds async variants of exception propagation tests; adjusts stream close behavior by mode. |
| instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md | Documents the chat stream wrapper refactor. |
| - When catching exceptions from the underlying library to record telemetry, always re-raise the | ||
| original exception unmodified. | ||
| - Do not raise new exceptions in instrumentation/telemetry code. | ||
| - For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper` |
There was a problem hiding this comment.
this belongs in ./instrumentation-genai/AGENTS.md, not in the root
Co-authored-by: Liudmila Molkova <neskazu@gmail.com>
|
|
||
|
|
||
| class ToolCallBuffer: | ||
| def __init__(self, index, tool_call_id, function_name): |
There was a problem hiding this comment.
If this is new code, can we add types?
| self.arguments = [] | ||
|
|
||
| def append_arguments(self, arguments): | ||
| if arguments is not None: |
There was a problem hiding this comment.
it doesn't look like arguments is ever None
| if getattr(chunk, "model", None): | ||
| self.response_model = chunk.model |
| arguments_str = "".join(tool_call.arguments) | ||
| if arguments_str: | ||
| try: | ||
| arguments = json.loads(arguments_str) | ||
| except json.JSONDecodeError: | ||
| arguments = arguments_str |
There was a problem hiding this comment.
Ik this code already existed, but shouldn't this join by ","?
| internally by the wrapper lifecycle and are not part of the public API. | ||
| """ | ||
|
|
||
| def __init__(self, stream: Any): |
There was a problem hiding this comment.
Can you type stream? it looks like Iterable[ChunkT] might work (or some other built in protocol). If not, can you define a new Protocol that describes all the operations it must support
| @abstractmethod | ||
| def _stop_stream(self) -> None: | ||
| """Finalize the stream successfully.""" |
There was a problem hiding this comment.
Is the subclass supposed to do the finalization of the wrapped stream? If these are hooks I'd recommend naming them _on_<event> instead.
| """ | ||
|
|
||
| def __init__(self, stream: Any): | ||
| self.stream = stream |
There was a problem hiding this comment.
Can this be prefixed with underscore? It's possible this can conflict with the public API of the wrapped object
| are owned by this base class. | ||
| """ | ||
|
|
||
| def __init__(self, stream: Any): |
There was a problem hiding this comment.
If this is new code, please add type annotations. It looks like some of it is copied/refactored which I'm OK to ignore to not bloat the scope of this PR.
| def __getattr__(self, name: str) -> Any: | ||
| return getattr(self.stream, name) |
There was a problem hiding this comment.
Also FYI this doesn't work for magic method function calls, so e.g. len() or str() will break. That's why we usually use wrapt, but I'm ok with this approach for now.
Description
This change introduces shared stream wrapper base classes in util-genai and updates the OpenAI v2 chat streaming wrappers to use that common lifecycle shape.
The new SyncStreamWrapper and AsyncStreamWrapper centralize stream iteration, close/context-manager handling, passthrough behavior, double-finalization protection, and failure handling. OpenAI chat streaming now uses dedicated ChatStreamWrapper and AsyncChatStreamWrapper classes that mirror the Messages and Responses wrapper structure while continuing to propagate SDK/API exceptions and user exceptions unchanged.
This also moves chat choice buffering into a separate chat_buffers.py module to avoid import cycles, switches the new chat path to LLMInvocation/InferenceInvocation lifecycle methods, and adds tests for stream wrapper behavior and sync/async chat exception propagation.
Fixes # (issue)
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Does This PR Require a Core Repo Change?
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.