Skip to content

Commit b560115

Browse files
authored
Add cancellation safety to message reading (#591)
* Add cancellation safety to message reading * Metrics * warning
1 parent 4d8a965 commit b560115

10 files changed

Lines changed: 280 additions & 25 deletions

File tree

pgdog/src/admin/show_stats.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ impl Command for ShowStats {
4343
Field::numeric(&format!("{}_bind_count", prefix)),
4444
Field::numeric(&format!("{}_close_count", prefix)),
4545
Field::numeric(&format!("{}_errors", prefix)),
46+
Field::numeric(&format!("{}_cleaned", prefix)),
47+
Field::numeric(&format!("{}_rollbacks", prefix)),
4648
]
4749
})
4850
.collect::<Vec<Field>>(),
@@ -84,7 +86,9 @@ impl Command for ShowStats {
8486
.add(stat.parse_count)
8587
.add(stat.bind_count)
8688
.add(stat.close)
87-
.add(stat.errors);
89+
.add(stat.errors)
90+
.add(stat.cleaned)
91+
.add(stat.rollbacks);
8892
}
8993

9094
messages.push(dr.message()?);

pgdog/src/backend/pool/connection/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl Connection {
236236
Ok(notification.ok_or(Error::ProtocolOutOfSync)?.message()?)
237237
}
238238

239-
// BUG: This is not cancellation-safe.
239+
// This is cancel-safe.
240240
message = self.binding.read() => {
241241
message
242242
}

pgdog/src/backend/pool/stats.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub struct Counts {
2525
pub healthchecks: usize,
2626
pub close: usize,
2727
pub errors: usize,
28+
pub cleaned: usize,
29+
pub prepared_sync: usize,
2830
}
2931

3032
impl Sub for Counts {
@@ -49,6 +51,8 @@ impl Sub for Counts {
4951
healthchecks: self.healthchecks.saturating_add(rhs.healthchecks),
5052
close: self.close.saturating_add(rhs.close),
5153
errors: self.errors.saturating_add(rhs.errors),
54+
cleaned: self.cleaned.saturating_add(rhs.cleaned),
55+
prepared_sync: self.prepared_sync.saturating_add(self.prepared_sync),
5256
}
5357
}
5458
}
@@ -73,6 +77,8 @@ impl Div<usize> for Counts {
7377
healthchecks: self.healthchecks.saturating_div(rhs),
7478
close: self.close.saturating_div(rhs),
7579
errors: self.errors.saturating_div(rhs),
80+
cleaned: self.cleaned.saturating_div(rhs),
81+
prepared_sync: self.prepared_sync.saturating_div(rhs),
7682
}
7783
}
7884
}
@@ -97,6 +103,8 @@ impl Add<BackendCounts> for Counts {
97103
healthchecks: self.healthchecks + rhs.healthchecks,
98104
close: self.close + rhs.close,
99105
errors: self.errors + rhs.errors,
106+
cleaned: self.cleaned + rhs.cleaned,
107+
prepared_sync: self.prepared_sync + rhs.prepared_sync,
100108
}
101109
}
102110
}
@@ -134,6 +142,8 @@ impl Add for Counts {
134142
healthchecks: self.healthchecks.saturating_add(rhs.healthchecks),
135143
close: self.close.saturating_add(rhs.close),
136144
errors: self.errors.saturating_add(rhs.errors),
145+
cleaned: self.cleaned.saturating_add(rhs.cleaned),
146+
prepared_sync: self.prepared_sync.saturating_add(rhs.prepared_sync),
137147
}
138148
}
139149
}

pgdog/src/backend/server.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
hello::SslReply, Authentication, BackendKeyData, ErrorResponse, FromBytes, Message,
2626
ParameterStatus, Password, Protocol, Query, ReadyForQuery, Startup, Terminate, ToBytes,
2727
},
28-
Close, Parameter, ProtocolMessage, Sync,
28+
Close, MessageBuffer, Parameter, ProtocolMessage, Sync,
2929
},
3030
stats::memory::MemoryUsage,
3131
};
@@ -59,7 +59,7 @@ pub struct Server {
5959
re_synced: bool,
6060
replication_mode: bool,
6161
pooler_mode: PoolerMode,
62-
stream_buffer: BytesMut,
62+
stream_buffer: MessageBuffer,
6363
}
6464

