Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit c321a62

Browse files
pslavovivanovyordan
authored andcommitted
Send end_lsn as feedback if not changes
Also add counter and improve logging when messages are consumed, End lsn also is used as the new bookmark if there is no detected changes
1 parent 4df9f62 commit c321a62

1 file changed

Lines changed: 13 additions & 5 deletions

File tree

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
UPDATE_BOOKMARK_PERIOD = 1000
2323

24+
COUNTER={'U': 0, 'D': 0, 'I': 0}
25+
2426
def get_pg_version(cur):
2527
cur.execute("SELECT version()")
2628
res = cur.fetchone()[0]
@@ -235,6 +237,8 @@ def consume_message_format_2(payload, conn_info, streams_lookup, state, time_ext
235237
if streams_lookup.get(tap_stream_id) is None:
236238
yield None
237239
else:
240+
COUNTER[action] += 1
241+
LOGGER.debug(" -- Tap Stream ID = %s, action - %s (counter - %s)", tap_stream_id, payload['action'], str(COUNTER))
238242
target_stream = streams_lookup[tap_stream_id]
239243
stream_version = get_stream_version(target_stream['tap_stream_id'], state)
240244
stream_md_map = metadata.to_map(target_stream['metadata'])
@@ -405,7 +409,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
405409
time_extracted = utils.now()
406410
slot = locate_replication_slot(conn_info)
407411
last_lsn_processed = None
408-
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 60 * 30 #we are willing to poll for a total of 30 minutes without finding a record
412+
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 60 * 5 #we are willing to poll for a total of 3 minutes without finding a record
409413
keep_alive_time = 10.0
410414
begin_ts = datetime.datetime.now()
411415
add_tables = []
@@ -443,6 +447,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
443447
poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
444448
if poll_duration > poll_total_seconds:
445449
LOGGER.info("breaking after %s seconds of polling with no data", poll_duration)
450+
if not last_lsn_processed:
451+
cur.send_feedback(flush_lsn=end_lsn)
446452
break
447453

448454
msg = cur.read_message()
@@ -458,6 +464,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
458464
last_lsn_processed = msg.data_start
459465
rows_saved = rows_saved + 1
460466
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
467+
LOGGER.info("Rows saved = %s, Processed messages counter: %s", str(rows_saved), str(COUNTER))
461468
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
462469
else:
463470
now = datetime.datetime.now()
@@ -471,10 +478,11 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
471478
except InterruptedError:
472479
pass # recalculate timeout and continue
473480

474-
if last_lsn_processed:
475-
for s in logical_streams:
476-
LOGGER.info("updating bookmark for stream %s to last_lsn_processed %s", s['tap_stream_id'], last_lsn_processed)
477-
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', last_lsn_processed)
481+
bookmark_lsn = last_lsn_processed if last_lsn_processed else end_lsn
482+
LOGGER.info("Finished processing messages - counter: %s", str(COUNTER))
483+
for s in logical_streams:
484+
LOGGER.info("updating bookmark for stream %s to last_lsn_processed %s", s['tap_stream_id'], bookmark_lsn)
485+
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', bookmark_lsn)
478486

479487
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
480488
return state

0 commit comments

Comments
 (0)