Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7114a01
fix: update pre-commit CI to use python 3.12 (#676)
ad-claw000 May 18, 2026
7965957
test: add initial suite of tests for Images.py (#674)
ad-claw000 May 19, 2026
8601cf4
feat(parallel): allow dynamic batch sizing by byte limits
ad-claw000 May 4, 2026
8f117da
fix(syntax): resolve unterminated string literal in ParallelQuery.py
ad-claw000 May 4, 2026
9c8d41d
fix: pre-commit formatting
ad-claw000 May 19, 2026
719829a
fix(parallel): fix unterminated string literal in ParallelQuery.py
ad-claw000 May 19, 2026
fc062dd
style: fix autopep8 formatting for dynamic batchsize log messages
ad-claw000 May 19, 2026
43a3810
Fix PR review comments
ad-claw000 May 20, 2026
efe4f95
chore: apply pre-commit formatting
ad-claw000 May 20, 2026
ed40546
fix(parallel): fix f-string syntax errors
ad-claw000 May 20, 2026
aaffdb4
test(parallel): add dynamic batching tests
ad-claw000 May 20, 2026
dd094cf
test(parallel): make mocked dynamic tests mock clone as well
ad-claw000 May 20, 2026
147727f
feat(parallel): pass max_bytes_per_batch to dask
ad-claw000 May 20, 2026
2c53767
test: fix shell variable expansion in test runner
ad-claw000 May 20, 2026
8c0ca4f
fix(parallel): fix types and mock dependencies
ad-claw000 May 20, 2026
cfc4903
fix(parallel): correctly pass kwargs down to parallel query in dask
ad-claw000 May 20, 2026
8da09cc
fix(parallel): address PR review comments for dynamic batching
ad-claw000 May 20, 2026
2614f0c
docs(parallel): update docstring for max_bytes_per_batch behavior
ad-claw000 May 20, 2026
e95fbeb
Merge origin/develop into feat/328-dynamic-batchsize
ad-claw000 May 20, 2026
70697c8
fix(parallel): fix remaining f-string syntax errors
ad-claw000 May 21, 2026
c1cfec6
fix(parallel): fix f-string syntax errors from autopep8 formatting
ad-claw000 May 21, 2026
f40493a
Merge branch 'develop' into feat/328-dynamic-batchsize
luisremis May 21, 2026
41ae866
fix: address remaining review comments for dynamic batching and conne…
ad-claw000 May 21, 2026
69220c4
merge: resolve conflicts with develop
ad-claw000 May 23, 2026
b7d3939
fix: address remaining dynamic batching review comments
ad-claw000 May 23, 2026
3db60e3
docs: clarify max_bytes_per_batch docstring
ad-claw000 May 23, 2026
74b777e
style: fix autopep8 formatting in test_Parallel.py
ad-claw000 May 24, 2026
e60e837
fix(ci): restore and fix nginx health check to prevent premature test…
ad-claw000 May 24, 2026
5fea602
merge: resolve conflicts with develop
ad-claw000 May 24, 2026
6f6ceec
fix(ci): fix lenz readiness check entrypoint and distinct stack ident…
ad-claw000 May 24, 2026
3171446
Merge remote-tracking branch 'origin/develop' into feat/328-dynamic-b…
ad-claw000 May 24, 2026
9bdf19d
test: add test for dynamic batching with small images
ad-claw000 May 25, 2026
d880797
Merge remote-tracking branch 'origin/develop' into feat/328-dynamic-b…
ad-claw000 May 25, 2026
c9fcd3e
test: add test for dynamic batching with variable image sizes
ad-claw000 May 25, 2026
8efe264
style: apply autopep8 fixes to test_Parallel.py
ad-claw000 May 25, 2026
55dc4ec
fix: safeguard item retrieval in dynamic batching
ad-claw000 May 25, 2026
36ab63a
ci: fix permission denied errors during checkout
ad-claw000 May 25, 2026
a6ba587
fix(ParallelQuery): flush and reset pending dynamic batch on item fai…
ad-claw000 May 25, 2026
db2fc27
merge: resolve conflicts with develop
ad-claw000 May 26, 2026
102d48e
style: fix autopep8 line wrapping issue with f-string
ad-claw000 May 26, 2026
32da8bc
ci: fix gcp auth in gpu test and update actions
Jun 6, 2026
27000d4
merge: resolve conflicts with develop
Jun 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ jobs:
steps:

- name: Cleanup previous run
run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb"
run: |
docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb /workspace/test/seattle-* /workspace/test/output /workspace/docker/tests" || true
sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true
continue-on-error: true

- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASS }}
Expand Down Expand Up @@ -80,13 +82,15 @@ jobs:
steps:

