Skip to content

Commit 004f03b

Browse files
authored
fix: table LSN not preserved during copy -> replication handover (#874)
- fix: regression introduced in #869 erasing table LSNs during copy to replication handover, causing rows written between slot creation and copy start replayed on the destination - fix: off-by-one in replication to copy LSN check - fix: advancing LSN prior to transaction commit, potentially skipping rows in the same transaction
1 parent 70d4839 commit 004f03b

5 files changed

Lines changed: 311 additions & 97 deletions

File tree

integration/copy_data/dev.sh

Lines changed: 56 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,36 @@ export PGPORT=5432
1111
export PGPASSWORD=pgdog
1212

1313
cleanup() {
14+
if [ -n "${BENCH_PID}" ]; then
15+
kill ${BENCH_PID} 2>/dev/null || true
16+
wait ${BENCH_PID} 2>/dev/null || true
17+
fi
1418
if [ -n "${REPL_PID}" ]; then
1519
kill ${REPL_PID} 2>/dev/null || true
1620
wait ${REPL_PID} 2>/dev/null || true
1721
fi
1822
}
1923
trap cleanup EXIT
2024

25+
start_pgbench() {
26+
(
27+
28+
pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \
29+
-t 100000000 -c 3 --protocol extended \
30+
-f "${SCRIPT_DIR}/pgbench.sql" -P 1
31+
32+
) &
33+
BENCH_PID=$!
34+
}
35+
36+
stop_pgbench() {
37+
if [ -n "${BENCH_PID}" ]; then
38+
kill ${BENCH_PID} 2>/dev/null || true
39+
wait ${BENCH_PID} 2>/dev/null || true
40+
BENCH_PID=""
41+
fi
42+
}
43+
2144
pushd ${SCRIPT_DIR}
2245

2346
psql -f init.sql
@@ -26,42 +49,46 @@ psql -f init.sql
2649
# 0 -> 2
2750
#
2851
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog
29-
${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data
30-
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover
52+
start_pgbench
53+
${PGDOG_BIN} data-sync --from-database source --to-database destination --publication pgdog &
54+
REPL_PID=$!
3155

32-
# Check row counts: source (pgdog) vs destination (pgdog1 + pgdog2)
33-
echo "Checking row counts: source -> destination..."
34-
SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity"
35-
OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories"
56+
# Give replication a moment to connect.
57+
sleep 2
3658

37-
for TABLE in ${SHARDED_TABLES}; do
38-
SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}")
39-
DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
40-
DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}")
41-
DST=$((DST1 + DST2))
42-
if [ "${SRC}" -ne "${DST}" ]; then
43-
echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})"
44-
exit 1
45-
fi
46-
echo "OK ${TABLE}: ${SRC} rows"
47-
done
59+
# Check that the replication process is still alive.
60+
if ! kill -0 ${REPL_PID} 2>/dev/null; then
61+
echo "ERROR: replication process exited early"
62+
wait ${REPL_PID}
63+
exit $?
64+
fi
4865

49-
for TABLE in ${OMNI_TABLES}; do
50-
SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}")
51-
DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
52-
DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}")
53-
if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then
54-
echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)"
55-
exit 1
56-
fi
57-
echo "OK ${TABLE}: ${SRC} rows on each shard"
58-
done
66+
# Let data sync and replication catch up
67+
echo "Letting replication run for 20 seconds..."
68+
sleep 20
69+
70+
# Stop replication and capture its exit code.
71+
kill ${REPL_PID} 2>/dev/null || true
72+
set +e
73+
wait ${REPL_PID}
74+
REPL_EXIT=$?
75+
set -e
76+
REPL_PID=""
77+
78+
# 0, 130 (SIGINT), 143 (SIGTERM) are all normal shutdown codes.
79+
if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 ]; then
80+
echo "ERROR: replication process exited with code ${REPL_EXIT}"
81+
exit ${REPL_EXIT}
82+
fi
83+
84+
stop_pgbench
85+
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover
5986

