feat(container-runtime): bunch reSubmit dispatch across same-DDS runs#27429
feat(container-runtime): bunch reSubmit dispatch across same-DDS runs#27429anthony-murphy wants to merge 2 commits into
Conversation
Extend the bunched dispatch pattern from inbound processMessages to the outbound reSubmit path. ContainerRuntime.reSubmitBatch now collects runs of FluidDataStoreOp entries and dispatches them via a new ChannelCollection.reSubmitContainerMessages, which bunches by (address, ddsType) exactly like processChannelMessages. Adds @legacy @beta IRuntimeResubmitMessageCollection and optional reSubmitMessages methods on IFluidDataStoreChannel and IDeltaHandler. ChannelDeltaConnection fans out stashed-op metadata across bunch entries before dispatch. SharedObject provides a default reSubmitMessages that loops over existing reSubmitCore / reSubmitSquashedCore, so DDSes work unchanged; SharedTree / MergeTree can override in follow-up PRs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Hi! Thank you for opening this PR. Want me to review it? Based on the diff (741 lines, 25 files), I've queued these reviewers:
How this works
|
Three call sites (processChannelMessages, reSubmitContainerMessages on ChannelCollection, and FluidDataStoreRuntime.reSubmitMessages) had near-identical "run-length encode contiguous same-key items, flush each run to a sink" loops. Extract the bunching skeleton into a single @internal helper in runtime-utils that takes keyOf / valueOf / onBunch and an optional keysEqual predicate (defaulting to Object.is). Per-message side effects (delete checks, GC bookkeeping, detectOutboundReferences) stay at the call site and run before items enter the helper, preserving the existing semantics. Unit tests cover empty input, singletons, contiguous runs, alternating keys, structured keys, and the default Object.is behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🔗 No broken links found! ✅ Your attention to detail is admirable. linkcheck output |
| localOpMetadata: unknown; | ||
| }[] = []; | ||
|
|
||
| const flushBunch = (): void => { |
There was a problem hiding this comment.
can't we use the bunching helper here
There was a problem hiding this comment.
Deep Review: Good question — forEachContiguousBunch would parallel the channelCollection.ts:847-873 usage and complete the inbound/outbound symmetry the rest of the design rests on. Recommend landing the conversion here rather than as a follow-up, since reSubmitBatch's ad-hoc accumulation is the one place in the outbound path that doesn't yet route through the shared helper.
Deep ReviewReviewed commit Readiness: 7/10 — ALMOST READY No correctness or security blockers. The outbound Path to Ready
Context for Reviewers
For human reviewer
|
| * All messages in a resubmit collection share the same target — that is, they are | ||
| * for the same DDS — and share a single `squash` setting. This mirrors the inbound | ||
| * "bunch" shape of {@link IRuntimeMessageCollection} for the outbound resubmit path, | ||
| * allowing DDSes to handle a contiguous run of resubmits in one call. |
There was a problem hiding this comment.
Deep Review: IRuntimeResubmitMessageCollection remarks overstate the same-DDS guarantee at this layer.
The remarks here (and the matching IFluidDataStoreChannel.reSubmitMessages doc at dataStoreContext.ts:484-497) say all messages "are for the same DDS" / "Ask the DDS". But grouping happens at two distinct layers:
channelCollection.ts:847-873groups by{ address: e.envelope.address, type: e.envelope.contents.type }(i.e.(address, ddsType)) before callingcontext.reSubmitMessages(key.type, { squash, messages }).dataStoreRuntime.ts:1418-1454then re-splits ChannelOp bunches by channel address before delegating tochannelContext.reSubmitMessages.
So at the IFluidDataStoreChannel boundary (where this doc lives) the collection is grouped by (address, ddsType) within one data store — the per-DDS run is established downstream by FluidDataStoreRuntime before reaching IDeltaHandler.reSubmitMessages.
Recommend rewriting both doc sites to scope the guarantee to the correct layer. Prevents misuse by third-party IFluidDataStoreChannel implementers that read the contract literally.
| * The messages to resubmit, in original submission order. | ||
| */ | ||
| readonly messages: readonly IRuntimeResubmitMessage[]; | ||
| } |
There was a problem hiding this comment.
Deep Review: Bunched-shape naming diverges from the inbound IRuntimeMessageCollection precedent.
Inbound shape (protocol.ts:129-142): IRuntimeMessageCollection { envelope; messagesContent: readonly IRuntimeMessagesContent[]; local; }.
Outbound shape here: IRuntimeResubmitMessageCollection { squash; messages: readonly IRuntimeResubmitMessage[]; } — items field is messages (not messagesContent), per-item type is IRuntimeResubmitMessage (not …MessagesContent). The doc comment claims this "mirrors the inbound IRuntimeMessageCollection" but the field/type names don't.
This matters because these are @sealed @legacy @beta types and renaming after consumers adopt is a breaking change. vladsud relitigated the inbound naming convention on #22839 and produced the current convention — worth their sign-off here before merge.
Two paths:
- Align outbound naming with the inbound precedent (
messagesContent/IRuntimeResubmitMessagesContent), or - Keep the divergence (defensible: outbound entries genuinely lack
clientSequenceNumber/envelope/localand carrylocalOpMetadata+ collection-levelsquash) and update the "mirrorsIRuntimeMessageCollection" doc to call out the intentional semantic distinction.
| }); | ||
|
|
||
| it("discardChanges drops staged ops", () => { | ||
| const channelCollectionStub = stubChannelCollection(containerRuntime); |
There was a problem hiding this comment.
Deep Review: No direct test for ChannelCollection.reSubmitContainerMessages (address, type) sub-bunching.
The new test "commitChanges splits bunches when the data store address changes" asserts only that channelCollection.reSubmitContainerMessages is called once carrying all five entries in submission order (["ds1","ds1","ds2","ds1","ds1"]) — the stub short-circuits back to containerRuntime.submitMessage, so the (address, type) sub-bunching at channelCollection.ts:847-873 is not actually exercised in the integration path.
It's covered indirectly via forEachContiguousBunch unit tests (bunching.spec.ts:33-67, 90-109), but nothing exercises the ChannelCollection-layer split, and nothing exercises the case where two same-address entries differ on the inner FluidDataStoreMessage.type (uncovered at any layer).
Recommend a direct unit test against ChannelCollection.reSubmitContainerMessages (or via a richer stub) asserting:
- (a) contiguous same-
(address, type)runs collapse into onecontext.reSubmitMessagescall, - (b) address transitions split,
- (c) same-address entries with different inner
FluidDataStoreMessage.typealso split.
The (address, type) tuple choice is this layer's central new behavior; the type-change case especially deserves a guard.
| const expandedCollection: IRuntimeResubmitMessageCollection = { | ||
| squash: collection.squash, | ||
| messages: flattened, | ||
| }; |
There was a problem hiding this comment.
Deep Review: Stashed-op expansion makes the bunch size invariant implicit.
This loop runs processWithStashedOpMetadataHandling over every input entry and pushes into flattened, so a collection.messages.length === N call from above can fan out to expandedCollection.messages.length !== N before reaching handler.reSubmitMessages. The inline comment captures the symmetry intent ("mirroring how processMessages expands messagesContent"), but the IDeltaHandler.reSubmitMessages interface contract over in datastore-definitions/src/channel.ts is silent on whether observed bunch.length corresponds 1:1 to the entries the caller passed in.
abe27342 raised this same fan-out asymmetry on #19518 (channelDeltaConnection.ts line 111) for the inbound path; the bunched outbound interface should document it explicitly so SharedTree/MergeTree don't write overrides that assume 1:1.
Recommend adding a doc comment on IDeltaHandler.reSubmitMessages (and the bunched IFluidDataStoreChannel.reSubmitMessages) clarifying that stashed-op expansion may make the observed bunch length differ from the caller's input length, and that bunch.length is post-expansion, not 1:1 with caller pending entries. Three-line fix that closes the contract footgun before consumers start implementing the override.
Description
Extend the bunched dispatch pattern from inbound
processMessagesto the outboundreSubmitpath. On reconnect or staging-exit, a batch of N contiguous ops targeting the same DDS now makes one trip through ContainerRuntime → ChannelCollection → DataStoreContext → DataStoreRuntime → ChannelContext → DDS instead of N. This mirrors the existing inbound bunching (ChannelCollection.processChannelMessages) and gives DDSes that can resubmit a contiguous run efficiently (e.g. SharedTree, MergeTree in follow-ups) the same opportunity on the outbound side.New
@legacy @betasurface, additive and backwards compatible:IRuntimeResubmitMessage,IRuntimeResubmitMessageCollection(runtime-definitions/src/protocol.ts) — the bunched envelope, with a sharedsquashflag.IFluidDataStoreChannel.reSubmitMessages(type, collection)(runtime-definitions) — bunched alongside the existingreSubmit.IDeltaHandler.reSubmitMessages(collection)(datastore-definitions) — bunched alongside the existingreSubmit.ContainerRuntime.reSubmitBatchnow collects runs ofFluidDataStoreOpentries and dispatches each run to a newChannelCollection.reSubmitContainerMessages, which sub-bunches by(address, ddsType)exactly likeprocessChannelMessages. Non-FluidDataStoreOptypes (Attach, Alias, GC, IdAllocation, BlobAttach, Rejoin, DocumentSchemaChange) continue to use the existing single-opreSubmitpath.ChannelDeltaConnection.reSubmitMessagesfans out stashed-op metadata pairs across bunch entries before dispatching, mirroringgetContentsWithStashedOpHandlingforprocessMessages.DDSes that don't implement
reSubmitMessagesautomatically fall back to per-messagereSubmit.SharedObjectprovides a default that loops over existingreSubmitCore/reSubmitSquashedCore, so all current DDSes keep working unchanged. Individual DDSes (SharedTree, MergeTree) can override in follow-up PRs to take advantage of seeing the full run together.Out of scope per scope decision: bunching for
rollback(constrained by reverse-order checkpoint API) andapplyStashedOp(async per-message return tangles with PendingStateManager realization loop), and DDS-side overrides ofreSubmitMessagesfor SharedTree/MergeTree.Reviewer Guidance
The review process is outlined on this wiki page.
ChannelCollection.reSubmitContainerMessagesand the(address, ddsType)key intentionally mirrorsprocessChannelMessages— worth confirming the symmetry is what we want.ChannelDeltaConnection.reSubmitMessagesflattens stashed-op metadata across entries before dispatching to the handler. The behavior should matchprocessMessages+getContentsWithStashedOpHandling.containerRuntime.spec.ts(Staging Mode describe) exercise contiguous-same-DDS bunching, address-change splits, andsquashpropagation. Full container-runtime suite (968 tests) passes locally.reSubmitMessages, so observable behavior is unchanged but the plumbing exercises the new path throughSharedObject's default loop.