Skip to content

Commit 1229e92

Browse files
PostgresNode::table_checksum is corrected (#317)
The previous implementation has problems. For example, it did not release a connection with active transaction for empty table and this does not allow to execute "TRUNCATE TABLE" statement for this table. New implementation is simple - it has a linear, fault-tolerant and single thread code. Also pgbench_table_checksums uses one transaction for all tables.
1 parent 26941c7 commit 1229e92

2 files changed

Lines changed: 397 additions & 59 deletions

File tree

src/node.py

Lines changed: 112 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,8 @@
33

44
import logging
55
import os
6-
import random
76
import signal
87
import subprocess
9-
import threading
10-
from queue import Queue
118

129
import time
1310
import typing
@@ -1953,69 +1950,47 @@ def connect(self,
19531950
password=password,
19541951
autocommit=autocommit) # yapf: disable
19551952

1956-
def table_checksum(self, table, dbname="postgres"):
1957-
con = self.connect(dbname=dbname)
1958-
1959-
curname = "cur_" + str(random.randint(0, 2 ** 48))
1960-
1961-
con.execute("""
1962-
DECLARE %s NO SCROLL CURSOR FOR
1963-
SELECT t::text FROM %s as t
1964-
""" % (curname, table))
1965-
1966-
que = Queue(maxsize=50)
1967-
sum = 0
1968-
1969-
rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
1970-
if not rows:
1971-
return 0
1972-
que.put(rows)
1953+
def table_checksum(
1954+
self,
1955+
table: str,
1956+
dbname: str = "postgres"
1957+
) -> int:
1958+
assert type(table) == str # noqa: E721
1959+
assert type(dbname) == str # noqa: E721
19731960

1974-
th = None
1975-
if len(rows) == 2000:
1976-
def querier():
1977-
try:
1978-
while True:
1979-
rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
1980-
if not rows:
1981-
break
1982-
que.put(rows)
1983-
except Exception as e:
1984-
que.put(e)
1985-
else:
1986-
que.put(None)
1961+
cn = self.connect(dbname=dbname)
1962+
assert type(cn) == NodeConnection # noqa: E721
19871963

1988-
th = threading.Thread(target=querier)
1989-
th.start()
1990-
else:
1991-
que.put(None)
1964+
try:
1965+
sum = __class__._table_checksum__use_cn(cn, table)
1966+
assert type(sum) == int # noqa: E721
1967+
finally:
1968+
assert type(cn) == NodeConnection # noqa: E721
1969+
cn.close()
19921970

1993-
while True:
1994-
rows = que.get()
1995-
if rows is None:
1996-
break
1997-
if isinstance(rows, Exception):
1998-
raise rows
1999-
# hash uses SipHash since Python3.4, therefore it is good enough
2000-
for row in rows:
2001-
sum += hash(row[0])
1971+
assert type(sum) == int # noqa: E721
1972+
return sum
20021973

2003-
if th is not None:
2004-
th.join()
1974+
sm_pgbench_tables = [
1975+
'pgbench_branches',
1976+
'pgbench_tellers',
1977+
'pgbench_accounts',
1978+
'pgbench_history'
1979+
]
20051980

2006-
con.execute("CLOSE %s; ROLLBACK;" % curname)
1981+
def pgbench_table_checksums(
1982+
self,
1983+
dbname: str = "postgres",
1984+
pgbench_tables: typing.Iterable[str] = sm_pgbench_tables
1985+
) -> typing.Set[typing.Tuple[str, int]]:
1986+
assert type(dbname) == str # noqa: E721
20071987

2008-
con.close()
2009-
return sum
1988+
r1 = self._tables_checksum(dbname, pgbench_tables)
1989+
assert type(r1) == list # noqa: E721
20101990

2011-
def pgbench_table_checksums(self, dbname="postgres",
2012-
pgbench_tables=('pgbench_branches',
2013-
'pgbench_tellers',
2014-
'pgbench_accounts',
2015-
'pgbench_history')
2016-
):
2017-
return {(table, self.table_checksum(table, dbname))
2018-
for table in pgbench_tables}
1991+
r2 = set(r1)
1992+
assert type(r2) == set # noqa: E721
1993+
return r2
20191994

20201995
def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
20211996
"""
@@ -2175,6 +2150,84 @@ def _escape_config_value(value):
21752150
result += "'"
21762151
return result
21772152

2153+
def _tables_checksum(
2154+
self,
2155+
dbname: str,
2156+
tables: typing.Iterable[str],
2157+
) -> typing.List[typing.Tuple[str, int]]:
2158+
assert isinstance(tables, typing.Iterable)
2159+
assert type(dbname) == str # noqa: E721
2160+
2161+
result = []
2162+
2163+
cn = self.connect(dbname=dbname)
2164+
assert type(cn) == NodeConnection # noqa: E721
2165+
2166+
try:
2167+
cn.begin()
2168+
2169+
for table in tables:
2170+
assert type(table) == str # noqa: E721
2171+
sum = __class__._table_checksum__use_cn(cn, table)
2172+
assert type(sum) == int # noqa: E721
2173+
result.append((table, sum))
2174+
2175+
cn.commit()
2176+
finally:
2177+
assert type(cn) == NodeConnection # noqa: E721
2178+
cn.close()
2179+
2180+
assert type(result) == list # noqa: E721
2181+
return result
2182+
2183+
@staticmethod
2184+
def _table_checksum__use_cn(
2185+
cn: NodeConnection,
2186+
table: str,
2187+
) -> int:
2188+
assert type(cn) == NodeConnection # noqa: E721
2189+
assert type(table) == str # noqa: E721
2190+
2191+
sum = 0
2192+
2193+
cursor = cn.connection.cursor()
2194+
assert cursor is not None
2195+
2196+
try:
2197+
cursor.execute("SELECT t::text FROM {} as t".format(
2198+
__class__._delim_sql_ident(table)
2199+
))
2200+
2201+
while True:
2202+
row = cursor.fetchone()
2203+
if row is None:
2204+
break
2205+
assert type(row) in [list, tuple] # noqa: E721
2206+
assert len(row) == 1
2207+
sum += hash(row[0])
2208+
continue
2209+
finally:
2210+
cursor.close()
2211+
2212+
assert type(sum) == int # noqa: E721
2213+
return sum
2214+
2215+
@staticmethod
2216+
def _delim_sql_ident(name: str) -> str:
2217+
assert isinstance(name, str)
2218+
2219+
result = '"'
2220+
2221+
for ch in name:
2222+
if ch == '"':
2223+
result = result + '""'
2224+
else:
2225+
result = result + ch
2226+
2227+
result = result + '"'
2228+
2229+
return result
2230+
21782231

21792232
class PostgresNodeLogReader:
21802233
class LogInfo:

0 commit comments

Comments
 (0)