Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fd90c47
feat: add client side transformers to parallel loader
ad-claw000 May 20, 2026
163c99b
fix: address PR review comments for client-side transformers
ad-claw000 May 20, 2026
9a2fa0b
style: fix pre-commit formatting
ad-claw000 May 20, 2026
5e39e02
fix: implement attribute delegation in Transformer
ad-claw000 May 20, 2026
fcb6f82
chore: fix pre-commit issues
ad-claw000 May 20, 2026
56c23e9
fix: correctly forward AttributeError in Transformer.__getattr__
ad-claw000 May 21, 2026
5c5da7b
fix: address remaining PR review comments
luisremis May 23, 2026
17a022f
test: add test for transformers manual wrapping equivalence
luisremis May 23, 2026
c0cd2b6
merge: resolve conflicts with develop
ad-claw000 May 23, 2026
6980319
fix: address review comments
ad-claw000 May 23, 2026
ec12689
docs: update stats parameter docstring in ParallelQuery
ad-claw000 May 23, 2026
0edb3c3
style: fix autopep8 formatting in test_Parallel.py
ad-claw000 May 24, 2026
702f7fb
Merge branch 'develop' into fix/issue-326-client-side-transformers
luisremis May 24, 2026
a36d159
Merge remote-tracking branch 'origin/develop' into fix/issue-326-clie…
ad-claw000 May 24, 2026
9886390
fix: resolve empty dataset transformers handling and pytest assertions
ad-claw000 May 24, 2026
4ab1392
Merge remote-tracking branch 'origin/develop' into fix/issue-326-clie…
ad-claw000 May 24, 2026
0b585a2
fix: reject manual transformer wrapping when dask mode is enabled
ad-claw000 May 25, 2026
71c1191
style: fix trailing whitespace in test_Parallel.py
ad-claw000 May 25, 2026
0dcc0f5
fix(ParallelQuery): normalize transformers parameter to list if not i…
ad-claw000 May 25, 2026
00a8c52
Merge remote-tracking branch 'origin/develop' into fix/issue-326-clie…
ad-claw000 May 25, 2026
f832605
test: address review comments on ParallelQuery and ParallelLoader tests
ad-claw000 May 25, 2026
67a820a
merge: resolve conflicts with develop
ad-claw000 May 26, 2026
bfb6ff9
fix(transformers): address review comments
ad-claw000 May 26, 2026
5e09a94
Merge branch 'develop' into fix/issue-326-client-side-transformers
luisremis Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions aperturedb/ParallelLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def query_setup(self, generator: Subscriptable) -> None:
logger.warning(
f"Failed to create index for {connection_class}.{property_name}")

def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None:
def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = 4, stats: bool = False, transformers: list = None) -> None:
"""
**Method to ingest data into the database**

Expand All @@ -189,10 +189,13 @@ def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int =
batchsize (int, optional): The size of batch to be used. Defaults to 1.
numthreads (int, optional): Number of workers to create. Defaults to 4.
stats (bool, optional): If stats need to be presented, realtime. Defaults to False.
transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None.
"""
logger.info(
f"Starting ingestion with batchsize={batchsize}, numthreads={numthreads}")
self.query(generator, batchsize, numthreads, stats)

self.query(generator, batchsize, numthreads,
stats, transformers=transformers)

def print_stats(self) -> None:

Expand Down
21 changes: 19 additions & 2 deletions aperturedb/ParallelQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,25 +287,42 @@ def get_succeeded_commands(self) -> int:
return sum(stat["succeeded_commands"]
for stat in self.actual_stats)

def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None:
def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, transformers: list = None) -> None:
Comment thread
ad-claw000 marked this conversation as resolved.
"""
This function takes as input the data to be executed in specified number of threads.
The generator yields a tuple : (array of commands, array of blobs)
Args:
generator (_type_): The class that generates the queries to be executed.
batchsize (int, optional): Number of queries per transaction. Defaults to 1.
numthreads (int, optional): Number of parallel workers. Defaults to 4.
stats (bool, optional): Show statistics at end of ingestion. Defaults to False.
stats (bool, optional): Show statistics at end of query execution. Defaults to False.
transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None.
"""

from aperturedb.transformers.transformer import Transformer

if transformers is not None:
if not isinstance(transformers, (list, tuple)):
transformers = [transformers]
for t in transformers:
if not (isinstance(t, type) and issubclass(t, Transformer)) and not callable(t):
raise TypeError(
"Each transformer must be a subclass of Transformer or a callable.")

Comment thread
ad-claw000 marked this conversation as resolved.
use_dask = hasattr(generator, "use_dask") and generator.use_dask
if use_dask:
if transformers or isinstance(generator, Transformer):
raise ValueError("Transformers cannot be used with Dask mode.")
self._reset(batchsize=batchsize, numthreads=numthreads)
self.daskManager = DaskManager(num_workers=numthreads)
Comment thread
ad-claw000 marked this conversation as resolved.

if hasattr(self, "query_setup"):
self.query_setup(generator)

if transformers and len(generator) > 0:
for transformer in transformers:
generator = transformer(generator, client=self.client)
Comment thread
ad-claw000 marked this conversation as resolved.

Comment thread
ad-claw000 marked this conversation as resolved.
Comment on lines +322 to +325
if use_dask:
results, self.total_actions_time = self.daskManager.run(
self.__class__, self.client, generator, batchsize, stats=stats, dry_run=self.dry_run)
Expand Down
34 changes: 31 additions & 3 deletions aperturedb/transformers/transformer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from aperturedb.Subscriptable import Subscriptable
from aperturedb.CommonLibrary import create_connector
from aperturedb.Utils import Utils
import threading
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, data: Subscriptable, client=None, **kwargs) -> None:
self._blob_index = []
self._add_image_index = []
self._client = client
self._thread_local = threading.local()