6087
#
61-
# 2 -> 2
88+
# 2 --> 2
6289
#
6390
${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog
64-
${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data
91+
${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data_2
6592
${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover
6693

6794
# Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1)
@@ -91,43 +118,6 @@ for TABLE in ${OMNI_TABLES}; do
91118
echo "OK ${TABLE}: ${SRC} rows on each shard"
92119
done
93120

94-
# Start replication in the background.
95-
${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog &
96-
REPL_PID=$!
97-
98-
# Give replication a moment to connect.
99-
sleep 2
100-
101-
# Check that the replication process is still alive.
102-
if ! kill -0 ${REPL_PID} 2>/dev/null; then
103-
echo "ERROR: replication process exited early"
104-
wait ${REPL_PID}
105-
exit $?
106-
fi
107-
108-
# Run pgbench against the source database — writes land on the source and
109-
# get replicated to the destination shards via logical replication.
110-
pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \
111-
-t 1000 -c 3 --protocol extended \
112-
-f "${SCRIPT_DIR}/pgbench.sql" -P 1
113-
114-
# Let replication catch up.
115-
sleep 3
116-
117-
# Stop replication and capture its exit code.
118-
kill ${REPL_PID} 2>/dev/null || true
119-
set +e
120-
wait ${REPL_PID}
121-
REPL_EXIT=$?
122-
set -e
123-
REPL_PID=""
124-
125-
# 0, 130 (SIGINT), 143 (SIGTERM) are all normal shutdown codes.
126-
if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 ]; then
127-
echo "ERROR: replication process exited with code ${REPL_EXIT}"
128-
exit ${REPL_EXIT}
129-
fi
130-
131121
psql -f init.sql
132122

133123
popd

integration/copy_data/pgbench.sql

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
11
\set tenant_id random(1, 20)
2-
\set user_id (1021 * random(10000, 100000))
3-
\set order_id random(100001, 999999999)
4-
\set log_id random(100001, 999999999)
52
\set order_amount random(1, 50000) / 100.0
63

7-
-- Upsert a user for this tenant.
4+
-- Upsert a persistent user for this transaction. txid_current() is stable
5+
-- within the transaction and unique enough across the benchmark run.
86
INSERT INTO copy_data.users (id, tenant_id, email, settings)
9-
VALUES (:user_id, :tenant_id, 'bench_' || :user_id || '@example.com', '{"theme":"dark"}')
7+
VALUES (txid_current()::bigint, :tenant_id, 'bench_' || txid_current() || '@example.com', '{"theme":"dark"}')
108
ON CONFLICT (id, tenant_id) DO UPDATE SET settings = EXCLUDED.settings;
119

1210
-- Read the user back.
1311
SELECT id, tenant_id, email, created_at FROM copy_data.users
14-
WHERE id = :user_id AND tenant_id = :tenant_id;
12+
WHERE id = txid_current()::bigint AND tenant_id = :tenant_id;
1513

16-
-- Insert an order with an explicit large id.
14+
-- Insert a persistent order with a transaction-unique id.
1715
INSERT INTO copy_data.orders (id, user_id, tenant_id, amount)
18-
VALUES (:order_id, :user_id, :tenant_id, :order_amount)
16+
VALUES (txid_current()::bigint, txid_current()::bigint, :tenant_id, :order_amount)
1917
ON CONFLICT (id) DO NOTHING;
2018

2119
-- Read recent orders for this tenant.
@@ -24,11 +22,11 @@ WHERE tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 5;
2422

2523
-- Update the user settings.
2624
UPDATE copy_data.users SET settings = jsonb_set(settings, '{last_bench}', to_jsonb(now()::text))
27-
WHERE id = :user_id AND tenant_id = :tenant_id;
25+
WHERE id = txid_current()::bigint AND tenant_id = :tenant_id;
2826

29-
-- Log an action with an explicit large id.
27+
-- Insert a persistent log row with a transaction-unique id.
3028
INSERT INTO copy_data.log_actions (id, tenant_id, action)
31-
VALUES (:log_id, :tenant_id, 'bench')
29+
VALUES (txid_current()::bigint, :tenant_id, 'bench')
3230
ON CONFLICT (id) DO NOTHING;
3331

3432
\set country_id random(1, 10)
@@ -56,15 +54,3 @@ ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name;
5654

5755
-- Read a category.
5856
SELECT id, name, parent_id FROM copy_data.categories WHERE id = :category_id;
59-
60-
-- Clean up everything we created.
61-
DELETE FROM copy_data.orders WHERE id = :order_id;
62-
63-
DELETE FROM copy_data.log_actions WHERE id = :log_id;
64-
65-
DELETE FROM copy_data.users
66-
WHERE id = :user_id AND tenant_id = :tenant_id AND email = 'bench_' || :user_id || '@example.com';
67-
68-
DELETE FROM copy_data.countries WHERE id = :country_id AND code = 'X' || :country_id;
69-
70-
DELETE FROM copy_data.categories WHERE id = :category_id AND name = 'Bench Category ' || :category_id;

pgdog/src/backend/replication/logical/publisher/publisher_impl.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,21 @@ use crate::backend::{pool::Request, Cluster};
2323
use crate::config::Role;
2424
use crate::net::replication::ReplicationMeta;
2525

26+
fn merge_table_lsns(
27+
tables: Vec<Table>,
28+
existing_lsns: Option<&HashMap<(String, String), Lsn>>,
29+
) -> Vec<Table> {
30+
tables
31+
.into_iter()
32+
.map(|mut table| {
33+
if let Some(lsn) = existing_lsns.and_then(|tables| tables.get(&table.key())) {
34+
table.lsn = *lsn;
35+
}
36+
table
37+
})
38+
.collect()
39+
}
40+
2641
#[derive(Debug, Default)]
2742
pub struct Publisher {
2843
/// Destination cluster.
@@ -72,6 +87,19 @@ impl Publisher {
7287
/// Synchronize tables for all shards.
7388
pub async fn sync_tables(&mut self, data_sync: bool, dest: &Cluster) -> Result<(), Error> {
7489
let sharding_tables = dest.sharding_schema().tables;
90+
let existing_lsns: HashMap<usize, HashMap<(String, String), Lsn>> = self
91+
.tables
92+
.iter()
93+
.map(|(shard, tables)| {
94+
(
95+
*shard,
96+
tables
97+
.iter()
98+
.map(|table| (table.key(), table.lsn))
99+
.collect::<HashMap<_, _>>(),
100+
)
101+
})
102+
.collect();
75103

76104
// Omnisharded tables are split evenly between shards
77105
// during copy to avoid duplicate key errors.
@@ -96,6 +124,7 @@ impl Publisher {
96124
}
97125
} else {
98126
// For replication, process changes from all shards.
127+
let tables = merge_table_lsns(tables, existing_lsns.get(&number));
99128
self.tables.insert(number, tables);
100129
}
101130
}
@@ -389,3 +418,61 @@ impl Waiter {
389418
}
390419
}
391420
}
421+
422+
#[cfg(test)]
423+
mod test {
424+
use super::*;
425+
use crate::backend::replication::logical::publisher::{
426+
PublicationTable, PublicationTableColumn, ReplicaIdentity,
427+
};
428+
429+
fn make_table(schema: &str, name: &str, lsn: i64) -> Table {
430+
Table {
431+
publication: "test".to_string(),
432+
table: PublicationTable {
433+
schema: schema.to_string(),
434+
name: name.to_string(),
435+
attributes: String::new(),
436+
parent_schema: String::new(),
437+
parent_name: String::new(),
438+
},
439+
identity: ReplicaIdentity {
440+
oid: pgdog_postgres_types::Oid(1),
441+
identity: String::new(),
442+
kind: String::new(),
443+
},
444+
columns: vec![PublicationTableColumn {
445+
oid: 1,
446+
name: "tenant_id".to_string(),
447+
type_oid: pgdog_postgres_types::Oid(20),
448+
identity: true,
449+
}],
450+
lsn: Lsn::from_i64(lsn),
451+
query_parser_engine: QueryParserEngine::default(),
452+
}
453+
}
454+
455+
#[test]
456+
fn merge_table_lsns_preserves_existing_offsets() {
457+
let existing = HashMap::from([(
458+
("copy_data".to_string(), "users".to_string()),
459+
Lsn::from_i64(123),
460+
)]);
461+
462+
let merged = merge_table_lsns(vec![make_table("copy_data", "users", 0)], Some(&existing));
463+
464+
assert_eq!(merged[0].lsn, Lsn::from_i64(123));
465+
}
466+
467+
#[test]
468+
fn merge_table_lsns_leaves_unknown_tables_unset() {
469+
let existing = HashMap::from([(
470+
("copy_data".to_string(), "users".to_string()),
471+
Lsn::from_i64(123),
472+
)]);
473+
474+
let merged = merge_table_lsns(vec![make_table("copy_data", "orders", 0)], Some(&existing));
475+
476+
assert_eq!(merged[0].lsn, Lsn::default());
477+
}
478+
}

0 commit comments

Comments
 (0)