Skip to content

fix: unawaited asyncio.Task cancellations #78

Merged
quettabit merged 3 commits into
mainfrom
qb/66
Jun 13, 2026
Merged

fix: unawaited asyncio.Task cancellations #78
quettabit merged 3 commits into
mainfrom
qb/66

Conversation

@quettabit

Copy link
Copy Markdown
Member

closes #66

@quettabit

Copy link
Copy Markdown
Member Author

@greptileai review pls

@greptile-apps

greptile-apps Bot commented Jun 13, 2026

Copy link
Copy Markdown

Greptile Summary

This PR fixes a class of bugs where asyncio.Task objects were cancelled via .cancel() but never awaited, leaving them as dangling tasks that could emit "Task was destroyed but it is pending" warnings and interfere with graceful shutdown. The fix is applied consistently across three production modules and the test suite.

  • _batching.py: next_record_task (formerly pending_next) is now properly cancelled and awaited in the finally block.
  • _producer.py: The linger task cancellation is extracted into _cancel_linger_task() which awaits the task; a new _flush_lock serialises concurrent calls to _submit_accumulated_records(); and a _final_flush_done flag prevents _drain_acks from exiting before close() has finished flushing.
  • _s2s/_append_session.py: Both next_msg_task and deadline_armed_waiter_task are awaited after cancellation, both inline in the loop body and in the finally block.

Confidence Score: 5/5

Safe to merge. All three cancellation sites are handled correctly, and the new flush lock and final-flush flag in the producer preserve the existing ordering guarantees.

The changes are surgical and well-reasoned. In _producer.py, a key concern was whether the linger task could be cancelled while it already held the flush lock mid-session.submit(), but this cannot occur: _flush_after_linger clears self._linger_task = None synchronously (no yield point) before calling _submit_accumulated_records(), so _cancel_linger_task() always sees None by the time the linger task has advanced past sleeping. The flush lock handles the remaining concurrency between a full-accumulator submit() and close() correctly. The _final_flush_done guard prevents _drain_acks from exiting before the flush completes. No record-loss or pending-future-leak paths were introduced.

No files require special attention. The most complex change is in _producer.py (flush lock + final-flush flag), which was analysed in depth.

Important Files Changed

Filename Overview
src/s2_sdk/_producer.py Adds _flush_lock to serialise concurrent flush calls, introduces _final_flush_done to gate _drain_acks exit, and properly awaits the cancelled linger task via _cancel_linger_task.
src/s2_sdk/_s2s/_append_session.py Converts Future variables to Task types and properly awaits both next_msg_task and deadline_armed_waiter_task after cancellation in the finally block and inline.
src/s2_sdk/_batching.py Renames pending_nextnext_record_task for clarity and properly awaits the cancelled task in the finally block, fixing the unawaited-cancel warning.
tests/test_stream_ops.py Properly awaits the cancelled append_later_task in the test finally block, mirroring the same fix applied in production code.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Producer
    participant LingerTask as _flush_after_linger (Task)
    participant FlushLock as _flush_lock
    participant Session as AppendSession
    participant DrainTask as _drain_acks (Task)

    note over LingerTask: linger timer fires
    LingerTask->>LingerTask: "self._linger_task = None"
    LingerTask->>FlushLock: async with _flush_lock (acquire)
    LingerTask->>Session: await session.submit(batch)

    Caller->>Producer: close()
    Producer->>Producer: "_closed = True"
    Producer->>Producer: _cancel_linger_task()
    note over Producer: self._linger_task is already None
    Producer->>FlushLock: _submit_accumulated_records() waits for lock

    Session-->>LingerTask: ticket returned
    LingerTask->>DrainTask: _batch_ready.set()
    LingerTask->>FlushLock: release lock

    Producer->>FlushLock: acquire lock
    note over Producer: accumulator is empty → return
    Producer->>FlushLock: release lock

    Producer->>Session: session.close()
    Producer->>Producer: "_final_flush_done = True"
    Producer->>DrainTask: _batch_ready.set()
    Producer->>DrainTask: await _drain_task
    DrainTask-->>Producer: done
    Producer-->>Caller: close() returns
Loading

Reviews (6): Last reviewed commit: "fix submit race" | Re-trigger Greptile

@quettabit

Copy link
Copy Markdown
Member Author

@greptileai pls review again

@quettabit quettabit force-pushed the qb/66 branch 2 times, most recently from 2e59053 to 38a165c Compare June 13, 2026 02:43
@quettabit quettabit marked this pull request as ready for review June 13, 2026 02:46
@quettabit quettabit requested a review from a team as a code owner June 13, 2026 02:46
@quettabit quettabit marked this pull request as draft June 13, 2026 03:22
@quettabit

Copy link
Copy Markdown
Member Author

@greptileai pls review again

@quettabit

Copy link
Copy Markdown
Member Author

@greptileai addressed your concerns. pls review again

@quettabit quettabit marked this pull request as ready for review June 13, 2026 03:55
@quettabit quettabit merged commit 4d4ac80 into main Jun 13, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Detail Bug] Batching: append_record_batches cancels a pending task without awaiting it

1 participant