Skip to content

feat(container-runtime): bunch reSubmit dispatch across same-DDS runs#27429

Draft
anthony-murphy wants to merge 2 commits into
microsoft:mainfrom
anthony-murphy:batch-op-dispatch
Draft

feat(container-runtime): bunch reSubmit dispatch across same-DDS runs#27429
anthony-murphy wants to merge 2 commits into
microsoft:mainfrom
anthony-murphy:batch-op-dispatch

Conversation

@anthony-murphy
Copy link
Copy Markdown
Contributor

Description

Extend the bunched dispatch pattern from inbound processMessages to the outbound reSubmit path. On reconnect or staging-exit, a batch of N contiguous ops targeting the same DDS now makes one trip through ContainerRuntime → ChannelCollection → DataStoreContext → DataStoreRuntime → ChannelContext → DDS instead of N. This mirrors the existing inbound bunching (ChannelCollection.processChannelMessages) and gives DDSes that can resubmit a contiguous run efficiently (e.g. SharedTree, MergeTree in follow-ups) the same opportunity on the outbound side.

New @legacy @beta surface, additive and backwards compatible:

  • IRuntimeResubmitMessage, IRuntimeResubmitMessageCollection (runtime-definitions/src/protocol.ts) — the bunched envelope, with a shared squash flag.
  • Optional IFluidDataStoreChannel.reSubmitMessages(type, collection) (runtime-definitions) — bunched alongside the existing reSubmit.
  • Optional IDeltaHandler.reSubmitMessages(collection) (datastore-definitions) — bunched alongside the existing reSubmit.

ContainerRuntime.reSubmitBatch now collects runs of FluidDataStoreOp entries and dispatches each run to a new ChannelCollection.reSubmitContainerMessages, which sub-bunches by (address, ddsType) exactly like processChannelMessages. Non-FluidDataStoreOp types (Attach, Alias, GC, IdAllocation, BlobAttach, Rejoin, DocumentSchemaChange) continue to use the existing single-op reSubmit path. ChannelDeltaConnection.reSubmitMessages fans out stashed-op metadata pairs across bunch entries before dispatching, mirroring getContentsWithStashedOpHandling for processMessages.

DDSes that don't implement reSubmitMessages automatically fall back to per-message reSubmit. SharedObject provides a default that loops over existing reSubmitCore / reSubmitSquashedCore, so all current DDSes keep working unchanged. Individual DDSes (SharedTree, MergeTree) can override in follow-up PRs to take advantage of seeing the full run together.

Out of scope per scope decision: bunching for rollback (constrained by reverse-order checkpoint API) and applyStashedOp (async per-message return tangles with PendingStateManager realization loop), and DDS-side overrides of reSubmitMessages for SharedTree/MergeTree.

Reviewer Guidance

The review process is outlined on this wiki page.

  • The bunching loop in ChannelCollection.reSubmitContainerMessages and the (address, ddsType) key intentionally mirrors processChannelMessages — worth confirming the symmetry is what we want.
  • ChannelDeltaConnection.reSubmitMessages flattens stashed-op metadata across entries before dispatching to the handler. The behavior should match processMessages + getContentsWithStashedOpHandling.
  • New tests in containerRuntime.spec.ts (Staging Mode describe) exercise contiguous-same-DDS bunching, address-change splits, and squash propagation. Full container-runtime suite (968 tests) passes locally.
  • Draft pending deep-review sign-off — no DDS currently overrides reSubmitMessages, so observable behavior is unchanged but the plumbing exercises the new path through SharedObject's default loop.

Extend the bunched dispatch pattern from inbound processMessages to the
outbound reSubmit path. ContainerRuntime.reSubmitBatch now collects runs
of FluidDataStoreOp entries and dispatches them via a new
ChannelCollection.reSubmitContainerMessages, which bunches by
(address, ddsType) exactly like processChannelMessages.

Adds @legacy @beta IRuntimeResubmitMessageCollection and optional
reSubmitMessages methods on IFluidDataStoreChannel and IDeltaHandler.
ChannelDeltaConnection fans out stashed-op metadata across bunch entries
before dispatch. SharedObject provides a default reSubmitMessages that
loops over existing reSubmitCore / reSubmitSquashedCore, so DDSes work
unchanged; SharedTree / MergeTree can override in follow-up PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

Hi! Thank you for opening this PR. Want me to review it?

Based on the diff (741 lines, 25 files), I've queued these reviewers:

  • Correctness — logic errors, race conditions, lifecycle issues
  • Security — vulnerabilities, secret exposure, injection
  • API Compatibility — breaking changes, release tags, type design
  • Performance — algorithmic regressions, memory leaks
  • Testing — coverage gaps, hollow tests

