Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 8e385c3

Browse files
pslavovivanovyordan
authored andcommitted
Fix problem receiving None messages
Problem occures when you have a big transaction from a table that is not added for decoding.
1 parent 256be35 commit 8e385c3

1 file changed

Lines changed: 15 additions & 12 deletions

File tree

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import psycopg2
1313
from psycopg2 import sql
1414
import copy
15-
from select import select
1615
from functools import reduce
1716
import json
1817
import re
18+
import time
1919

2020
LOGGER = singer.get_logger()
2121

@@ -420,7 +420,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
420420
slot = locate_replication_slot(conn_info)
421421
last_lsn_processed = None
422422
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 60 * 5 #we are willing to poll for a total of 3 minutes without finding a record
423-
keep_alive_time = 10.0
423+
keep_alive_time = 0.1
424424
begin_ts = datetime.datetime.now()
425425
add_tables = []
426426

@@ -461,6 +461,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
461461
cur.send_feedback(flush_lsn=state["lsn_to_flush"])
462462

463463
rows_saved = 0
464+
idle_count = 0
464465
while True:
465466
poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
466467
if poll_duration > poll_total_seconds:
@@ -473,6 +474,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
473474
msg = cur.read_message()
474475
COUNTER['read_message'] += (datetime.datetime.now() - read_message_start).total_seconds()
475476
if msg:
477+
idle_count = 0
476478
begin_ts = datetime.datetime.now()
477479
if msg.data_start > end_lsn:
478480
LOGGER.info("gone past end_lsn %s for run. breaking", end_lsn)
@@ -489,16 +491,17 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
489491
LOGGER.debug("Sending state to loader: %s", str(state))
490492
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
491493
else:
492-
now = datetime.datetime.now()
493-
timeout = keep_alive_time - (now - cur.io_timestamp).total_seconds()
494-
try:
495-
sel = select([cur], [], [], max(0, timeout))
496-
if not any(sel):
497-
LOGGER.info("no data for %s seconds. sending feedback to server with NO flush_lsn. just a keep-alive", timeout)
498-
cur.send_feedback()
499-
500-
except InterruptedError:
501-
pass # recalculate timeout and continue
494+
idle_count += 1
495+
if idle_count > 100:
496+
idle_count = 0
497+
tmp_poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
498+
LOGGER.info(
499+
"No data for ~10 seconds (%s seconds from start). sending feedback to server with NO flush_lsn. just a keep-alive",
500+
tmp_poll_duration
501+
)
502+
cur.send_feedback()
503+
else:
504+
time.sleep(keep_alive_time)
502505

503506
bookmark_lsn = last_lsn_processed if last_lsn_processed else end_lsn
504507
LOGGER.info("Finished processing messages - counter: %s", str(COUNTER))

0 commit comments

Comments
 (0)