Skip to content

Commit 898b62d

Browse files
lutterclaude
andcommitted
graph, store: Add lock-free AtomicMovingStats for pool wait tracking
Replace RwLock<MovingStats> with a lock-free AtomicMovingStats that uses an atomic ring buffer with packed bins. Each bin packs epoch (32 bits), count (32 bits), and duration_nanos (64 bits) into a single AtomicU128 for lock-free CAS updates. This eliminates lock contention when many threads write concurrently (every semaphore wait, connection checkout, query execution) while reducing memory usage by 2x (4.8KB vs 9.6KB per stats instance). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4d84021 commit 898b62d

8 files changed

Lines changed: 280 additions & 23 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ futures03 = { version = "0.3.31", package = "futures", features = ["compat"] }
7979
wasmparser = "0.118.1"
8080
thiserror = { workspace = true }
8181
parking_lot = "0.12.5"
82+
portable-atomic = { version = "1.11", features = ["fallback"] }
8283
itertools = "0.14.0"
8384
defer = "0.2"
8485

graph/src/components/store/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
2828
use std::sync::Arc;
2929
use std::time::Duration;
3030

31-
use crate::parking_lot::RwLock;
32-
3331
use async_trait::async_trait;
3432

3533
use crate::blockchain::{Block, BlockHash, BlockPtr};
@@ -44,7 +42,7 @@ use crate::env::ENV_VARS;
4442
use crate::internal_error;
4543
use crate::prelude::{s, Attribute, DeploymentHash, ValueType};
4644
use crate::schema::{ast as sast, EntityKey, EntityType, InputSchema};
47-
use crate::util::stats::MovingStats;
45+
use crate::util::stats::AtomicMovingStats;
4846

4947
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
5048
pub struct EntityFilterDerivative(bool);
@@ -744,8 +742,8 @@ impl Display for DeploymentLocator {
744742
}
745743

746744
// The type that the connection pool uses to track wait times for
747-
// connection checkouts
748-
pub type PoolWaitStats = Arc<RwLock<MovingStats>>;
745+
// connection checkouts. Uses lock-free atomic operations internally.
746+
pub type PoolWaitStats = Arc<AtomicMovingStats>;
749747

750748
/// Determines which columns should be selected in a table.
751749
#[derive(Clone, Debug, PartialEq, Eq, Hash)]

graph/src/data/graphql/load_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ impl LoadManager {
459459
}
460460

