Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 101adfd

Browse files
committed
Reuse connections
Tap-postgres opense a new connection every time it needs to cast a value. This is highly inefficient and can be fixed relatively easy. To fix the issue we do two things: 1. We created a Singleton Postgres connection wrapper. This wrapper actually holds up to two connections, since we need two different connection factories. The `connect` method returns the conneciton we need based on the arguments provided. 2. Remove `when` statements when asking for a connection. When statements are great everytime we need to ensure a resource is properly closed after it's being used, but in our specific case, we don't want to close connections after each query.
1 parent d41e40e commit 101adfd

6 files changed

Lines changed: 453 additions & 389 deletions

File tree

tap_postgres/__init__.py

Lines changed: 55 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def schema_for_column(c):
232232
#this seems to identify all arrays:
233233
#select typname from pg_attribute as pga join pg_type as pgt on pgt.oid = pga.atttypid where typlen = -1 and typelem != 0 and pga.attndims > 0;
234234
def produce_table_info(conn):
235-
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur:
235+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
236236
cur.itersize = post_db.cursor_iter_size
237237
table_info = {}
238238
# SELECT CASE WHEN $2.typtype = 'd' THEN $2.typbasetype ELSE $1.atttypid END
@@ -407,30 +407,30 @@ def dump_catalog(all_streams):
407407
def do_discovery(conn_config):
408408
all_streams = []
409409

410-
with post_db.open_connection(conn_config) as conn:
411-
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur:
412-
cur.itersize = post_db.cursor_iter_size
413-
sql = """SELECT datname
414-
FROM pg_database
415-
WHERE datistemplate = false
416-
AND datname != 'rdsadmin'"""
410+
conn = post_db.open_connection(conn_config)
411+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
412+
cur.itersize = post_db.cursor_iter_size
413+
sql = """SELECT datname
414+
FROM pg_database
415+
WHERE datistemplate = false
416+
AND datname != 'rdsadmin'"""
417417

418-
if conn_config.get('filter_dbs'):
419-
sql = post_db.filter_dbs_sql_clause(sql, conn_config['filter_dbs'])
418+
if conn_config.get('filter_dbs'):
419+
sql = post_db.filter_dbs_sql_clause(sql, conn_config['filter_dbs'])
420420

421-
LOGGER.info("Running DB discovery: %s with itersize %s", sql, cur.itersize)
422-
cur.execute(sql)
423-
found_dbs = (row[0] for row in cur.fetchall())
421+
LOGGER.info("Running DB discovery: %s with itersize %s", sql, cur.itersize)
422+
cur.execute(sql)
423+
found_dbs = (row[0] for row in cur.fetchall())
424424

425425
filter_dbs = filter(lambda dbname: attempt_connection_to_db(conn_config, dbname), found_dbs)
426426

427427
for db_row in filter_dbs:
428428
dbname = db_row
429429
LOGGER.info("Discovering db %s", dbname)
430430
conn_config['dbname'] = dbname
431-
with post_db.open_connection(conn_config) as conn:
432-
db_streams = discover_db(conn)
433-
all_streams = all_streams + db_streams
431+
conn = post_db.open_connection(conn_config)
432+
db_streams = discover_db(conn)
433+
all_streams = all_streams + db_streams
434434

435435

436436
if len(all_streams) == 0:
@@ -587,49 +587,49 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn):
587587

588588

589589
def register_type_adapters(conn_config):
590-
with post_db.open_connection(conn_config) as conn:
591-
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
592-
#citext[]
593-
cur.execute("SELECT typarray FROM pg_type where typname = 'citext'")
594-
citext_array_oid = cur.fetchone()
595-
if citext_array_oid:
596-
psycopg2.extensions.register_type(
597-
psycopg2.extensions.new_array_type(
598-
(citext_array_oid[0],), 'CITEXT[]', psycopg2.STRING))
599-
600-
#bit[]
601-
cur.execute("SELECT typarray FROM pg_type where typname = 'bit'")
602-
bit_array_oid = cur.fetchone()[0]
603-
psycopg2.extensions.register_type(
604-
psycopg2.extensions.new_array_type(
605-
(bit_array_oid,), 'BIT[]', psycopg2.STRING))
606-
607-
608-
#UUID[]
609-
cur.execute("SELECT typarray FROM pg_type where typname = 'uuid'")
610-
uuid_array_oid = cur.fetchone()[0]
590+
conn = post_db.open_connection(conn_config)
591+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
592+
#citext[]
593+
cur.execute("SELECT typarray FROM pg_type where typname = 'citext'")
594+
citext_array_oid = cur.fetchone()
595+
if citext_array_oid:
611596
psycopg2.extensions.register_type(
612597
psycopg2.extensions.new_array_type(
613-
(uuid_array_oid,), 'UUID[]', psycopg2.STRING))
614-
615-
#money[]
616-
cur.execute("SELECT typarray FROM pg_type where typname = 'money'")
617-
money_array_oid = cur.fetchone()[0]
598+
(citext_array_oid[0],), 'CITEXT[]', psycopg2.STRING))
599+
600+
#bit[]
601+
cur.execute("SELECT typarray FROM pg_type where typname = 'bit'")
602+
bit_array_oid = cur.fetchone()[0]
603+
psycopg2.extensions.register_type(
604+
psycopg2.extensions.new_array_type(
605+
(bit_array_oid,), 'BIT[]', psycopg2.STRING))
606+
607+
608+
#UUID[]
609+
cur.execute("SELECT typarray FROM pg_type where typname = 'uuid'")
610+
uuid_array_oid = cur.fetchone()[0]
611+
psycopg2.extensions.register_type(
612+
psycopg2.extensions.new_array_type(
613+
(uuid_array_oid,), 'UUID[]', psycopg2.STRING))
614+
615+
#money[]
616+
cur.execute("SELECT typarray FROM pg_type where typname = 'money'")
617+
money_array_oid = cur.fetchone()[0]
618+
psycopg2.extensions.register_type(
619+
psycopg2.extensions.new_array_type(
620+
(money_array_oid,), 'MONEY[]', psycopg2.STRING))
621+
622+
#json and jsbon
623+
psycopg2.extras.register_default_json(loads=lambda x: str(x))
624+
psycopg2.extras.register_default_jsonb(loads=lambda x: str(x))
625+
626+
#enum[]'s
627+
cur.execute("SELECT distinct(t.typarray) FROM pg_type t JOIN pg_enum e ON t.oid = e.enumtypid")
628+
for oid in cur.fetchall():
629+
enum_oid = oid[0]
618630
psycopg2.extensions.register_type(
619631
psycopg2.extensions.new_array_type(
620-
(money_array_oid,), 'MONEY[]', psycopg2.STRING))
621-
622-
#json and jsbon
623-
psycopg2.extras.register_default_json(loads=lambda x: str(x))
624-
psycopg2.extras.register_default_jsonb(loads=lambda x: str(x))
625-
626-
#enum[]'s
627-
cur.execute("SELECT distinct(t.typarray) FROM pg_type t JOIN pg_enum e ON t.oid = e.enumtypid")
628-
for oid in cur.fetchall():
629-
enum_oid = oid[0]
630-
psycopg2.extensions.register_type(
631-
psycopg2.extensions.new_array_type(
632-
(enum_oid,), 'ENUM_{}[]'.format(enum_oid), psycopg2.STRING))
632+
(enum_oid,), 'ENUM_{}[]'.format(enum_oid), psycopg2.STRING))
633633

634634

635635
def any_logical_streams(streams, default_replication_method):

0 commit comments

Comments
 (0)