Skip to content

Commit c5188ce

Browse files
authored
fix: deadlock in multi-shard SELECT queries (#864)
`SELECT` queries that read data from less than the total number of shards (multi-shard connection binding) would deadlock because we were incorrectly sending the query to less than the total number of shards. This didn't affect cross-shard (all-shard) queries or direct to shard queries.
1 parent 9935a95 commit c5188ce

11 files changed

Lines changed: 193 additions & 58 deletions

File tree

integration/pgdog.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,19 @@ database_name = "shard_1"
7878
shard = 1
7979
role = "replica"
8080

81+
# [[databases]]
82+
# name = "pgdog_sharded"
83+
# host = "localhost"
84+
# database_name = "shard_2"
85+
# shard = 2
86+
87+
# [[databases]]
88+
# name = "pgdog_sharded"
89+
# host = "localhost"
90+
# database_name = "shard_2"
91+
# shard = 2
92+
# role = "replica"
93+
8194
# ------------------------------------------------------------------------------
8295
# ----- Database :: failover ---------------------------------------------------
8396

integration/setup.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export PGHOST=127.0.0.1
2626
export PGPORT=5432
2727
export PGUSER='pgdog'
2828

29-
for db in pgdog shard_0 shard_1; do
29+
for db in pgdog shard_0 shard_1 shard_2 shard_3; do
3030
psql -c "DROP DATABASE $db" || true
3131
psql -c "CREATE DATABASE $db" || true
3232
for user in pgdog pgdog1 pgdog2 pgdog3; do
@@ -35,7 +35,7 @@ for db in pgdog shard_0 shard_1; do
3535
done
3636
done
3737

38-
for db in pgdog shard_0 shard_1; do
38+
for db in pgdog shard_0 shard_1 shard_2 shard_3; do
3939
for table in sharded sharded_omni; do
4040
psql -c "DROP TABLE IF EXISTS ${table}" ${db} -U pgdog
4141
psql -c "CREATE TABLE IF NOT EXISTS ${table} (

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ impl Binding {
136136
let mut shards_sent = servers.len();
137137
let mut futures = Vec::new();
138138

139-
for (shard, server) in servers.iter_mut().enumerate() {
139+
for (position, server) in servers.iter_mut().enumerate() {
140+
// Map positional index to actual shard number.
141+
// When only a subset of shards is connected (Shard::Multi binding),
142+
// positional indices don't match actual shard numbers.
143+
let shard = state.shard_index(position);
140144
let send = match client_request.route().shard() {
141145
Shard::Direct(s) => {
142146
shards_sent = 1;
@@ -177,9 +181,10 @@ impl Binding {
177181
/// Send copy messages to shards they are destined to go.
178182
pub async fn send_copy(&mut self, rows: Vec<CopyRow>) -> Result<(), Error> {
179183
match self {
180-
Binding::MultiShard(servers, _state) => {
184+
Binding::MultiShard(servers, state) => {
181185
for row in rows {
182-
for (shard, server) in servers.iter_mut().enumerate() {
186+
for (position, server) in servers.iter_mut().enumerate() {
187+
let shard = state.shard_index(position);
183188
match row.shard() {
184189
Shard::Direct(row_shard) => {
185190
if shard == *row_shard {

pgdog/src/backend/pool/connection/binding_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ mod tests {
4949
];
5050

5151
let route = Route::write(ShardWithPriority::new_default_unset(Shard::All));
52-
let multishard = MultiShard::new(3, &route);
52+
let multishard = MultiShard::new(vec![0, 1, 2], &route);
5353

5454
let mut binding = Binding::MultiShard(guards, Box::new(multishard));
5555

pgdog/src/backend/pool/connection/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ impl Connection {
158158
};
159159
} else {
160160
let mut shards = vec![];
161+
let mut shard_indices = vec![];
161162
for (i, shard) in self.cluster()?.shards().iter().enumerate() {
162163
if let Shard::Multi(numbers) = route.shard() {
163164
if !numbers.contains(&i) {
@@ -175,11 +176,11 @@ impl Connection {
175176
}
176177

177178
shards.push(server);
179+
shard_indices.push(i);
178180
}
179-
let num_shards = shards.len();
180181

181182
self.binding =
182-
Binding::MultiShard(shards, Box::new(MultiShard::new(num_shards, route)));
183+
Binding::MultiShard(shards, Box::new(MultiShard::new(shard_indices, route)));
183184
}
184185

185186
Ok(())

pgdog/src/backend/pool/connection/multi_shard/mod.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ pub struct MultiShard {
5252
shards: usize,
5353
/// Route the query is taking.
5454
route: Route,
55+
/// Maps positional index in the servers vec to actual shard number.
56+
/// When all shards are connected, this is `[0, 1, 2, ...]`.
57+
/// When only a subset is connected (e.g. shards 0 and 2), this is `[0, 2]`.
58+
shard_indices: Vec<usize>,
5559

5660
/// Counters
5761
counters: Counters,
@@ -64,16 +68,26 @@ pub struct MultiShard {
6468
}
6569

6670
impl MultiShard {
67-
/// New multi-shard state given the number of shards in the cluster.
68-
pub(super) fn new(shards: usize, route: &Route) -> Self {
71+
/// New multi-shard state given the actual shard indices connected.
72+
pub(super) fn new(shard_indices: Vec<usize>, route: &Route) -> Self {
73+
let shards = shard_indices.len();
6974
Self {
7075
shards,
76+
shard_indices,
7177
route: route.clone(),
7278
counters: Counters::default(),
7379
..Default::default()
7480
}
7581
}
7682

83+
/// Map a positional index to the actual shard number.
84+
pub(super) fn shard_index(&self, position: usize) -> usize {
85+
self.shard_indices
86+
.get(position)
87+
.copied()
88+
.unwrap_or(position)
89+
}
90+
7791
/// Update multi-shard state.
7892
pub(super) fn update(&mut self, shards: usize, route: &Route) {
7993
self.reset();

pgdog/src/backend/pool/connection/multi_shard/test.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::*;
88
#[test]
99
fn test_inconsistent_row_descriptions() {
1010
let route = Route::default();
11-
let mut multi_shard = MultiShard::new(2, &route);
11+
let mut multi_shard = MultiShard::new(vec![0, 1], &route);
1212

1313
// Create two different row descriptions
1414
let rd1 = RowDescription::new(&[Field::text("name"), Field::bigint("id")]);
@@ -32,7 +32,7 @@ fn test_inconsistent_row_descriptions() {
3232
#[test]
3333
fn test_inconsistent_data_rows() {
3434
let route = Route::default();
35-
let mut multi_shard = MultiShard::new(2, &route);
35+
let mut multi_shard = MultiShard::new(vec![0, 1], &route);
3636

3737
// Set up row description first
3838
let rd = RowDescription::new(&[Field::text("name"), Field::bigint("id")]);
@@ -63,7 +63,7 @@ fn test_inconsistent_data_rows() {
6363
#[test]
6464
fn test_rd_before_dr() {
6565
let mut multi_shard = MultiShard::new(
66-
3,
66+
vec![0, 1, 2],
6767
&Route::read(ShardWithPriority::new_default_unset(Shard::All)),
6868
);
6969
let rd = RowDescription::new(&[Field::bigint("id")]);
@@ -127,7 +127,7 @@ fn test_rd_before_dr() {
127127
#[test]
128128
fn test_ready_for_query_error_preservation() {
129129
let route = Route::default();
130-
let mut multi_shard = MultiShard::new(2, &route);
130+
let mut multi_shard = MultiShard::new(vec![0, 1], &route);
131131

132132
// Create ReadyForQuery messages - one with transaction error, one normal
133133
let rfq_error = ReadyForQuery::error();
@@ -151,7 +151,7 @@ fn test_ready_for_query_error_preservation() {
151151
fn test_omni_command_complete_not_summed() {
152152
// For omni-sharded tables, we should NOT sum row counts across shards.
153153
let route = Route::write(ShardWithPriority::new_table_omni(Shard::All));
154-
let mut multi_shard = MultiShard::new(3, &route);
154+
let mut multi_shard = MultiShard::new(vec![0, 1, 2], &route);
155155

156156
let backend1 = BackendKeyData { pid: 1, secret: 1 };
157157
let backend2 = BackendKeyData { pid: 2, secret: 2 };
@@ -193,7 +193,7 @@ fn test_omni_command_complete_not_summed() {
193193
fn test_omni_command_complete_uses_first_shard_row_count() {
194194
// For omni, we use the first shard's row count for consistency with DataRow behavior.
195195
let route = Route::write(ShardWithPriority::new_table_omni(Shard::All));
196-
let mut multi_shard = MultiShard::new(2, &route);
196+
let mut multi_shard = MultiShard::new(vec![0, 1], &route);
197197

198198
let backend1 = BackendKeyData { pid: 1, secret: 1 };
199199
let backend2 = BackendKeyData { pid: 2, secret: 2 };
@@ -228,7 +228,7 @@ fn test_omni_command_complete_uses_first_shard_row_count() {
228228
fn test_omni_data_rows_only_from_first_server() {
229229
// For omni-sharded tables with RETURNING, only forward DataRows from the first server.
230230
let route = Route::write(ShardWithPriority::new_table_omni(Shard::All));
231-
let mut multi_shard = MultiShard::new(2, &route);
231+
let mut multi_shard = MultiShard::new(vec![0, 1], &route);
232232

233233
let backend1 = BackendKeyData { pid: 1, secret: 1 };
234234
let backend2 = BackendKeyData { pid: 2, secret: 2 };

pgdog/src/config/mod.rs

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -207,52 +207,46 @@ pub fn load_test_replicas() {
207207

208208
#[cfg(test)]
209209
pub fn load_test_sharded() {
210+
load_test_sharded_n(2);
211+
}
212+
213+
/// Load 3-shard test configuration.
214+
pub fn load_test_sharded_3() {
215+
load_test_sharded_n(3);
216+
}
217+
218+
fn load_test_sharded_n(num_shards: usize) {
210219
use pgdog_config::{OmnishardedTables, ShardedSchema};
211220

212221
use crate::backend::databases::init;
213222

214223
let mut config = ConfigAndUsers::default();
215224
config.config.general.min_pool_size = 0;
216-
config.config.databases = vec![
217-
Database {
218-
name: "pgdog".into(),
219-
host: "127.0.0.1".into(),
220-
port: 5432,
221-
role: Role::Primary,
222-
database_name: Some("shard_0".into()),
223-
shard: 0,
224-
..Default::default()
225-
},
226-
Database {
227-
name: "pgdog".into(),
228-
host: "127.0.0.1".into(),
229-
port: 5432,
230-
role: Role::Replica,
231-
read_only: Some(true),
232-
database_name: Some("shard_0".into()),
233-
shard: 0,
234-
..Default::default()
235-
},
236-
Database {
237-
name: "pgdog".into(),
238-
host: "127.0.0.1".into(),
239-
port: 5432,
240-
role: Role::Primary,
241-
database_name: Some("shard_1".into()),
242-
shard: 1,
243-
..Default::default()
244-
},
245-
Database {
246-
name: "pgdog".into(),
247-
host: "127.0.0.1".into(),
248-
port: 5432,
249-
role: Role::Replica,
250-
read_only: Some(true),
251-
database_name: Some("shard_1".into()),
252-
shard: 1,
253-
..Default::default()
254-
},
255-
];
225+
config.config.databases = (0..num_shards)
226+
.flat_map(|shard| {
227+
vec![
228+
Database {
229+
name: "pgdog".into(),
230+
host: "127.0.0.1".into(),
231+
port: 5432,
232+
role: Role::Primary,
233+
database_name: Some(format!("shard_{}", shard)),
234+
shard,
235+
..Default::default()
236+
},
237+
Database {
238+
name: "pgdog".into(),
239+
host: "127.0.0.1".into(),
240+
port: 5432,
241+
role: Role::Replica,
242+
read_only: Some(true),
243+
database_name: Some(format!("shard_{}", shard)),
244+
shard,
245+
..Default::default()
246+
},
247+
]
248+
})
249+
.collect();
256250
config.config.sharded_tables = vec![
257251
ShardedTable {
258252
database: "pgdog".into(),

pgdog/src/frontend/client/query_engine/test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod graceful_disconnect;
1515
mod graceful_shutdown;
1616
mod idle_in_transaction_recovery;
1717
mod lock_session;
18+
mod multi_binding;
1819
mod omni;
1920
pub mod prelude;
2021
mod prepared_syntax_error;
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use crate::{
2+
expect_message,
3+
net::{CommandComplete, Parameters, Query, ReadyForQuery},
4+
};
5+
6+
use super::prelude::*;
7+
8+
/// Regression test: Shard::Multi with fewer shards than total (e.g. 2 of 3)
9+
/// used to get stuck because send() compared actual shard numbers against
10+
/// positional indices in the pre-filtered servers vec.
11+
///
12+
/// With 3 shards, an IN clause targeting shards 0 and 2 creates a MultiShard
13+
/// binding with only 2 servers. Before the fix, send() used positional indices
14+
/// (0, 1) to match against actual shard numbers ([0, 2]), so server at index 1
15+
/// (shard 2) never received the query — hanging the response.
16+
#[tokio::test]
17+
async fn test_multi_binding_select_subset_of_shards() {
18+
let mut client = TestClient::new_sharded_3(Parameters::default()).await;
19+
20+
let id_shard0 = client.random_id_for_shard(0);
21+
let id_shard2 = client.random_id_for_shard(2);
22+
23+
// Cleanup
24+
client
25+
.send_simple(Query::new(format!(
26+
"DELETE FROM sharded WHERE id IN ({}, {})",
27+
id_shard0, id_shard2
28+
)))
29+
.await;
30+
client.read_until('Z').await.unwrap();
31+
32+
// Insert rows into shards 0 and 2 (skipping shard 1).
33+
client
34+
.send_simple(Query::new(format!(
35+
"INSERT INTO sharded (id, value) VALUES ({}, 'multi0'), ({}, 'multi2') ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value",
36+
id_shard0, id_shard2
37+
)))
38+
.await;
39+
client.read_until('Z').await.unwrap();
40+
41+
// SELECT with IN clause targeting shards 0 and 2 only.
42+
// This creates a Shard::Multi([0, 2]) binding with 2 of 3 servers.
43+
client
44+
.send_simple(Query::new(format!(
45+
"SELECT * FROM sharded WHERE id IN ({}, {})",
46+
id_shard0, id_shard2
47+
)))
48+
.await;
49+
50+
let messages = client.read_until('Z').await.unwrap();
51+
52+
let data_rows: Vec<_> = messages.iter().filter(|m| m.code() == 'D').collect();
53+
assert_eq!(
54+
data_rows.len(),
55+
2,
56+
"should return rows from both targeted shards"
57+
);
58+
59+
let cc_msg = messages.iter().find(|m| m.code() == 'C').unwrap();
60+
let cc = CommandComplete::try_from(cc_msg.clone()).unwrap();
61+
assert_eq!(cc.command(), "SELECT 2");
62+
63+
// Cleanup
64+
client
65+
.send_simple(Query::new(format!(
66+
"DELETE FROM sharded WHERE id IN ({}, {})",
67+
id_shard0, id_shard2
68+
)))
69+
.await;
70+
client.read_until('Z').await.unwrap();
71+
}
72+
73+
/// Test multi-binding DELETE targeting a subset of shards (2 of 3).
74+
#[tokio::test]
75+
async fn test_multi_binding_delete_subset_of_shards() {
76+
let mut client = TestClient::new_sharded_3(Parameters::default()).await;
77+
78+
let id_shard0 = client.random_id_for_shard(0);
79+
let id_shard2 = client.random_id_for_shard(2);
80+
81+
// Setup
82+
client
83+
.send_simple(Query::new(format!(
84+
"INSERT INTO sharded (id, value) VALUES ({}, 'del0'), ({}, 'del2') ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value",
85+
id_shard0, id_shard2
86+
)))
87+
.await;
88+
client.read_until('Z').await.unwrap();
89+
90+
// DELETE targeting shards 0 and 2 via IN clause (2 of 3 shards).
91+
client
92+
.send_simple(Query::new(format!(
93+
"DELETE FROM sharded WHERE id IN ({}, {})",
94+
id_shard0, id_shard2
95+
)))
96+
.await;
97+
98+
let cc = expect_message!(client.read().await, CommandComplete);
99+
assert_eq!(cc.command(), "DELETE 2");
100+
expect_message!(client.read().await, ReadyForQuery);
101+
}

0 commit comments

Comments
 (0)