Skip to content

Commit 1f3e14a

Browse files
authored
fix: put Sync into its own ClientRequest when spliced, feat: add client_connection_recovery setting (#764)
- fix: put `Sync` into its own `ClientRequest`. This affects sharded spliced requests only (multiple `Execute`/`Bind` in one request). Previously, Sync was only attached to the last request only. Now, it's sent individually to all shards, making sure all shards execute requests. - feat: add `client_connection_recovery` setting. Allows to disconnect clients on hard to recover connection errors, like pool timeouts. ```toml [general] client_connection_recovery = "drop" ```
1 parent 1854970 commit 1f3e14a

13 files changed

Lines changed: 761 additions & 17 deletions

File tree

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use rust::setup::{admin_sqlx, admin_tokio};
2+
use serial_test::serial;
3+
use sqlx::{Executor, Row};
4+
use std::time::Duration;
5+
use tokio::time::sleep;
6+
use tokio_postgres::NoTls;
7+
8+
async fn connection_tokio(db: &str) -> tokio_postgres::Client {
9+
let (client, connection) = tokio_postgres::connect(
10+
&format!(
11+
"host=127.0.0.1 user=pgdog dbname={} password=pgdog port=6432",
12+
db
13+
),
14+
NoTls,
15+
)
16+
.await
17+
.unwrap();
18+
19+
tokio::spawn(async move {
20+
if let Err(e) = connection.await {
21+
eprintln!("connection error: {}", e);
22+
}
23+
});
24+
25+
client
26+
}
27+
28+
async fn get_pool_ids(database: &str) -> (i64, i64) {
29+
let admin = admin_sqlx().await;
30+
let pools = admin.fetch_all("SHOW POOLS").await.unwrap();
31+
32+
let primary_id = pools
33+
.iter()
34+
.find(|p| {
35+
p.get::<String, &str>("database") == database
36+
&& p.get::<String, &str>("user") == "pgdog"
37+
&& p.get::<String, &str>("role") == "primary"
38+
})
39+
.map(|p| p.get::<i64, &str>("id"))
40+
.unwrap();
41+
42+
let replica_id = pools
43+
.iter()
44+
.find(|p| {
45+
p.get::<String, &str>("database") == database
46+
&& p.get::<String, &str>("user") == "pgdog"
47+
&& p.get::<String, &str>("role") == "replica"
48+
})
49+
.map(|p| p.get::<i64, &str>("id"))
50+
.unwrap();
51+
52+
(primary_id, replica_id)
53+
}
54+
55+
async fn ban_pools(database: &str) {
56+
let admin = admin_sqlx().await;
57+
let (primary_id, replica_id) = get_pool_ids(database).await;
58+
59+
admin
60+
.execute(format!("BAN {}", primary_id).as_str())
61+
.await
62+
.unwrap();
63+
admin
64+
.execute(format!("BAN {}", replica_id).as_str())
65+
.await
66+
.unwrap();
67+
}
68+
69+
async fn unban_pools(database: &str) {
70+
let admin = admin_sqlx().await;
71+
let (primary_id, replica_id) = get_pool_ids(database).await;
72+
73+
admin
74+
.execute(format!("UNBAN {}", primary_id).as_str())
75+
.await
76+
.unwrap();
77+
admin
78+
.execute(format!("UNBAN {}", replica_id).as_str())
79+
.await
80+
.unwrap();
81+
}
82+
83+
#[tokio::test]
84+
#[serial]
85+
async fn test_client_connection_recovery_default() {
86+
let admin = admin_tokio().await;
87+
88+
// Ensure recover mode (default)
89+
admin
90+
.simple_query("SET client_connection_recovery TO 'recover'")
91+
.await
92+
.unwrap();
93+
94+
// Give pools time to reinitialize
95+
sleep(Duration::from_millis(200)).await;
96+
97+
// Connection that we'll test recovery on
98+
let conn = connection_tokio("pgdog").await;
99+
100+
// Ban both primary and replica pools to force Banned error
101+
ban_pools("pgdog").await;
102+
103+
// Give ban time to take effect
104+
sleep(Duration::from_millis(50)).await;
105+
106+
// This query should fail with Banned error because both pools are banned
107+
conn.simple_query("BEGIN").await.unwrap();
108+
let result = conn.simple_query("SELECT 1").await;
109+
assert!(result.is_err(), "Expected error when pools are banned");
110+
let err = result.unwrap_err();
111+
let err_str = err.to_string().to_lowercase();
112+
assert!(
113+
err_str.contains("all replicas down"),
114+
"Expected 'all replicas down' error, got: {}",
115+
err
116+
);
117+
118+
// Unban pools
119+
unban_pools("pgdog").await;
120+
121+
// Give unban time to take effect
122+
sleep(Duration::from_millis(100)).await;
123+
124+
// In recovery mode, the connection should still be usable
125+
let result = conn.simple_query("BEGIN").await;
126+
assert!(
127+
result.is_ok(),
128+
"Expected query to succeed after recovery, got: {:?}",
129+
result.err()
130+
);
131+
let result = conn.simple_query("SELECT 1").await;
132+
assert!(
133+
result.is_ok(),
134+
"Expected SELECT to succeed after recovery, got: {:?}",
135+
result.err()
136+
);
137+
conn.simple_query("COMMIT").await.unwrap();
138+
139+
// Reset settings
140+
admin.simple_query("RELOAD").await.unwrap();
141+
}
142+
143+
#[tokio::test]
144+
#[serial]
145+
async fn test_client_connection_recovery_drop() {
146+
let admin = admin_tokio().await;
147+
148+
// Set drop mode - client should be disconnected on recoverable errors
149+
admin
150+
.simple_query("SET client_connection_recovery TO 'drop'")
151+
.await
152+
.unwrap();
153+
154+
// Give pools time to reinitialize
155+
sleep(Duration::from_millis(200)).await;
156+
157+
// Connection that we'll test drop behavior on
158+
let conn = connection_tokio("pgdog").await;
159+
160+
// Ban both pools to force Banned error
161+
ban_pools("pgdog").await;
162+
163+
// Give ban time to take effect
164+
sleep(Duration::from_millis(50)).await;
165+
166+
// This query should fail because pools are banned
167+
conn.simple_query("BEGIN").await.unwrap();
168+
let result = conn.simple_query("SELECT 1").await;
169+
assert!(result.is_err(), "Expected error when pools are banned");
170+
171+
// Unban pools
172+
unban_pools("pgdog").await;
173+
174+
// Give unban time to take effect
175+
sleep(Duration::from_millis(100)).await;
176+
177+
// In drop mode, the connection should be disconnected
178+
// Subsequent queries should fail because the connection is closed
179+
let result = conn.simple_query("BEGIN").await;
180+
assert!(
181+
result.is_err(),
182+
"Expected query to fail because connection was dropped"
183+
);
184+
185+
// Verify that the error indicates connection was closed
186+
let err = result.unwrap_err();
187+
let err_str = err.to_string().to_lowercase();
188+
assert!(
189+
err_str.contains("connection closed"),
190+
"Expected 'connection closed' error, got: {}",
191+
err
192+
);
193+
194+
// Reset settings
195+
admin.simple_query("RELOAD").await.unwrap();
196+
}

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod auto_id;
44
pub mod avg;
55
pub mod ban;
66
pub mod client_ids;
7+
pub mod connection_recovery;
78
pub mod cross_shard_disabled;
89
pub mod distinct;
910
pub mod explain;

pgdog-config/src/general.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ pub struct General {
182182
/// Connection cleanup algorithm.
183183
#[serde(default = "General::connection_recovery")]
184184
pub connection_recovery: ConnectionRecovery,
185+
/// Client connection recovery
186+
#[serde(default = "General::client_connection_recovery")]
187+
pub client_connection_recovery: ConnectionRecovery,
185188
/// LSN check interval.
186189
#[serde(default = "General::lsn_check_interval")]
187190
pub lsn_check_interval: u64,
@@ -270,6 +273,7 @@ impl Default for General {
270273
server_lifetime: Self::server_lifetime(),
271274
stats_period: Self::stats_period(),
272275
connection_recovery: Self::connection_recovery(),
276+
client_connection_recovery: Self::client_connection_recovery(),
273277
lsn_check_interval: Self::lsn_check_interval(),
274278
lsn_check_timeout: Self::lsn_check_timeout(),
275279
lsn_check_delay: Self::lsn_check_delay(),
@@ -601,6 +605,10 @@ impl General {
601605
Self::env_enum_or_default("PGDOG_CONNECTION_RECOVERY")
602606
}
603607

608+
pub fn client_connection_recovery() -> ConnectionRecovery {
609+
Self::env_enum_or_default("PGDOG_CLIENT_CONNECTION_RECOVERY")
610+
}
611+
604612
fn stats_period() -> u64 {
605613
Self::env_or_default("PGDOG_STATS_PERIOD", 15_000)
606614
}

pgdog/src/admin/set.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,22 @@ impl Command for Set {
164164
config.config.general.reload_schema_on_ddl = Self::from_json(&self.value)?;
165165
}
166166

167+
"connection_recovery" => {
168+
config.config.general.connection_recovery = Self::from_json(&self.value)?;
169+
}
170+
171+
"client_connection_recovery" => {
172+
config.config.general.client_connection_recovery = Self::from_json(&self.value)?;
173+
}
174+
175+
"default_pool_size" => {
176+
config.config.general.default_pool_size = self.value.parse()?;
177+
}
178+
179+
"connect_timeout" => {
180+
config.config.general.connect_timeout = self.value.parse()?;
181+
}
182+
167183
_ => return Err(Error::Syntax),
168184
}
169185

pgdog/src/backend/pool/cluster.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub struct Cluster {
7171
pub_sub_channel_size: usize,
7272
query_parser: QueryParserLevel,
7373
connection_recovery: ConnectionRecovery,
74+
client_connection_recovery: ConnectionRecovery,
7475
query_parser_engine: QueryParserEngine,
7576
reload_schema_on_ddl: bool,
7677
}
@@ -144,6 +145,7 @@ pub struct ClusterConfig<'a> {
144145
pub query_parser: QueryParserLevel,
145146
pub query_parser_engine: QueryParserEngine,
146147
pub connection_recovery: ConnectionRecovery,
148+
pub client_connection_recovery: ConnectionRecovery,
147149
pub lsn_check_interval: Duration,
148150
pub reload_schema_on_ddl: bool,
149151
}
@@ -192,6 +194,7 @@ impl<'a> ClusterConfig<'a> {
192194
query_parser: general.query_parser,
193195
query_parser_engine: general.query_parser_engine,
194196
connection_recovery: general.connection_recovery,
197+
client_connection_recovery: general.client_connection_recovery,
195198
lsn_check_interval: Duration::from_millis(general.lsn_check_interval),
196199
reload_schema_on_ddl: general.reload_schema_on_ddl,
197200
}
@@ -225,6 +228,7 @@ impl Cluster {
225228
pub_sub_channel_size,
226229
query_parser,
227230
connection_recovery,
231+
client_connection_recovery,
228232
lsn_check_interval,
229233
query_parser_engine,
230234
reload_schema_on_ddl,
@@ -272,6 +276,7 @@ impl Cluster {
272276
pub_sub_channel_size,
273277
query_parser,
274278
connection_recovery,
279+
client_connection_recovery,
275280
query_parser_engine,
276281
reload_schema_on_ddl,
277282
}
@@ -379,6 +384,10 @@ impl Cluster {
379384
&self.connection_recovery
380385
}
381386

387+
pub fn client_connection_recovery(&self) -> &ConnectionRecovery {
388+
&self.client_connection_recovery
389+
}
390+
382391
pub fn dry_run(&self) -> bool {
383392
self.dry_run
384393
}

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,14 @@ impl Binding {
160160
result?;
161161
}
162162

163-
state.update(shards_sent, client_request.route());
163+
// For Sync-only requests, update shards count but don't reset counters.
164+
// Sync needs correct shards for ReadyForQuery counting, but we must
165+
// preserve buffered CommandComplete from previous queries.
166+
if client_request.is_sync_only() {
167+
state.update_shards(shards_sent);
168+
} else {
169+
state.update(shards_sent, client_request.route());
170+
}
164171

165172
Ok(())
166173
}

pgdog/src/backend/pool/connection/multi_shard/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ impl MultiShard {
8181
self.route = route.clone();
8282
}
8383

84+
/// Update only the shards count without resetting counters.
85+
/// Used for Sync-only requests where we need correct ReadyForQuery counting
86+
/// but must preserve buffered CommandComplete from previous queries.
87+
pub(super) fn update_shards(&mut self, shards: usize) {
88+
self.shards = shards;
89+
}
90+
8491
pub(super) fn reset(&mut self) {
8592
self.counters = Counters::default();
8693
self.buffer.reset();

0 commit comments

Comments
 (0)