Skip to content

Commit a092d1d

Browse files
authored
Replica lag monitor (#613)
1 parent c46f1dd commit a092d1d

25 files changed

Lines changed: 555 additions & 349 deletions

File tree

integration/load_balancer/pgdog.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pooler_mode = "transaction"
1616
load_balancing_strategy = "round_robin"
1717
auth_type = "trust"
1818
read_write_split = "exclude_primary"
19+
lsn_check_delay = 5_000
1920

2021
[rewrite]
2122
enabled = false

pgdog-config/src/database.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,15 @@ pub enum Role {
128128
#[default]
129129
Primary,
130130
Replica,
131+
Auto,
131132
}
132133

133134
impl std::fmt::Display for Role {
134135
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135136
match self {
136137
Self::Primary => write!(f, "primary"),
137138
Self::Replica => write!(f, "replica"),
139+
Self::Auto => write!(f, "auto"),
138140
}
139141
}
140142
}
@@ -146,6 +148,7 @@ impl FromStr for Role {
146148
match s.to_lowercase().as_str() {
147149
"primary" => Ok(Self::Primary),
148150
"replica" => Ok(Self::Replica),
151+
"auto" => Ok(Self::Auto),
149152
_ => Err(format!("Invalid role: {}", s)),
150153
}
151154
}

pgdog-config/src/general.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,15 @@ pub struct General {
172172
/// Connection cleanup algorithm.
173173
#[serde(default = "General::connection_recovery")]
174174
pub connection_recovery: ConnectionRecovery,
175+
/// LSN check interval.
176+
#[serde(default = "General::lsn_check_interval")]
177+
pub lsn_check_interval: u64,
178+
/// LSN check timeout.
179+
#[serde(default = "General::lsn_check_timeout")]
180+
pub lsn_check_timeout: u64,
181+
/// LSN check delay.
182+
#[serde(default = "General::lsn_check_delay")]
183+
pub lsn_check_delay: u64,
175184
}
176185

177186
impl Default for General {
@@ -233,6 +242,9 @@ impl Default for General {
233242
server_lifetime: Self::server_lifetime(),
234243
stats_period: Self::stats_period(),
235244
connection_recovery: Self::connection_recovery(),
245+
lsn_check_interval: Self::lsn_check_interval(),
246+
lsn_check_timeout: Self::lsn_check_timeout(),
247+
lsn_check_delay: Self::lsn_check_delay(),
236248
}
237249
}
238250
}
@@ -398,6 +410,18 @@ impl General {
398410
Self::env_enum_or_default("PGDOG_POOLER_MODE")
399411
}
400412

413+
fn lsn_check_timeout() -> u64 {
414+
Self::env_or_default("PGDOG_LSN_CHECK_TIMEOUT", 5_000)
415+
}
416+
417+
fn lsn_check_interval() -> u64 {
418+
Self::env_or_default("PGDOG_LSN_CHECK_INTERVAL", 5_000)
419+
}
420+
421+
fn lsn_check_delay() -> u64 {
422+
Self::env_or_default("PGDOG_LSN_CHECK_DELAY", Duration::MAX.as_millis() as u64)
423+
}
424+
401425
fn read_write_strategy() -> ReadWriteStrategy {
402426
Self::env_enum_or_default("PGDOG_READ_WRITE_STRATEGY")
403427
}

pgdog/src/admin/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod show_peers;
2929
pub mod show_pools;
3030
pub mod show_prepared_statements;
3131
pub mod show_query_cache;
32+
pub mod show_replication;
3233
pub mod show_server_memory;
3334
pub mod show_servers;
3435
pub mod show_stats;

pgdog/src/admin/parser.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use super::{
77
show_client_memory::ShowClientMemory, show_clients::ShowClients, show_config::ShowConfig,
88
show_instance_id::ShowInstanceId, show_lists::ShowLists, show_mirrors::ShowMirrors,
99
show_peers::ShowPeers, show_pools::ShowPools, show_prepared_statements::ShowPreparedStatements,
10-
show_query_cache::ShowQueryCache, show_server_memory::ShowServerMemory,
11-
show_servers::ShowServers, show_stats::ShowStats, show_transactions::ShowTransactions,
12-
show_version::ShowVersion, shutdown::Shutdown, Command, Error,
10+
show_query_cache::ShowQueryCache, show_replication::ShowReplication,
11+
show_server_memory::ShowServerMemory, show_servers::ShowServers, show_stats::ShowStats,
12+
show_transactions::ShowTransactions, show_version::ShowVersion, shutdown::Shutdown, Command,
13+
Error,
1314
};
1415

