Skip to content

Commit bad1471

Browse files
authored
feat: DDL/DML-only mirroring (#855)
Support mirroring DML or DDL _only_. DDL-only mirroring is helpful when maintaining long-running logical replicas. DML-only mirroring is helpful to simulate logical replication, without the replication aspect. ```toml [[mirroring]] source = "prod" destination = "mirror" level = "ddl" ``` This is helpful when resharding, for example, to make sure logical replication doesn't break. #807
1 parent ad39495 commit bad1471

12 files changed

Lines changed: 411 additions & 30 deletions

File tree

.schema/pgdog.schema.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,11 @@
11731173
],
11741174
"format": "float"
11751175
},
1176+
"level": {
1177+
"description": "What kind of statements to replicate.",
1178+
"$ref": "#/$defs/MirroringLevel",
1179+
"default": "all"
1180+
},
11761181
"queue_length": {
11771182
"description": "The length of the queue to provision for mirrored transactions. See [mirroring](https://docs.pgdog.dev/features/mirroring/) for more details. This overrides the [`mirror_queue`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#mirror_queue) setting.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/mirroring/#queue_depth",
11781183
"type": [
@@ -1193,6 +1198,25 @@
11931198
"destination_db"
11941199
]
11951200
},
1201+
"MirroringLevel": {
1202+
"oneOf": [
1203+
{
1204+
"description": "Replicate all statements.",
1205+
"type": "string",
1206+
"const": "all"
1207+
},
1208+
{
1209+
"description": "Only DML (e.g., insert, update, delete, etc),",
1210+
"type": "string",
1211+
"const": "dml"
1212+
},
1213+
{
1214+
"description": "Only DDL (CREATE, DROP, etc.)",
1215+
"type": "string",
1216+
"const": "ddl"
1217+
}
1218+
]
1219+
},
11961220
"MultiTenant": {
11971221
"description": "multi-tenant routing configuration, mapping queries to shards via a tenant identifier column.",
11981222
"type": "object",

integration/mirror/dev.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,3 @@ mkdir -p ${GEM_HOME}
99
bundle install
1010
bundle exec rspec *_spec.rb
1111
popd
12-
13-
pushd ${SCRIPT_DIR}/php
14-
bash run.sh
15-
popd

integration/mirror/pgdog.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[general]
22
mirror_exposure = 1.0
33
openmetrics_port = 9090
4+
query_parser = "on"
45

56
[rewrite]
67
enabled = false
@@ -16,9 +17,20 @@ name = "pgdog_mirror"
1617
host = "127.0.0.1"
1718
database_name = "pgdog1"
1819

20+
[[databases]]
21+
name = "pgdog_mirror2"
22+
host = "127.0.0.1"
23+
database_name = "pgdog2"
24+
1925
[[mirroring]]
2026
source_db = "pgdog"
2127
destination_db = "pgdog_mirror"
2228

29+
30+
[[mirroring]]
31+
source_db = "pgdog"
32+
destination_db = "pgdog_mirror2"
33+
level = "ddl"
34+
2335
[admin]
2436
password = "pgdog"

integration/mirror/ruby/copy_spec.rb

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,80 @@
8080
conn.exec 'DROP TABLE IF EXISTS mirror_copy_test'
8181
end
8282
end
83+
84+
describe 'ddl-only mirror' do
85+
conn = PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/pgdog')
86+
ddl_mirror = PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/pgdog_mirror2')
87+
88+
before do
89+
conn.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
90+
ddl_mirror.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
91+
end
92+
93+
it 'replicates DDL to ddl-only mirror' do
94+
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
95+
sleep(0.5)
96+
97+
# DDL should be mirrored
98+
result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
99+
expect(result[0]['exists']).to eq('t')
100+
end
101+
102+
it 'does not replicate DML to ddl-only mirror' do
103+
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
104+
sleep(0.5)
105+
106+
conn.exec "INSERT INTO ddl_mirror_test VALUES (1, 'should not mirror')"
107+
sleep(0.5)
108+
109+
# Table should exist on ddl mirror (DDL was mirrored)
110+
result = ddl_mirror.exec 'SELECT count(*) FROM ddl_mirror_test'
111+
# But no rows (DML was not mirrored)
112+
expect(result[0]['count'].to_i).to eq(0)
113+
end
114+
115+
it 'does not replicate UPDATE to ddl-only mirror' do
116+
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
117+
sleep(0.5)
118+
119+
# Insert directly into ddl mirror so we can check UPDATE doesn't propagate
120+
ddl_mirror.exec "INSERT INTO ddl_mirror_test VALUES (1, 'original')"
121+
122+
conn.exec "INSERT INTO ddl_mirror_test VALUES (1, 'original')"
123+
conn.exec "UPDATE ddl_mirror_test SET value = 'updated' WHERE id = 1"
124+
sleep(0.5)
125+
126+
result = ddl_mirror.exec 'SELECT value FROM ddl_mirror_test WHERE id = 1'
127+
expect(result[0]['value']).to eq('original')
128+
end
129+
130+
it 'replicates ALTER TABLE to ddl-only mirror' do
131+
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
132+
sleep(0.5)
133+
134+
conn.exec 'ALTER TABLE ddl_mirror_test ADD COLUMN extra TEXT'
135+
sleep(0.5)
136+
137+
result = ddl_mirror.exec "SELECT column_name FROM information_schema.columns WHERE table_name = 'ddl_mirror_test' AND column_name = 'extra'"
138+
expect(result.ntuples).to eq(1)
139+
end
140+
141+
it 'replicates DROP TABLE to ddl-only mirror' do
142+
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
143+
sleep(0.5)
144+
145+
result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
146+
expect(result[0]['exists']).to eq('t')
147+
148+
conn.exec 'DROP TABLE ddl_mirror_test'
149+
sleep(0.5)
150+
151+
result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
152+
expect(result[0]['exists']).to eq('f')
153+
end
154+
155+
after do
156+
conn.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
157+
ddl_mirror.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
158+
end
159+
end

integration/mirror/run.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@ wait_for_pgdog
88

99
bash ${SCRIPT_DIR}/dev.sh
1010

11+
12+
pushd ${SCRIPT_DIR}/php
13+
bash run.sh
14+
popd
15+
16+
1117
stop_pgdog

integration/mirror/users.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@ database = "pgdog"
77
name = "pgdog"
88
password = "pgdog"
99
database = "pgdog_mirror"
10+
11+
[[users]]
12+
name = "pgdog"
13+
password = "pgdog"
14+
database = "pgdog_mirror2"

pgdog-config/src/core.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use super::error::Error;
1818
use super::general::General;
1919
use super::networking::{MultiTenant, Tcp, TlsVerifyMode};
2020
use super::pooling::PoolerMode;
21-
use super::replication::{MirrorConfig, Mirroring, ReplicaLag, Replication};
21+
use super::replication::{MirrorConfig, Mirroring, MirroringLevel, ReplicaLag, Replication};
2222
use super::rewrite::Rewrite;
2323
use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable};
2424
use super::users::{Admin, Plugin, ServerAuth, Users};
@@ -94,15 +94,13 @@ impl ConfigAndUsers {
9494
warn!("admin password has been randomly generated");
9595
}
9696

