perf(task-manager): use DoublyLinkedList + side-map for taskQueues and latestPendingOps#27416
perf(task-manager): use DoublyLinkedList + side-map for taskQueues and latestPendingOps#27416anthony-murphy wants to merge 4 commits into
Conversation
… and latestPendingOps
|
Hi! Thank you for opening this PR. Want me to review it? Based on the diff (700 lines, 3 files), I've queued these reviewers:
How this works
|
| * {@link ListNode} that holds that clientId in the corresponding queue. Enables O(1) lookup and removal of a | ||
| * client from a queue without scanning the list. Must be kept symmetric with `taskQueues` at every mutation. | ||
| */ | ||
| private readonly taskQueueNodes = new Map<string, Map<string, ListNode<string>>>(); |
There was a problem hiding this comment.
Deep Review: The new taskQueueNodes: Map<string, Map<string, ListNode<string>>> must stay byte-for-byte symmetric with taskQueues across seven mutation sites: addClientToQueue, removeClientFromQueue, the "complete" op handler, the isDetached() shortcut in subscribeToTask, replacePlaceholderInAllQueues, scrubClientsNotInQuorum, and loadCore. The invariant is enforced only by two runtime asserts at the iteration sites (around lines 856-858 and 883-885) and by a one-line comment on the field declaration saying "Must be kept symmetric with taskQueues at every mutation." Every future mutation site is a silent-corruption hazard — the PR's own risk section names this "the main long-term hazard … there is no encapsulating type that enforces this — every future mutation site must update both."
Two independent design proposals converged on the same mitigation, and it lines up with ChumpChief's stated preference (#25607, 2025-10-09: "I've started preferring merging these internal emitters to a single internalEvents member") for consolidating parallel state holders.
Suggested shape. Add a small private helper inside the package — e.g. IndexedClientQueue wrapping DoublyLinkedList<string> + Map<string, ListNode<string>> — exposing has / push / delete / first / last / length / replace / [Symbol.iterator]. Replace the two parallel maps on TaskManagerClass with a single Map<string, IndexedClientQueue>. Do the analogous wrap for latestPendingOps keyed by messageId — that composes cleanly with closing the O(p) reSubmitCore lookup. The two side-map asserts retire; the invariant becomes unrepresentable. No public API change; behavior preserved.
There was a problem hiding this comment.
Deep Review: Resolved on sha 8cc348d. The new IndexedList<K, V> (taskManager.ts:278-330) is exactly the wrapper shape this thread proposed — single private encapsulation of DoublyLinkedList + side-index Map, with has / push / insertAfter / deleteByKey / removeNode / first / last / length / getNode / [Symbol.iterator] / map. taskQueues is now Map<string, IndexedList<string, string>> and latestPendingOps is Map<string, IndexedList<number, IPendingOp>>; the parallel taskQueueNodes field and its two iteration-site asserts are gone. The list/index-sync invariant is structural rather than convention-driven. Safe to mark resolved.
Deep ReviewReviewed commit Readiness: 8/10 — ALMOST READY The structural refactor is sound and the previously-flagged two-map invariant is now encapsulated in Path to Ready
Context for Reviewers
For human reviewer
Suggested description## Description
Replaces array-backed pending-state in `TaskManager` with a private
`IndexedList<K, V>` wrapper (`taskManager.ts:73-90`) that pairs a
`DoublyLinkedList<V>` with a key→node `Map` and exposes `has` / `push` /
`insertAfter` / `deleteByKey` / `removeNode` / `first` / `last` /
`length` / `getNode` / `[Symbol.iterator]` / `map`:
- `taskQueues: Map<string, string[]>` → `Map<string, IndexedList<string, string>>`.
- `latestPendingOps: Map<string, IPendingOp[]>` → `Map<string, IndexedList<number, IPendingOp>>` keyed by `messageId`.
Every `indexOf` + `splice` pair in `taskManager.ts` becomes O(1):
- `removeClientFromQueue`: `clientQueue.deleteByKey(clientId)`.
- `replacePlaceholderInAllQueues`: `insertAfter` + `deleteByKey` on the
placeholder node (preserves queue position; node data is readonly so
in-place rekey isn't available).
- `reSubmitCore`: `pendingOps.getNode(localOpMetadata)` then
`removeNode(node)`, with the type assert preserved under tag `0xc43`.
Read sites preserve semantics: `clientQueue[0]` → `clientQueue.first?.data`;
`clientQueue.includes(clientId)` → `clientQueue.has(clientId)`; trailing
reads → `list.last?.data`. `summarizeCore` keeps the snapshot wire format
unchanged via `[...queue.map((node) => node.data)]`; `loadCore` rebuilds
an `IndexedList` per task and still calls `scrubClientsNotInQuorum()`
(load-bearing per #13169).
## Why
`TaskManager` ran multiple `indexOf(clientId)` + `splice` operations per
queue mutation — quadratic in queue depth. The `IndexedList` abstraction
eliminates those scans without changing observable behavior or wire
format, and collapses what would otherwise be two parallel maps into a
single encapsulated invariant (no separate `taskQueueNodes` field on
`TaskManagerClass`).
## Tests
- New `Queue ordering regressions` describe block in
`test/taskManager.spec.ts` (~5 tests) covering scrub, placeholder swap,
`reSubmitCore` sibling-order preservation, rollback, and summary
round-trip.
- New private-state helpers `readTaskQueue` / `getInternalTaskQueues` /
`getTaskQueueAsArray` / `getTaskQueueHead` migrated to the new
`IndexedList` shape.
- `assertEqualTaskManagers` in `test/fuzzUtils.ts` iterates the new
`IndexedList` via `Symbol.iterator` and compares `node.data`.
## Notes
Pure structural prep — no `reSubmitSquashed`, no squash logic, no
api-report changes. The disconnect/scrub paths
(`removeClientFromAllQueues`, `scrubClientsNotInQuorum`) still iterate
every task queue; a reverse `clientId → taskIds` index is a deliberate
follow-up (see issue #NNN).
Verified locally: `pnpm tsc`, `pnpm build:esnext`, `pnpm eslint` (src +
tests), and `pnpm test` (incl. fuzz with `assertEqualTaskManagers`
hook) all clean.Review history (3 prior reviews)
|
| * mutations to the list and the index cannot drift out of sync. The two structures were previously | ||
| * stored independently on {@link TaskManagerClass} and had to be kept byte-symmetric at every | ||
| * mutation site — an invariant only enforced by runtime asserts. By bundling them behind this | ||
| * private wrapper the invariant becomes unrepresentable. |
There was a problem hiding this comment.
Deep Review: The JSDoc here states the list and its index "were previously stored independently" and that queue-ordering invariants were previously guaranteed by side-by-side taskQueues + taskQueueIndex structures. That history doesn't exist. #13014 (scottn12, 2022-11-29) shows taskQueues as Map<string, string[]> with no separate taskQueueIndex field, and the PR description itself describes the side-lookup as new in this change. A grep of the package finds taskQueueIndex only in these new comments — never as a real implementation field, past or present.
Rewrite to describe the actual before/after: the old representation used a single Map<string, string[]> where order and membership lived in one string[]; the new IndexedList keeps a DoublyLinkedList and its key→node Map synchronized inside one class so the dual-structure invariant is structurally unrepresentable. The parallel taskQueues + taskQueueNodes shape described in the PR body was a midpoint design and never landed.
| describe("Queue ordering regressions", () => { | ||
| // These tests exercise invariants around queue ordering that were previously | ||
| // guaranteed by the side-by-side `taskQueues` + `taskQueueIndex` structures and | ||
| // are now guaranteed by the `IndexedList` wrapper. |
There was a problem hiding this comment.
Deep Review: Same "previously stored independently" framing as the JSDoc on IndexedList itself — the implied history (parallel taskQueues + taskQueueIndex fields) was never on main. Align this comment with the actual before-state (a single Map<string, string[]>) and the new invariant (list + key→node Map co-located inside IndexedList).
Description
Replaces array-backed pending-state in
TaskManager:taskQueues: Map<string, string[]>→Map<string, DoublyLinkedList<string>>, with a side-lookuptaskQueueNodes: Map<string, Map<string, ListNode<string>>>for O(1) clientId → node access.latestPendingOps: Map<string, IPendingOp[]>→Map<string, DoublyLinkedList<IPendingOp>>.Every
indexOf+splicepair intaskManager.tsbecomes O(1):removeClientFromQueue: lookup node via side map,list.remove(node).replacePlaceholderInAllQueues:insertAfter+removeon the placeholder node (preserves queue position; data is readonly so we can't reassign in place).reSubmitCore:pendingOps.find(...)thenlist.remove(node).Read sites preserve semantics:
clientQueue[0]→clientQueue.first?.data;clientQueue.includes(clientId)→clientNodes.has(clientId); trailing reads →list.last?.data.summarizeCorekeeps the snapshot wire format unchanged.Why
TaskManagerruns multipleindexOf(clientId)+spliceoperations per queue mutation today — quadratic in queue depth. The data-structure swap eliminates those scans without changing observable behavior or wire format. The side-lookuptaskQueueNodesmap is the only new state; it is populated/cleared in lockstep withtaskQueuesat every mutation site (insert, remove, complete, scrub, queue-empty cleanup).Notes
Pure structural prep — no
reSubmitSquashed, no squash logic, no tests touched, no api-report changes.Verified locally:
pnpm tsc,pnpm build:esnext, andpnpm eslint src/taskManager.tsall clean.