Skip to content

Commit 1d26172

Browse files
authored
fix: client disconnect inside transaction caused deadlock (#848)
If a client sent `Terminate` while inside a transaction (i.e. closing connection), we would deadlock, leaving the client task stuck, blocking the server connection inside the transaction as well. This is a regression introduced in #820, in an attempt to fix session mode clients not disconnecting properly. Contributing factors include a special "test mode" for the query engine / client query handling code that required code that only ran in tests but not in production, and vice versa. This was removed, and the client query handling loop is now fully tested the way it works in prod.
1 parent 3f6cd27 commit 1d26172

20 files changed

Lines changed: 561 additions & 564 deletions

.config/nextest.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
fail-fast = true
33
test-threads = 1
44
default-filter = "not package(rust)"
5+
6+
[profile.default]
7+
slow-timeout = "15s"

integration/python/test_idle_in_transaction.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import asyncpg
23
import pytest
34
from sqlalchemy import text
45
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
@@ -101,3 +102,46 @@ async def test_idle_in_transaction_timeout():
101102
under concurrent load."""
102103
tasks = [_run_idle_in_transaction() for _ in range(WORKERS)]
103104
await asyncio.gather(*tasks)
105+
106+
107+
async def _disconnect_mid_transaction():
108+
"""Single worker: begin a transaction, disconnect, verify rollback."""
109+
conn = await asyncpg.connect(
110+
user="pgdog",
111+
password="pgdog",
112+
database="pgdog",
113+
host="127.0.0.1",
114+
port=6432,
115+
)
116+
117+
# Start a transaction and grab the backend pid
118+
await conn.execute("BEGIN")
119+
pid = await conn.fetchval("SELECT pg_backend_pid()")
120+
await conn.execute("SELECT 1")
121+
122+
# Disconnect abruptly (without COMMIT/ROLLBACK)
123+
await conn.close()
124+
125+
# Give pgdog a moment to clean up
126+
await asyncio.sleep(0.5)
127+
128+
# Check via direct connection that the backend is idle (transaction was rolled back)
129+
direct = direct_sync()
130+
direct.autocommit = True
131+
cur = direct.cursor()
132+
cur.execute(
133+
"SELECT state FROM pg_stat_activity WHERE pid = %s",
134+
(pid,),
135+
)
136+
row = cur.fetchone()
137+
direct.close()
138+
139+
assert row is not None, f"expected backend {pid} to still exist in the pool"
140+
assert row[0] == "idle", f"expected backend {pid} to be idle (rolled back), got: {row[0]}"
141+
142+
143+
@pytest.mark.asyncio
144+
async def test_disconnect_mid_transaction_rolls_back():
145+
"""Disconnect in the middle of a transaction and verify pgdog rolls it back."""
146+
tasks = [_disconnect_mid_transaction() for _ in range(WORKERS)]
147+
await asyncio.gather(*tasks)

pgdog/src/backend/pool/connection/mirror/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,6 @@ impl Mirror {
9696
let mut query_engine =
9797
QueryEngine::new(&params, &ClientComms::new(&BackendKeyData::new()), false)?;
9898

99-
// Mirror must read server responses to keep the connection synchronized,
100-
// so disable test_mode which skips reading responses.
101-
#[cfg(test)]
102-
query_engine.set_test_mode(false);
103-
10499
// Mirror traffic handler.
105100
let mut mirror = Self::new(&params, &config);
106101

pgdog/src/frontend/client/mod.rs

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ pub struct Client {
4747
comms: ClientComms,
4848
admin: bool,
4949
streaming: bool,
50-
shutdown: bool,
5150
prepared_statements: PreparedStatements,
5251
transaction: Option<TransactionType>,
5352
timeouts: Timeouts,
@@ -306,7 +305,6 @@ impl Client {
306305
timeouts: Timeouts::from_config(&config.config.general),
307306
client_request: ClientRequest::new(),
308307
stream_buffer: MessageBuffer::new(config.config.memory.message_buffer),
309-
shutdown: false,
310308
sticky: Sticky::from_params(&params),
311309
connect_params: params,
312310
}))
@@ -338,7 +336,6 @@ impl Client {
338336
timeouts: Timeouts::from_config(&config().config.general),
339337
client_request: ClientRequest::new(),
340338
stream_buffer: MessageBuffer::new(4096),
341-
shutdown: false,
342339
sticky: Sticky::from_params(&connect_params),
343340
params: connect_params,
344341
}
@@ -380,28 +377,28 @@ impl Client {
380377
/// Run the client.
381378
async fn run(&mut self) -> Result<(), Error> {
382379
let shutdown = self.comms.shutting_down();
383-
let mut offline;
384380
let mut query_engine = QueryEngine::from_client(self)?;
385-
let mut terminating = false;
386381

387382
loop {
388-
offline = (self.comms.offline() && !self.admin || self.shutdown)
389-
&& query_engine.can_disconnect();
390-
if offline {
391-
break;
392-
}
393-
394-
if terminating && query_engine.can_disconnect() {
383+
// Check if we should be shutting down.
384+
let offline = self.comms.offline();
385+
// Check that there are no active transactions.
386+
let query_engine_done = query_engine.can_disconnect();
387+
388+
// If query engine is idle and we requested shutdown, we're done.
389+
if query_engine_done && offline {
390+
// Send shutdown notification to client.
391+
self.stream
392+
.send_flush(&ErrorResponse::shutting_down())
393+
.await?;
395394
break;
396395
}
397396

398397
let client_state = query_engine.client_state();
399398

400399
select! {
401400
_ = shutdown.notified() => {
402-
if query_engine.can_disconnect() {
403-
continue; // Wake up task.
404-
}
401+
continue; // Wake up task.
405402
}
406403

407404
// Async messages.
@@ -410,7 +407,7 @@ impl Client {
410407
self.server_message(&mut query_engine, message).await?;
411408
}
412409

413-
buffer = self.buffer(client_state), if !terminating => {
410+
buffer = self.buffer(client_state) => {
414411
let event = buffer?;
415412

416413
// Only send requests to the backend if they are complete.
@@ -420,25 +417,14 @@ impl Client {
420417
}
421418

422419
match event {
423-
BufferEvent::DisconnectAbrupt => break,
424-
BufferEvent::DisconnectGraceful => {
425-
if query_engine.can_disconnect() {
426-
break;
427-
}
428-
terminating = true;
429-
}
420+
// Client disconnected, we're done.
421+
BufferEvent::DisconnectAbrupt | BufferEvent::DisconnectGraceful => break,
430422
BufferEvent::HaveRequest => (),
431423
}
432424
}
433425
}
434426
}
435427

436-
if offline && !self.shutdown {
437-
self.stream
438-
.send_flush(&ErrorResponse::shutting_down())
439-
.await?;
440-
}
441-
442428
Ok(())
443429
}
444430

@@ -547,7 +533,6 @@ impl Client {
547533

548534
// Terminate (B & F).
549535
if message.code() == 'X' {
550-
self.shutdown = true;
551536
return Ok(BufferEvent::DisconnectGraceful);
552537
} else {
553538
let message = ProtocolMessage::from_bytes(message.to_bytes()?)?;

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,28 +43,6 @@ use notify_buffer::NotifyBuffer;
4343
pub use two_pc::phase::TwoPcPhase;
4444
use two_pc::TwoPc;
4545

46-
#[derive(Debug)]
47-
pub struct TestMode {
48-
pub enabled: bool,
49-
}
50-
51-
impl Default for TestMode {
52-
fn default() -> Self {
53-
Self::new()
54-
}
55-
}
56-
57-
impl TestMode {
58-
pub fn new() -> Self {
59-
Self {
60-
#[cfg(test)]
61-
enabled: true,
62-
#[cfg(not(test))]
63-
enabled: false,
64-
}
65-
}
66-
}
67-
6846
#[derive(Debug)]
6947
pub struct QueryEngine {
7048
begin_stmt: Option<BufferedQuery>,
@@ -73,7 +51,6 @@ pub struct QueryEngine {
7351
stats: Stats,
7452
backend: Connection,
7553
streaming: bool,
76-
test_mode: TestMode,
7754
two_pc: TwoPc,
7855
notify_buffer: NotifyBuffer,
7956
pending_explain: Option<ExplainResponseState>,
@@ -92,7 +69,6 @@ impl QueryEngine {
9269
backend,
9370
comms: comms.clone(),
9471
hooks: QueryEngineHooks::new(),
95-
test_mode: TestMode::new(),
9672
stats: Stats::default(),
9773
streaming: bool::default(),
9874
two_pc: TwoPc::default(),

pgdog/src/frontend/client/query_engine/query.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ impl QueryEngine {
9595
while self.backend.has_more_messages()
9696
&& !self.backend.in_copy_mode()
9797
&& !self.streaming
98-
&& !self.test_mode.enabled
9998
{
10099
let message = self.read_server_message().await?;
101100
self.process_server_message(context, message).await?;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use crate::{
2+
expect_message,
3+
net::{
4+
CloseComplete, CommandComplete, DataRow, Parameters, ParseComplete, ReadyForQuery,
5+
RowDescription,
6+
},
7+
};
8+
9+
use super::prelude::*;
10+
11+
/// Test closing a non-existent prepared statement, then a query,
12+
/// then close+parse+flush for the same name.
13+
#[tokio::test]
14+
async fn test_close_parse() {
15+
let mut client = TestClient::new_replicas(Parameters::default()).await;
16+
17+
// Close non-existent statement + Sync
18+
client.send(Close::named("test")).await;
19+
client.send(Sync).await;
20+
client.try_process().await.unwrap();
21+
22+
expect_message!(client.read().await, CloseComplete); // '3'
23+
expect_message!(client.read().await, ReadyForQuery); // 'Z'
24+
25+
// Simple SELECT query
26+
client.send_simple(Query::new("SELECT 1")).await;
27+
28+
expect_message!(client.read().await, RowDescription); // 'T'
29+
expect_message!(client.read().await, DataRow); // 'D'
30+
expect_message!(client.read().await, CommandComplete); // 'C'
31+
expect_message!(client.read().await, ReadyForQuery); // 'Z'
32+
33+
// Close + Parse same name + Flush
34+
client.send(Close::named("test1")).await;
35+
client.send(Parse::named("test1", "SELECT $1")).await;
36+
client.send(Flush).await;
37+
client.try_process().await.unwrap();
38+
39+
expect_message!(client.read().await, CloseComplete); // '3'
40+
expect_message!(client.read().await, ParseComplete); // '1'
41+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use crate::{
2+
expect_message,
3+
net::{CloseComplete, Parameters, ParseComplete},
4+
};
5+
6+
use super::prelude::*;
7+
8+
/// Test that Close + Parse for the same name correctly updates
9+
/// the global prepared statement cache.
10+
#[tokio::test]
11+
async fn test_close_parse_same_name_global_cache() {
12+
let mut client = TestClient::new_replicas(Parameters::default()).await;
13+
14+
// Close + Parse same name + Flush
15+
client.send(Close::named("test_stmt")).await;
16+
client.send(Parse::named("test_stmt", "SELECT $1")).await;
17+
client.send(Flush).await;
18+
client.try_process().await.unwrap();
19+
20+
expect_message!(client.read().await, CloseComplete); // '3'
21+
expect_message!(client.read().await, ParseComplete); // '1'
22+
23+
// Verify the statement is registered correctly in the global cache
24+
let global_cache = client.client().prepared_statements.global.clone();
25+
assert_eq!(global_cache.read().len(), 1);
26+
let binding = global_cache.write();
27+
let (_, cached_stmt) = binding.statements().iter().next().unwrap();
28+
assert_eq!(cached_stmt.used, 1);
29+
30+
// Verify the SQL content in the global cache
31+
let global_stmt_name = cached_stmt.name();
32+
let cached_query = binding.query(&global_stmt_name).unwrap();
33+
assert_eq!(cached_query, "SELECT $1");
34+
35+
// Verify the client's local cache
36+
assert_eq!(client.client().prepared_statements.len_local(), 1);
37+
assert!(client
38+
.client()
39+
.prepared_statements
40+
.name("test_stmt")
41+
.is_some());
42+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use crate::{
2+
expect_message,
3+
net::{
4+
BindComplete, CommandComplete, DataRow, ParameterDescription, Parameters, ParseComplete,
5+
ReadyForQuery, RowDescription,
6+
},
7+
};
8+
9+
use super::prelude::*;
10+
11+
/// Test extended protocol with named prepared statement, parameter binding, and Describe.
12+
#[tokio::test]
13+
async fn test_extended_with_params_and_describe() {
14+
let mut client = TestClient::new_replicas(Parameters::default()).await;
15+
16+
client.send(Parse::named("test", "SELECT $1")).await;
17+
client
18+
.send(Bind::new_params(
19+
"test",
20+
&[Parameter {
21+
len: 3,
22+
data: "123".into(),
23+
}],
24+
))
25+
.await;
26+
client.send(Describe::new_statement("test")).await;
27+
client.send(Execute::new()).await;
28+
client.send(Sync).await;
29+
client.try_process().await.unwrap();
30+
31+
expect_message!(client.read().await, ParseComplete); // '1'
32+
expect_message!(client.read().await, BindComplete); // '2'
33+
expect_message!(client.read().await, ParameterDescription); // 't'
34+
expect_message!(client.read().await, RowDescription); // 'T'
35+
let row = expect_message!(client.read().await, DataRow); // 'D'
36+
assert_eq!(
37+
row.get::<String>(0, crate::net::Format::Text),
38+
Some("123".into())
39+
);
40+
expect_message!(client.read().await, CommandComplete); // 'C'
41+
let rfq = expect_message!(client.read().await, ReadyForQuery); // 'Z'
42+
assert_eq!(rfq.status, 'I');
43+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::time::Duration;
2+
3+
use tokio::{io::AsyncReadExt, time::timeout};
4+
5+
use super::prelude::*;
6+
7+
/// Regression test: client sends BEGIN, gets response, then sends Terminate.
8+
/// The backend is idle-in-transaction, so it won't send any messages.
9+
/// The client event loop must exit promptly instead of deadlocking.
10+
#[tokio::test]
11+
async fn test_graceful_disconnect_idle_in_transaction() {
12+
let mut client = SpawnedClient::new_default(Parameters::default()).await;
13+
14+
// Start a transaction.
15+
client.send(Query::new("BEGIN")).await;
16+
client.read_until('Z').await;
17+
18+
// Send Terminate while idle-in-transaction.
19+
client.send(Terminate).await;
20+
21+
// The client task should exit and close the connection.
22+
// With the deadlock bug, this times out because the event loop hangs.
23+
let mut buf = [0u8; 1];
24+
let result = timeout(Duration::from_secs(2), client.conn.read(&mut buf)).await;
25+
assert!(
26+
result.is_ok(),
27+
"client.run() deadlocked after Terminate during idle-in-transaction"
28+
);
29+
30+
// EOF means the connection was closed cleanly.
31+
assert_eq!(result.unwrap().unwrap(), 0);
32+
}

0 commit comments

Comments
 (0)