Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 388c633

Browse files
pslavovivanovyordan
authored andcommitted
Print state at each 1000 rows
1 parent 34f57a9 commit 388c633

1 file changed

Lines changed: 11 additions & 7 deletions

File tree

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,6 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn, mes
383383
if msg.data_start > end_lsn:
384384
raise Exception("incorrectly attempting to flush an lsn({}) > end_lsn({})".format(msg.data_start, end_lsn))
385385

386-
msg.cursor.send_feedback(flush_lsn=msg.data_start)
387-
388-
389386
return state
390387

391388
def locate_replication_slot(conn_info):
@@ -411,7 +408,7 @@ def locate_replication_slot(conn_info):
411408

412409

413410
def sync_tables(conn_info, logical_streams, state, end_lsn):
414-
start_lsn = min([get_bookmark(state, s['tap_stream_id'], 'lsn') for s in logical_streams])
411+
start_lsn = 0
415412
time_extracted = utils.now()
416413
slot = locate_replication_slot(conn_info)
417414
last_lsn_processed = None
@@ -427,6 +424,9 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
427424
for s in logical_streams:
428425
add_tables.append("{}.{}".format(s["metadata"][0]["metadata"]["schema-name"], s["table_name"]))
429426

427+
if "lsn_to_flush" in state:
428+
start_lsn = state["lsn_to_flush"]
429+
430430
conn = post_db.open_connection(conn_info, True)
431431
with conn.cursor() as cur:
432432
LOGGER.info("Starting Logical Replication for %s(%s): %s -> %s. poll_total_seconds: %s", list(map(lambda s: s['tap_stream_id'], logical_streams)), slot, start_lsn, end_lsn, poll_total_seconds)
@@ -448,13 +448,18 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
448448
except psycopg2.ProgrammingError:
449449
raise Exception("unable to start replication with logical replication slot {}".format(slot))
450450

451+
# initial flush lsn from the previous run
452+
if "lsn_to_flush" in state:
453+
LOGGER.info("Flushing lsn %s from from previous job run!", str(state["lsn_to_flush"]))
454+
cur.send_feedback(flush_lsn=state["lsn_to_flush"])
455+
451456
rows_saved = 0
452457
while True:
453458
poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
454459
if poll_duration > poll_total_seconds:
455460
LOGGER.info("breaking after %s seconds of polling with no data", poll_duration)
456461
if not last_lsn_processed:
457-
cur.send_feedback(flush_lsn=end_lsn)
462+
state["lsn_to_flush"] = end_lsn
458463
break
459464

460465
read_message_start = datetime.datetime.now()
@@ -464,8 +469,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
464469
begin_ts = datetime.datetime.now()
465470
if msg.data_start > end_lsn:
466471
LOGGER.info("gone past end_lsn %s for run. breaking", end_lsn)
467-
if not last_lsn_processed:
468-
cur.send_feedback(flush_lsn=end_lsn)
469472
break
470473

471474
state = consume_message(logical_streams, state, msg, time_extracted,
@@ -476,6 +479,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
476479
if rows_saved % COUNTER_PRINT_PERIOD == 0:
477480
LOGGER.info("Rows saved = %s, Processed messages counter: %s", str(rows_saved), str(COUNTER))
478481
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
482+
LOGGER.debug("Sending state to loader: %s", str(state))
479483
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
480484
else:
481485
now = datetime.datetime.now()

0 commit comments

Comments
 (0)