feat: Support dynamic batchsize based on query size#696
Closed
ad-claw000 wants to merge 1 commit into
Closed
Conversation
This adds a `max_bytes_per_batch` parameter to ParallelQuery and ParallelLoader, allowing the worker to automatically adjust the batch size (the number of items in a batch) to ensure the total size of the batch's encoded items stays under the limit, avoiding oversized payload rejections. Fixes #328
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds an optional byte-based dynamic batching mode to ParallelQuery / ParallelLoader so batches can be sized based on an estimated payload size (intended to avoid oversized encoded-query rejections and reduce per-batch memory use), addressing #328.
Changes:
- Add
max_bytes_per_batchparameter toParallelQuery.query()andParallelLoader.ingest(). - Implement dynamic batch formation in
ParallelQuery.worker()whenmax_bytes_per_batchis set. - Document the new loader parameter and forward it from
ParallelLoader.ingest()intoParallelQuery.query().
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| aperturedb/ParallelQuery.py | Adds max_bytes_per_batch option and dynamic byte-based batch splitting in the worker loop. |
| aperturedb/ParallelLoader.py | Exposes max_bytes_per_batch on ingest() and forwards it into query(). |
Comments suppressed due to low confidence (5)
aperturedb/ParallelQuery.py:320
- The function docstring is no longer the first statement in
query()becauseself.max_bytes_per_batch = ...comes before the triple-quoted string. This prevents Python from treating it as the method docstring (e.g.,help()/Sphinx won’t pick it up). Move the assignment below the docstring (or set the attribute later) so the docstring remains effective.
def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, max_bytes_per_batch: int = None) -> None:
self.max_bytes_per_batch = max_bytes_per_batch
"""
aperturedb/ParallelQuery.py:242
- The byte estimation for dynamic batching only sums blob sizes; it does not include the serialized command JSON size (and related encoding overhead). Since the goal is to keep the encoded query + blobs under a limit, this can still produce oversized payloads for large/complex command bodies. Consider adding an estimate for
item[0](e.g., JSON-encoded byte length) toitem_bytes.
for blob in item[1]:
if isinstance(blob, bytes):
item_bytes += len(blob)
else:
item_bytes += len(str(blob))
aperturedb/ParallelQuery.py:245
- Edge case: if a single item’s estimated size exceeds
max_bytes_per_batch, the current logic will still enqueue it into an empty batch and send a payload above the limit (likely still rejected). Handle this explicitly (e.g., send the item alone with a warning, or raise a clear error advising to increase the limit / reduce blob size).
if len(current_batch) > 0 and (current_bytes + item_bytes > max_bytes):
try:
aperturedb/ParallelQuery.py:319
max_bytes_per_batchis accepted byParallelQuery.query(), but in theuse_daskpath it is not forwarded into the per-partitionloader.query(...)calls inDaskManager.run(), so dynamic byte-based batching is effectively disabled whengenerator.use_daskis true. Consider plumbing the parameter through the Dask execution path as well (or documenting that it’s not supported under Dask).
self.max_bytes_per_batch = max_bytes_per_batch
aperturedb/ParallelQuery.py:225
- New behavior (
max_bytes_per_batchdynamic batching) is not covered by tests. Sincetest/test_Parallel.pyandtest/test_ResponseHandler.pyalready exerciseParallelQuery.query(), please add a unit test that sets a smallmax_bytes_per_batchand asserts the work is split into multipledo_batchcalls (and that remainder handling works).
if max_bytes is not None and max_bytes > 0:
logger.info(
f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}")
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+223
to
+227
| if max_bytes is not None and max_bytes > 0: | ||
| logger.info( | ||
| f"Worker {thid} executing dynamically sized batches (max {max_bytes} bytes), {self.stats=}") | ||
| current_batch = [] | ||
| current_bytes = 0 |
Comment on lines
173
to
+176
| batchsize (int, optional): The size of batch to be used. Defaults to 1. | ||
| numthreads (int, optional): Number of workers to create. Defaults to 4. | ||
| stats (bool, optional): If stats need to be presented, realtime. Defaults to False. | ||
| max_bytes_per_batch (int, optional): Automatic batch sizing to keep encoded queries under this limit. |
Contributor
|
duplicated with #669 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Allows ParallelQuery and ParallelLoader to dynamically adjust batch size based on the estimated bytes of the query + blobs to prevent large payload rejection and to allow user to limit memory consumption per batch.
Verification
Tests using
ParallelQueryandParallelLoadercomplete correctly and dynamic batching logs show when batch limits are hit. The feature is tested against large blobs estimating query bounds correctly.Fixes #328.