- name: Cleanup previous run
run: docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb"
run: |
docker run --rm -v ${{ github.workspace }}:/workspace alpine sh -c "rm -rf /workspace/test/aperturedb /workspace/test/seattle-* /workspace/test/output /workspace/docker/tests" || true
sudo rm -rf test/aperturedb test/seattle-* test/output docker/tests || true
continue-on-error: true

- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASS }}
Expand All @@ -96,7 +100,6 @@ jobs:
with:
credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}
project_id: ${{ secrets.GCP_SERVICE_ACCOUNT_PROJECT_ID }}

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v2
- name: Build tests on pytorch GPU image
Expand Down
9 changes: 5 additions & 4 deletions aperturedb/DaskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def __del__(self):
self._client.close()
self._cluster.close()

def run(self, QueryClass: type[ParallelQuery], client: Connector, generator, batchsize, stats, dry_run=False):
def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connector_type, dry_run):
def run(self, QueryClass: type['ParallelQuery'], client: Connector, generator, batchsize, stats, dry_run=False, **kwargs):
def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connector_type, dry_run, kwargs_dict):
metrics = Stats()
# Dask reads data in partitions, and the first partition is of 2 rows, with all
# values as 'foo'. This is for sampling the column names and types. Should not process
Expand Down Expand Up @@ -88,7 +88,7 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connecto
blobs_relative_to_csv=generator.blobs_relative_to_csv)

