Skip to content

feat(server): Serializer bucket states#7027

Merged
dranikpg merged 7 commits intodragonflydb:mainfrom
dranikpg:serializer-bucket-states
Apr 16, 2026
Merged

feat(server): Serializer bucket states#7027
dranikpg merged 7 commits intodragonflydb:mainfrom
dranikpg:serializer-bucket-states

Conversation

@dranikpg
Copy link
Copy Markdown
Contributor

@dranikpg dranikpg commented Mar 30, 2026

Add bucket dependency counter to track "async" serialization operations (like big value streaming or tiered loads). Resolves #6830

Why did I choose just a latch over a struct BucketState { vector<DelayedEntry> }?

Because multiple fibers can serialize delayed entries, we need a mechanism to not serialize the same value concurrently (i.e. pop before wait as before). This means that we need a signalling mechanism to wait for the last popped item to finish serializing (as the vector can be empty), which is either a bool or a counter. And then we also need a wakeup primitive to unblock waiters -> we need just a latch or condvar.

What is more, we need some kind of latch or condvar for big value streaming. We can unite all that into a unified notion of "dependency" and use a single latch.

@dranikpg dranikpg force-pushed the serializer-bucket-states branch 5 times, most recently from 71c9ad5 to 383ac33 Compare April 2, 2026 07:54
@dranikpg dranikpg marked this pull request as ready for review April 2, 2026 11:07
@dranikpg dranikpg requested review from kostasrim and romange April 2, 2026 11:08
@dranikpg
Copy link
Copy Markdown
Contributor Author

dranikpg commented Apr 2, 2026

augment review

@augmentcode
Copy link
Copy Markdown

augmentcode Bot commented Apr 2, 2026

🤖 Augment PR Summary

Summary: This PR adds bucket-level dependency tracking to represent in-flight async serialization work during snapshot/replication.

Changes:

  • Introduced BucketDependencies (per-bucket LocalLatch counters) with Increment/Decrement and a Wait() helper.
  • Integrated dependency tracking with tiered offload serialization by incrementing on enqueue and decrementing after delayed entry serialization.
  • Refactored SerializerBase to inherit BucketDependencies and mark buckets busy for the duration of ProcessBucket().
  • Removed the previous bucket state-machine tracking code and the corresponding unit test target.

Technical Notes: Bucket staleness is now detected via the snapshot version, with touched stale buckets flushing tiered delayed entries and asserting no remaining dependencies under big_value_mu_.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 5 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

Comment thread src/server/serializer_base.cc
Comment thread src/server/serializer_base.cc Outdated
Comment thread src/server/serializer_base.cc
Comment thread src/server/serializer_base.h
Comment thread src/server/serializer_base.h Outdated
@@ -108,21 +133,7 @@ class SerializerBase : public DelayedEntryHandler {

private:
friend class SerializerBaseTest;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/server/serializer_base.h:135: Since the prior SerializerBaseTest was removed in this PR, there doesn’t appear to be replacement unit coverage for the new BucketDependencies accounting (especially failure paths like tiered read errors). That increases the risk of regressions where buckets remain permanently “busy”.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 4 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

PrimeValue pv{*value};
SerializeFetchedEntry(*entry, pv);

deps_.Decrement(it->first);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelayedEntryHandler::ProcessDelayedEntries: if !value.has_value() we return without removing the delayed entry or calling deps_.Decrement(...), which can leave the bucket permanently “busy” (and repeatedly re-hit the error on subsequent passes). This becomes observable now that EnqueueOffloaded() increments dependencies for each offloaded key.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

: db_slice_(slice), base_cntx_(cntx) {
DCHECK(db_slice_);
: DelayedEntryHandler(static_cast<BucketDependencies&>(*this)),
db_slice_(slice),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializerBase::SerializerBase: the previous DCHECK(db_slice_) was removed, so a null slice would now crash later (e.g. in RegisterChangeListener()/ProcessBucket()) without a clear precondition failure.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

// State tracking is purely observational in early PRs: it drives DCHECKs and
// stats but does not alter the serialization control flow.
class SerializerBase : public DelayedEntryHandler {
class SerializerBase : public BucketDependencies, public DelayedEntryHandler {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment block above SerializerBase still describes a per-bucket state-machine stored in a map, but this PR removed bucket_states_ in favor of BucketDependencies, so the documentation (and BucketPhase description) looks out of sync with the implementation.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

@@ -1,63 +0,0 @@
// Copyright 2026, DragonflyDB authors. All rights reserved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes the only unit tests covering SerializerBase’s bucket bookkeeping; with the new BucketDependencies semantics, it may be worth adding a small focused test to guard against regressions (e.g., increment/decrement and error paths).

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

vyavdoshenko and others added 3 commits April 8, 2026 18:43
…tableFieldJson test (dragonflydb#7094)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@dranikpg dranikpg force-pushed the serializer-bucket-states branch from 1b1f0b0 to 97d8133 Compare April 8, 2026 16:43
Copilot AI review requested due to automatic review settings April 8, 2026 16:43
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds bucket-level dependency tracking to SerializerBase to coordinate async serialization work (tiered reads / big value streaming) and avoid racing bucket updates during snapshotting/streaming. Resolves the need for a per-bucket “completion” signal described in #6830.

Changes:

  • Introduces BucketDependencies (per-bucket latch/counter) and wires it into SerializerBase and delayed tiered-entry handling.
  • Refactors ProcessDelayedEntries to decrement bucket dependencies when tiered reads complete.
  • Adjusts a search test ordering and removes serializer_base_test from the CMake test list.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
src/server/serializer_base.h Adds BucketDependencies and integrates it with serializer/delayed-entry infrastructure.
src/server/serializer_base.cc Implements dependency tracking and refactors delayed-entry processing to signal dependency completion.
src/server/search/search_family_test.cc Reorders setup calls in a JSON search test.
src/server/CMakeLists.txt Stops building/running serializer_base_test.
Comments suppressed due to low confidence (1)

src/server/serializer_base.cc:82

  • On tiered read failure (!value.has_value()), the code decrements the bucket dependency but returns without removing the entry from delayed_entries_. This can cause repeated retries and eventually a CHECK failure/double-Decrement when the same entry is processed again; remove the failed entry (and ensure its dependency is released exactly once).
    if (!value.has_value()) {
      deps_.Decrement(it->first);
      cntx->ReportError(make_error_code(std::errc::io_error),
                        absl::StrCat("Failed to read tiered key: ", entry->key.ToString()));
      return;
    }

Comment on lines +73 to +89
auto serialize_entry = [&](decltype(delayed_entries_)::iterator it) {
auto& entry = it->second;
auto value = entry->value.Get();

if (!value.has_value()) {
deps_.Decrement(it->first);
cntx->ReportError(make_error_code(std::errc::io_error),
absl::StrCat("Failed to read tiered key: ", entry->key.ToString()));
return;
}

PrimeValue pv{*value};
SerializeFetchedEntry(*entry, pv);

deps_.Decrement(it->first);
delayed_entries_.erase(it++);
};
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessDelayedEntries holds a multimap iterator/reference across entry->value.Get() and SerializeFetchedEntry(), both of which can suspend; if another fiber also processes delayed_entries_ concurrently, the same element can be erased/processed twice, leading to use-after-free/UB. Consider extracting/moving the node (or unique_ptr) out of delayed_entries_ before any potentially-blocking calls, then erase/commit after serialization finishes to guarantee single processing per entry.

Copilot uses AI. Check for mistakes.
Comment thread src/server/CMakeLists.txt
Comment on lines 165 to 167
helio_cxx_test(cluster/cluster_family_test dfly_test_lib LABELS DFLY)
helio_cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY)
helio_cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serializer_base_test.cc still exists but references removed SerializerBase APIs (BucketPhase/MarkBucketSerializing/FinishBucketIteration); removing the test target here hides a now-broken test instead of fixing/updating it. Either update the test to cover the new BucketDependencies behavior or delete the obsolete test file to keep the test suite consistent.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines -143 to +146
// Check if this bucket should be serialized
if (!ShouldProcessBucket(it)) {
// Check if this bucket is stale
if (it.is_done() || it.GetVersion() >= snapshot_version_) {
stats_.buckets_skipped++;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

Comment thread src/server/CMakeLists.txt
helio_cxx_test(cluster/cluster_family_test dfly_test_lib LABELS DFLY)
helio_cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY)
helio_cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY)
helio_cxx_test(serializer_base_test dfly_test_lib LABELS DFLY)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this deleted?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the test uses a bucket state machine that I completely replaced

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add some other tests

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, my bad, did not delete the file

@romange
Copy link
Copy Markdown
Collaborator

romange commented Apr 14, 2026

is it synced with latest?

@dranikpg
Copy link
Copy Markdown
Contributor Author

is it synced with latest?

Yes

@dranikpg dranikpg requested a review from romange April 15, 2026 12:35
// Flush all entries of bucket
if (flush_bucket) {
auto range = delayed_entries_.equal_range(flush_bucket);
for (auto it = range.first; it != range.second;) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not increment in the "for" loop, like a true professional?

@dranikpg dranikpg merged commit 96ad549 into dragonflydb:main Apr 16, 2026
13 checks passed
@dranikpg dranikpg deleted the serializer-bucket-states branch April 16, 2026 07:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

shard-serialization P1.3: introduce bucket-level completion state machine

5 participants