Skip to content

Commit 919e5ee

Browse files
authored
State::IdleInTransaction isn't updated until the end of a request (#617)
* Draft fix for idle_in_transaction state issues * clean up ruby helper, slim transaction time in transaction state test * Try reusing update_stats instead of just manually slamming the stats
1 parent bd7f3e2 commit 919e5ee

4 files changed

Lines changed: 141 additions & 0 deletions

File tree

integration/ruby/rspec_helper.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ def ensure_done
3131
expect(pool['cl_waiting']).to eq('0')
3232
expect(pool['out_of_sync']).to eq('0')
3333
end
34+
current_client_id = conn.backend_pid
3435
clients = conn.exec 'SHOW CLIENTS'
3536
clients.each do |client|
37+
next if client['id'].to_i == current_client_id
3638
expect(client['state']).to eq('idle')
3739
end
3840
servers = conn.exec 'SHOW SERVERS'

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ pub mod syntax_error;
1919
pub mod timestamp_sorting;
2020
pub mod tls_enforced;
2121
pub mod tls_reload;
22+
pub mod transaction_state;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use rust::setup::admin_sqlx;
2+
use serial_test::serial;
3+
use sqlx::{Executor, Pool, Postgres, Row};
4+
use tokio::time::{Duration, Instant, sleep};
5+
6+
const APP_NAME: &str = "test_transaction_state_flow";
7+
8+
#[tokio::test]
9+
#[serial]
10+
async fn test_transaction_state_transitions() {
11+
let admin = admin_sqlx().await;
12+
assert!(fetch_client_state(&admin, APP_NAME).await.is_none());
13+
14+
let (client, connection) = tokio_postgres::connect(
15+
"host=127.0.0.1 user=pgdog dbname=pgdog password=pgdog port=6432",
16+
tokio_postgres::NoTls,
17+
)
18+
.await
19+
.unwrap();
20+
21+
let connection_handle = tokio::spawn(async move {
22+
if let Err(e) = connection.await {
23+
panic!("connection error: {}", e);
24+
}
25+
});
26+
27+
client
28+
.batch_execute(&format!("SET application_name TO '{}';", APP_NAME))
29+
.await
30+
.unwrap();
31+
client
32+
.batch_execute("SET statement_timeout TO '10s';")
33+
.await
34+
.unwrap();
35+
client.batch_execute("SELECT 1;").await.unwrap();
36+
37+
wait_for_client_state(&admin, APP_NAME, "idle").await;
38+
39+
client.batch_execute("BEGIN;").await.unwrap();
40+
wait_for_client_state(&admin, APP_NAME, "idle in transaction").await;
41+
42+
{
43+
let query = client.simple_query("SELECT pg_sleep(0.25);");
44+
tokio::pin!(query);
45+
46+
let deadline = Instant::now() + Duration::from_secs(5);
47+
let mut saw_active = false;
48+
loop {
49+
tokio::select! {
50+
result = &mut query => {
51+
result.unwrap();
52+
break;
53+
}
54+
_ = sleep(Duration::from_millis(10)) => {
55+
if let Some(state) = fetch_client_state(&admin, APP_NAME).await {
56+
if state == "active" {
57+
saw_active = true;
58+
}
59+
}
60+
61+
if Instant::now() >= deadline {
62+
panic!("timed out waiting for client to become active");
63+
}
64+
}
65+
}
66+
}
67+
68+
assert!(
69+
saw_active,
70+
"client never reported active state during query"
71+
);
72+
}
73+
74+
wait_for_client_state(&admin, APP_NAME, "idle in transaction").await;
75+
76+
client.batch_execute("COMMIT;").await.unwrap();
77+
wait_for_client_state(&admin, APP_NAME, "idle").await;
78+
79+
drop(client);
80+
81+
wait_for_no_client(&admin, APP_NAME).await;
82+
83+
admin.close().await;
84+
connection_handle.await.unwrap();
85+
}
86+
87+
async fn fetch_client_state(admin: &Pool<Postgres>, application_name: &str) -> Option<String> {
88+
let rows = admin.fetch_all("SHOW CLIENTS").await.unwrap();
89+
for row in rows {
90+
let db: String = row.get::<String, _>("database");
91+
let app: String = row.get::<String, _>("application_name");
92+
if db == "pgdog" && app == application_name {
93+
return Some(row.get::<String, _>("state"));
94+
}
95+
}
96+
None
97+
}
98+
99+
async fn wait_for_client_state(admin: &Pool<Postgres>, application_name: &str, expected: &str) {
100+
let deadline = Instant::now() + Duration::from_secs(5);
101+
loop {
102+
if let Some(state) = fetch_client_state(admin, application_name).await {
103+
if state == expected {
104+
return;
105+
}
106+
}
107+
108+
if Instant::now() >= deadline {
109+
panic!(
110+
"timed out waiting for client state '{}' (expected '{}')",
111+
fetch_client_state(admin, application_name)
112+
.await
113+
.unwrap_or_else(|| "<none>".to_string()),
114+
expected
115+
);
116+
}
117+
118+
sleep(Duration::from_millis(25)).await;
119+
}
120+
}
121+
122+
async fn wait_for_no_client(admin: &Pool<Postgres>, application_name: &str) {
123+
let deadline = Instant::now() + Duration::from_secs(5);
124+
loop {
125+
if fetch_client_state(admin, application_name).await.is_none() {
126+
return;
127+
}
128+
129+
if Instant::now() >= deadline {
130+
panic!("client '{}' still present", application_name);
131+
}
132+
133+
sleep(Duration::from_millis(25)).await;
134+
}
135+
}

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ impl QueryEngine {
109109

110110
/// Handle client request.
111111
pub async fn handle(&mut self, context: &mut QueryEngineContext<'_>) -> Result<(), Error> {
112+
// ensure that when we are handling a client request, it shows as active
113+
self.update_stats(context);
114+
112115
self.stats
113116
.received(context.client_request.total_message_len());
114117

0 commit comments

Comments
 (0)