461461
fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) {
462-
let store_avg = wait_stats.read().average();
462+
let store_avg = wait_stats.average();
463463
let overloaded = store_avg
464464
.map(|average| average > ENV_VARS.load_threshold)
465465
.unwrap_or(false);

graph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub mod prelude {
170170
pub use crate::log::split::split_logger;
171171
pub use crate::util::cache_weight::CacheWeight;
172172
pub use crate::util::futures::{retry, TimeoutError};
173-
pub use crate::util::stats::MovingStats;
173+
pub use crate::util::stats::{AtomicMovingStats, MovingStats};
174174

175175
macro_rules! static_graphql {
176176
($m:ident, $m2:ident, {$($n:ident,)*}) => {

graph/src/util/stats.rs

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::VecDeque;
2+
use std::sync::atomic::Ordering;
23
use std::time::{Duration, Instant};
34

5+
use portable_atomic::AtomicU128;
46
use prometheus::Gauge;
57

68
use crate::prelude::ENV_VARS;
@@ -166,6 +168,196 @@ impl MovingStats {
166168
}
167169
}
168170

171+
/// Packed bin for atomic operations: epoch (32 bits) | count (32 bits) | duration_nanos (64 bits)
172+
/// Fits in a single AtomicU128 for lock-free CAS updates.
173+
#[repr(transparent)]
174+
struct PackedBin(AtomicU128);
175+
176+
impl PackedBin {
177+
fn new() -> Self {
178+
Self(AtomicU128::new(0))
179+
}
180+
181+
/// Pack epoch, count, and duration into a single u128 value.
182+
fn pack(epoch: u32, count: u32, duration_nanos: u64) -> u128 {
183+
((epoch as u128) << 96) | ((count as u128) << 64) | (duration_nanos as u128)
184+
}
185+
186+
/// Unpack a u128 value into (epoch, count, duration_nanos).
187+
fn unpack(packed: u128) -> (u32, u32, u64) {
188+
let epoch = (packed >> 96) as u32;
189+
let count = (packed >> 64) as u32;
190+
let duration_nanos = packed as u64;
191+
(epoch, count, duration_nanos)
192+
}
193+
}
194+
195+
/// Lock-free moving statistics using an epoch-based ring buffer.
196+
///
197+
/// This is a thread-safe, lock-free alternative to `MovingStats` that uses
198+
/// atomic operations instead of locks. It tracks durations over a sliding
199+
/// time window, storing values in fixed-size bins.
200+
///
201+
/// Writers use CAS loops to atomically update bins, while readers can
202+
/// scan all bins without blocking writers.
203+
pub struct AtomicMovingStats {
204+
start_time: Instant,
205+
bin_size: Duration,
206+
bins: Box<[PackedBin]>,
207+
}
208+
209+
impl Default for AtomicMovingStats {
210+
fn default() -> Self {
211+
Self::new(ENV_VARS.load_window_size, ENV_VARS.load_bin_size)
212+
}
213+
}
214+
215+
impl AtomicMovingStats {
216+
/// Create a new AtomicMovingStats with the given window and bin sizes.
217+
///
218+
/// # Panics
219+
///
220+
/// Panics if `window_size` or `bin_size` is `0`, or if `bin_size` >= `window_size`
221+
pub fn new(window_size: Duration, bin_size: Duration) -> Self {
222+
assert!(window_size.as_millis() > 0);
223+
assert!(bin_size.as_millis() > 0);
224+
assert!(window_size > bin_size);
225+
226+
let num_bins = window_size.as_millis() as usize / bin_size.as_millis() as usize;
227+
let bins: Vec<PackedBin> = (0..num_bins).map(|_| PackedBin::new()).collect();
228+
229+
Self {
230+
start_time: Instant::now(),
231+
bin_size,
232+
bins: bins.into_boxed_slice(),
233+
}
234+
}
235+
236+
/// Calculate the epoch number for a given instant.
237+
fn epoch_at(&self, now: Instant) -> u32 {
238+
let elapsed = now.saturating_duration_since(self.start_time);
239+
(elapsed.as_millis() / self.bin_size.as_millis()) as u32
240+
}
241+
242+
/// Add a duration measurement at the current time.
243+
pub fn add(&self, duration: Duration) {
244+
self.add_at(Instant::now(), duration);
245+
}
246+
247+
/// Add a duration measurement at a specific time.
248+
///
249+
/// Note: It is expected that subsequent calls to `add_at` happen with
250+
/// monotonically increasing `now` values for optimal accuracy.
251+
pub fn add_at(&self, now: Instant, duration: Duration) {
252+
let current_epoch = self.epoch_at(now);
253+
let bin_idx = current_epoch as usize % self.bins.len();
254+
let bin = &self.bins[bin_idx];
255+
let duration_nanos = duration.as_nanos() as u64;
256+
257+
loop {
258+
let current = bin.0.load(Ordering::Acquire);
259+
let (bin_epoch, count, total_nanos) = PackedBin::unpack(current);
260+
261+
let new_packed = if bin_epoch == current_epoch {
262+
// Same epoch: increment existing values
263+
PackedBin::pack(
264+
current_epoch,
265+
count.saturating_add(1),
266+
total_nanos.saturating_add(duration_nanos),
267+
)
268+
} else {
269+
// Stale epoch: reset bin with new measurement
270+
PackedBin::pack(current_epoch, 1, duration_nanos)
271+
};
272+
273+
match bin.0.compare_exchange_weak(
274+
current,
275+
new_packed,
276+
Ordering::AcqRel,
277+
Ordering::Acquire,
278+
) {
279+
Ok(_) => break,
280+
Err(_) => continue, // CAS failed, retry
281+
}
282+
}
283+
}
284+
285+
/// Calculate the average duration over the current window.
286+
///
287+
/// Returns `None` if no measurements have been recorded in the window.
288+
pub fn average(&self) -> Option<Duration> {
289+
self.average_at(Instant::now())
290+
}
291+
292+
/// Calculate the average duration at a specific time.
293+
fn average_at(&self, now: Instant) -> Option<Duration> {
294+
let current_epoch = self.epoch_at(now);
295+
let num_bins = self.bins.len() as u32;
296+
let mut total_count: u64 = 0;
297+
let mut total_nanos: u128 = 0;
298+
299+
for bin in self.bins.iter() {
300+
let (bin_epoch, count, duration_nanos) =
301+
PackedBin::unpack(bin.0.load(Ordering::Acquire));
302+
// Valid if within window (handles epoch wraparound)
303+
if current_epoch.wrapping_sub(bin_epoch) < num_bins {
304+
total_count += count as u64;
305+
total_nanos += duration_nanos as u128;
306+
}
307+
}
308+
309+
if total_count > 0 {
310+
Some(Duration::from_nanos(
311+
(total_nanos / total_count as u128) as u64,
312+
))
313+
} else {
314+
None
315+
}
316+
}
317+
318+
/// Return `true` if the average of measurements within the window
319+
/// is above `duration`.
320+
pub fn average_gt(&self, duration: Duration) -> bool {
321+
self.average().map(|avg| avg > duration).unwrap_or(false)
322+
}
323+
324+
/// Return `true` if the average at a specific time is above `duration`.
325+
#[cfg(test)]
326+
fn average_gt_at(&self, now: Instant, duration: Duration) -> bool {
327+
self.average_at(now)
328+
.map(|avg| avg > duration)
329+
.unwrap_or(false)
330+
}
331+
332+
/// Return the total duration recorded in the current window.
333+
pub fn duration(&self) -> Duration {
334+
self.duration_at(Instant::now())
335+
}
336+
337+
/// Return the total duration at a specific time.
338+
fn duration_at(&self, now: Instant) -> Duration {
339+
let current_epoch = self.epoch_at(now);
340+
let num_bins = self.bins.len() as u32;
341+
let mut total_nanos: u128 = 0;
342+
343+
for bin in self.bins.iter() {
344+
let (bin_epoch, _, duration_nanos) = PackedBin::unpack(bin.0.load(Ordering::Acquire));
345+
if current_epoch.wrapping_sub(bin_epoch) < num_bins {
346+
total_nanos += duration_nanos as u128;
347+
}
348+
}
349+
350+
Duration::from_nanos(total_nanos as u64)
351+
}
352+
353+
/// Adds `duration` to the stats, and register the average ms to `avg_gauge`.
354+
pub fn add_and_register(&self, duration: Duration, avg_gauge: &Gauge) {
355+
self.add(duration);
356+
let wait_avg = self.average().map(|avg| avg.as_millis()).unwrap_or(0);
357+
avg_gauge.set(wait_avg as f64);
358+
}
359+
}
360+
169361
#[cfg(test)]
170362
mod tests {
171363
use super::*;
@@ -219,4 +411,77 @@ mod tests {
219411
assert_eq!(20, stats.total.count);
220412
assert_eq!(Duration::from_secs(5 * 86 + 16 * 10), stats.total.duration);
221413
}
414+
415+
#[test]
416+
fn atomic_add_one_const() {
417+
let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1));
418+
let start = stats.start_time;
419+
for i in 0..10 {
420+
stats.add_at(start + Duration::from_secs(i), Duration::from_secs(1));
421+
}
422+
// After 10 seconds with 5-second window, only last 5 entries are valid
423+
assert_eq!(5, stats.bins.len());
424+
// Query at time 10 seconds (end of data range)
425+
let query_time = start + Duration::from_secs(10);
426+
// Average should be 1 second
427+
let avg = stats.average_at(query_time).unwrap();
428+
assert_eq!(Duration::from_secs(1), avg);
429+
assert!(stats.average_gt_at(query_time, Duration::from_millis(900)));
430+
assert!(!stats.average_gt_at(query_time, Duration::from_secs(1)));
431+
}
432+
433+
#[test]
434+
fn atomic_add_four_linear() {
435+
let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1));
436+
let start = stats.start_time;
437+
for i in 0..40u64 {
438+
stats.add_at(
439+
start + Duration::from_millis(250 * i),
440+
Duration::from_secs(i),
441+
);
442+
}
443+
assert_eq!(5, stats.bins.len());
444+
// Query at time 9.999 seconds (just before epoch 10 to include epoch 5)
445+
// At epoch 9, valid bins are epochs 5-9 (9 - bin_epoch < 5)
446+
let query_time = start + Duration::from_millis(9999);
447+
// Total duration in window: 4 entries per bin, bins 5-9 contain entries 20-39
448+
// Bin 5: entries 20,21,22,23 -> sum = 86
449+
// Bin 6: entries 24,25,26,27 -> sum = 102
450+
// ...
451+
// Total count = 20, total duration = 5*86 + 16*10 = 590
452+
assert_eq!(
453+
Duration::from_secs(5 * 86 + 16 * 10),
454+
stats.duration_at(query_time)
455+
);
456+
}
457+
458+
#[test]
459+
fn atomic_empty_average() {
460+
let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1));
461+
// Query at start_time (no data yet)
462+
assert!(stats.average_at(stats.start_time).is_none());
463+
assert!(!stats.average_gt_at(stats.start_time, Duration::from_secs(1)));
464+
}
465+
466+
#[test]
467+
fn atomic_pack_unpack() {
468+
// Test edge cases of packing/unpacking
469+
let packed = PackedBin::pack(u32::MAX, u32::MAX, u64::MAX);
470+
let (epoch, count, nanos) = PackedBin::unpack(packed);
471+
assert_eq!(u32::MAX, epoch);
472+
assert_eq!(u32::MAX, count);
473+
assert_eq!(u64::MAX, nanos);
474+
475+
let packed = PackedBin::pack(0, 0, 0);
476+
let (epoch, count, nanos) = PackedBin::unpack(packed);
477+
assert_eq!(0, epoch);
478+
assert_eq!(0, count);
479+
assert_eq!(0, nanos);
480+
481+
let packed = PackedBin::pack(12345, 67890, 123456789012345);
482+
let (epoch, count, nanos) = PackedBin::unpack(packed);
483+
assert_eq!(12345, epoch);
484+
assert_eq!(67890, count);
485+
assert_eq!(123456789012345, nanos);
486+
}
222487
}

