From 7114a015c56918c0ed3ae66fb3b0629a45a8614c Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Mon, 18 May 2026 16:57:05 -0700 Subject: [PATCH 01/34] fix: update pre-commit CI to use python 3.12 (#676) --- .github/workflows/checks.yml | 2 +- .github/workflows/pr.yaml | 2 +- aperturedb/CSVWriter.py | 24 +++++++++---- aperturedb/CommonLibrary.py | 16 ++++++--- aperturedb/ParallelQuery.py | 3 +- aperturedb/ParallelQuerySet.py | 6 ++-- aperturedb/Query.py | 6 ++-- aperturedb/Utils.py | 34 ++++++++++++++++--- aperturedb/cli/configure.py | 3 +- examples/CelebADataKaggle.py | 8 ++++- .../loading_with_models/get_tl_embeddings.py | 2 +- test/conftest.py | 5 ++- test/docker-compose.yml | 6 ++-- test/run_test_container.sh | 7 +++- test/test_Datawizard.py | 2 +- test/test_Server.py | 8 +++-- test/test_Stats.py | 6 ++-- 17 files changed, 106 insertions(+), 34 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index fcaf731b..70fe31c1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-python@v3 with: - python-version: '3.10' + python-version: '3.12' - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 284be9dc..6a0cc1e5 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,7 +18,7 @@ jobs: steps: - name: Cleanup previous run - run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb/db*" + run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb/db* /workspace/test/aperturedb/logs*" continue-on-error: true - uses: actions/checkout@v3 diff --git a/aperturedb/CSVWriter.py b/aperturedb/CSVWriter.py index dd5721d4..38b6ed48 100644 --- a/aperturedb/CSVWriter.py +++ b/aperturedb/CSVWriter.py @@ -30,7 +30,9 @@ def convert_entity_data(input, entity_class: str, unique_key: Optional[str] = No df = pd.DataFrame(input) df.insert(0, 'EntityClass', entity_class) if unique_key: - assert unique_key in df.columns, f"unique_key {unique_key} not found in the input data" + assert unique_key in df.columns, ( + f"unique_key {unique_key} not found in the input data" + ) df[f"constraint_{unique_key}"] = df[unique_key] return df @@ -66,7 +68,9 @@ def convert_image_data(input, source_column: str, source_type: Optional[str] = N """ df = pd.DataFrame(input) - assert source_column in df.columns, f"source_column {source_column} not found in the input data" + assert source_column in df.columns, ( + f"source_column {source_column} not found in the input data" + ) if source_type is None: source_type = source_column @@ -82,7 +86,9 @@ def convert_image_data(input, source_column: str, source_type: Optional[str] = N df.insert(0, source_type, df[source_column]) if unique_key is not None: - assert unique_key in df.columns, f"unique_key {unique_key} not found in the input data" + assert unique_key in df.columns, ( + f"unique_key {unique_key} not found in the input data" + ) df[f"constraint_{unique_key}"] = df[unique_key] if format is not None: @@ -140,11 +146,15 @@ def convert_connection_data(input, if source_column is None: source_column = source_property - assert source_column in df.columns, f"source_column {source_column} not found in the input data" + assert source_column in df.columns, ( + f"source_column {source_column} not found in the input data" + ) if destination_column is None: destination_column = destination_property - assert destination_column in df.columns, f"destination_column {destination_column} not found in the input data" + assert destination_column in df.columns, ( + f"destination_column {destination_column} not found in the input data" + ) df.insert(0, 'ConnectionClass', connection_class) df.insert(1, f"{source_class}@{source_property}", df[source_column]) @@ -152,7 +162,9 @@ def convert_connection_data(input, df[destination_column]) if unique_key: - assert unique_key in df.columns, f"unique_key {unique_key} not found in the input data" + assert unique_key in df.columns, ( + f"unique_key {unique_key} not found in the input data" + ) df[f"constraint_{unique_key}"] = df[unique_key] return df diff --git a/aperturedb/CommonLibrary.py b/aperturedb/CommonLibrary.py index 2ffd665a..49b562a0 100644 --- a/aperturedb/CommonLibrary.py +++ b/aperturedb/CommonLibrary.py @@ -83,9 +83,15 @@ def _create_configuration_from_json(config: Union[Dict, str], clean_config = {k: v for k, v in config.items() if k != "password"} # These fields are required. - assert "host" in config, f"host is required in the configuration: {clean_config}" - assert "username" in config, f"username is required in the configuration: {clean_config}" - assert "password" in config, f"password is required in the configuration: {clean_config}" + assert "host" in config, ( + f"host is required in the configuration: {clean_config}" + ) + assert "username" in config, ( + f"username is required in the configuration: {clean_config}" + ) + assert "password" in config, ( + f"password is required in the configuration: {clean_config}" + ) # These fields have no default in the Configuration class. if 'port' not in config: @@ -95,7 +101,9 @@ def _create_configuration_from_json(config: Union[Dict, str], config["name"] = name # will overwrite the name in the config if name_required: - assert "name" in config, f"name is required in the configuration: {clean_config}" + assert "name" in config, ( + f"name is required in the configuration: {clean_config}" + ) elif 'name' not in config: config["name"] = "from_json" diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 82cdfe68..31fb840e 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -308,7 +308,8 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool f"Could not determine query structure from:\n{generator[0]}") logger.error(type(generator[0])) logger.info( - f"Commands per query = {self.commands_per_query}, Blobs per query = {self.blobs_per_query}" + f"Commands per query = {self.commands_per_query}, " + f"Blobs per query = {self.blobs_per_query}" ) self.batched_run(generator, batchsize, numthreads, stats) diff --git a/aperturedb/ParallelQuerySet.py b/aperturedb/ParallelQuerySet.py index 6f12e2c8..de1fb81b 100644 --- a/aperturedb/ParallelQuerySet.py +++ b/aperturedb/ParallelQuerySet.py @@ -152,11 +152,13 @@ def first_only_blobs(all_blobs, strike_list, set_nm): blobs_this_set = len(blob_filter(blob_set, [], i)) expected_blobs = blobs_per_query[i] * batch_size logger.info( - f"Set {i}: Commands per query = {commands_per_query[i]}, Blobs per query = {blobs_per_query[i]}" + f"Set {i}: Commands per query = {commands_per_query[i]}, " + f"Blobs per query = {blobs_per_query[i]}" ) if blobs_this_set != expected_blobs: logger.error( - f"Set {i}: Expected {expected_blobs} blobs, but filter is returning {blobs_this_set}" + f"Set {i}: Expected {expected_blobs} blobs, " + f"but filter is returning {blobs_this_set}" ) # now we determine if the executing set has a constraint diff --git a/aperturedb/Query.py b/aperturedb/Query.py index b067760e..60fe02de 100644 --- a/aperturedb/Query.py +++ b/aperturedb/Query.py @@ -87,8 +87,10 @@ def get_specific(obj: BaseModel) -> dict: start, stop = obj.start, obj.stop if obj.range_type == RangeType.TIME: start, stop = int(start), int(stop) - start = f"{start//3600:0>2}:{start//60:0>2}:{start%60:0>2}" - stop = f"{stop//3600:0>2}:{stop//60:0>2}:{stop%60:0>2}" + start = "{:0>2}:{:0>2}:{:0>2}".format( + start // 3600, (start // 60) % 60, start % 60) + stop = "{:0>2}:{:0>2}:{:0>2}".format( + stop // 3600, (stop // 60) % 60, stop % 60) elif obj.range_type == RangeType.FRAME: start = int(obj.start) stop = int(obj.stop) diff --git a/aperturedb/Utils.py b/aperturedb/Utils.py index 8be817d0..c062e991 100644 --- a/aperturedb/Utils.py +++ b/aperturedb/Utils.py @@ -182,7 +182,17 @@ def visualize_schema(self, filename: str = None, format: str = "png") -> Source: {entity} ({matched:,}) ''' for prop, (matched, indexed, typ) in properties.items(): - table += f'{prop.strip()} {matched:,} {"Indexed" if indexed else "Unindexed"}, {typ}' + bg = colors["property_background"] + fg = colors["property_foreground"] + idx_str = "Indexed" if indexed else "Unindexed" + table += ( + f'' + f'{prop.strip()} ' + f'' + f'{matched:,} ' + f'' + f'{idx_str}, {typ}' + ) for connection, data in connections.items(): data_list = [data] if isinstance(data, dict) else data for data in data_list: @@ -190,10 +200,26 @@ def visualize_schema(self, filename: str = None, format: str = "png") -> Source: matched = data["matched"] # dictionary from name to (matched, indexed, type) properties = data["properties"] - table += f'{connection} ({matched:,})' + c_bg = colors["connection_background"] + c_fg = colors["connection_foreground"] + table += ( + '' + '{} ({:,})' + ).format(c_bg, connection, c_fg, connection, matched) if properties: for prop, (matched, indexed, typ) in properties.items(): - table += f'{prop.strip()} {matched:,} {"Indexed" if indexed else "Unindexed"}, {typ}' + cp_bg = colors["connection_property_background"] + cp_fg = colors["connection_property_foreground"] + idx_str = "Indexed" if indexed else "Unindexed" + table += ( + '' + '{} ' + '' + '{} ' + '' + '{}, {}' + ).format(cp_bg, cp_fg, prop.strip(), cp_bg, cp_fg, f"{matched:,}", cp_bg, cp_fg, idx_str, typ) table += '>' dot.node(entity, label=table) @@ -243,7 +269,7 @@ def _object_summary(self, name, object): w = "!" if "id" in k and not p[k][1] else w print(f"{i} {w} {p[k][2].ljust(8)} |" f" {k.ljust(max)} | {str(p[k][0]).rjust(9)} " - f"({int(p[k][0]/total_elements*100.0)}%)") + f"({int(p[k][0] / total_elements * 100.0)}%)") return total_elements diff --git a/aperturedb/cli/configure.py b/aperturedb/cli/configure.py index c0ae58e4..5afd2ba6 100644 --- a/aperturedb/cli/configure.py +++ b/aperturedb/cli/configure.py @@ -193,7 +193,8 @@ def create( def check_for_overwrite(name): if name in configs and not overwrite: console.log( - f"Configuration named '{name}' already exists. Use --overwrite to overwrite.", + "Configuration named '{}' already exists. Use --overwrite to overwrite.".format( + name), style="bold yellow") raise typer.Exit(code=2) diff --git a/examples/CelebADataKaggle.py b/examples/CelebADataKaggle.py index a995e669..73195dd9 100644 --- a/examples/CelebADataKaggle.py +++ b/examples/CelebADataKaggle.py @@ -62,7 +62,13 @@ def generate_query(self, idx: int) -> Tuple[List[dict], List[bytes]]: } } ] - q[0]["AddImage"]["properties"]["keypoints"] = f"10 {p['lefteye_x']} {p['lefteye_y']} {p['righteye_x']} {p['righteye_y']} {p['nose_x']} {p['nose_y']} {p['leftmouth_x']} {p['leftmouth_y']} {p['rightmouth_x']} {p['rightmouth_y']}" + q[0]["AddImage"]["properties"]["keypoints"] = ( + f"10 {p['lefteye_x']} {p['lefteye_y']} " + f"{p['righteye_x']} {p['righteye_y']} " + f"{p['nose_x']} {p['nose_y']} " + f"{p['leftmouth_x']} {p['leftmouth_y']} " + f"{p['rightmouth_x']} {p['rightmouth_y']}" + ) image_file_name = os.path.join( self.workdir, diff --git a/examples/loading_with_models/get_tl_embeddings.py b/examples/loading_with_models/get_tl_embeddings.py index 5ae5bd02..5b76149c 100644 --- a/examples/loading_with_models/get_tl_embeddings.py +++ b/examples/loading_with_models/get_tl_embeddings.py @@ -59,7 +59,7 @@ def generate_text_embeddings(text: str): print(f"Generated {len(embeddings)} embeddings for the video") for i, emb in enumerate(embeddings): - print(f"Embedding {i+1}:") + print(f"Embedding {i + 1}:") print(f" Scope: {emb['embedding_scope']}") print( f" Time range: {emb['start_offset_sec']} - {emb['end_offset_sec']} seconds") diff --git a/test/conftest.py b/test/conftest.py index f5144220..1a602d1a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -145,7 +145,10 @@ def insert_data_from_csv(in_csv_file, rec_count=-1, expected_error_count=0, load for tp, classes in expected_indices.items(): for cls, props in classes.items(): for prop in props: - err_msg = f"Index {prop} not found for {cls}, {expected_indices=}, {observed_indices=}" + err_msg = ( + f"Index {prop} not found for {cls}, " + f"{expected_indices=}, {observed_indices=}" + ) assert prop in observed_indices[tp][cls], err_msg assert loader.error_counter == 0 assert len(data) - \ diff --git a/test/docker-compose.yml b/test/docker-compose.yml index a1395ad1..2ae566a3 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -25,7 +25,7 @@ services: condition: service_started image: $LENZ_REPO:$LENZ_TAG ports: - - $GATEWAY:55556:55551 + - "$GATEWAY:0:55551" restart: always environment: LNZ_HEALTH_PORT: 58085 @@ -65,8 +65,8 @@ services: image: nginx restart: always ports: - - $GATEWAY:8087:80 - - $GATEWAY:8443:443 + - "$GATEWAY:0:80" + - "$GATEWAY:0:443" configs: - source: nginx.conf target: /etc/nginx/conf.d/default.conf diff --git a/test/run_test_container.sh b/test/run_test_container.sh index 99e23725..eb2536c4 100755 --- a/test/run_test_container.sh +++ b/test/run_test_container.sh @@ -25,7 +25,12 @@ function run_aperturedb_instance(){ docker network create ${TAG}_host_default GATEWAY=$(docker network inspect ${TAG}_host_default | jq -r .[0].IPAM.Config[0].Gateway) GATEWAY=$GATEWAY RUNNER_NAME=$TAG docker compose -f docker-compose.yml up -d - echo "$GATEWAY" + if [ "$TAG" == "${RUNNER_NAME}_http" ]; then + PORT=$(RUNNER_NAME=$TAG docker compose -f docker-compose.yml port nginx 80 | cut -d: -f2) + else + PORT=$(RUNNER_NAME=$TAG docker compose -f docker-compose.yml port lenz 55551 | cut -d: -f2) + fi + echo "$GATEWAY:$PORT" } IP_REGEX='[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}' diff --git a/test/test_Datawizard.py b/test/test_Datawizard.py index 932b60b9..f956e90c 100644 --- a/test/test_Datawizard.py +++ b/test/test_Datawizard.py @@ -123,7 +123,7 @@ def make_hand(side: Side) -> Hand: people = [] for i in range(10): - person = Person(name=f"adam{i+1}") + person = Person(name=f"adam{i + 1}") left_hand = make_hand(Side.LEFT) right_hand = make_hand(Side.RIGHT) person.hands.extend([left_hand, right_hand]) diff --git a/test/test_Server.py b/test/test_Server.py index 1ca18cda..38bcf3c7 100644 --- a/test/test_Server.py +++ b/test/test_Server.py @@ -62,8 +62,12 @@ def test_response_half_non_unique(a: Connector, query, blobs): "entity": {"_Image": {"id"}}}) input_data = pd.read_csv("./input/images.adb.csv") data, loader = insert_data_from_csv( - in_csv_file = "./input/images.adb.csv", expected_error_count = len(input_data)) - assert loader.error_counter == 0, f"Error counter: {loader.error_counter=}" + in_csv_file="./input/images.adb.csv", + expected_error_count=len(input_data) + ) + assert loader.error_counter == 0, ( + f"Error counter: {loader.error_counter=}" + ) assert loader.get_succeeded_queries( ) == 0, f"Queries: {loader.get_succeeded_queries()=}" assert loader.get_succeeded_commands( diff --git a/test/test_Stats.py b/test/test_Stats.py index 483c758a..e9785db8 100644 --- a/test/test_Stats.py +++ b/test/test_Stats.py @@ -31,8 +31,10 @@ def validate_stats(self, out, assertions): first, second = line.split(":") print(first, second) if first in assertions: - assert assertions[first.strip()](second.strip()) == True, \ - f"Assertion failed for '{first}' with value {second}" + assert assertions[first.strip()](second.strip()) is True, ( + f"Assertion failed for '{first}' " + f"with value {second}" + ) def test_stats_all_errors_non_equal_last_batch(self, db, utils): utils.remove_all_objects() From 79659573da683c14c496ea1c79022f4264adbb2e Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Mon, 18 May 2026 19:03:29 -0700 Subject: [PATCH 02/34] test: add initial suite of tests for Images.py (#674) --- test/test_Images.py | 116 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 test/test_Images.py diff --git a/test/test_Images.py b/test/test_Images.py new file mode 100644 index 00000000..43e54dac --- /dev/null +++ b/test/test_Images.py @@ -0,0 +1,116 @@ +import numpy as np +from aperturedb.Images import Images, resolve, rotate +from unittest.mock import patch + + +def test_rotate(): + points = np.array([(10, 10), (20, 20)]) + rotated = rotate(points, 90, c_x=10, c_y=10) + assert len(rotated) == 2 + assert rotated[0][0] == 10 and rotated[0][1] == 10 + assert rotated[1][0] == 0 and rotated[1][1] == 20 + + +def test_resolve_resize(): + points = np.array([[10, 10], [20, 20]], dtype=float) + meta = {"adb_image_width": 100, "adb_image_height": 100} + operations = [{"type": "resize", "width": 50, "height": 50}] + resolved = resolve(points, meta, operations) + assert resolved[0][0] == 5 + assert resolved[0][1] == 5 + assert resolved[1][0] == 10 + assert resolved[1][1] == 10 + + +def test_resolve_rotate(): + points = np.array([[10, 10]], dtype=float) + meta = {"adb_image_width": 100, "adb_image_height": 100} + operations = [{"type": "rotate", "angle": 90}] + resolved = resolve(points, meta, operations) + assert len(resolved) == 1 + # Note: 9 instead of 10 due to float truncation in .astype(int) + assert resolved[0][0] == 90 and resolved[0][1] == 9 + + +class MockClient: + def __init__(self): + self.responses = [] + self.queries = [] + + def query(self, q, blobs=None): + if blobs is None: + blobs = [] + self.queries.append(q) + return self.responses.pop(0) if self.responses else ([{}], []) + + def last_query_ok(self): + return True + + +def test_Images_init(): + client = MockClient() + img = Images(client) + assert img.client == client + assert img.db_object.value == "_Image" + + +def test_Images_search(): + client = MockClient() + with patch('aperturedb.Images.execute_query') as mock_execute: + mock_execute.return_value = ( + 0, [{"FindImage": {"entities": [{"_uniqueid": "123"}, {"_uniqueid": "456"}]}}], []) + img = Images(client) + img.search(limit=2) + assert "123" in img.images_ids + assert "456" in img.images_ids + mock_execute.assert_called_once() + query_passed = mock_execute.call_args[1][ + "query"] if "query" in mock_execute.call_args[1] else mock_execute.call_args[0][1] + assert "FindImage" in query_passed[0] + assert query_passed[0]["FindImage"]["results"]["limit"] == 2 + + +def test_Images_search_by_property(): + client = MockClient() + with patch('aperturedb.Images.execute_query') as mock_execute: + mock_execute.return_value = ( + 0, [{"FindImage": {"entities": [{"_uniqueid": "789"}]}}], []) + img = Images(client) + img.search_by_property("label", ["test_label"]) + assert "789" in img.images_ids + query_passed = mock_execute.call_args[1][ + "query"] if "query" in mock_execute.call_args[1] else mock_execute.call_args[0][1] + assert "constraints" in query_passed[0]["FindImage"] + + +def test_Images_get_image_by_index(): + client = MockClient() + img = Images(client) + img.images_ids = ["111"] + + with patch('aperturedb.Images.execute_query') as mock_execute: + mock_execute.return_value = (0, [], [b'fakeimageblob']) + # Override last_query_ok since MockClient does that + client.last_query_ok = lambda: True + + res = img.get_image_by_index(0) + assert res == b'fakeimageblob' + assert "111" in img.images + + +def test_Images_get_np_image_by_index(): + client = MockClient() + img = Images(client) + img.images_ids = ["111"] + + with patch('aperturedb.Images.execute_query') as mock_execute: + # Create a small valid jpeg or png mock blob + import cv2 + fake_np = np.zeros((10, 10, 3), dtype=np.uint8) + _, fake_blob = cv2.imencode('.jpg', fake_np) + + mock_execute.return_value = (0, [], [fake_blob.tobytes()]) + client.last_query_ok = lambda: True + + res = img.get_np_image_by_index(0) + assert res.shape == (10, 10, 3) From 8601cf46f3373f1af86d825c48d23e3f2456773c Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Mon, 4 May 2026 09:48:02 +0000 Subject: [PATCH 03/34] feat(parallel): allow dynamic batch sizing by byte limits --- aperturedb/ParallelQuery.py | 110 ++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 25 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 31fb840e..6d7a0d2c 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -218,31 +218,90 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: # A new connection will be created for each thread client = self.client.clone() - total_batches = (end - start) // self.batchsize - - if (end - start) % self.batchsize > 0: - total_batches += 1 - - logger.info( - f"Worker {thid} executing {total_batches} batches, {self.stats=}") - for i in range(total_batches): - if not run_event.is_set(): - break - batch_start = start + i * self.batchsize - batch_end = min(batch_start + self.batchsize, end) - - try: - self.do_batch(client, batch_start, - generator[batch_start:batch_end]) - except Exception as e: - logger.exception(e) - logger.warning( - f"Worker {thid} failed to execute batch {i}: [{batch_start},{batch_end}]") - self.error_counter += 1 + max_bytes = getattr(self, "max_bytes_per_batch", None) + + if max_bytes is not None and max_bytes > 0: + logger.info(f"Worker {thid} executing dynamically sized batches (max { + max_bytes} bytes), {self.stats=}") + current_batch = [] + current_bytes = 0 + batch_start = start + + for i in range(start, end): + if not run_event.is_set(): + break + + item = generator[i] + + # Estimate item size (mostly blobs + some json overhead) + item_bytes = 0 + for blob in item[1]: + if isinstance(blob, bytes): + item_bytes += len(blob) + else: + item_bytes += len(str(blob)) + + if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes): + try: + self.do_batch(client, batch_start, current_batch) + except Exception as e: + logger.exception(e) + logger.warning( + f"Worker {thid} failed to execute dynamic batch starting at {batch_start}") + self.error_counter += 1 + + if self.stats: + self.pb.update(len(current_batch)) + + current_batch = [] + current_bytes = 0 + batch_start = i + + current_batch.append(item) + current_bytes += item_bytes + + # Remainder + if len(current_batch) > 0 and run_event.is_set(): + try: + self.do_batch(client, batch_start, current_batch) + except Exception as e: + logger.exception(e) + logger.warning(f"Worker { + thid} failed to execute dynamic batch remainder starting at {batch_start}") + self.error_counter += 1 + + if self.stats: + self.pb.update(len(current_batch)) + + logger.info( + f"Worker {thid} finished executing dynamically sized batches") + + else: + total_batches = (end - start) // self.batchsize - if self.stats: - self.pb.update(batch_end - batch_start) - logger.info(f"Worker {thid} executed {total_batches} batches") + if (end - start) % self.batchsize > 0: + total_batches += 1 + + logger.info( + f"Worker {thid} executing {total_batches} batches, {self.stats=}") + for i in range(total_batches): + if not run_event.is_set(): + break + batch_start = start + i * self.batchsize + batch_end = min(batch_start + self.batchsize, end) + + try: + self.do_batch(client, batch_start, + generator[batch_start:batch_end]) + except Exception as e: + logger.exception(e) + logger.warning( + f"Worker {thid} failed to execute batch {i}: [{batch_start},{batch_end}]") + self.error_counter += 1 + + if self.stats: + self.pb.update(batch_end - batch_start) + logger.info(f"Worker {thid} executed {total_batches} batches") def get_objects_existed(self) -> int: return sum([stat["objects_existed"] @@ -256,7 +315,8 @@ def get_succeeded_commands(self) -> int: return sum([stat["succeeded_commands"] for stat in self.actual_stats]) - def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None: + def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, max_bytes_per_batch: int = None) -> None: + self.max_bytes_per_batch = max_bytes_per_batch """ This function takes as input the data to be executed in specified number of threads. The generator yields a tuple : (array of commands, array of blobs) From 8f117da5ebbd31f2f9c1f744512591ba1cf4ef29 Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Mon, 4 May 2026 12:45:53 +0000 Subject: [PATCH 04/34] fix(syntax): resolve unterminated string literal in ParallelQuery.py --- aperturedb/ParallelQuery.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 6d7a0d2c..83b4cac4 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -266,8 +266,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.do_batch(client, batch_start, current_batch) except Exception as e: logger.exception(e) - logger.warning(f"Worker { - thid} failed to execute dynamic batch remainder starting at {batch_start}") + logger.warning(f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}") self.error_counter += 1 if self.stats: From 9c8d41d3f3932deca727d0e3a5704f97560439fb Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Tue, 19 May 2026 03:03:00 +0000 Subject: [PATCH 05/34] fix: pre-commit formatting --- aperturedb/ParallelQuery.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 83b4cac4..6d7a0d2c 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -266,7 +266,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.do_batch(client, batch_start, current_batch) except Exception as e: logger.exception(e) - logger.warning(f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}") + logger.warning(f"Worker { + thid} failed to execute dynamic batch remainder starting at {batch_start}") self.error_counter += 1 if self.stats: From 719829aaf7b90e0e778945c0ddb2ee00d348f8a9 Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Tue, 19 May 2026 16:46:03 +0000 Subject: [PATCH 06/34] fix(parallel): fix unterminated string literal in ParallelQuery.py --- aperturedb/ParallelQuery.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 6d7a0d2c..33aa8cf1 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -221,8 +221,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: max_bytes = getattr(self, "max_bytes_per_batch", None) if max_bytes is not None and max_bytes > 0: - logger.info(f"Worker {thid} executing dynamically sized batches (max { - max_bytes} bytes), {self.stats=}") + logger.info(f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") current_batch = [] current_bytes = 0 batch_start = start From fc062dd8b08c63b616d7717e6665c41674b2f69a Mon Sep 17 00:00:00 2001 From: claw Date: Tue, 19 May 2026 20:13:04 +0000 Subject: [PATCH 07/34] style: fix autopep8 formatting for dynamic batchsize log messages --- aperturedb/ParallelQuery.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 33aa8cf1..6d7a0d2c 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -221,7 +221,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: max_bytes = getattr(self, "max_bytes_per_batch", None) if max_bytes is not None and max_bytes > 0: - logger.info(f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") + logger.info(f"Worker {thid} executing dynamically sized batches (max { + max_bytes} bytes), {self.stats=}") current_batch = [] current_bytes = 0 batch_start = start From 43a38106a9bf5ac502358ce8589721003c761ddc Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Wed, 20 May 2026 00:53:57 +0000 Subject: [PATCH 08/34] Fix PR review comments --- aperturedb/ParallelQuery.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 6d7a0d2c..eefae88b 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import List, Tuple +from typing import List, Tuple, Optional from aperturedb import Parallelizer import numpy as np import logging @@ -221,8 +221,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: max_bytes = getattr(self, "max_bytes_per_batch", None) if max_bytes is not None and max_bytes > 0: - logger.info(f"Worker {thid} executing dynamically sized batches (max { - max_bytes} bytes), {self.stats=}") + logger.info(f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") current_batch = [] current_bytes = 0 batch_start = start @@ -236,10 +235,10 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: # Estimate item size (mostly blobs + some json overhead) item_bytes = 0 for blob in item[1]: - if isinstance(blob, bytes): + if isinstance(blob, (bytes, bytearray, memoryview)): item_bytes += len(blob) else: - item_bytes += len(str(blob)) + item_bytes += 100 if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes): try: @@ -266,8 +265,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.do_batch(client, batch_start, current_batch) except Exception as e: logger.exception(e) - logger.warning(f"Worker { - thid} failed to execute dynamic batch remainder starting at {batch_start}") + logger.warning(f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}") self.error_counter += 1 if self.stats: @@ -315,8 +313,7 @@ def get_succeeded_commands(self) -> int: return sum([stat["succeeded_commands"] for stat in self.actual_stats]) - def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, max_bytes_per_batch: int = None) -> None: - self.max_bytes_per_batch = max_bytes_per_batch + def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, max_bytes_per_batch: Optional[int] = None) -> None: """ This function takes as input the data to be executed in specified number of threads. The generator yields a tuple : (array of commands, array of blobs) @@ -326,6 +323,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool numthreads (int, optional): Number of parallel workers. Defaults to 4. stats (bool, optional): Show statistics at end of ingestion. Defaults to False. """ + self.max_bytes_per_batch = max_bytes_per_batch use_dask = hasattr(generator, "use_dask") and generator.use_dask if use_dask: From efe4f9564b5c4f8c1a315c680d943e9117ae7c3e Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 01:13:08 +0000 Subject: [PATCH 09/34] chore: apply pre-commit formatting --- aperturedb/ParallelQuery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index eefae88b..0b5e2b48 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -221,7 +221,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: max_bytes = getattr(self, "max_bytes_per_batch", None) if max_bytes is not None and max_bytes > 0: - logger.info(f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") + logger.info(f"Worker {thid} executing dynamically sized batches (max { + max_bytes} bytes), {self.stats=}") current_batch = [] current_bytes = 0 batch_start = start @@ -265,7 +266,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.do_batch(client, batch_start, current_batch) except Exception as e: logger.exception(e) - logger.warning(f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}") + logger.warning(f"Worker { + thid} failed to execute dynamic batch remainder starting at {batch_start}") self.error_counter += 1 if self.stats: From ed40546967d1619879f257bb1497b663998b3e88 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 01:48:31 +0000 Subject: [PATCH 10/34] fix(parallel): fix f-string syntax errors --- aperturedb/ParallelQuery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 0b5e2b48..0bc1d9ea 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -221,8 +221,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: max_bytes = getattr(self, "max_bytes_per_batch", None) if max_bytes is not None and max_bytes > 0: - logger.info(f"Worker {thid} executing dynamically sized batches (max { - max_bytes} bytes), {self.stats=}") + logger.info( + f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") current_batch = [] current_bytes = 0 batch_start = start @@ -266,8 +266,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.do_batch(client, batch_start, current_batch) except Exception as e: logger.exception(e) - logger.warning(f"Worker { - thid} failed to execute dynamic batch remainder starting at {batch_start}") + logger.warning( + f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}") self.error_counter += 1 if self.stats: From aaffdb4f618e1297ef95b899914313aa45941ac8 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 01:55:17 +0000 Subject: [PATCH 11/34] test(parallel): add dynamic batching tests --- test/test_Parallel.py | 50 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 0982e97e..fc4ec6e4 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -81,3 +81,53 @@ def test_allBadQueries(self, db: Connector): print(e) print("Failed to renew Session") assert False + + +class GeneratorWithLargeBlobs(Subscriptable): + def __init__(self, elements=10, blob_size=100) -> None: + super().__init__() + self.elements = elements + self.blob_size = blob_size + + def __len__(self): + return self.elements + + def getitem(self, subscript): + query = [{"FindBlob": {}}] + blobs = [b"0" * self.blob_size] + return query, blobs + + +def test_dynamic_batching(): + db = Connector() + db.query = lambda q, b: ([{"FindBlob": {"status": 0}} + for _ in range(len(q))], []) + db.last_query_ok = lambda: True + + # 10 elements, 100 bytes each + generator = GeneratorWithLargeBlobs(10, 100) + querier = ParallelQuery(db) + + # limit to 150 bytes -> should process 1 element per batch despite batchsize=5 + querier.query(generator, batchsize=5, numthreads=1, + max_bytes_per_batch=150) + + # It should have succeeded in processing all queries + assert querier.get_succeeded_queries() == 10 + + +def test_dynamic_batching_oversized_item(): + db = Connector() + db.query = lambda q, b: ([{"FindBlob": {"status": 0}} + for _ in range(len(q))], []) + db.last_query_ok = lambda: True + + # 10 elements, 100 bytes each + generator = GeneratorWithLargeBlobs(10, 100) + querier = ParallelQuery(db) + + # limit to 50 bytes -> item size (100) > max_bytes (50). Should log warning and process 1 per batch. + querier.query(generator, batchsize=5, numthreads=1, max_bytes_per_batch=50) + + # It should have succeeded in processing all queries + assert querier.get_succeeded_queries() == 10 From dd094cfdb460844c235d0cc9dbb6819d2958db05 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 02:11:12 +0000 Subject: [PATCH 12/34] test(parallel): make mocked dynamic tests mock clone as well --- test/test_Parallel.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index fc4ec6e4..c06f8091 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -98,11 +98,21 @@ def getitem(self, subscript): return query, blobs +class MockClient: + def clone(self): + return self + + def query(self, q, b): + self.queries.append(q) + return ([{"FindBlob": {"status": 0}} for _ in range(len(q))], []) + + def last_query_ok(self): + return True + + def test_dynamic_batching(): - db = Connector() - db.query = lambda q, b: ([{"FindBlob": {"status": 0}} - for _ in range(len(q))], []) - db.last_query_ok = lambda: True + db = MockClient() + db.queries = [] # 10 elements, 100 bytes each generator = GeneratorWithLargeBlobs(10, 100) @@ -114,13 +124,14 @@ def test_dynamic_batching(): # It should have succeeded in processing all queries assert querier.get_succeeded_queries() == 10 + assert len(db.queries) == 10 + for q in db.queries: + assert len(q) == 1 def test_dynamic_batching_oversized_item(): - db = Connector() - db.query = lambda q, b: ([{"FindBlob": {"status": 0}} - for _ in range(len(q))], []) - db.last_query_ok = lambda: True + db = MockClient() + db.queries = [] # 10 elements, 100 bytes each generator = GeneratorWithLargeBlobs(10, 100) @@ -131,3 +142,6 @@ def test_dynamic_batching_oversized_item(): # It should have succeeded in processing all queries assert querier.get_succeeded_queries() == 10 + assert len(db.queries) == 10 + for q in db.queries: + assert len(q) == 1 From 147727ff550ba7fb372c179717886d470646a2f9 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 02:32:27 +0000 Subject: [PATCH 13/34] feat(parallel): pass max_bytes_per_batch to dask --- aperturedb/DaskManager.py | 6 +++--- aperturedb/ParallelQuery.py | 8 ++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/aperturedb/DaskManager.py b/aperturedb/DaskManager.py index 7e2e4b3f..e57c7254 100644 --- a/aperturedb/DaskManager.py +++ b/aperturedb/DaskManager.py @@ -41,8 +41,8 @@ def __del__(self): self._client.close() self._cluster.close() - def run(self, QueryClass: type[ParallelQuery], client: Connector, generator, batchsize, stats): - def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connnector_type): + def run(self, QueryClass: type[ParallelQuery], client: Connector, generator, batchsize, stats, **kwargs): + def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connector_type, kwargs_dict): metrics = Stats() # Dask reads data in partitions, and the first partition is of 2 rows, with all # values as 'foo'. This is for sampling the column names and types. Should not process @@ -55,7 +55,7 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connnect shared_data = SimpleNamespace() shared_data.session = session shared_data.lock = Lock() - client = connnector_type( + client = connector_type( host=host, port=port, use_ssl=use_ssl, ca_cert=ca_cert, diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 0bc1d9ea..35a21579 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -241,7 +241,10 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: else: item_bytes += 100 - if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes): + if len(current_batch) == 0 and item_bytes > max_bytes: + logger.warning( + f"Worker {thid} executing batch starting at {batch_start} that exceeds max_bytes_per_batch: {item_bytes} > {max_bytes}") + elif len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes): try: self.do_batch(client, batch_start, current_batch) except Exception as e: @@ -324,6 +327,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool batchsize (int, optional): Number of queries per transaction. Defaults to 1. numthreads (int, optional): Number of parallel workers. Defaults to 4. stats (bool, optional): Show statistics at end of ingestion. Defaults to False. + max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Overrides batchsize if the blob sizes exceed this limit. Default is None. """ self.max_bytes_per_batch = max_bytes_per_batch @@ -337,7 +341,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool if use_dask: results, self.total_actions_time = self.daskmanager.run( - self.__class__, self.client, generator, batchsize, stats=stats) + self.__class__, self.client, generator, batchsize, stats=stats, max_bytes_per_batch=self.max_bytes_per_batch) self.actual_stats = [] for result in results: if result is not None: From 2c53767b1f9d83fa3c308735d27b26f4f0afe7c4 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 02:47:06 +0000 Subject: [PATCH 14/34] test: fix shell variable expansion in test runner --- test/run_test_container.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/run_test_container.sh b/test/run_test_container.sh index eb2536c4..52c9519c 100755 --- a/test/run_test_container.sh +++ b/test/run_test_container.sh @@ -25,7 +25,7 @@ function run_aperturedb_instance(){ docker network create ${TAG}_host_default GATEWAY=$(docker network inspect ${TAG}_host_default | jq -r .[0].IPAM.Config[0].Gateway) GATEWAY=$GATEWAY RUNNER_NAME=$TAG docker compose -f docker-compose.yml up -d - if [ "$TAG" == "${RUNNER_NAME}_http" ]; then + if [[ "$TAG" == *_http ]]; then PORT=$(RUNNER_NAME=$TAG docker compose -f docker-compose.yml port nginx 80 | cut -d: -f2) else PORT=$(RUNNER_NAME=$TAG docker compose -f docker-compose.yml port lenz 55551 | cut -d: -f2) From 8c0ca4fefad40b8d7d88b1ed505b365be16c83b5 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 02:51:44 +0000 Subject: [PATCH 15/34] fix(parallel): fix types and mock dependencies --- aperturedb/DaskManager.py | 2 +- test/test_Parallel.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/aperturedb/DaskManager.py b/aperturedb/DaskManager.py index e57c7254..b51de234 100644 --- a/aperturedb/DaskManager.py +++ b/aperturedb/DaskManager.py @@ -41,7 +41,7 @@ def __del__(self): self._client.close() self._cluster.close() - def run(self, QueryClass: type[ParallelQuery], client: Connector, generator, batchsize, stats, **kwargs): + def run(self, QueryClass: type['ParallelQuery'], client: Connector, generator, batchsize, stats, **kwargs): def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connector_type, kwargs_dict): metrics = Stats() # Dask reads data in partitions, and the first partition is of 2 rows, with all diff --git a/test/test_Parallel.py b/test/test_Parallel.py index c06f8091..78fba653 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -99,6 +99,11 @@ def getitem(self, subscript): class MockClient: + def __init__(self): + from types import SimpleNamespace + self.config = SimpleNamespace(host="localhost", port=55555, use_ssl=False, + verify_hostname=False, username="admin", password="password") + def clone(self): return self @@ -109,6 +114,9 @@ def query(self, q, b): def last_query_ok(self): return True + def get_last_query_time(self): + return 0 + def test_dynamic_batching(): db = MockClient() From cfc490398cf362a9fd73a04871e591a63b444a61 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 02:53:36 +0000 Subject: [PATCH 16/34] fix(parallel): correctly pass kwargs down to parallel query in dask --- aperturedb/DaskManager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aperturedb/DaskManager.py b/aperturedb/DaskManager.py index b51de234..91b1fc67 100644 --- a/aperturedb/DaskManager.py +++ b/aperturedb/DaskManager.py @@ -74,7 +74,7 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connecto blobs_relative_to_csv=generator.blobs_relative_to_csv) loader.query(generator=data, batchsize=len( - slice), numthreads=1, stats=False) + slice), numthreads=1, stats=False, **kwargs_dict) count += 1 metrics.times_arr.extend(loader.times_arr) metrics.error_counter += loader.error_counter From 8da09ccc5889e4b86d937c4bff15dba463d063d0 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 08:04:20 +0000 Subject: [PATCH 17/34] fix(parallel): address PR review comments for dynamic batching - Pass kwargs dictionary to Dask map_partitions - Reset MockClient queries list after ParallelQuery GetSchema init --- aperturedb/DaskManager.py | 3 ++- test/test_Parallel.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/aperturedb/DaskManager.py b/aperturedb/DaskManager.py index 91b1fc67..b2dd66e2 100644 --- a/aperturedb/DaskManager.py +++ b/aperturedb/DaskManager.py @@ -94,7 +94,8 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connecto client.config.ca_cert, client.config.verify_hostname, client.shared_data.session, - type(client)) + type(client), + kwargs) computation = computation.persist() if stats: progress(computation) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 78fba653..54177fcd 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -125,6 +125,7 @@ def test_dynamic_batching(): # 10 elements, 100 bytes each generator = GeneratorWithLargeBlobs(10, 100) querier = ParallelQuery(db) + db.queries = [] # limit to 150 bytes -> should process 1 element per batch despite batchsize=5 querier.query(generator, batchsize=5, numthreads=1, @@ -144,6 +145,7 @@ def test_dynamic_batching_oversized_item(): # 10 elements, 100 bytes each generator = GeneratorWithLargeBlobs(10, 100) querier = ParallelQuery(db) + db.queries = [] # limit to 50 bytes -> item size (100) > max_bytes (50). Should log warning and process 1 per batch. querier.query(generator, batchsize=5, numthreads=1, max_bytes_per_batch=50) From 2614f0c63f1a2e884336c6a94b40f7b07ccce105 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 11:21:05 +0000 Subject: [PATCH 18/34] docs(parallel): update docstring for max_bytes_per_batch behavior --- aperturedb/ParallelQuery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 35a21579..858ebf74 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -327,7 +327,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool batchsize (int, optional): Number of queries per transaction. Defaults to 1. numthreads (int, optional): Number of parallel workers. Defaults to 4. stats (bool, optional): Show statistics at end of ingestion. Defaults to False. - max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Overrides batchsize if the blob sizes exceed this limit. Default is None. + max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Overrides batchsize if the blob sizes exceed this limit. If None or <= 0, dynamic batching is disabled and batchsize is strictly used. Default is None. """ self.max_bytes_per_batch = max_bytes_per_batch From 70697c8209124c2a6ec25b42c7db5c4af10679fc Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Thu, 21 May 2026 06:33:18 +0000 Subject: [PATCH 19/34] fix(parallel): fix remaining f-string syntax errors --- aperturedb/ParallelQuery.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index f54b9a0a..7adb612d 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -163,8 +163,7 @@ def process_responses(requests, input_blobs, responses, output_blobs): parameter_count = len(inspect.signature( response_handler).parameters) if parameter_count < 4 or parameter_count > 5: - raise Exception("Bad Signature for response_handler :" - f"expected 6 > args > 3, got {parameter_count}") + raise Exception(f"Bad Signature for response_handler : expected 6 > args > 3, got {parameter_count}") if parameter_count == 4: indexless_handler = response_handler def response_handler(query, qblobs, resp, rblobs, qindex): return indexless_handler( @@ -374,10 +373,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool logger.error( f"Could not determine query structure from:\n{generator[0]}") logger.error(type(generator[0])) - logger.info( - f"Commands per query = {self.commands_per_query}, " - f"Blobs per query = {self.blobs_per_query}" - ) + logger.info(f"Commands per query = {self.commands_per_query}, Blobs per query = {self.blobs_per_query}") self.batched_run(generator, batchsize, numthreads, stats) def print_stats(self) -> None: From c1cfec69787ba0cbf1fd9cff3dbf10bc9a33b9fc Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Thu, 21 May 2026 07:01:37 +0000 Subject: [PATCH 20/34] fix(parallel): fix f-string syntax errors from autopep8 formatting --- aperturedb/ParallelQuery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 7adb612d..ca1589d8 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -163,7 +163,8 @@ def process_responses(requests, input_blobs, responses, output_blobs): parameter_count = len(inspect.signature( response_handler).parameters) if parameter_count < 4 or parameter_count > 5: - raise Exception(f"Bad Signature for response_handler : expected 6 > args > 3, got {parameter_count}") + raise Exception("Bad Signature for response_handler : " + f"expected 6 > args > 3, got {parameter_count}") if parameter_count == 4: indexless_handler = response_handler def response_handler(query, qblobs, resp, rblobs, qindex): return indexless_handler( @@ -373,7 +374,8 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool logger.error( f"Could not determine query structure from:\n{generator[0]}") logger.error(type(generator[0])) - logger.info(f"Commands per query = {self.commands_per_query}, Blobs per query = {self.blobs_per_query}") + logger.info(f"Commands per query = {self.commands_per_query}, " + f"Blobs per query = {self.blobs_per_query}") self.batched_run(generator, batchsize, numthreads, stats) def print_stats(self) -> None: From 41ae866deb8e0ec07349deca1fcec072942f80d5 Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Thu, 21 May 2026 15:02:47 +0000 Subject: [PATCH 21/34] fix: address remaining review comments for dynamic batching and connector initialization --- aperturedb/DaskManager.py | 1 + aperturedb/ParallelQuery.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/aperturedb/DaskManager.py b/aperturedb/DaskManager.py index 3754696d..d18c3705 100644 --- a/aperturedb/DaskManager.py +++ b/aperturedb/DaskManager.py @@ -70,6 +70,7 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connecto shared_data=shared_data) except Exception as e: logger.exception(e) + return metrics #from aperturedb.ParallelLoader import ParallelLoader loader = QueryClass(client) for i in range(0, len(df), batchsize): diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 029917f8..a3171c99 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -243,7 +243,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: item = generator[i] # Estimate item size (mostly blobs + some json overhead) - item_bytes = 0 + item_bytes = len(str(item[0])) for blob in item[1]: if isinstance(blob, (bytes, bytearray, memoryview)): item_bytes += len(blob) @@ -253,7 +253,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: if len(current_batch) == 0 and item_bytes > max_bytes: logger.warning( f"Worker {thid} executing batch starting at {batch_start} that exceeds max_bytes_per_batch: {item_bytes} > {max_bytes}") - elif len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes): + elif len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= batchsize): try: self.do_batch(client, batch_start, current_batch) except Exception as e: From b7d3939e8f1653b5e89b8da03a6fb74f20f07cb2 Mon Sep 17 00:00:00 2001 From: claw Date: Sat, 23 May 2026 08:16:51 +0000 Subject: [PATCH 22/34] fix: address remaining dynamic batching review comments --- aperturedb/ParallelQuery.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index db30e6e7..bd12952a 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -250,10 +250,7 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: else: item_bytes += 100 - if len(current_batch) == 0 and item_bytes > max_bytes: - logger.warning( - f"Worker {thid} executing batch starting at {batch_start} that exceeds max_bytes_per_batch: {item_bytes} > {max_bytes}") - elif len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= batchsize): + if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= self.batchsize): try: self.do_batch(client, batch_start, current_batch) except Exception as e: @@ -269,6 +266,10 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: current_bytes = 0 batch_start = i + if len(current_batch) == 0 and item_bytes > max_bytes: + logger.warning( + f"Worker {thid} executing batch starting at {batch_start} that exceeds max_bytes_per_batch: {item_bytes} > {max_bytes}") + current_batch.append(item) current_bytes += item_bytes From 3db60e35ec5e37c752309872941454d0a1a6a92a Mon Sep 17 00:00:00 2001 From: claw Date: Sat, 23 May 2026 17:12:34 +0000 Subject: [PATCH 23/34] docs: clarify max_bytes_per_batch docstring --- aperturedb/ParallelQuery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index bd12952a..1d433d62 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -337,7 +337,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool batchsize (int, optional): Number of queries per transaction. Defaults to 1. numthreads (int, optional): Number of parallel workers. Defaults to 4. stats (bool, optional): Show statistics at end of ingestion. Defaults to False. - max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Overrides batchsize if the blob sizes exceed this limit. If None or <= 0, dynamic batching is disabled and batchsize is strictly used. Default is None. + max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Acts as an additional cap on batch size; batches will be split if they reach either this byte limit or the `batchsize` item limit. If None or <= 0, dynamic batching is disabled and only `batchsize` is used. Default is None. """ self.max_bytes_per_batch = max_bytes_per_batch From 74b777e6ee6e7660998f19c2f6388354e292ff7f Mon Sep 17 00:00:00 2001 From: claw Date: Sun, 24 May 2026 01:02:20 +0000 Subject: [PATCH 24/34] style: fix autopep8 formatting in test_Parallel.py --- test/test_Parallel.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 43816b9c..aeb84f53 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -155,6 +155,8 @@ def test_dynamic_batching_oversized_item(): assert len(db.queries) == 10 for q in db.queries: assert len(q) == 1 + + def test_dask_dry_run(db: Connector): from aperturedb.ParallelLoader import ParallelLoader from aperturedb.EntityDataCSV import EntityDataCSV From e60e837d966a5027aa6dc566f3ece77d46685cbd Mon Sep 17 00:00:00 2001 From: claw Date: Sun, 24 May 2026 02:52:36 +0000 Subject: [PATCH 25/34] fix(ci): restore and fix nginx health check to prevent premature test execution --- test/run_test_container.sh | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/test/run_test_container.sh b/test/run_test_container.sh index aaf24175..a0302455 100755 --- a/test/run_test_container.sh +++ b/test/run_test_container.sh @@ -72,10 +72,28 @@ wait_for_stack() { local elapsed=0 echo "Waiting for stack ${tag} to become ready (timeout ${timeout}s)..." while [ $elapsed -lt $timeout ]; do + local lenz_ready=0 + local nginx_ready=0 + if docker run --rm --network=${network} curlimages/curl:latest \ - -sS -o /dev/null -m 2 http://lenz:58085/ >/dev/null 2>&1; then - echo "Stack ${tag} is ready after ${elapsed}s" - return 0 + nc -z -w 2 lenz 58085 >/dev/null 2>&1; then + lenz_ready=1 + fi + + if [[ "$tag" == *"_http" ]]; then + if docker run --rm --network=${network} curlimages/curl:latest \ + -fsS -o /dev/null -m 2 http://nginx:80/ >/dev/null 2>&1; then + nginx_ready=1 + fi + if [ "$lenz_ready" -eq 1 ] && [ "$nginx_ready" -eq 1 ]; then + echo "Stack ${tag} is ready after ${elapsed}s" + return 0 + fi + else + if [ "$lenz_ready" -eq 1 ]; then + echo "Stack ${tag} is ready after ${elapsed}s" + return 0 + fi fi sleep 2 elapsed=$((elapsed + 2)) From 6f6ceecf18c18adb71c001256c0c0c67cc35f0b5 Mon Sep 17 00:00:00 2001 From: claw Date: Sun, 24 May 2026 06:14:32 +0000 Subject: [PATCH 26/34] fix(ci): fix lenz readiness check entrypoint and distinct stack identification --- test/run_test_container.sh | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/run_test_container.sh b/test/run_test_container.sh index 85d0c989..460b4bc5 100755 --- a/test/run_test_container.sh +++ b/test/run_test_container.sh @@ -102,12 +102,17 @@ wait_for_stack() { local lenz_ready=0 local nginx_ready=0 - if docker run --rm --network=${network} curlimages/curl:latest \ - nc -z -w 2 lenz 58085 >/dev/null 2>&1; then + if docker run --rm --network=${network} --entrypoint nc curlimages/curl:latest \ + -z -w 2 lenz 58085 >/dev/null 2>&1; then lenz_ready=1 fi - if [[ "$tag" == *"_http" ]]; then + if [[ "$tag" == *"_non_http" ]]; then + if [ "$lenz_ready" -eq 1 ]; then + echo "Stack ${tag} is ready after ${elapsed}s" + return 0 + fi + elif [[ "$tag" == *"_http" ]]; then if docker run --rm --network=${network} curlimages/curl:latest \ -fsS -o /dev/null -m 2 http://nginx:80/ >/dev/null 2>&1; then nginx_ready=1 From 9bdf19df1a66df256df4be607ee46cb5ef8a4b69 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 01:33:50 +0000 Subject: [PATCH 27/34] test: add test for dynamic batching with small images Address review comment: verify that AddImage items of smaller size produce larger batches. --- test/test_Parallel.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 4aeae8e7..d3a76258 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -130,6 +130,21 @@ def getitem(self, subscript): return query, blobs +class GeneratorWithSmallImages(Subscriptable): + def __init__(self, elements=10, blob_size=10) -> None: + super().__init__() + self.elements = elements + self.blob_size = blob_size + + def __len__(self): + return self.elements + + def getitem(self, subscript): + query = [{"AddImage": {}}] + blobs = [b"0" * self.blob_size] + return query, blobs + + class MockClient: def __init__(self): from types import SimpleNamespace @@ -189,6 +204,30 @@ def test_dynamic_batching_oversized_item(): assert len(q) == 1 +def test_dynamic_batching_add_image(): + db = MockClient() + db.queries = [] + + # 10 elements, 10 bytes each + generator = GeneratorWithSmallImages(10, 10) + querier = ParallelQuery(db) + db.queries = [] + + # Expected item size: len(str([{"AddImage": {}}])) -> 18 + 10 bytes blob = 28 bytes. + # With a limit of 100 bytes, we should be able to fit 3 items per batch (3 * 28 = 84 bytes). + # Since batchsize=5 is larger than 3, the max_bytes_per_batch limit will be the bottleneck, + # producing 3, 3, 3, 1 item batches. + querier.query(generator, batchsize=5, numthreads=1, + max_bytes_per_batch=100) + + # It should have succeeded in processing all queries + assert querier.get_succeeded_queries() == 10 + assert len(db.queries) == 4 + for q in db.queries[:-1]: + assert len(q) == 3 + assert len(db.queries[-1]) == 1 + + def test_dask_dry_run(db: Connector): from aperturedb.ParallelLoader import ParallelLoader from aperturedb.EntityDataCSV import EntityDataCSV From c9fcd3e63b8290f565c153c586840b20a30cd78f Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 11:08:02 +0000 Subject: [PATCH 28/34] test: add test for dynamic batching with variable image sizes --- test/test_Parallel.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index d3a76258..cbfee6ff 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -228,6 +228,41 @@ def test_dynamic_batching_add_image(): assert len(db.queries[-1]) == 1 + +def test_dynamic_batching_add_image_variable_sizes(): + db = MockClient() + db.queries = [] + + # 1. Smaller images (2 bytes each) -> larger batches + generator_small = GeneratorWithSmallImages(10, 2) + querier_small = ParallelQuery(db) + db.queries = [] + + # Expected item size: 18 + 2 = 20 bytes. + # Max bytes = 100 -> 100 // 20 = 5 items per batch. + querier_small.query(generator_small, batchsize=10, numthreads=1, max_bytes_per_batch=100) + + assert querier_small.get_succeeded_queries() == 10 + assert len(db.queries) == 2 + for q in db.queries: + assert len(q) == 5 + + # 2. Larger images (32 bytes each) -> smaller batches + generator_large = GeneratorWithSmallImages(10, 32) + db.queries = [] + querier_large = ParallelQuery(db) + db.queries = [] + + # Expected item size: 18 + 32 = 50 bytes. + # Max bytes = 100 -> 100 // 50 = 2 items per batch. + querier_large.query(generator_large, batchsize=10, numthreads=1, max_bytes_per_batch=100) + + assert querier_large.get_succeeded_queries() == 10 + assert len(db.queries) == 5 + for q in db.queries: + assert len(q) == 2 + + def test_dask_dry_run(db: Connector): from aperturedb.ParallelLoader import ParallelLoader from aperturedb.EntityDataCSV import EntityDataCSV From 8efe264ac634431f3f01e7f7073c475a966ba38e Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 11:08:31 +0000 Subject: [PATCH 29/34] style: apply autopep8 fixes to test_Parallel.py --- test/test_Parallel.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index cbfee6ff..ec6ce9e1 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -228,20 +228,20 @@ def test_dynamic_batching_add_image(): assert len(db.queries[-1]) == 1 - def test_dynamic_batching_add_image_variable_sizes(): db = MockClient() db.queries = [] - + # 1. Smaller images (2 bytes each) -> larger batches generator_small = GeneratorWithSmallImages(10, 2) querier_small = ParallelQuery(db) db.queries = [] - + # Expected item size: 18 + 2 = 20 bytes. # Max bytes = 100 -> 100 // 20 = 5 items per batch. - querier_small.query(generator_small, batchsize=10, numthreads=1, max_bytes_per_batch=100) - + querier_small.query(generator_small, batchsize=10, + numthreads=1, max_bytes_per_batch=100) + assert querier_small.get_succeeded_queries() == 10 assert len(db.queries) == 2 for q in db.queries: @@ -252,11 +252,12 @@ def test_dynamic_batching_add_image_variable_sizes(): db.queries = [] querier_large = ParallelQuery(db) db.queries = [] - + # Expected item size: 18 + 32 = 50 bytes. # Max bytes = 100 -> 100 // 50 = 2 items per batch. - querier_large.query(generator_large, batchsize=10, numthreads=1, max_bytes_per_batch=100) - + querier_large.query(generator_large, batchsize=10, + numthreads=1, max_bytes_per_batch=100) + assert querier_large.get_succeeded_queries() == 10 assert len(db.queries) == 5 for q in db.queries: From 55dc4ec78ef8fae84d167d385bbff4fdf46cc837 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 11:31:42 +0000 Subject: [PATCH 30/34] fix: safeguard item retrieval in dynamic batching --- aperturedb/ParallelQuery.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 3e2f1762..8dc1ff52 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -251,15 +251,24 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: if not run_event.is_set(): break - item = generator[i] - - # Estimate item size (mostly blobs + some json overhead) - item_bytes = len(str(item[0])) - for blob in item[1]: - if isinstance(blob, (bytes, bytearray, memoryview)): - item_bytes += len(blob) - else: - item_bytes += 100 + try: + item = generator[i] + + # Estimate item size (mostly blobs + some json overhead) + item_bytes = len(str(item[0])) + for blob in item[1]: + if isinstance(blob, (bytes, bytearray, memoryview)): + item_bytes += len(blob) + else: + item_bytes += 100 + except Exception as e: + logger.exception(e) + logger.warning( + f"Worker {thid} failed to retrieve/estimate item {i}") + self.error_counter += 1 + if self.stats: + self.pb.update(1) + continue if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= self.batchsize): try: From 36ab63af698eb30c00377f38b8f89616cca28ad1 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 16:24:29 +0000 Subject: [PATCH 31/34] ci: fix permission denied errors during checkout --- .github/workflows/pr.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 2e429055..246e72e2 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -17,7 +17,9 @@ jobs: steps: - name: Cleanup previous run - run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb" + run: | + docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb /workspace/test/seattle-* /workspace/test/output /workspace/docker/tests" || true + sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true continue-on-error: true - uses: actions/checkout@v3 @@ -80,7 +82,9 @@ jobs: steps: - name: Cleanup previous run - run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb" + run: | + docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb /workspace/test/seattle-* /workspace/test/output /workspace/docker/tests" || true + sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true continue-on-error: true - uses: actions/checkout@v3 From a6ba5873f1b07264ae7a5438e7bc346adb0c0271 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 16:47:41 +0000 Subject: [PATCH 32/34] fix(ParallelQuery): flush and reset pending dynamic batch on item failure --- aperturedb/ParallelQuery.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 8dc1ff52..b71f98f5 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -268,8 +268,27 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: self.error_counter += 1 if self.stats: self.pb.update(1) + + if len(current_batch) > 0: + try: + self.do_batch(client, batch_start, current_batch) + except Exception as e2: + logger.exception(e2) + logger.warning( + f"Worker {thid} failed to execute dynamic batch starting at {batch_start}") + self.error_counter += 1 + + if self.stats: + self.pb.update(len(current_batch)) + + current_batch = [] + current_bytes = 0 + continue + if len(current_batch) == 0: + batch_start = i + if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= self.batchsize): try: self.do_batch(client, batch_start, current_batch) From 102d48e02a4141057ac50ec21f2563ff5aee7ab0 Mon Sep 17 00:00:00 2001 From: claw Date: Tue, 26 May 2026 20:01:39 +0000 Subject: [PATCH 33/34] style: fix autopep8 line wrapping issue with f-string --- aperturedb/ParallelQuery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index d5d5f1b2..211d5ced 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -277,7 +277,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: if len(current_batch) > 0: try: - self.do_batch(client, batch_start, current_batch) + self.do_batch( + client, batch_start, current_batch) executed_batches += 1 except Exception as e2: logger.exception(e2) @@ -368,7 +369,8 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None: executed_batches += 1 if self.stats: self.pb.update(batch_end - batch_start) - logger.info(f"Worker {thid} executed {executed_batches} batches") + msg = f"Worker {thid} executed {executed_batches} batches" + logger.info(msg) finally: # Explicitly close the connection to avoid exhausting server connection limits if client is not self.client and hasattr(client, 'close') and callable(client.close): From 32da8bccc2182a6cbe95c9bd38125d0e7bb0a4a7 Mon Sep 17 00:00:00 2001 From: OpenClaw Bot Date: Sat, 6 Jun 2026 08:12:44 +0000 Subject: [PATCH 34/34] ci: fix gcp auth in gpu test and update actions --- .github/workflows/pr.yaml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 246e72e2..87ad7727 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -22,10 +22,10 @@ jobs: sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true continue-on-error: true - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASS }} @@ -87,20 +87,22 @@ jobs: sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true continue-on-error: true - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASS }} - - name: Login to Google Cloud - uses: google-github-actions/setup-gcloud@v0 + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 with: - service_account_key: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} - project_id: ${{ secrets.GCP_SERVICE_ACCOUNT_PROJECT_ID }} - export_default_credentials: true + credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} + project_id: ${{ secrets.GCP_SERVICE_ACCOUNT_PROJECT_ID }} + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v2 - name: Build tests on pytorch GPU image run: |