How this works

  • Adjust the reviewer set by ticking/unticking boxes above. Reviewer toggles alone don't trigger anything.

  • Tick Start review below to dispatch the review fleet.

  • After review finishes, tick Start review again to request another run — it auto-resets after each dispatch.

  • This comment updates as new commits land; your reviewer selections are preserved.

  • Start review

Three call sites (processChannelMessages, reSubmitContainerMessages on
ChannelCollection, and FluidDataStoreRuntime.reSubmitMessages) had
near-identical "run-length encode contiguous same-key items, flush each
run to a sink" loops. Extract the bunching skeleton into a single
@internal helper in runtime-utils that takes keyOf / valueOf / onBunch
and an optional keysEqual predicate (defaulting to Object.is).

Per-message side effects (delete checks, GC bookkeeping,
detectOutboundReferences) stay at the call site and run before items
enter the helper, preserving the existing semantics. Unit tests cover
empty input, singletons, contiguous runs, alternating keys, structured
keys, and the default Object.is behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

🔗 No broken links found! ✅

Your attention to detail is admirable.

linkcheck output


> fluid-framework-docs-site@0.0.0 ci:check-links /home/runner/work/FluidFramework/FluidFramework/docs
> start-server-and-test "npm run serve -- --no-open" 3000 check-links

1: starting server using command "npm run serve -- --no-open"
and when url "[ 'http://127.0.0.1:3000' ]" is responding with HTTP status code 200
running tests using command "npm run check-links"


> fluid-framework-docs-site@0.0.0 serve
> docusaurus serve --no-open

[SUCCESS] Serving "build" directory at: http://localhost:3000/

> fluid-framework-docs-site@0.0.0 check-links
> linkcheck http://localhost:3000 --skip-file skipped-urls.txt

Crawling...

Stats:
  288859 links
    1925 destination URLs
    2175 URLs ignored
       0 warnings
       0 errors


localOpMetadata: unknown;
}[] = [];

const flushBunch = (): void => {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use the bunching helper here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Good question — forEachContiguousBunch would parallel the channelCollection.ts:847-873 usage and complete the inbound/outbound symmetry the rest of the design rests on. Recommend landing the conversion here rather than as a follow-up, since reSubmitBatch's ad-hoc accumulation is the one place in the outbound path that doesn't yet route through the shared helper.

@anthony-murphy
Copy link
Copy Markdown
Contributor Author

Deep Review

Reviewed commit fa31a15 on 2026-05-27.

Readiness: 7/10 — ALMOST READY

No correctness or security blockers. The outbound (address, ddsType) sub-bunching mirrors the inbound processChannelMessages pattern cleanly and the optional-method fallbacks keep existing DDSes working unchanged. Six polish items remain inline — mostly doc/naming on the new @sealed @legacy @beta shapes (hard to fix after release), one test-coverage gap on the new sub-bunching layer, and one missing protected SharedObject seam that SharedTree/MergeTree will hit at merge time.

Path to Ready

  • Resolve inline threads
  • Answer your own pending question on containerRuntime.ts:5044 (forEachContiguousBunch in reSubmitBatch) — if yes, land it here so the inbound/outbound symmetry is complete in one PR
  • Loop in vladsud on the outbound naming decision before merge (owns the inbound IRuntimeMessageCollection naming convention this PR diverges from)

Context for Reviewers

For human reviewer
  • vladsud sign-off on bunched naming — relitigated the inbound naming convention on Op bunching 1: Bunch contiguous ops for data store in a batch - Runtime part #22839; the outbound naming decision (mirror messagesContent/IRuntimeMessagesContent vs. deliberate semantic divergence) is their call.
  • agarwal-navin sign-off on the bunching contract symmetry — owns the inbound bunching contract this PR mirrors; worth confirming the (address, ddsType) sub-bunching key and the stashed-op fan-out semantics are intentional.
  • abe27342 on the stashed-op expansion seam — raised the original fan-out asymmetry concern on Align apply stashed ops with re-submit #19518; well-placed to review the new bunched interface contract.
  • SharedTree / MergeTree owners — named as future consumers in the PR description; their input on whether the protected SharedObject.reSubmitMessagesCore seam lands here or in follow-ups is the design decision the pipeline cannot make.
  • Runtime-perf assessment — whether contiguous-bunch dispatch on the outbound path wins back the O(N·M) cost the motivation cites requires real DDS overrides and benchmarks, neither of which exist in this PR.

* All messages in a resubmit collection share the same target — that is, they are
* for the same DDS — and share a single `squash` setting. This mirrors the inbound
* "bunch" shape of {@link IRuntimeMessageCollection} for the outbound resubmit path,
* allowing DDSes to handle a contiguous run of resubmits in one call.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: IRuntimeResubmitMessageCollection remarks overstate the same-DDS guarantee at this layer.

The remarks here (and the matching IFluidDataStoreChannel.reSubmitMessages doc at dataStoreContext.ts:484-497) say all messages "are for the same DDS" / "Ask the DDS". But grouping happens at two distinct layers:

  • channelCollection.ts:847-873 groups by { address: e.envelope.address, type: e.envelope.contents.type } (i.e. (address, ddsType)) before calling context.reSubmitMessages(key.type, { squash, messages }).
  • dataStoreRuntime.ts:1418-1454 then re-splits ChannelOp bunches by channel address before delegating to channelContext.reSubmitMessages.

So at the IFluidDataStoreChannel boundary (where this doc lives) the collection is grouped by (address, ddsType) within one data store — the per-DDS run is established downstream by FluidDataStoreRuntime before reaching IDeltaHandler.reSubmitMessages.

Recommend rewriting both doc sites to scope the guarantee to the correct layer. Prevents misuse by third-party IFluidDataStoreChannel implementers that read the contract literally.

* The messages to resubmit, in original submission order.
*/
readonly messages: readonly IRuntimeResubmitMessage[];
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Bunched-shape naming diverges from the inbound IRuntimeMessageCollection precedent.

Inbound shape (protocol.ts:129-142): IRuntimeMessageCollection { envelope; messagesContent: readonly IRuntimeMessagesContent[]; local; }.

Outbound shape here: IRuntimeResubmitMessageCollection { squash; messages: readonly IRuntimeResubmitMessage[]; } — items field is messages (not messagesContent), per-item type is IRuntimeResubmitMessage (not …MessagesContent). The doc comment claims this "mirrors the inbound IRuntimeMessageCollection" but the field/type names don't.

This matters because these are @sealed @legacy @beta types and renaming after consumers adopt is a breaking change. vladsud relitigated the inbound naming convention on #22839 and produced the current convention — worth their sign-off here before merge.

Two paths:

  • Align outbound naming with the inbound precedent (messagesContent / IRuntimeResubmitMessagesContent), or
  • Keep the divergence (defensible: outbound entries genuinely lack clientSequenceNumber/envelope/local and carry localOpMetadata + collection-level squash) and update the "mirrors IRuntimeMessageCollection" doc to call out the intentional semantic distinction.

});

