|
21 | 21 |
|
22 | 22 | UPDATE_BOOKMARK_PERIOD = 1000 |
23 | 23 |
|
24 | | -COUNTER={'U': 0, 'D': 0, 'I': 0} |
| 24 | +COUNTER={'U': 0, 'D': 0, 'I': 0, 'json_load': 0, 'read_message': 0, 'send_message': 0} |
25 | 25 |
|
26 | 26 | def get_pg_version(cur): |
27 | 27 | cur.execute("SELECT version()") |
@@ -274,7 +274,9 @@ def consume_message_format_2(payload, conn_info, streams_lookup, state, time_ext |
274 | 274 | col_names = col_names + ['_sdc_lsn'] |
275 | 275 |
|
276 | 276 | # Yield 1 record to match the API of V1 |
| 277 | + send_message_start = datetime.datetime.now() |
277 | 278 | yield row_to_singer_message(target_stream, col_vals, stream_version, col_names, time_extracted, stream_md_map, conn_info) |
| 279 | + COUNTER['send_message'] += (datetime.datetime.now() - send_message_start).total_seconds() |
278 | 280 |
|
279 | 281 | state = singer.write_bookmark(state, |
280 | 282 | target_stream['tap_stream_id'], |
@@ -354,7 +356,9 @@ def consume_message_format_1(payload, conn_info, streams_lookup, state, time_ext |
354 | 356 |
|
355 | 357 |
|
356 | 358 | def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn, message_format="1"): |
| 359 | + load_json_start = datetime.datetime.now() |
357 | 360 | payload = json.loads(msg.payload) |
| 361 | + COUNTER['json_load'] += (datetime.datetime.now() - load_json_start).total_seconds() |
358 | 362 | lsn = msg.data_start |
359 | 363 |
|
360 | 364 | streams_lookup = {s['tap_stream_id']: s for s in streams} |
@@ -451,11 +455,15 @@ def sync_tables(conn_info, logical_streams, state, end_lsn): |
451 | 455 | cur.send_feedback(flush_lsn=end_lsn) |
452 | 456 | break |
453 | 457 |
|
| 458 | + read_message_start = datetime.datetime.now() |
454 | 459 | msg = cur.read_message() |
| 460 | + COUNTER['read_message'] += (datetime.datetime.now() - read_message_start).total_seconds() |
455 | 461 | if msg: |
456 | 462 | begin_ts = datetime.datetime.now() |
457 | 463 | if msg.data_start > end_lsn: |
458 | 464 | LOGGER.info("gone past end_lsn %s for run. breaking", end_lsn) |
| 465 | + if not last_lsn_processed: |
| 466 | + cur.send_feedback(flush_lsn=end_lsn) |
459 | 467 | break |
460 | 468 |
|
461 | 469 | state = consume_message(logical_streams, state, msg, time_extracted, |
|
0 commit comments