97-
let mut config_and_users = ConfigAndUsers {
97+
let config_and_users = ConfigAndUsers {
9898
config,
9999
users,
100100
config_path: config_path.to_owned(),
101101
users_path: users_path.to_owned(),
102102
};
103103

104-
config_and_users.check()?;
105-
106104
Ok(config_and_users)
107105
}
108106

@@ -424,6 +422,7 @@ impl Config {
424422
role: Role,
425423
role_warned: bool,
426424
parser_warned: bool,
425+
mirror_parser_warned: bool,
427426
have_replicas: bool,
428427
sharded: bool,
429428
}
@@ -471,6 +470,7 @@ impl Config {
471470
role: database.role,
472471
role_warned: false,
473472
parser_warned: false,
473+
mirror_parser_warned: false,
474474
have_replicas: database.role == Role::Replica,
475475
sharded: database.shard > 0,
476476
},
@@ -517,7 +517,30 @@ impl Config {
517517
}
518518
}
519519

520-
for (database, check) in checks {
520+
for mirror in &self.mirroring {
521+
if mirror.level == MirroringLevel::All {
522+
continue;
523+
}
524+
if let Some(check) = checks.get_mut(&mirror.source_db) {
525+
if check.mirror_parser_warned {
526+
continue;
527+
}
528+
let parser_enabled = match self.general.query_parser {
529+
QueryParserLevel::On => true,
530+
QueryParserLevel::Off => false,
531+
QueryParserLevel::Auto => check.have_replicas || check.sharded,
532+
};
533+
if !parser_enabled {
534+
check.mirror_parser_warned = true;
535+
warn!(
536+
r#"mirroring from "{}" with level "{}" requires the query parser to classify statements, but it won't be enabled, set query_parser = "on""#,
537+
mirror.source_db, mirror.level
538+
);
539+
}
540+
}
541+
}
542+
543+
for (database, check) in &checks {
521544
if !check.have_replicas
522545
&& self.general.read_write_split == ReadWriteSplit::ExcludePrimary
523546
{
@@ -560,6 +583,7 @@ impl Config {
560583
.map(|m| MirrorConfig {
561584
queue_length: m.queue_length.unwrap_or(self.general.mirror_queue),
562585
exposure: m.exposure.unwrap_or(self.general.mirror_exposure),
586+
level: m.level,
563587
})
564588
}
565589

@@ -571,6 +595,7 @@ impl Config {
571595
let config = MirrorConfig {
572596
queue_length: mirror.queue_length.unwrap_or(self.general.mirror_queue),
573597
exposure: mirror.exposure.unwrap_or(self.general.mirror_exposure),
598+
level: mirror.level,
574599
};
575600

576601
result

pgdog-config/src/replication.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,45 @@ pub struct Mirroring {
156156
///
157157
/// https://docs.pgdog.dev/configuration/pgdog.toml/mirroring/#exposure
158158
pub exposure: Option<f32>,
159+
160+
/// What kind of statements to replicate.
161+
#[serde(default)]
162+
pub level: MirroringLevel,
163+
}
164+
165+
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema, Copy)]
166+
#[serde(deny_unknown_fields, rename_all = "lowercase")]
167+
pub enum MirroringLevel {
168+
/// Replicate all statements.
169+
#[default]
170+
All,
171+
/// Only DML (e.g., insert, update, delete, etc),
172+
Dml,
173+
/// Only DDL (CREATE, DROP, etc.)
174+
Ddl,
175+
}
176+
177+
impl std::fmt::Display for MirroringLevel {
178+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179+
match self {
180+
Self::All => write!(f, "all"),
181+
Self::Dml => write!(f, "dml"),
182+
Self::Ddl => write!(f, "ddl"),
183+
}
184+
}
185+
}
186+
187+
impl FromStr for MirroringLevel {
188+
type Err = ();
189+
190+
fn from_str(value: &str) -> Result<Self, Self::Err> {
191+
match value {
192+
"all" => Ok(Self::All),
193+
"dml" => Ok(Self::Dml),
194+
"ddl" => Ok(Self::Ddl),
195+
_ => Err(()),
196+
}
197+
}
159198
}
160199

161200
impl FromStr for Mirroring {
@@ -166,6 +205,7 @@ impl FromStr for Mirroring {
166205
let mut destination_db = None;
167206
let mut queue_length = None;
168207
let mut exposure = None;
208+
let mut level = MirroringLevel::default();
169209

170210
for pair in s.split('&') {
171211
let parts: Vec<&str> = pair.split('=').collect();
@@ -190,6 +230,7 @@ impl FromStr for Mirroring {
190230
.map_err(|_| format!("Invalid exposure: {}", parts[1]))?,
191231
);
192232
}
233+
"level" => level = MirroringLevel::from_str(parts[1]).unwrap_or_default(),
193234
_ => return Err(format!("Unknown parameter: {}", parts[0])),
194235
}
195236
}
@@ -202,15 +243,18 @@ impl FromStr for Mirroring {
202243
destination_db,
203244
queue_length,
204245
exposure,
246+
level,
205247
})
206248
}
207249
}
208250

209251
/// Runtime mirror configuration with defaults resolved from global settings.
210-
#[derive(Debug, Clone)]
252+
#[derive(Debug, Clone, Default)]
211253
pub struct MirrorConfig {
212254
/// Effective queue length for this mirror.
213255
pub queue_length: usize,
214256
/// Effective exposure fraction for this mirror.
215257
pub exposure: f32,
258+
/// What kind of statements to mirror.
259+
pub level: MirroringLevel,
216260
}

0 commit comments

Comments
 (0)