Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit a06a5c1

Browse files
pslavovivanovyordan
authored andcommitted
Dont update lsn on terminating with no changes
1 parent d316b91 commit a06a5c1

1 file changed

Lines changed: 12 additions & 6 deletions

File tree

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,18 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
473473

474474
rows_saved = 0
475475
idle_count = 0
476+
terminated_with_no_changes = False
476477
while True:
477478
if exists('/tmp/terminating_pod'):
478479
LOGGER.info("SIGTERM received from file '/tmp/terminating_pod'. Exiting")
480+
if not last_lsn_processed:
481+
terminated_with_no_changes = True
479482
break
480483

481484
if SIGTERM_RECEIVED:
482485
LOGGER.info("SIGTERM received from parent process. Exiting")
486+
if not last_lsn_processed:
487+
terminated_with_no_changes = True
483488
break
484489

485490
poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
@@ -488,7 +493,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
488493
if not last_lsn_processed:
489494
state["lsn_to_flush"] = end_lsn
490495
break
491-
496+
492497
script_run_time = (datetime.datetime.now() - START_TIME).total_seconds()
493498
if script_run_time > max_run_time:
494499
LOGGER.info("breaking after %s seconds of script running (more then the maximum %s seconds)!", script_run_time, max_run_time)
@@ -520,18 +525,19 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
520525
idle_count = 0
521526
tmp_poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
522527
LOGGER.info(
523-
"No data for ~10 seconds (%s seconds from start). sending feedback to server with NO flush_lsn. just a keep-alive",
528+
"No data for ~10 seconds (%s seconds from start). sending feedback to server with NO flush_lsn. just a keep-alive",
524529
tmp_poll_duration
525530
)
526531
cur.send_feedback()
527532
else:
528533
time.sleep(keep_alive_time)
529534

530-
bookmark_lsn = last_lsn_processed if last_lsn_processed else end_lsn
531535
LOGGER.info("Finished processing messages - counter: %s", str(COUNTER))
532-
for s in logical_streams:
533-
LOGGER.info("updating bookmark for stream %s to last_lsn_processed %s", s['tap_stream_id'], bookmark_lsn)
534-
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', bookmark_lsn)
536+
if not terminated_with_no_changes:
537+
bookmark_lsn = last_lsn_processed if last_lsn_processed else end_lsn
538+
for s in logical_streams:
539+
LOGGER.info("updating bookmark for stream %s to last_lsn_processed %s", s['tap_stream_id'], bookmark_lsn)
540+
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', bookmark_lsn)
535541

536542
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
537543
return state

0 commit comments

Comments
 (0)