6565
impl MemoryUsage for Server {
@@ -263,7 +263,7 @@ impl Server {
263263
in_transaction: false,
264264
re_synced: false,
265265
pooler_mode: PoolerMode::Transaction,
266-
stream_buffer: BytesMut::with_capacity(1024),
266+
stream_buffer: MessageBuffer::new(),
267267
};
268268

269269
server.stats.memory_used(server.memory_usage()); // Stream capacity.
@@ -340,18 +340,17 @@ impl Server {
340340
}
341341

342342
/// Read a single message from the server.
343+
///
344+
/// # Cancellation safety
345+
///
346+
/// This method is cancel-safe.
347+
///
343348
pub async fn read(&mut self) -> Result<Message, Error> {
344349
let message = loop {
345350
if let Some(message) = self.prepared_statements.state_mut().get_simulated() {
346351
return Ok(message.backend());
347352
}
348-
match self
349-
.stream
350-
.as_mut()
351-
.unwrap()
352-
.read_buf(&mut self.stream_buffer)
353-
.await
354-
{
353+
match self.stream_buffer.read(self.stream.as_mut().unwrap()).await {
355354
Ok(message) => {
356355
let message = message.stream(self.streaming).backend();
357356
match self.prepared_statements.forward(&message) {
@@ -704,7 +703,7 @@ impl Server {
704703
debug!("prepared statements synchronized [{}]", self.addr());
705704

706705
let count = self.prepared_statements.len();
707-
self.stats_mut().set_prepared_statements(count);
706+
self.stats.set_prepared_statements(count);
708707

709708
Ok(())
710709
}
@@ -838,6 +837,7 @@ impl Server {
838837
#[inline]
839838
pub(super) fn cleaned(&mut self) {
840839
self.dirty = false;
840+
self.stats.cleaned();
841841
}
842842

843843
/// Server is streaming data.
@@ -933,7 +933,7 @@ pub mod test {
933933
re_synced: false,
934934
replication_mode: false,
935935
pooler_mode: PoolerMode::Transaction,
936-
stream_buffer: BytesMut::with_capacity(1024),
936+
stream_buffer: MessageBuffer::new(),
937937
}
938938
}
939939
}

pgdog/src/backend/stats.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub struct Counts {
6666
pub healthchecks: usize,
6767
pub close: usize,
6868
pub memory_used: usize,
69+
pub cleaned: usize,
70+
pub prepared_sync: usize,
6971
}
7072

7173
impl Add for Counts {
@@ -80,15 +82,15 @@ impl Add for Counts {
8082
queries: self.queries.saturating_add(rhs.queries),
8183
rollbacks: self.rollbacks.saturating_add(rhs.rollbacks),
8284
errors: self.errors.saturating_add(rhs.errors),
83-
prepared_statements: self
84-
.prepared_statements
85-
.saturating_add(rhs.prepared_statements),
85+
prepared_statements: rhs.prepared_statements, // It's a gauge.
8686
query_time: self.query_time.saturating_add(rhs.query_time),
8787
transaction_time: self.query_time.saturating_add(rhs.transaction_time),
8888
parse: self.parse.saturating_add(rhs.parse),
8989
bind: self.bind.saturating_add(rhs.bind),
9090
healthchecks: self.healthchecks.saturating_add(rhs.healthchecks),
9191
close: self.close.saturating_add(rhs.close),
92+
cleaned: self.cleaned.saturating_add(rhs.cleaned),
93+
prepared_sync: self.prepared_sync.saturating_add(rhs.prepared_sync),
9294
memory_used: self.memory_used, // It's a gauge.
9395
}
9496
}
@@ -183,6 +185,8 @@ impl Stats {
183185
/// for stats.
184186
pub fn set_prepared_statements(&mut self, size: usize) {
185187
self.total.prepared_statements = size;
188+
self.total.prepared_sync += 1;
189+
self.last_checkout.prepared_sync += 1;
186190
self.update();
187191
}
188192

@@ -288,6 +292,12 @@ impl Stats {
288292
self.last_checkout.memory_used = memory;
289293
}
290294

295+
#[inline]
296+
pub fn cleaned(&mut self) {
297+
self.last_checkout.cleaned += 1;
298+
self.total.cleaned += 1;
299+
}
300+
291301
/// Track rollbacks.
292302
pub fn rollback(&mut self) {
293303
self.total.rollbacks += 1;

pgdog/src/frontend/client/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::net::SocketAddr;
44
use std::time::{Duration, Instant};
55

6-
use bytes::BytesMut;
76
use timeouts::Timeouts;
87
use tokio::{select, spawn, time::timeout};
98
use tracing::{debug, enabled, error, info, trace, Level as LogLevel};
@@ -21,8 +20,8 @@ use crate::net::messages::{
2120
Authentication, BackendKeyData, ErrorResponse, FromBytes, Message, Password, Protocol,
2221
ReadyForQuery, ToBytes,
2322
};
24-
use crate::net::ProtocolMessage;
2523
use crate::net::{parameter::Parameters, Stream};
24+
use crate::net::{MessageBuffer, ProtocolMessage};
2625
use crate::state::State;
2726
use crate::stats::memory::MemoryUsage;
2827
use crate::util::user_database_from_params;
@@ -47,7 +46,7 @@ pub struct Client {
4746
transaction: Option<TransactionType>,
4847
timeouts: Timeouts,
4948
client_request: ClientRequest,
50-
stream_buffer: BytesMut,
49+
stream_buffer: MessageBuffer,
5150
passthrough_password: Option<String>,
5251
}
5352

@@ -86,7 +85,7 @@ impl MemoryUsage for Client {
8685
+ std::mem::size_of::<bool>() * 5
8786
+ self.prepared_statements.memory_used()
8887
+ std::mem::size_of::<Timeouts>()
89-
+ self.stream_buffer.memory_usage()
88+
+ self.stream_buffer.capacity()
9089
+ self.client_request.memory_usage()
9190
+ self
9291
.passthrough_password
@@ -301,7 +300,7 @@ impl Client {
301300
transaction: None,
302301
timeouts: Timeouts::from_config(&config.config.general),
303302
client_request: ClientRequest::new(),
304-
stream_buffer: BytesMut::new(),
303+
stream_buffer: MessageBuffer::new(),
305304
shutdown: false,
306305
passthrough_password,
307306
}))
@@ -328,7 +327,7 @@ impl Client {
328327
transaction: None,
329328
timeouts: Timeouts::from_config(&config().config.general),
330329
client_request: ClientRequest::new(),
331-
stream_buffer: BytesMut::new(),
330+
stream_buffer: MessageBuffer::new(),
332331
shutdown: false,
333332
passthrough_password: None,
334333
}
@@ -491,7 +490,7 @@ impl Client {
491490
.client_idle_timeout(&state, &self.client_request);
492491

493492
let message =
494-
match timeout(idle_timeout, self.stream.read_buf(&mut self.stream_buffer)).await {
493+
match timeout(idle_timeout, self.stream_buffer.read(&mut self.stream)).await {
495494
Err(_) => {
496495
self.stream
497496
.fatal(ErrorResponse::client_idle_timeout(idle_timeout))

0 commit comments

Comments
 (0)