1516
use tracing::debug;
@@ -35,6 +36,7 @@ pub enum ParseResult {
3536
Shutdown(Shutdown),
3637
ShowLists(ShowLists),
3738
ShowPrepared(ShowPreparedStatements),
39+
ShowReplication(ShowReplication),
3840
ShowServerMemory(ShowServerMemory),
3941
ShowClientMemory(ShowClientMemory),
4042
Set(Set),
@@ -69,6 +71,7 @@ impl ParseResult {
6971
Shutdown(shutdown) => shutdown.execute().await,
7072
ShowLists(show_lists) => show_lists.execute().await,
7173
ShowPrepared(cmd) => cmd.execute().await,
74+
ShowReplication(show_replication) => show_replication.execute().await,
7275
ShowServerMemory(show_server_memory) => show_server_memory.execute().await,
7376
ShowClientMemory(show_client_memory) => show_client_memory.execute().await,
7477
Set(set) => set.execute().await,
@@ -103,6 +106,7 @@ impl ParseResult {
103106
Shutdown(shutdown) => shutdown.name(),
104107
ShowLists(show_lists) => show_lists.name(),
105108
ShowPrepared(show) => show.name(),
109+
ShowReplication(show_replication) => show_replication.name(),
106110
ShowServerMemory(show_server_memory) => show_server_memory.name(),
107111
ShowClientMemory(show_client_memory) => show_client_memory.name(),
108112
Set(set) => set.name(),
@@ -158,6 +162,7 @@ impl Parser {
158162
"instance_id" => ParseResult::ShowInstanceId(ShowInstanceId::parse(&sql)?),
159163
"lists" => ParseResult::ShowLists(ShowLists::parse(&sql)?),
160164
"prepared" => ParseResult::ShowPrepared(ShowPreparedStatements::parse(&sql)?),
165+
"replication" => ParseResult::ShowReplication(ShowReplication::parse(&sql)?),
161166
command => {
162167
debug!("unknown admin show command: '{}'", command);
163168
return Err(Error::Syntax);

pgdog/src/admin/show_pools.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ impl Command for ShowPools {
4242
Field::numeric("out_of_sync"),
4343
Field::numeric("force_closed"),
4444
Field::bool("online"),
45-
Field::text("replica_lag"),
4645
Field::bool("schema_admin"),
4746
]);
4847
let mut messages = vec![rd.message()?];
@@ -76,7 +75,6 @@ impl Command for ShowPools {
7675
.add(state.out_of_sync)
7776
.add(state.force_close)
7877
.add(state.online)
79-
.add(state.replica_lag.simple_display())
8078
.add(cluster.schema_admin());
8179

8280
messages.push(row.message()?);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use tokio::time::Instant;
2+
3+
use crate::{
4+
backend::databases::databases,
5+
net::{
6+
data_row::Data,
7+
messages::{DataRow, Field, Protocol, RowDescription},
8+
ToDataRowColumn,
9+
},
10+
};
11+
12+
use super::prelude::*;
13+
14+
pub struct ShowReplication;
15+
16+
#[async_trait]
17+
impl Command for ShowReplication {
18+
fn name(&self) -> String {
19+
"SHOW REPLICATION".into()
20+
}
21+
22+
fn parse(_sql: &str) -> Result<Self, Error> {
23+
Ok(ShowReplication {})
24+
}
25+
26+
async fn execute(&self) -> Result<Vec<Message>, Error> {
27+
let rd = RowDescription::new(&[
28+
Field::bigint("id"),
29+
Field::text("database"),
30+
Field::text("user"),
31+
Field::text("addr"),
32+
Field::numeric("port"),
33+
Field::numeric("shard"),
34+
Field::text("role"),
35+
Field::text("pg_replica_lag"),
36+
Field::text("pg_lsn"),
37+
Field::text("lsn_age"),
38+
Field::text("pg_is_in_recovery"),
39+
]);
40+
let mut messages = vec![rd.message()?];
41+
let now = Instant::now();
42+
for (user, cluster) in databases().all() {
43+
for (shard_num, shard) in cluster.shards().iter().enumerate() {
44+
for (role, _ban, pool) in shard.pools_with_roles_and_bans() {
45+
let mut row = DataRow::new();
46+
let state = pool.state();
47+
48+
let lsn_read = state.lsn_stats.lsn.lsn > 0;
49+
let lsn_age = now.duration_since(state.lsn_stats.fetched);
50+
51+
row.add(pool.id() as i64)
52+
.add(user.database.as_str())
53+
.add(user.user.as_str())
54+
.add(pool.addr().host.as_str())
55+
.add(pool.addr().port as i64)
56+
.add(shard_num as i64)
57+
.add(role.to_string())
58+
.add(if lsn_read {
59+
state
60+
.replica_lag
61+
.as_millis()
62+
.to_string()
63+
.to_data_row_column()
64+
} else {
65+
Data::null()
66+
})
67+
.add(if lsn_read {
68+
state.lsn_stats.lsn.to_string().to_data_row_column()
69+
} else {
70+
Data::null()
71+
})
72+
.add(if lsn_read {
73+
lsn_age.as_millis().to_string().to_data_row_column()
74+
} else {
75+
Data::null()
76+
})
77+
.add(if lsn_read {
78+
state.lsn_stats.replica.to_data_row_column()
79+
} else {
80+
Data::null()
81+
});
82+
83+
messages.push(row.message()?);
84+
}
85+
}
86+
}
87+
Ok(messages)
88+
}
89+
}

pgdog/src/admin/tests/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ async fn show_pools_reports_schema_admin_flag() {
118118
"out_of_sync",
119119
"force_closed",
120120
"online",
121-
"replica_lag",
122121
"schema_admin",
123122
];
124123
assert_eq!(actual_names, expected_names);

0 commit comments

Comments
 (0)