store/postgres/src/pool/manager.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use diesel_async::pooled_connection::{PoolError as DieselPoolError, PoolableConn
1010
use diesel_async::{AsyncConnection, RunQueryDsl};
1111
use graph::env::ENV_VARS;
1212
use graph::prelude::error;
13+
use graph::prelude::AtomicMovingStats;
1314
use graph::prelude::Counter;
1415
use graph::prelude::Gauge;
1516
use graph::prelude::MetricsRegistry;
16-
use graph::prelude::MovingStats;
1717
use graph::prelude::PoolWaitStats;
1818
use graph::slog::info;
1919
use graph::slog::Logger;
@@ -24,8 +24,6 @@ use std::sync::atomic::Ordering;
2424
use std::sync::Arc;
2525
use std::time::{Duration, Instant};
2626

27-
use graph::parking_lot::RwLock;
28-
2927
use crate::pool::AsyncPool;
3028

3129
/// Our own connection manager. It is pretty much the same as
@@ -298,7 +296,7 @@ impl WaitMeter {
298296
const_labels,
299297
)
300298
.expect("failed to create `store_connection_wait_time_ms` counter");
301-
let wait_stats = Arc::new(RwLock::new(MovingStats::default()));
299+
let wait_stats = Arc::new(AtomicMovingStats::default());
302300

303301
Self {
304302
wait_gauge,
@@ -307,8 +305,6 @@ impl WaitMeter {
307305
}
308306

309307
pub(crate) fn add_conn_wait_time(&self, duration: Duration) {
310-
self.wait_stats
311-
.write()
312-
.add_and_register(duration, &self.wait_gauge);
308+
self.wait_stats.add_and_register(duration, &self.wait_gauge);
313309
}
314310
}

0 commit comments

Comments
 (0)