Skip to content

Commit a41e41d

Browse files
authored
chore: reproduce idle-in-transaction extended protocol bug (#851)
- chore: upgrade bindgen in `pg_query` to work with latest clang - chore: add test case that demonstrates partial extended exchange preventing `idle_in_transaction_session_timeout` from firing
1 parent 5ad09ed commit a41e41d

6 files changed

Lines changed: 104 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pgdog-plugin/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ crate-type = ["rlib", "cdylib"]
1515

1616
[dependencies]
1717
libloading = "0.8"
18-
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "f8c216209f90525f065b47ffde9eb5da803d2dc6" }
18+
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "d30dcb47fdf3fa77d102b813a34392146642903a" }
1919
pgdog-macros = { path = "../pgdog-macros", version = "0.1.1" }
2020

2121
tracing = "0.1"

pgdog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ base64 = "0.22"
4343
md5 = "0.7"
4444
futures = "0.3"
4545
csv-core = "0.1"
46-
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "f8c216209f90525f065b47ffde9eb5da803d2dc6" }
46+
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "d30dcb47fdf3fa77d102b813a34392146642903a" }
4747
regex = "1"
4848
semver = "1"
4949
uuid = { version = "1", features = ["v4", "serde"] }
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::time::Duration;
2+
3+
use pgdog_postgres_types::Format;
4+
use tokio::time::sleep;
5+
6+
use crate::{
7+
backend::{server::test::test_server, Server},
8+
expect_message,
9+
net::{DataRow, RowDescription},
10+
};
11+
12+
use super::prelude::*;
13+
14+
#[tokio::test]
15+
async fn test_idle_in_transaction_partial_recovery() {
16+
crate::logger();
17+
18+
// Direct connection for testing state.
19+
let mut test_server = test_server().await;
20+
21+
let mut client = TestClient::new_replicas(Parameters::default())
22+
.await
23+
.leak_pool();
24+
25+
client.send_simple(Query::new("BEGIN")).await;
26+
client.read_until('Z').await.unwrap();
27+
28+
client
29+
.send_simple(Query::new("SELECT pg_backend_pid()::text"))
30+
.await;
31+
expect_message!(client.read().await, RowDescription);
32+
let rd = expect_message!(client.read().await, DataRow);
33+
let pid: String = rd.get(0, Format::Text).unwrap();
34+
35+
client.read_until('Z').await.unwrap();
36+
37+
// This won't fire because we'll be stuck inside the extended exchange.
38+
client
39+
.send_simple(Query::new("SET idle_in_transaction_session_timeout TO 100"))
40+
.await;
41+
client.read_until('Z').await.unwrap();
42+
43+
client.send(Parse::named("__test_1", "SELECT $1")).await;
44+
client.send(Flush).await;
45+
client.try_process().await.unwrap();
46+
client.read_until('1').await.unwrap();
47+
48+
// Stuck inside extended exchange, idle in transaction timeout will not fire.
49+
sleep(Duration::from_millis(100)).await;
50+
51+
client.send(Parse::named("__test_2", "SELECT $1")).await;
52+
client.send(Flush).await;
53+
54+
client.try_process().await.unwrap();
55+
client.read_until('1').await.unwrap();
56+
57+
// Server in active state.
58+
assert_server_state(&mut test_server, &pid, "active").await;
59+
60+
client.send(Terminate).await;
61+
drop(client);
62+
63+
sleep(Duration::from_millis(50)).await;
64+
65+
// Cleanup works.
66+
assert_server_state(&mut test_server, &pid, "idle").await;
67+
}
68+
69+
async fn assert_server_state(conn: &mut Server, pid: &str, expected: &str) {
70+
let response: Vec<String> = conn
71+
.fetch_all(format!(
72+
"SELECT state::text FROM pg_stat_activity WHERE pid = {}",
73+
pid
74+
))
75+
.await
76+
.unwrap();
77+
assert_eq!(response[0], expected);
78+
}

pgdog/src/frontend/client/query_engine/test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod extended;
1313
mod fatal_error;
1414
mod graceful_disconnect;
1515
mod graceful_shutdown;
16+
mod idle_in_transaction_recovery;
1617
mod lock_session;
1718
mod omni;
1819
pub mod prelude;

pgdog/src/frontend/client/test/test_client.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ pub struct TestClient {
111111
pub(crate) client: Client,
112112
pub(crate) engine: QueryEngine,
113113
pub(crate) conn: TcpStream,
114+
pub(crate) leak_pool: bool,
114115
}
115116

116117
impl TestClient {
@@ -119,13 +120,14 @@ impl TestClient {
119120
///
120121
/// Config needs to be loaded.
121122
///
122-
async fn new(params: Parameters) -> Self {
123+
pub(crate) async fn new(params: Parameters) -> Self {
123124
let (conn, client) = new_client_pair(params).await;
124125

125126
Self {
126127
conn,
127128
engine: QueryEngine::from_client(&client).expect("create query engine from client"),
128129
client,
130+
leak_pool: false,
129131
}
130132
}
131133

@@ -135,6 +137,11 @@ impl TestClient {
135137
Self::new(params).await
136138
}
137139

140+
pub(crate) fn leak_pool(mut self) -> Self {
141+
self.leak_pool = true;
142+
self
143+
}
144+
138145
/// New client with replicas but not sharded.
139146
pub(crate) async fn new_replicas(params: Parameters) -> Self {
140147
load_test_replicas();
@@ -243,7 +250,9 @@ impl TestClient {
243250

244251
impl Drop for TestClient {
245252
fn drop(&mut self) {
246-
shutdown();
253+
if !self.leak_pool {
254+
shutdown();
255+
}
247256
}
248257
}
249258

0 commit comments

Comments
 (0)