Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit ef5aa53

Browse files
pslavovivanovyordan
authored andcommitted
Fix long transaction data loss
do max script run time to be configurable more offten poll keep alive time
1 parent ab02ba1 commit ef5aa53

2 files changed

Lines changed: 15 additions & 5 deletions

File tree

tap_postgres/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,8 @@ def main_impl():
684684
'debug_lsn' : args.config.get('debug_lsn') == 'true',
685685
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
686686
'wal2json_message_format': args.config.get('wal2json_message_format'),
687-
'wal2json_slot_name': args.config.get('wal2json_slot_name')}
687+
'wal2json_slot_name': args.config.get('wal2json_slot_name'),
688+
'max_script_run_time': args.config.get('max_script_run_time')}
688689

689690
if args.config.get('ssl') == 'true':
690691
conn_config['sslmode'] = 'require'

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,10 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
429429
slot = locate_replication_slot(conn_info)
430430
last_lsn_processed = None
431431
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 60 * 5 #we are willing to poll for a total of 3 minutes without finding a record
432-
keep_alive_time = 0.1
432+
keep_alive_time = 0.001
433433
begin_ts = datetime.datetime.now()
434434
add_tables = []
435-
max_run_time = 600 #max script run time for the extractor script
435+
max_run_time = conn_info['max_script_run_time'] or 1800 #max script run time for the extractor script
436436

437437
for s in logical_streams:
438438
sync_common.send_schema_message(s, ['lsn'])
@@ -446,7 +446,14 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
446446

447447
conn = post_db.open_connection(conn_info, True)
448448
with conn.cursor() as cur:
449-
LOGGER.info("Starting Logical Replication from slot %s: %s -> %s. poll_total_seconds: %s", slot, start_lsn, end_lsn, poll_total_seconds)
449+
LOGGER.info(
450+
"Starting Logical Replication from slot %s: %s -> %s. poll_total_seconds: %s. max_run_time: %s",
451+
slot,
452+
start_lsn,
453+
end_lsn,
454+
poll_total_seconds,
455+
max_run_time
456+
)
450457
LOGGER.info("Starting at bookmark: %s", list(map(lambda s: s['tap_stream_id'], logical_streams)))
451458

452459
replication_params = {"slot_name": slot,
@@ -496,6 +503,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
496503

497504
script_run_time = (datetime.datetime.now() - START_TIME).total_seconds()
498505
if script_run_time > max_run_time:
506+
if not last_lsn_processed:
507+
terminated_with_no_changes = True
499508
LOGGER.info("breaking after %s seconds of script running (more then the maximum %s seconds)!", script_run_time, max_run_time)
500509
break
501510

@@ -521,7 +530,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
521530
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
522531
else:
523532
idle_count += 1
524-
if idle_count > 100:
533+
if idle_count > 10000:
525534
idle_count = 0
526535
tmp_poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
527536
LOGGER.info(

0 commit comments

Comments
 (0)