Conversation
|
@greptileai review pls |
Greptile SummaryThis PR fixes a class of bugs where
Confidence Score: 5/5Safe to merge. All three cancellation sites are handled correctly, and the new flush lock and final-flush flag in the producer preserve the existing ordering guarantees. The changes are surgical and well-reasoned. In No files require special attention. The most complex change is in Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant Producer
participant LingerTask as _flush_after_linger (Task)
participant FlushLock as _flush_lock
participant Session as AppendSession
participant DrainTask as _drain_acks (Task)
note over LingerTask: linger timer fires
LingerTask->>LingerTask: "self._linger_task = None"
LingerTask->>FlushLock: async with _flush_lock (acquire)
LingerTask->>Session: await session.submit(batch)
Caller->>Producer: close()
Producer->>Producer: "_closed = True"
Producer->>Producer: _cancel_linger_task()
note over Producer: self._linger_task is already None
Producer->>FlushLock: _submit_accumulated_records() waits for lock
Session-->>LingerTask: ticket returned
LingerTask->>DrainTask: _batch_ready.set()
LingerTask->>FlushLock: release lock
Producer->>FlushLock: acquire lock
note over Producer: accumulator is empty → return
Producer->>FlushLock: release lock
Producer->>Session: session.close()
Producer->>Producer: "_final_flush_done = True"
Producer->>DrainTask: _batch_ready.set()
Producer->>DrainTask: await _drain_task
DrainTask-->>Producer: done
Producer-->>Caller: close() returns
Reviews (6): Last reviewed commit: "fix submit race" | Re-trigger Greptile |
|
@greptileai pls review again |
2e59053 to
38a165c
Compare
|
@greptileai pls review again |
|
@greptileai addressed your concerns. pls review again |
closes #66