loader.query(generator=data, batchsize=len(
slice), numthreads=1, stats=False)
slice), numthreads=1, stats=False, **kwargs_dict)
count += 1
metrics.times_arr.extend(loader.times_arr)
metrics.error_counter += loader.error_counter
Expand All @@ -109,7 +109,8 @@ def process(df, host, port, use_ssl, ca_cert, verify_hostname, session, connecto
client.config.verify_hostname,
client.shared_data.session,
type(client),
dry_run)
dry_run,
kwargs)
computation = computation.persist()
if stats:
progress(computation)
Expand Down
173 changes: 137 additions & 36 deletions aperturedb/ParallelQuery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import List, Tuple
from typing import List, Tuple, Optional
from aperturedb import Parallelizer
import numpy as np
import logging
Expand Down Expand Up @@ -174,7 +174,7 @@ def process_responses(requests, input_blobs, responses, output_blobs):
parameter_count = len(inspect.signature(
response_handler).parameters)
if parameter_count < 4 or parameter_count > 5:
raise Exception("Bad Signature for response_handler :"
raise Exception("Bad Signature for response_handler : "
f"expected 6 > args > 3, got {parameter_count}")
if parameter_count == 4:
indexless_handler = response_handler
Expand Down Expand Up @@ -241,35 +241,136 @@ def worker(self, thid: int, generator, start: int, end: int, run_event) -> None:
# A new connection will be created for each thread
client = self.client.clone()

try:
total_batches = (end - start) // self.batchsize

if (end - start) % self.batchsize > 0:
total_batches += 1

logger.info(
f"Worker {thid} executing {total_batches} batches, {self.stats=}")
executed_batches = 0
for i in range(total_batches):
if not run_event.is_set():
break
batch_start = start + i * self.batchsize
batch_end = min(batch_start + self.batchsize, end)

try:
self.do_batch(client, batch_start,
generator[batch_start:batch_end])
except Exception as e:
logger.exception(e)
logger.warning(
f"Worker {thid} failed to execute batch {i}: [{batch_start},{batch_end}]")
with self.error_counter_lock:
self.error_counter += 1
max_bytes = getattr(self, "max_bytes_per_batch", None)

executed_batches += 1
if self.stats:
self.pb.update(batch_end - batch_start)
logger.info(f"Worker {thid} executed {executed_batches} batches")
try:
if max_bytes is not None and max_bytes > 0:
logger.info(
f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}")
current_batch = []
current_bytes = 0
batch_start = start
executed_batches = 0

for i in range(start, end):
if not run_event.is_set():
break

try:
item = generator[i]

# Estimate item size (mostly blobs + some json overhead)
item_bytes = len(str(item[0]))
for blob in item[1]:
if isinstance(blob, (bytes, bytearray, memoryview)):
item_bytes += len(blob)
else:
item_bytes += 100
except Exception as e:
logger.exception(e)
logger.warning(
f"Worker {thid} failed to retrieve/estimate item {i}")
with self.error_counter_lock:
self.error_counter += 1
if self.stats:
self.pb.update(1)

if len(current_batch) > 0:
try:
self.do_batch(
client, batch_start, current_batch)
executed_batches += 1
except Exception as e2:
logger.exception(e2)
logger.warning(
f"Worker {thid} failed to execute dynamic batch starting at {batch_start}")
with self.error_counter_lock:
self.error_counter += 1

if self.stats:
self.pb.update(len(current_batch))

current_batch = []
current_bytes = 0

continue

if len(current_batch) == 0:
batch_start = i

if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes or len(current_batch) >= self.batchsize):
try:
self.do_batch(client, batch_start, current_batch)
executed_batches += 1
except Exception as e:
logger.exception(e)
logger.warning(
f"Worker {thid} failed to execute dynamic batch starting at {batch_start}")
with self.error_counter_lock:
self.error_counter += 1

if self.stats:
self.pb.update(len(current_batch))

current_batch = []
current_bytes = 0
batch_start = i

if len(current_batch) == 0 and item_bytes > max_bytes:
logger.warning(
f"Worker {thid} executing batch starting at {batch_start} that exceeds max_bytes_per_batch: {item_bytes} > {max_bytes}")

current_batch.append(item)
current_bytes += item_bytes

# Remainder
if len(current_batch) > 0 and run_event.is_set():
try:
self.do_batch(client, batch_start, current_batch)
executed_batches += 1
except Exception as e:
logger.exception(e)
logger.warning(
f"Worker {thid} failed to execute dynamic batch remainder starting at {batch_start}")
with self.error_counter_lock:
self.error_counter += 1

if self.stats:
self.pb.update(len(current_batch))

logger.info(
f"Worker {thid} finished executing {executed_batches} dynamically sized batches")

else:
total_batches = (end - start) // self.batchsize

if (end - start) % self.batchsize > 0:
total_batches += 1

logger.info(
f"Worker {thid} executing {total_batches} batches, {self.stats=}")
executed_batches = 0
for i in range(total_batches):
if not run_event.is_set():
break
batch_start = start + i * self.batchsize
batch_end = min(batch_start + self.batchsize, end)

try:
self.do_batch(client, batch_start,
generator[batch_start:batch_end])
except Exception as e:
logger.exception(e)
logger.warning(
f"Worker {thid} failed to execute batch {i}: [{batch_start},{batch_end}]")
with self.error_counter_lock:
self.error_counter += 1

executed_batches += 1
if self.stats:
self.pb.update(batch_end - batch_start)
msg = f"Worker {thid} executed {executed_batches} batches"
logger.info(msg)
finally:
# Explicitly close the connection to avoid exhausting server connection limits
if client is not self.client and hasattr(client, 'close') and callable(client.close):
Expand All @@ -287,7 +388,7 @@ def get_succeeded_commands(self) -> int:
return sum(stat["succeeded_commands"]
for stat in self.actual_stats)

