perf(metrics): batch tokenization with defer-to-flush drain#350
Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request replaces the thread-based TokenizePool with a process-sharded BatchTokenizer and a TokenBatchQueue to buffer and batch tokenization work (ISL/OSL/TPOT) during metrics aggregation, preventing the system from falling behind on high-throughput runs. The review feedback highlights critical reliability improvements in token_metrics.py. Specifically, it is recommended to wrap the queue's flush logic in a try...finally block to prevent self._inflight from leaking on exceptions or cancellations. Additionally, count_texts and count_texts_async should explicitly check if the tokenizer is closed, and close() should wait for process pools to shut down to avoid resource leaks.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
39b4a9b to
c1d2cb7
Compare
b1395ab to
1e502c5
Compare
nvzhihanj
left a comment
There was a problem hiding this comment.
Review Council — re-audit after rebase (HEAD 4633699d)
Reviewed by: Claude + Codex (low reasoning, correctness pass) · Depth: thorough. Focus (as requested): is the metrics-tokenization change modular / clean / non-intrusive (existing benchmark behavior preserved), and are redundant/meaningless tests added.
Verdict: the rebase reduced the intrusiveness but did not resolve it. Replacing the pre_publish hook (full tokenizer pool fired from the publish tick) with a bounded single-shard live lane is a real improvement. But mid-run tokenization is still on by default (--live-tokenizers defaults to 1 → 0.25s live flush), the live shard runs on the highest core block which overlaps the HTTP workers (compute_affinity_plan Phase-3 spillover), and the PR removed the only opt-out (metrics_tokenizer_workers) without replacement — so the observability component can perturb the SUT during measurement and the operator can't turn it off via config. Headline recommendation: default --live-tokenizers 0 for measurement-grade runs (defer all tokenization to the post-run drain), or confine the live shard to cores disjoint from worker_cpu_sets; restore a benchmark-reachable knob. (A1, A2, A3.)
Otherwise clean / non-intrusive. The change stays in the aggregator subprocess (only cross-module touch: importing endpoint_client.cpu_affinity). The consumer contract is verified intact — SessionState, the MetricsSnapshot schema, publisher cadence, and the state==COMPLETE && n_pending_tasks>0 incomplete-drain signal are unchanged; flush_remaining is bounded by the drain budget and never raises; the live-loop's failure cannot skip publish_final. The "shard or exit cleanly" fallback and the unpinned-without-affinity (macOS) path are correct and tested.
Tests: no redundant or meaningless tests. The new branches are mostly well covered with behavior-grounded assertions (the _setup_shards decision matrix, no-fast-backend-exit, unpinned-without-affinity, warmup-failure-exit, flush_remaining timeout/failure, live-loop start/stop/survives-failure, expand_to_all_online_cpus). Removing the old metrics_tokenizer_workers tests was correct (dead). The problems are coverage gaps, not redundancy: the aggregator-side start_live wiring is untested (A5) and TestAggregatorArgs no longer pins the forwarded-args contract (A6). Two _FakeProc-injection tests are borderline-coupled to internals but still verify fan-out/reassembly; TestEvenChunks is trivial-but-cheap. No mock-only or duplicate tests found.
Codex findings — not posted: (1) a multi-turn-ISL precompute regression at execute.py:351 — that's PR #349's change, out of scope here; (2) a shutdown(wait=False) worker-terminate race — _terminate_procs already defensively handles _processes is None and CPython doesn't synchronously null it, so the specific mechanism couldn't be verified → dropped. Existing gemini/github-code-quality token_metrics.py comments (flush-exception inflight; closed-tokenizer guards; close() shutdown leak; Protocol ...→pass) are unaddressed but deduped here, not re-posted.
8f547af to
f1ac948
Compare
nv-alicheng
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by: Codex + Claude | Depth: thorough
Posted 9 inline findings. Well-engineered PR — no critical/high defects; the drain/flush hot path is largely correct. Findings are condition-gated edge cases + polish. See summary comment for the tiered breakdown + one untested-path note that couldn't be posted inline (handler code unchanged in the diff).
Review Council — Multi-AI Code ReviewReviewed by: Codex + Claude | Depth: thorough 9 inline findings across 4 files. No critical/high defects — the defer-to-flush hot path is largely correct: the batch is detached before tokenization, drain failures are terminal-and-pending-only (no double-count, no silent loss on the normal paths), 🟡 Should Fix (medium)
🔵 Consider (low)
Not posted inline (handler code unchanged in the diff, so no valid inline anchor):
|
Design note — scope over architectureIndependent of the line-level findings already posted, a framing for the rework as a whole. The architecture here is forced by the constraints and the PR gets it right: tokenize is heavy CPU work that must not block the loop, must not contend with the loadgen mid-run, must keep up at 50k+ QPS, and must yield exact final numbers. GIL ⇒ real parallelism needs processes; a single BPE rayon pool saturates ~8 cores; the work is only needed for metrics ⇒ it can defer past the run. Those facts make defer-to-flush batching + process-sharding pinned to disjoint core blocks essentially the only answer, and I'd converge on the same — along with no-silent-fallback and the exact-or-flagged Several subtle calls are better than a first pass would make: the sharding-by-core-block thesis is measured (16k vs 1.5k texts/s), the warmup is a bounded startup error that races the launch budget, shards stay idle until The one theme worth weighing is scope, not structure: the live in-process tokenization lane roughly doubles the edge-case surface and accounts for most of the review findings (live-cancel, re-queue-on-failure, the live thread-pool blocking teardown, the bounded per-flush cap). The authoritative metrics are computed at the drain regardless; the live lane exists only to keep token metrics current in the TUI mid-run, and So if live token metrics are not a hard requirement, a KISS v1 would be drain-only, adding the live lane later if operators ask for live OSL. That removes the
None of this is a rework — the bones are right and more thoroughly justified than most perf PRs. It's about trimming optional scope out of v1 and making the two trickiest spots (affinity, the dual-phase flush) simpler. (Independent design review by Claude at a maintainer's request — not a re-run of the automated council above.) |
…e hardening Addresses open review threads on #350: - drain-timeout default 300s -> 0 (unlimited) per maintainer review; an incomplete drain is already flagged via n_pending_tasks, never silent. Single-sourced in schema; --drain-timeout / --tokenizer-workers gain service-side defaults (0 / 2) so the service is hand-launchable, with the benchmark still forwarding schema values as the source of truth. - cpu_affinity.expand_to_all_online_cpus: when sysfs `online` is unreadable/filtered (some containers), widen against all logical CPUs instead of silently leaking the narrow inherited mask (which starved the shard pool on large runs). Kernel clamps to the cgroup cpuset. - drain text phase: isolate a wrong-length tokenizer result as a phase failure so the message phase still runs (was zip(strict=True) raising out of flush()). - BatchTokenizer.close(): cancel_futures on the live thread pool so only an in-flight encode (bounded) is waited on at teardown. - _terminate_procs: log loud if CPython's private _processes attr is missing rather than silent no-op. - aggregator drain log: success line is now an else-branch of the incomplete-drain warning; group drain_timeout_s with defaulted kwargs. - docs/comments: TPOT excludes TTFT; empty-output is not an anomaly; trust_remote_code rationale; flush_remaining CancelledError contract; DESIGN.md INTERRUPTED reachable from any state + worker SIGINT/interrupt path; align TokenCounter Protocol positional-only markers. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
ISL/OSL/TPOT need a tokenizer pass per completion. The old path dispatched one asyncio task per event into a 2-thread pool; at high completion rates the backlog grew unbounded and the end-of-run drain took ~an hour per million samples. This batches: triggers enqueue O(1); a small in-process live lane keeps live metrics current; the end-of-run drain tokenizes everything left through a process-sharded pool (one worker pinned per 8-core block). - BatchTokenizer: sharded drain via encode_batch_fast (Rust/rayon), auto-sized to the allowed CPU universe; the aggregator's inherited mask is restored. No silent fallbacks — a tokenizer without a fast backend, or a failed/over-budget warmup, is a clean startup error. macOS shards unpinned. - TokenBatchQueue: buffers (text, on_count) per event; live failures requeue (no sample loss); drain failures stay counted in n_pending_tasks (state == complete && n_pending_tasks > 0 = incomplete drain). - Drain budget --drain-timeout (schema default 0 = unlimited; never exits while samples are still pending). --tokenizer-workers (default 2; 0 = defer all). Review hardening (PR #350): cpu_affinity widens against all logical CPUs when sysfs `online` is unreadable; drain isolates wrong-length tokenizer results so the message phase still runs; close() drops queued live encodes; _terminate_procs snapshots worker handles before shutdown() nulls _processes; TPOT docstring notes TTFT exclusion; Protocol positional-only markers aligned. Rebased onto main: reconciled with #372 (use_legacy_loadgen_qps_metrics) — the ENDED drain finalize and the SIGTERM handler both refresh the legacy_loadgen_window_duration_ns counter alongside the token-queue drain. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
bd20315 to
89a065a
Compare
ISL/OSL/TPOT need a tokenizer pass per completion. The old path dispatched one asyncio task per event into a 2-thread pool; at high completion rates the backlog grew unbounded and the end-of-run drain took ~an hour per million samples. This batches: triggers enqueue O(1); a small in-process live lane keeps live metrics current; the end-of-run drain tokenizes everything left through a process-sharded pool (one worker pinned per 8-core block). - BatchTokenizer: sharded drain via encode_batch_fast (Rust/rayon), auto-sized to the allowed CPU universe; the aggregator's inherited mask is restored. No silent fallbacks — a tokenizer without a fast backend, or a failed/over-budget warmup, is a clean startup error. macOS shards unpinned. - TokenBatchQueue: buffers (text, on_count) per event; live failures requeue (no sample loss); drain failures stay counted in n_pending_tasks (state == complete && n_pending_tasks > 0 = incomplete drain). - Drain budget --drain-timeout (schema default 0 = unlimited; never exits while samples are still pending). --tokenizer-workers (default 2; 0 = defer all). Review hardening (PR #350): cpu_affinity widens against all logical CPUs when sysfs `online` is unreadable; drain isolates wrong-length tokenizer results so the message phase still runs; close() drops queued live encodes; TPOT docstring notes TTFT exclusion; Protocol positional-only markers aligned. Simplification pass: drain flatten uses itertools.chain.from_iterable (1.44x); worker termination uses public multiprocessing.active_children() instead of ProcessPoolExecutor._processes (also catches init-hung workers); the shard-core probe-and-restore moved to cpu_affinity.cgroup_clamped_cpus() so building a tokenizer no longer mutates the aggregator's own mask; flush(live=bool) split into intent-named flush_live_once()/drain_all() over a private _flush; dead _live_workers field removed. Rebased onto main: reconciled with #372 (use_legacy_loadgen_qps_metrics) — the ENDED drain finalize and the SIGTERM handler both refresh the legacy_loadgen_window_duration_ns counter alongside the token-queue drain. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
89a065a to
f35ee08
Compare
ISL/OSL/TPOT need a tokenizer pass per completion. The old path dispatched one asyncio task per event into a 2-thread pool; at high completion rates the backlog grew unbounded and the end-of-run drain took ~an hour per million samples. This batches: triggers enqueue O(1); a small in-process live lane keeps live metrics current; the end-of-run drain tokenizes everything left through a process-sharded pool (one worker pinned per 8-core block). - BatchTokenizer: sharded drain via encode_batch_fast (Rust/rayon), auto-sized to the allowed CPU universe; the aggregator's inherited mask is restored. No silent fallbacks — a tokenizer without a fast backend, or a failed/over-budget warmup, is a clean startup error. macOS shards unpinned. - TokenBatchQueue: buffers (text, on_count) per event; live failures requeue (no sample loss); drain failures stay counted in n_pending_tasks (state == complete && n_pending_tasks > 0 = incomplete drain). - Drain budget --drain-timeout (schema default 0 = unlimited; never exits while samples are still pending). --tokenizer-workers (default 2; 0 = defer all). Review hardening (PR #350): cpu_affinity widens against all logical CPUs when sysfs `online` is unreadable; drain isolates wrong-length tokenizer results so the message phase still runs; close() drops queued live encodes; TPOT docstring notes TTFT exclusion; Protocol positional-only markers aligned. Simplification pass: drain flatten uses itertools.chain.from_iterable (1.44x); worker termination uses public multiprocessing.active_children() instead of ProcessPoolExecutor._processes (also catches init-hung workers); the shard-core probe-and-restore moved to cpu_affinity.cgroup_clamped_cpus() so building a tokenizer no longer mutates the aggregator's own mask; flush(live=bool) split into intent-named flush_live_once()/drain_all() over a private _flush; dead _live_workers field removed. Rebased onto main: reconciled with #372 (use_legacy_loadgen_qps_metrics) — the ENDED drain finalize and the SIGTERM handler both refresh the legacy_loadgen_window_duration_ns counter alongside the token-queue drain. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
f35ee08 to
d70afe3
Compare
What
ISL/OSL/TPOT need a tokenizer pass per completion.
maindispatches oneasyncio task per event into a 2-thread pool — at high completion rates the
backlog grows unboundedly and the end-of-run drain takes ~an hour per million
samples. This PR batches: triggers enqueue O(1); a small live lane keeps live
metrics current; the end-of-run drain tokenizes everything left through a
process-sharded pool that uses the whole machine.
How
BatchTokenizer— the drain runsencode_batch_fast(Rust, rayon)across auto-sized worker processes, one pinned per 8-core block of the
allowed CPU universe (probed via
expand_to_all_online_cpus(), then theaggregator's inherited mask is restored — the service stays wherever
the parent placed it). No silent fallbacks: a tokenizer without a fast
backend, or a failed/over-budget warmup, is a clean startup error. macOS
shards unpinned (rayon capped per worker) at full speed.
--metrics-tokenizer-workers, schemadefault 2, the pre-existing knob and footprint;
0= defer everything tothe drain), rayon-capped, slice-capped per flush. Owned by the queue
(
start_live); the publisher knows nothing about tokenization.TokenBatchQueue— buffers(text, on_count)per event; livefailures/cancellations re-queue items (no sample loss), drain failures are
terminal and stay counted in
n_pending_tasks(incomplete-drain contract:state == complete && n_pending_tasks > 0). Drain budget--drain-timeout(default 60 s,
0= unlimited); finalize always runs.MetricsTableis fully synchronous;CORES_PER_WORKERis a moduleconstant. Defaults are single-sourced in
config/schema.py(
metrics_drain_timeout_s60 s,metrics_tokenizer_workers2); theservice args are required and always forwarded by the benchmark.
Validation
drain timeout/failure, live requeue, RAYON caps, wiring seams);
pre-commitclean. Offline-burst e2e:state=complete, all seriespopulated, drain to
n_pending_tasks=0.48-core x86 host and a 144-core GB200): the drain shards span the machine
while the aggregator keeps its inherited mask.
Tokenizer micro-benchmark (GB200, real DeepSeek-R1 tokenizer)
144-core Grace, corpus = MLPerf DS-R1 prompts tiled to the dataset-mean OSL
of 3877 tokens; identical token counts both sides.
main1M-sample end-to-end A/B vs
mainOffline 1M samples, streaming, DS-R1 tokenizer, server-paced at 8k QPS with
~1k-token outputs. Both sides: 1,000,000/1,000,000,
state=complete,n_pending_tasks=0, identical token series.ENDEDmainmain🤖 Generated with Claude Code