Skip to content

Commit ed3fd40

Browse files
lutterclaude
andcommitted
store: Replace std::sync::RwLock with parking_lot::RwLock in writable
Replace std::sync::RwLock with parking_lot::RwLock in the background writer's Request::Write batch handling. This reduces lock contention as parking_lot's RwLock is faster for short-held locks due to efficient spinning before parking. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4bd926c commit ed3fd40

1 file changed

Lines changed: 10 additions & 11 deletions

File tree

store/postgres/src/writable.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::collections::BTreeSet;
22
use std::ops::{Deref, Range};
33
use std::sync::atomic::{AtomicBool, Ordering};
4-
use std::sync::{Mutex, RwLock, TryLockError as RwLockError};
4+
use std::sync::Mutex;
5+
6+
use graph::parking_lot::RwLock;
57
use std::time::Instant;
68
use std::{collections::BTreeMap, sync::Arc};
79

@@ -574,7 +576,7 @@ impl BlockTracker {
574576
// processed.
575577
let res = queue.find_map(|req| match req.as_ref() {
576578
Request::Write { batch, .. } => {
577-
let batch = batch.read().unwrap();
579+
let batch = batch.read();
578580
tracker.write(&batch.block_ptr);
579581
if batch.first_block <= tracker.revert {
580582
let res = f(batch.deref(), tracker.revert);
@@ -613,7 +615,7 @@ impl BlockTracker {
613615
let accum = queue.fold(init, |accum, req| {
614616
match req.as_ref() {
615617
Request::Write { batch, .. } => {
616-
let batch = batch.read().unwrap();
618+
let batch = batch.read();
617619
let mut accum = accum;
618620
tracker.write(&batch.block_ptr);
619621
if batch.first_block <= tracker.revert {
@@ -740,7 +742,7 @@ impl std::fmt::Debug for Request {
740742
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
741743
match self {
742744
Self::Write { batch, store, .. } => {
743-
let batch = batch.read().unwrap();
745+
let batch = batch.read();
744746
write!(
745747
f,
746748
"write[{}, {:p}, {} entities]",
@@ -811,7 +813,7 @@ impl Request {
811813
} => {
812814
let start = Instant::now();
813815

814-
let batch = batch.write().unwrap().close();
816+
let batch = batch.write().close();
815817

816818
if let Some(err) = &batch.error {
817819
// This can happen when appending to the batch failed
@@ -850,7 +852,7 @@ impl Request {
850852
fn should_process(&self) -> bool {
851853
match self {
852854
Request::Write { queued, batch, .. } => {
853-
batch.read().unwrap().weight() >= ENV_VARS.store.write_batch_size
855+
batch.read().weight() >= ENV_VARS.store.write_batch_size
854856
|| queued.elapsed() >= ENV_VARS.store.write_batch_duration
855857
}
856858
Request::RevertTo { .. } | Request::Stop => true,
@@ -1169,7 +1171,7 @@ impl Queue {
11691171
// duration of the write, and we do not want to
11701172
// slow down queueing requests unnecessarily
11711173
match existing.try_write() {
1172-
Ok(mut existing) => {
1174+
Some(mut existing) => {
11731175
if existing.weight() < ENV_VARS.store.write_batch_size {
11741176
let res = existing.append(batch).map(|()| None);
11751177
if existing.weight() >= ENV_VARS.store.write_batch_size {
@@ -1180,16 +1182,13 @@ impl Queue {
11801182
Ok(Some(batch))
11811183
}
11821184
}
1183-
Err(RwLockError::WouldBlock) => {
1185+
None => {
11841186
// This branch can cause batches that
11851187
// are not 'full' at the head of the
11861188
// queue, something that start_writer
11871189
// has to take into account
11881190
Ok(Some(batch))
11891191
}
1190-
Err(RwLockError::Poisoned(e)) => {
1191-
panic!("rwlock on batch was poisoned {:?}", e);
1192-
}
11931192
}
11941193
} else {
11951194
Ok(Some(batch))

0 commit comments

Comments
 (0)