def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None:
def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, max_bytes_per_batch: Optional[int] = None) -> None:
"""
This function takes as input the data to be executed in specified number of threads.
The generator yields a tuple : (array of commands, array of blobs)
Expand All @@ -296,7 +397,9 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool
batchsize (int, optional): Number of queries per transaction. Defaults to 1.
numthreads (int, optional): Number of parallel workers. Defaults to 4.
stats (bool, optional): Show statistics at end of ingestion. Defaults to False.
max_bytes_per_batch (int, optional): The maximum number of bytes allowed per batch. Acts as an additional cap on batch size; batches will be split if they reach either this byte limit or the `batchsize` item limit. If None or <= 0, dynamic batching is disabled and only `batchsize` is used. Default is None.
"""
self.max_bytes_per_batch = max_bytes_per_batch

use_dask = hasattr(generator, "use_dask") and generator.use_dask
if use_dask:
Expand All @@ -308,7 +411,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool

if use_dask:
results, self.total_actions_time = self.daskManager.run(
self.__class__, self.client, generator, batchsize, stats=stats, dry_run=self.dry_run)
self.__class__, self.client, generator, batchsize, stats=stats, dry_run=self.dry_run, max_bytes_per_batch=self.max_bytes_per_batch)
self.actual_stats = []
for result in results:
if result is not None:
Expand Down Expand Up @@ -339,10 +442,8 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool
logger.error(
f"Could not determine query structure from:\n{generator[0]}")
logger.error(type(generator[0]))
logger.info(
f"Commands per query = {self.commands_per_query}, "
f"Blobs per query = {self.blobs_per_query}"
)
logger.info(f"Commands per query = {self.commands_per_query}, "
f"Blobs per query = {self.blobs_per_query}")
self.batched_run(generator, batchsize, numthreads, stats)

def print_stats(self) -> None:
Expand Down
15 changes: 10 additions & 5 deletions test/run_test_container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function run_aperturedb_instance(){
docker network create ${TAG}_host_default
GATEWAY=$(docker network inspect ${TAG}_host_default | jq -r .[0].IPAM.Config[0].Gateway)
GATEWAY=$GATEWAY RUNNER_NAME=$TAG docker compose -f docker-compose.yml up -d

if [[ "$TAG" == *_non_http ]]; then
PORT=$(RUNNER_NAME=$TAG docker compose -f docker-compose.yml port lenz 55551 | awk -F: '{print $NF}')
elif [[ "$TAG" == *_http ]]; then
Expand Down Expand Up @@ -101,17 +102,21 @@ wait_for_stack() {
local lenz_ready=0
local nginx_ready=0

if docker run --rm --network=${network} curlimages/curl:latest \
nc -z -w 2 lenz 58085 >/dev/null 2>&1; then
if docker run --rm --network=${network} --entrypoint nc curlimages/curl:latest \
-z -w 2 lenz 58085 >/dev/null 2>&1; then
lenz_ready=1
fi

if [[ "$tag" == *"_http" ]]; then
if [[ "$tag" == *"_non_http" ]]; then
if [ "$lenz_ready" -eq 1 ]; then
echo "Stack ${tag} is ready after ${elapsed}s"
return 0
fi
elif [[ "$tag" == *"_http" ]]; then
if docker run --rm --network=${network} curlimages/curl:latest \
-sS -o /dev/null -m 2 http://nginx:80/ >/dev/null 2>&1; then
-fsS -o /dev/null -m 2 http://nginx:80/ >/dev/null 2>&1; then
nginx_ready=1
fi

if [ "$lenz_ready" -eq 1 ] && [ "$nginx_ready" -eq 1 ]; then
echo "Stack ${tag} is ready after ${elapsed}s"
return 0
Expand Down
Loading
Loading