Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit d41e40e

Browse files
committed
Filter logical replication tables
wal2json has an option where we can set which tables should be synced. We have to list tables we want to sync either way so there's no reason not to use that list.
1 parent e28bcad commit d41e40e

1 file changed

Lines changed: 11 additions & 2 deletions

File tree

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,21 +409,30 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
409409
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 60 * 30 #we are willing to poll for a total of 30 minutes without finding a record
410410
keep_alive_time = 10.0
411411
begin_ts = datetime.datetime.now()
412+
add_tables = []
412413

413414
for s in logical_streams:
414415
sync_common.send_schema_message(s, ['lsn'])
415416

417+
418+
for s in logical_streams:
419+
add_tables.append("{}.{}".format(s["metadata"][0]["metadata"]["schema-name"], s["table_name"]))
420+
416421
with post_db.open_connection(conn_info, True) as conn:
417422
with conn.cursor() as cur:
418423
LOGGER.info("Starting Logical Replication for %s(%s): %s -> %s. poll_total_seconds: %s", list(map(lambda s: s['tap_stream_id'], logical_streams)), slot, start_lsn, end_lsn, poll_total_seconds)
419424

420425
replication_params = {"slot_name": slot,
421426
"decode": True,
422-
"start_lsn": start_lsn}
427+
"start_lsn": start_lsn,
428+
"options": {
429+
"add-tables": ", ".join(add_tables)
430+
}}
423431
message_format = conn_info.get("wal2json_message_format") or "1"
424432
if message_format == "2":
425433
LOGGER.info("Using wal2json format-version 2")
426-
replication_params["options"] = {"format-version": 2, "include-timestamp": True}
434+
replication_params["options"]["format-version"] = 2
435+
replication_params["options"]["include-timestamp"] = True
427436

428437
try:
429438
cur.start_replication(**replication_params)

0 commit comments

Comments
 (0)