Skip to content

Commit 46e7142

Browse files
authored
Fix: cancel backend sessions when shutdown timeout expires (#524)
* Fix: cancel backend sessions when shutdown timeout expires * fix: wait on passthrough auth pgdog shutdown * feature: add shutdown termination timeout * test: add cancel-on-shutdown integration * fix: only cancel connections when shutdown termination timeout is set
1 parent 1f608d6 commit 46e7142

9 files changed

Lines changed: 210 additions & 0 deletions

File tree

example.pgdog.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ tls_server_ca_certificate = "relative/or/absolute/path/to/certificate.pem"
127127
# Default: 60 seconds
128128
shutdown_timeout = 60_000
129129

130+
# How long to wait for active connections to be forcibly terminated
131+
# after shutdown_timeout expires.
132+
#
133+
# Default: disabled
134+
shutdown_termination_timeout = 60_000
135+
130136
# OpenMetrics server port.
131137
#
132138
# If set, enables Prometheus-style metrics exporter.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[general]
2+
query_timeout = 60000
3+
shutdown_timeout = 0
4+
shutdown_termination_timeout = 1000
5+
6+
[[databases]]
7+
name = "pgdog"
8+
host = "127.0.0.1"
9+
10+
[admin]
11+
password = "pgdog"
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import asyncio
2+
import sys
3+
4+
import asyncpg
5+
import psycopg
6+
7+
8+
SHUTDOWN_TIMEOUT = 1
9+
SLEEP_SECONDS = 10000
10+
APPLICATION_NAME = "pgdog_cancel_query"
11+
12+
13+
async def trigger_shutdown() -> None:
14+
conn = await asyncpg.connect(
15+
host="127.0.0.1",
16+
port=6432,
17+
database="pgdog",
18+
user="pgdog",
19+
password="pgdog",
20+
)
21+
22+
try:
23+
await conn.execute(f"SET application_name = '{APPLICATION_NAME}'")
24+
sleep_task = asyncio.create_task(conn.execute(f"SELECT pg_sleep({SLEEP_SECONDS})"))
25+
26+
# Give the backend time to register the long running query.
27+
await asyncio.sleep(1)
28+
29+
admin = psycopg.connect(
30+
"dbname=admin user=admin host=127.0.0.1 port=6432 password=pgdog"
31+
)
32+
admin.autocommit = True
33+
try:
34+
admin.execute("SHUTDOWN")
35+
finally:
36+
admin.close()
37+
38+
try:
39+
await asyncio.wait_for(sleep_task, timeout=SHUTDOWN_TIMEOUT)
40+
except asyncio.TimeoutError:
41+
print("pg_sleep query did not terminate after PgDog shutdown", file=sys.stderr)
42+
raise SystemExit(1)
43+
except (asyncpg.exceptions.PostgresError, asyncpg.exceptions.InterfaceError):
44+
# Expected: connection terminates as PgDog shuts down.
45+
return
46+
except (ConnectionError, psycopg.Error):
47+
# psycopg errors propagate through asyncpg when connection drops.
48+
return
49+
else:
50+
print("pg_sleep query completed without interruption", file=sys.stderr)
51+
raise SystemExit(1)
52+
finally:
53+
try:
54+
await conn.close()
55+
except Exception:
56+
pass
57+
58+
59+
if __name__ == "__main__":
60+
asyncio.run(trigger_shutdown())
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
set -e
3+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
4+
source ${SCRIPT_DIR}/../../common.sh
5+
6+
APPLICATION_NAME="pgdog_cancel_query"
7+
QUERY="SELECT COUNT(*) FROM pg_stat_activity WHERE application_name = '${APPLICATION_NAME}'"
8+
9+
export PGPASSWORD=pgdog
10+
11+
active_venv
12+
13+
run_pgdog "${SCRIPT_DIR}"
14+
wait_for_pgdog
15+
16+
pushd ${SCRIPT_DIR}
17+
python run.py
18+
popd
19+
20+
attempts=0
21+
until [[ "$(psql -h localhost -U pgdog -tAq -c "${QUERY}")" == "0" ]]; do
22+
if [[ ${attempts} -ge 5 ]]; then
23+
echo "Found lingering sessions with application_name='${APPLICATION_NAME}'" >&2
24+
exit 1
25+
fi
26+
attempts=$((attempts + 1))
27+
sleep 5
28+
done
29+
30+
stop_pgdog
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
[[users]]
2+
name = "pgdog"
3+
database = "pgdog"
4+
password = "pgdog"
5+
6+
[[users]]
7+
name = "pgdog"
8+
database = "pgdog_sharded"
9+
password = "pgdog"
10+
11+
[[users]]
12+
name = "pgdog_session"
13+
database = "pgdog"
14+
password = "pgdog"
15+
server_user = "pgdog"
16+
pooler_mode = "session"
17+
18+
[[users]]
19+
name = "pgdog_2pc"
20+
database = "pgdog"
21+
password = "pgdog"
22+
server_user = "pgdog"
23+
two_phase_commit = true
24+
min_pool_size = 0
25+
26+
[[users]]
27+
name = "pgdog_2pc"
28+
database = "pgdog_sharded"
29+
password = "pgdog"
30+
server_user = "pgdog"
31+
two_phase_commit = true
32+
min_pool_size = 0
33+
34+
[[users]]
35+
name = "pgdog_migrator"
36+
database = "pgdog_sharded"
37+
password = "pgdog"
38+
server_user = "pgdog"
39+
schema_admin = true
40+
41+
[[users]]
42+
name = "pgdog"
43+
database = "failover"
44+
password = "pgdog"
45+
46+
[[users]]
47+
name = "pgdog"
48+
database = "single_sharded_list"
49+
password = "pgdog"
50+
51+
[[users]]
52+
name = "pgdog_no_cross_shard"
53+
database = "single_sharded_list"
54+
password = "pgdog"
55+
server_user = "pgdog"
56+
cross_shard_disabled = true
57+
min_pool_size = 0

integration/complex/passthrough_auth/run.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ PGDOG_BIN_PATH="${PGDOG_BIN:-${SCRIPT_DIR}/../../../target/release/pgdog}"
1212
"${PGDOG_BIN_PATH}" \
1313
--config ${SCRIPT_DIR}/pgdog-enabled.toml \
1414
--users ${SCRIPT_DIR}/users.toml &
15+
PGDOG_PID=$!
1516

1617
until pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog; do
1718
sleep 1
@@ -32,10 +33,12 @@ if [[ "$statement_timeout" != *"100ms"* ]]; then
3233
fi
3334

3435
killall -TERM pgdog
36+
wait "${PGDOG_PID}" 2> /dev/null || true
3537

3638
"${PGDOG_BIN_PATH}" \
3739
--config ${SCRIPT_DIR}/pgdog-disabled.toml \
3840
--users ${SCRIPT_DIR}/users.toml &
41+
PGDOG_PID=$!
3942

4043
until pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog; do
4144
sleep 1
@@ -49,3 +52,4 @@ fi
4952
psql -U pgdog pgdog -c 'SELECT 1' > /dev/null
5053

5154
killall -TERM pgdog
55+
wait "${PGDOG_PID}" 2> /dev/null || true

integration/complex/run.sh

100644100755
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
55
pushd ${SCRIPT_DIR}
66
bash shutdown.sh
77
bash passthrough_auth/run.sh
8+
bash cancel_query/run.sh
89
popd

pgdog/src/config/general.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ pub struct General {
7171
/// Shutdown timeout.
7272
#[serde(default = "General::default_shutdown_timeout")]
7373
pub shutdown_timeout: u64,
74+
/// Shutdown termination timeout (after shutdown_timeout expires, forcibly terminate).
75+
#[serde(default = "General::default_shutdown_termination_timeout")]
76+
pub shutdown_termination_timeout: Option<u64>,
7477
/// Broadcast IP.
7578
pub broadcast_address: Option<Ipv4Addr>,
7679
/// Broadcast port.
@@ -179,6 +182,7 @@ impl Default for General {
179182
tls_verify: Self::default_tls_verify(),
180183
tls_server_ca_certificate: Self::tls_server_ca_certificate(),
181184
shutdown_timeout: Self::default_shutdown_timeout(),
185+
shutdown_termination_timeout: Self::default_shutdown_termination_timeout(),
182186
broadcast_address: Self::broadcast_address(),
183187
broadcast_port: Self::broadcast_port(),
184188
query_log: Self::query_log(),
@@ -353,6 +357,10 @@ impl General {
353357
Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000)
354358
}
355359

360+
fn default_shutdown_termination_timeout() -> Option<u64> {
361+
Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT")
362+
}
363+
356364
fn default_connect_timeout() -> u64 {
357365
Self::env_or_default("PGDOG_CONNECT_TIMEOUT", 5_000)
358366
}
@@ -497,6 +505,10 @@ impl General {
497505
Duration::from_millis(self.shutdown_timeout)
498506
}
499507

508+
pub fn shutdown_termination_timeout(&self) -> Option<Duration> {
509+
self.shutdown_termination_timeout.map(Duration::from_millis)
510+
}
511+
500512
/// Get TLS config, if any.
501513
pub fn tls(&self) -> Option<(&PathBuf, &PathBuf)> {
502514
if let Some(cert) = &self.tls_certificate {
@@ -665,6 +677,7 @@ mod tests {
665677
env::set_var("PGDOG_BAN_TIMEOUT", "600000");
666678
env::set_var("PGDOG_ROLLBACK_TIMEOUT", "10000");
667679
env::set_var("PGDOG_SHUTDOWN_TIMEOUT", "120000");
680+
env::set_var("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT", "15000");
668681
env::set_var("PGDOG_CONNECT_ATTEMPT_DELAY", "1000");
669682
env::set_var("PGDOG_QUERY_TIMEOUT", "30000");
670683
env::set_var("PGDOG_CLIENT_IDLE_TIMEOUT", "3600000");
@@ -674,6 +687,10 @@ mod tests {
674687
assert_eq!(General::ban_timeout(), 600000);
675688
assert_eq!(General::rollback_timeout(), 10000);
676689
assert_eq!(General::default_shutdown_timeout(), 120000);
690+
assert_eq!(
691+
General::default_shutdown_termination_timeout(),
692+
Some(15_000)
693+
);
677694
assert_eq!(General::default_connect_attempt_delay(), 1000);
678695
assert_eq!(General::default_query_timeout(), 30000);
679696
assert_eq!(General::default_client_idle_timeout(), 3600000);
@@ -683,6 +700,7 @@ mod tests {
683700
env::remove_var("PGDOG_BAN_TIMEOUT");
684701
env::remove_var("PGDOG_ROLLBACK_TIMEOUT");
685702
env::remove_var("PGDOG_SHUTDOWN_TIMEOUT");
703+
env::remove_var("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT");
686704
env::remove_var("PGDOG_CONNECT_ATTEMPT_DELAY");
687705
env::remove_var("PGDOG_QUERY_TIMEOUT");
688706
env::remove_var("PGDOG_CLIENT_IDLE_TIMEOUT");
@@ -692,6 +710,7 @@ mod tests {
692710
assert_eq!(General::ban_timeout(), 300000);
693711
assert_eq!(General::rollback_timeout(), 5000);
694712
assert_eq!(General::default_shutdown_timeout(), 60000);
713+
assert_eq!(General::default_shutdown_termination_timeout(), None);
695714
assert_eq!(General::default_connect_attempt_delay(), 0);
696715
}
697716

pgdog/src/frontend/listener.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,28 @@ impl Listener {
132132
"terminating {} client connections due to shutdown timeout",
133133
comms.tracker().len()
134134
);
135+
136+
// If a shutdown termination timeout is configured, enforce it here.
137+
// This will ensure that we don't wait indefinitely for databases to respond.
138+
if let Some(termination_timeout) =
139+
config().config.general.shutdown_termination_timeout()
140+
{
141+
// Shutdown timeout elapsed; cancel any still-running queries before tearing pools down.
142+
let cancel_futures = comms.clients().into_keys().map(|id| async move {
143+
if let Err(err) = databases().cancel(&id).await {
144+
error!(?id, "cancel request failed during shutdown: {err}");
145+
}
146+
});
147+
let cancel_all = futures::future::join_all(cancel_futures);
148+
149+
if timeout(termination_timeout, cancel_all).await.is_err() {
150+
error!(
151+
"forced shutdown: abandoning {} outstanding cancel requests after waiting {:.3}s" ,
152+
comms.clients().len(),
153+
termination_timeout.as_secs_f64()
154+
);
155+
}
156+
}
135157
}
136158

137159
self.shutdown.notify_waiters();

0 commit comments

Comments
 (0)