Skip to content

Commit 2e02051

Browse files
authored
fix: fix replication state sync (#833)
#817
1 parent 37cc2ab commit 2e02051

17 files changed

Lines changed: 891 additions & 68 deletions

File tree

.claude/skills/bash/SKILL.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
name: bash
3+
description: How to use the shell successfully
4+
allowed-tools: Bash
5+
---
6+
7+
Execute shell commands inside their own bash shell, like so:
8+
9+
```bash
10+
bash -c 'command'
11+
```
12+
13+
This makes sure the shell is polluted by any outside state.

.claude/skills/rust/SKILL.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,17 @@ for i in 0..1000 {
290290
}
291291
```
292292

293+
### Import instead of using absolute paths
294+
295+
```rust
296+
/// BAD
297+
let v = tokio::net::TcpStream::connect("localhost:8080");
298+
299+
/// GOOD
300+
use tokio::net::TcpStream;
301+
let v = TcpStream::connect("localhost:8080");
302+
```
303+
293304
### Profile Before Optimizing
294305

295306
```bash

integration/copy_data/dev.sh

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#!/bin/bash
22
set -e
3-
trap 'kill 0' SIGINT SIGTERM
43
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
54
DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog"
65
PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN}
@@ -11,11 +10,57 @@ export PGHOST=127.0.0.1
1110
export PGPORT=5432
1211
export PGPASSWORD=pgdog
1312

13+
cleanup() {
14+
if [ -n "${REPL_PID}" ]; then
15+
kill ${REPL_PID} 2>/dev/null || true
16+
wait ${REPL_PID} 2>/dev/null || true
17+
fi
18+
}
19+
trap cleanup EXIT
20+
1421
pushd ${SCRIPT_DIR}
1522

1623
psql -f init.sql
1724

1825
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog
1926
${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data
2027
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover
28+
29+
# Start replication in the background.
30+
${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog &
31+
REPL_PID=$!
32+
33+
# Give replication a moment to connect.
34+
sleep 2
35+
36+
# Check that the replication process is still alive.
37+
if ! kill -0 ${REPL_PID} 2>/dev/null; then
38+
echo "ERROR: replication process exited early"
39+
wait ${REPL_PID}
40+
exit $?
41+
fi
42+
43+
# Run pgbench against the source database — writes land on the source and
44+
# get replicated to the destination shards via logical replication.
45+
pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \
46+
-t 1000 -c 3 --protocol extended \
47+
-f "${SCRIPT_DIR}/pgbench.sql" -P 1
48+
49+
# Let replication catch up.
50+
sleep 3
51+
52+
# Stop replication and capture its exit code.
53+
kill ${REPL_PID} 2>/dev/null || true
54+
set +e
55+
wait ${REPL_PID}
56+
REPL_EXIT=$?
57+
set -e
58+
REPL_PID=""
59+
60+
# 0, 130 (SIGINT), 143 (SIGTERM) are all normal shutdown codes.
61+
if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 ]; then
62+
echo "ERROR: replication process exited with code ${REPL_EXIT}"
63+
exit ${REPL_EXIT}
64+
fi
65+
2166
popd

integration/copy_data/pgbench.sql

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
\set tenant_id random(1, 20)
2+
\set user_id (1021 * random(10000, 100000))
3+
\set order_id random(100001, 999999999)
4+
\set log_id random(100001, 999999999)
5+
\set order_amount random(1, 50000) / 100.0
6+
7+
-- Upsert a user for this tenant.
8+
INSERT INTO copy_data.users (id, tenant_id, email, settings)
9+
VALUES (:user_id, :tenant_id, 'bench_' || :user_id || '@example.com', '{"theme":"dark"}')
10+
ON CONFLICT (id, tenant_id) DO UPDATE SET settings = EXCLUDED.settings;
11+
12+
-- Read the user back.
13+
SELECT id, tenant_id, email, created_at FROM copy_data.users
14+
WHERE id = :user_id AND tenant_id = :tenant_id;
15+
16+
-- Insert an order with an explicit large id.
17+
INSERT INTO copy_data.orders (id, user_id, tenant_id, amount)
18+
VALUES (:order_id, :user_id, :tenant_id, :order_amount)
19+
ON CONFLICT (id) DO NOTHING;
20+
21+
-- Read recent orders for this tenant.
22+
SELECT id, user_id, amount, created_at FROM copy_data.orders
23+
WHERE tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 5;
24+
25+
-- Update the user settings.
26+
UPDATE copy_data.users SET settings = jsonb_set(settings, '{last_bench}', to_jsonb(now()::text))
27+
WHERE id = :user_id AND tenant_id = :tenant_id;
28+
29+
-- Log an action with an explicit large id.
30+
INSERT INTO copy_data.log_actions (id, tenant_id, action)
31+
VALUES (:log_id, :tenant_id, 'bench')
32+
ON CONFLICT (id) DO NOTHING;
33+
34+
-- Clean up everything we created.
35+
DELETE FROM copy_data.orders WHERE id = :order_id;
36+
37+
DELETE FROM copy_data.log_actions WHERE id = :log_id;
38+
39+
DELETE FROM copy_data.users
40+
WHERE id = :user_id AND tenant_id = :tenant_id AND email = 'bench_' || :user_id || '@example.com';

pgdog/src/backend/pool/address.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,12 @@ impl Address {
6666
},
6767
password: if server_auth.rds_iam() {
6868
String::new()
69+
} else if let Some(password) = database.password.clone() {
70+
password
71+
} else if let Some(password) = user.server_password.clone() {
72+
password
6973
} else {
70-
if let Some(password) = database.password.clone() {
71-
password
72-
} else if let Some(password) = user.server_password.clone() {
73-
password
74-
} else {
75-
user.password().to_string()
76-
}
74+
user.password().to_string()
7775
},
7876
server_auth,
7977
server_iam_region: user.server_iam_region.clone(),

pgdog/src/backend/replication/logical/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum Error {
3333
#[error("out of sync during relation prepare, got {0}")]
3434
RelationOutOfSync(char),
3535

36+
#[error("out of sync during row write, got {0}")]
37+
SendOutOfSync(char),
38+
3639
#[error("missing data")]
3740
MissingData,
3841

pgdog/src/backend/replication/logical/status.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ impl TableCopy {
6666
state.bytes_per_sec = state.bytes / elapsed as usize;
6767
}
6868

69-
data_sync_progress(&self, &state);
69+
data_sync_progress(self, &state);
7070
}
7171
}
7272

7373
pub(crate) fn error(&self, error: &LogicalError) {
74-
data_sync_error(&self, error);
74+
data_sync_error(self, error);
7575
}
7676

7777
pub(crate) fn update_sql(&self, sql: &str) {
@@ -83,7 +83,7 @@ impl TableCopy {
8383

8484
impl Drop for TableCopy {
8585
fn drop(&mut self) {
86-
data_sync_done(&self);
86+
data_sync_done(self);
8787
COPIES.copies.remove(self);
8888
}
8989
}
@@ -300,16 +300,15 @@ impl SchemaStatement {
300300
}
301301

302302
pub(crate) fn running(&mut self) {
303-
if let Some(entry) =
304-
SchemaStatements::get()
305-
.stmts
306-
.remove(&self.task)
307-
.and_then(|mut entry| {
308-
entry.running = true;
309-
entry.statement.started_at = Some(SystemTime::now());
310-
311-
Some(entry)
312-
})
303+
if let Some(entry) = SchemaStatements::get()
304+
.stmts
305+
.remove(&self.task)
306+
.map(|mut entry| {
307+
entry.running = true;
308+
entry.statement.started_at = Some(SystemTime::now());
309+
310+
entry
311+
})
313312
{
314313
self.task = entry.clone();
315314
schema_sync_task(&self.task);

pgdog/src/backend/replication/logical/subscriber/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ pub mod copy;
33
pub mod parallel_connection;
44
pub mod stream;
55

6+
#[cfg(test)]
7+
mod tests;
8+
69
pub use context::StreamContext;
710
pub use copy::CopySubscriber;
811
pub use parallel_connection::ParallelConnection;

0 commit comments

Comments
 (0)