it("discardChanges drops staged ops", () => {
const channelCollectionStub = stubChannelCollection(containerRuntime);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: No direct test for ChannelCollection.reSubmitContainerMessages (address, type) sub-bunching.

The new test "commitChanges splits bunches when the data store address changes" asserts only that channelCollection.reSubmitContainerMessages is called once carrying all five entries in submission order (["ds1","ds1","ds2","ds1","ds1"]) — the stub short-circuits back to containerRuntime.submitMessage, so the (address, type) sub-bunching at channelCollection.ts:847-873 is not actually exercised in the integration path.

It's covered indirectly via forEachContiguousBunch unit tests (bunching.spec.ts:33-67, 90-109), but nothing exercises the ChannelCollection-layer split, and nothing exercises the case where two same-address entries differ on the inner FluidDataStoreMessage.type (uncovered at any layer).

Recommend a direct unit test against ChannelCollection.reSubmitContainerMessages (or via a richer stub) asserting:

  • (a) contiguous same-(address, type) runs collapse into one context.reSubmitMessages call,
  • (b) address transitions split,
  • (c) same-address entries with different inner FluidDataStoreMessage.type also split.

The (address, type) tuple choice is this layer's central new behavior; the type-change case especially deserves a guard.

const expandedCollection: IRuntimeResubmitMessageCollection = {
squash: collection.squash,
messages: flattened,
};
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Stashed-op expansion makes the bunch size invariant implicit.

This loop runs processWithStashedOpMetadataHandling over every input entry and pushes into flattened, so a collection.messages.length === N call from above can fan out to expandedCollection.messages.length !== N before reaching handler.reSubmitMessages. The inline comment captures the symmetry intent ("mirroring how processMessages expands messagesContent"), but the IDeltaHandler.reSubmitMessages interface contract over in datastore-definitions/src/channel.ts is silent on whether observed bunch.length corresponds 1:1 to the entries the caller passed in.

abe27342 raised this same fan-out asymmetry on #19518 (channelDeltaConnection.ts line 111) for the inbound path; the bunched outbound interface should document it explicitly so SharedTree/MergeTree don't write overrides that assume 1:1.

Recommend adding a doc comment on IDeltaHandler.reSubmitMessages (and the bunched IFluidDataStoreChannel.reSubmitMessages) clarifying that stashed-op expansion may make the observed bunch length differ from the caller's input length, and that bunch.length is post-expansion, not 1:1 with caller pending entries. Three-line fix that closes the contract footgun before consumers start implementing the override.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant