feat(server): Serializer bucket states#7027
Conversation
71c9ad5 to
383ac33
Compare
|
augment review |
🤖 Augment PR SummarySummary: This PR adds bucket-level dependency tracking to represent in-flight async serialization work during snapshot/replication. Changes:
Technical Notes: Bucket staleness is now detected via the snapshot version, with touched stale buckets flushing tiered delayed entries and asserting no remaining dependencies under 🤖 Was this summary useful? React with 👍 or 👎 |
| @@ -108,21 +133,7 @@ class SerializerBase : public DelayedEntryHandler { | |||
|
|
|||
| private: | |||
| friend class SerializerBaseTest; | |||
There was a problem hiding this comment.
src/server/serializer_base.h:135: Since the prior SerializerBaseTest was removed in this PR, there doesn’t appear to be replacement unit coverage for the new BucketDependencies accounting (especially failure paths like tiered read errors). That increases the risk of regressions where buckets remain permanently “busy”.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| PrimeValue pv{*value}; | ||
| SerializeFetchedEntry(*entry, pv); | ||
|
|
||
| deps_.Decrement(it->first); |
There was a problem hiding this comment.
DelayedEntryHandler::ProcessDelayedEntries: if !value.has_value() we return without removing the delayed entry or calling deps_.Decrement(...), which can leave the bucket permanently “busy” (and repeatedly re-hit the error on subsequent passes). This becomes observable now that EnqueueOffloaded() increments dependencies for each offloaded key.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| : db_slice_(slice), base_cntx_(cntx) { | ||
| DCHECK(db_slice_); | ||
| : DelayedEntryHandler(static_cast<BucketDependencies&>(*this)), | ||
| db_slice_(slice), |
There was a problem hiding this comment.
| // State tracking is purely observational in early PRs: it drives DCHECKs and | ||
| // stats but does not alter the serialization control flow. | ||
| class SerializerBase : public DelayedEntryHandler { | ||
| class SerializerBase : public BucketDependencies, public DelayedEntryHandler { |
There was a problem hiding this comment.
The comment block above SerializerBase still describes a per-bucket state-machine stored in a map, but this PR removed bucket_states_ in favor of BucketDependencies, so the documentation (and BucketPhase description) looks out of sync with the implementation.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| @@ -1,63 +0,0 @@ | |||
| // Copyright 2026, DragonflyDB authors. All rights reserved. | |||
There was a problem hiding this comment.
This PR removes the only unit tests covering SerializerBase’s bucket bookkeeping; with the new BucketDependencies semantics, it may be worth adding a small focused test to guard against regressions (e.g., increment/decrement and error paths).
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
…tableFieldJson test (dragonflydb#7094) Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
1b1f0b0 to
97d8133
Compare
There was a problem hiding this comment.
Pull request overview
Adds bucket-level dependency tracking to SerializerBase to coordinate async serialization work (tiered reads / big value streaming) and avoid racing bucket updates during snapshotting/streaming. Resolves the need for a per-bucket “completion” signal described in #6830.
Changes:
- Introduces
BucketDependencies(per-bucket latch/counter) and wires it intoSerializerBaseand delayed tiered-entry handling. - Refactors
ProcessDelayedEntriesto decrement bucket dependencies when tiered reads complete. - Adjusts a search test ordering and removes
serializer_base_testfrom the CMake test list.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/server/serializer_base.h |
Adds BucketDependencies and integrates it with serializer/delayed-entry infrastructure. |
src/server/serializer_base.cc |
Implements dependency tracking and refactors delayed-entry processing to signal dependency completion. |
src/server/search/search_family_test.cc |
Reorders setup calls in a JSON search test. |
src/server/CMakeLists.txt |
Stops building/running serializer_base_test. |
Comments suppressed due to low confidence (1)
src/server/serializer_base.cc:82
- On tiered read failure (!value.has_value()), the code decrements the bucket dependency but returns without removing the entry from delayed_entries_. This can cause repeated retries and eventually a CHECK failure/double-Decrement when the same entry is processed again; remove the failed entry (and ensure its dependency is released exactly once).
if (!value.has_value()) {
deps_.Decrement(it->first);
cntx->ReportError(make_error_code(std::errc::io_error),
absl::StrCat("Failed to read tiered key: ", entry->key.ToString()));
return;
}
| auto serialize_entry = [&](decltype(delayed_entries_)::iterator it) { | ||
| auto& entry = it->second; | ||
| auto value = entry->value.Get(); | ||
|
|
||
| if (!value.has_value()) { | ||
| deps_.Decrement(it->first); | ||
| cntx->ReportError(make_error_code(std::errc::io_error), | ||
| absl::StrCat("Failed to read tiered key: ", entry->key.ToString())); | ||
| return; | ||
| } | ||
|
|
||
| PrimeValue pv{*value}; | ||
| SerializeFetchedEntry(*entry, pv); | ||
|
|
||
| deps_.Decrement(it->first); | ||
| delayed_entries_.erase(it++); | ||
| }; |
There was a problem hiding this comment.
ProcessDelayedEntries holds a multimap iterator/reference across entry->value.Get() and SerializeFetchedEntry(), both of which can suspend; if another fiber also processes delayed_entries_ concurrently, the same element can be erased/processed twice, leading to use-after-free/UB. Consider extracting/moving the node (or unique_ptr) out of delayed_entries_ before any potentially-blocking calls, then erase/commit after serialization finishes to guarantee single processing per entry.
| helio_cxx_test(cluster/cluster_family_test dfly_test_lib LABELS DFLY) | ||
| helio_cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY) | ||
| helio_cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY) |
There was a problem hiding this comment.
serializer_base_test.cc still exists but references removed SerializerBase APIs (BucketPhase/MarkBucketSerializing/FinishBucketIteration); removing the test target here hides a now-broken test instead of fixing/updating it. Either update the test to cover the new BucketDependencies behavior or delete the obsolete test file to keep the test suite consistent.
| // Check if this bucket should be serialized | ||
| if (!ShouldProcessBucket(it)) { | ||
| // Check if this bucket is stale | ||
| if (it.is_done() || it.GetVersion() >= snapshot_version_) { | ||
| stats_.buckets_skipped++; | ||
|
|
| helio_cxx_test(cluster/cluster_family_test dfly_test_lib LABELS DFLY) | ||
| helio_cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY) | ||
| helio_cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY) | ||
| helio_cxx_test(serializer_base_test dfly_test_lib LABELS DFLY) |
There was a problem hiding this comment.
because the test uses a bucket state machine that I completely replaced
There was a problem hiding this comment.
I can add some other tests
There was a problem hiding this comment.
ah, my bad, did not delete the file
|
is it synced with latest? |
Yes |
Signed-off-by: Vladislav <vlad@dragonflydb.io>
| // Flush all entries of bucket | ||
| if (flush_bucket) { | ||
| auto range = delayed_entries_.equal_range(flush_bucket); | ||
| for (auto it = range.first; it != range.second;) { |
There was a problem hiding this comment.
nit: why not increment in the "for" loop, like a true professional?
Add bucket dependency counter to track "async" serialization operations (like big value streaming or tiered loads). Resolves #6830
Why did I choose just a latch over a
struct BucketState { vector<DelayedEntry> }?Because multiple fibers can serialize delayed entries, we need a mechanism to not serialize the same value concurrently (i.e. pop before wait as before). This means that we need a signalling mechanism to wait for the last popped item to finish serializing (as the vector can be empty), which is either a bool or a counter. And then we also need a wakeup primitive to unblock waiters -> we need just a latch or condvar.
What is more, we need some kind of latch or condvar for big value streaming. We can unite all that into a unified notion of "dependency" and use a single latch.