bc = 0
for i, c in enumerate(x[0]):
Expand All @@ -82,9 +84,35 @@ def __len__(self):
return len(self.data)

def get_client(self):
if self._client is None:
self._client = create_connector()
return self._client
if not hasattr(self, "_thread_local"):
self._thread_local = threading.local()

if not hasattr(self._thread_local, "client"):
if self._client is not None:
if hasattr(self._client, "clone"):
self._thread_local.client = self._client.clone()
else:
self._thread_local.client = self._client
else:
self._thread_local.client = create_connector()
return self._thread_local.client

def get_utils(self):
return Utils(self.get_client())

def __getattr__(self, name):
# Delegate specific attribute access to the underlying data (generator)
# to preserve behaviors from original generators (like CSVParser).
allowed_attributes = {
"use_dask",
"strict_response_validation",
"response_handler",
"error_handler",
"blobs_relative_to_csv",
"commands_per_query",
"blobs_per_query"
}
Comment on lines +106 to +114
if name in allowed_attributes and "data" in self.__dict__:
return getattr(self.data, name)
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'")
Comment thread
ad-claw000 marked this conversation as resolved.
96 changes: 95 additions & 1 deletion test/test_Parallel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
import logging
import random
import pytest

from aperturedb.Connector import Connector
from aperturedb.ParallelQuery import ParallelQuery
from aperturedb.ParallelLoader import ParallelLoader
from aperturedb.Subscriptable import Subscriptable
from aperturedb.transformers.transformer import Transformer

logger = logging.getLogger(__name__)


class DummyTransformer(Transformer):
def __init__(self, generator, client=None):
super().__init__(generator, client=client)
if client is None:
pytest.fail("Client was not passed to transformer!")

def getitem(self, idx):
query, blobs = self.data[idx]
return query, blobs

# Tests for parallel which don't involve data.


Expand All @@ -24,7 +38,7 @@ def getitem(self, subscript):
query = []
blobs = []
for i in range(self.commands_per_query):
if random.randint(0, 100) <= (self.error_pct * 100):
if random.random() < self.error_pct:
query.append({
"BadCommand": {
}
Expand Down Expand Up @@ -114,6 +128,86 @@ def test_dictResponseHandling(self):
print(e)
raise

def test_transformers(self, db: Connector):
"""
Verifies that transformers are correctly applied.
"""
Comment thread
ad-claw000 marked this conversation as resolved.
elements = 10
generator = GeneratorWithErrors(elements=elements, error_pct=0)

loader = ParallelLoader(db)
loader.ingest(generator, batchsize=2, numthreads=2,
stats=False, transformers=[DummyTransformer])
Comment thread
ad-claw000 marked this conversation as resolved.

assert loader.get_succeeded_queries() > 0

def test_transformers_rejects_dask(self, db: Connector):
elements = 10
generator = GeneratorWithErrors(elements=elements, error_pct=0)
generator.use_dask = True

loader = ParallelLoader(db)
with pytest.raises(ValueError, match="Transformers cannot be used with Dask"):
loader.ingest(generator, batchsize=2, numthreads=2,
stats=False, transformers=[DummyTransformer])

# Test manual wrapping also gets rejected
transformer = DummyTransformer(generator, client=db)
with pytest.raises(ValueError, match="Transformers cannot be used with Dask"):
loader.ingest(transformer, batchsize=2, numthreads=2, stats=False)

def test_transformers_equivalence(self, db: Connector):
"""
Verifies that using transformers parameter is equivalent to manual wrapping.
"""
elements = 10

# Manual wrapping
generator1 = GeneratorWithErrors(elements=elements, error_pct=0)
transformer1 = DummyTransformer(generator1, client=db)
loader1 = ParallelLoader(db)
loader1.ingest(transformer1, batchsize=2, numthreads=2, stats=False)

# transformers parameter
generator2 = GeneratorWithErrors(elements=elements, error_pct=0)
loader2 = ParallelLoader(db)
loader2.ingest(generator2, batchsize=2, numthreads=2,
stats=False, transformers=[DummyTransformer])

assert loader1.get_succeeded_queries() == loader2.get_succeeded_queries()
assert loader1.get_succeeded_queries() > 0

def test_query_transformers(self, db: Connector):
"""
Verifies that transformers are correctly applied when calling ParallelQuery.query().
"""
elements = 10
generator = GeneratorWithErrors(elements=elements, error_pct=0)

querier = ParallelQuery(db)
querier.query(generator, batchsize=2, numthreads=2,
stats=False, transformers=[DummyTransformer])

assert querier.get_succeeded_queries() > 0

def test_transformers_single_and_invalid(self, db: Connector):
"""
Verifies that passing a single transformer works and invalid inputs raise TypeError.
"""
elements = 10
generator = GeneratorWithErrors(elements=elements, error_pct=0)
loader = ParallelLoader(db)

# Single transformer
loader.ingest(generator, batchsize=2, numthreads=2,
stats=False, transformers=DummyTransformer)
assert loader.get_succeeded_queries() > 0

# Invalid input
with pytest.raises(TypeError, match="must be a subclass of Transformer or a callable"):
loader.ingest(generator, batchsize=2, numthreads=2,
stats=False, transformers="invalid_transformer")

def test_parallel_query_worker_closes_connection(self, db, monkeypatch):
from aperturedb.QueryGenerator import QueryGenerator
import threading
Expand Down
Loading