fix(observe): preserve asyncio.timeout awareness in async generator wrapper#1690
Open
goingforstudying-ctrl wants to merge 2 commits into
Open
Conversation
6d0989d to
9d12bbc
Compare
…rapper The _ContextPreservedAsyncGeneratorWrapper was creating a fresh asyncio.Task for every __anext__ call (via asyncio.create_task( context=...)). This changed the task identity between generator iterations, which silently broke asyncio.timeout / asyncio.timeout_at because those context managers bind to asyncio.current_task() at enter time and the bound task is already done by the next iteration. Replace the asyncio.create_task wrapping with a token-based approach that sets preserved contextvars directly on the current task. Context vars are task-local so they survive generator suspension points without changing the task identity. The same pattern is applied to aclose() for consistency. Closes langfuse/langfuse#13349
…r trade-off - Assert span.ended == 1 after TimeoutError in test_async_generator_wrapper_respects_asyncio_timeout - Expand docstring on _ContextPreservedAsyncGeneratorWrapper to note that context-var mutations across yield points are discarded between iterations
0d4e480 to
c486887
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Ran into an issue where
asyncio.timeoutbecomes a no-op inside an async generator decorated with@observe. Turns out the_ContextPreservedAsyncGeneratorWrapperwraps every__anext__call in a freshasyncio.Taskto preserve contextvars — but that changes the task identity between iterations.asyncio.timeoutbinds toasyncio.current_task()at enter time, so it ends up tracking a task that's already completed by the time the deadline fires.The fix replaces
asyncio.create_task(coro, context=self.context)with a token-based approach: set the preserved contextvars directly on the current task before calling__anext__, reset them after the call returns. Context vars are task-local, so they survive generator suspension points without changing the task identity. Same treatment foraclose().Added two tests:
asyncio.timeoutworks normally when the generator finishes before the deadline.TimeoutError(without the fix it would hang forever).Let me know if there's a different approach you'd prefer — I considered a few alternatives (persistent task with a queue, re-entering the timeout each iteration) but the token approach seemed cleanest since it doesn't introduce any new task machinery.
Closes #13349
Greptile Summary
This PR fixes
asyncio.timeoutbecoming a no-op inside async generators wrapped by_ContextPreservedAsyncGeneratorWrapper. The previous implementation created a freshasyncio.Taskper__anext__call (to run the generator in a preserved context), which brokeasyncio.timeoutbecause the context manager binds toasyncio.current_task()at entry and the freshly created task is already gone by the time the deadline fires.__anext__andaclosenow apply the preserved context vars directly to the current task usingvar.set(val)/var.reset(token)token pairs, and then restore the originals in afinallyblock — no new task is created, so task identity is preserved andasyncio.timeoutworks correctly.self.contextis re-applied fresh each call and then reset). This is benign for Langfuse's tracing use case but differs from the old semantics.Confidence Score: 4/5
The change is safe to merge. The token-reset approach is correct for Python 3.10+ and fixes a real observable hang. The only gap is that the timeout test omits a span-finalization assertion, leaving that path unverified.
The implementation correctly resolves the task-identity problem and handles all exception paths (StopAsyncIteration, CancelledError, general exceptions) with proper token cleanup. One test is missing a span.ended assertion after timeout, and the changed context-var mutation semantics across iterations could surprise future callers — neither rises to a blocking issue, but both are worth addressing before merge.
tests/unit/test_observe.py — the second new test should assert span.ended to confirm finalization on timeout.
Sequence Diagram
sequenceDiagram participant Caller as Caller task participant Wrapper as _ContextPreservedAsyncGeneratorWrapper participant Gen as Async Generator participant EL as Event Loop Note over Caller: asyncio.timeout(T) binds to Caller task Caller->>Wrapper: async for item in wrapper (→ __anext__) Wrapper->>Wrapper: var.set(val) for each var in self.context snapshot Wrapper->>Gen: await generator.__anext__() Gen-->>EL: suspend at await asyncio.sleep(...) EL-->>Gen: resume after sleep Gen-->>Wrapper: yield item Wrapper->>Wrapper: var.reset(token) [restore previous values] Wrapper-->>Caller: return item Note over Caller,Wrapper: On timeout — CancelledError raised into Caller task Caller->>Wrapper: __anext__() [CancelledError injected] Wrapper->>Wrapper: var.set(val) for each var in self.context snapshot Wrapper->>Gen: await generator.__anext__() → CancelledError raised Wrapper->>Wrapper: finally: var.reset(token) Wrapper->>Wrapper: _finalize_with_error(CancelledError) Wrapper-->>Caller: re-raise CancelledError → asyncio.timeout converts to TimeoutErrorPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(observe): preserve asyncio.timeout a..." | Re-trigger Greptile