From fd90c47a56a46ada7d6179dc895c6143ea814c09 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 08:42:24 +0000 Subject: [PATCH 01/17] feat: add client side transformers to parallel loader This allows applying transformers like ImageProps, Resizer etc. by passing a list of transformer classes via the `transformers` argument to `ParallelLoader.ingest` and `ParallelQuery.query`. Fixes #326 --- aperturedb/ParallelLoader.py | 8 +++++++- aperturedb/ParallelQuery.py | 7 ++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/aperturedb/ParallelLoader.py b/aperturedb/ParallelLoader.py index 94344411..4a102496 100644 --- a/aperturedb/ParallelLoader.py +++ b/aperturedb/ParallelLoader.py @@ -180,7 +180,7 @@ def query_setup(self, generator: Subscriptable) -> None: logger.warning( f"Failed to create index for {connection_class}.{property_name}") - def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None: + def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = 4, stats: bool = False, transformers: list = None) -> None: """ **Method to ingest data into the database** @@ -189,9 +189,15 @@ def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = batchsize (int, optional): The size of batch to be used. Defaults to 1. numthreads (int, optional): Number of workers to create. Defaults to 4. stats (bool, optional): If stats need to be presented, realtime. Defaults to False. + transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None. """ logger.info( f"Starting ingestion with batchsize={batchsize}, numthreads={numthreads}") + + if transformers: + for transformer in transformers: + generator = transformer(generator) + self.query(generator, batchsize, numthreads, stats) def print_stats(self) -> None: diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index b8a15176..124b3a31 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -259,7 +259,7 @@ def get_succeeded_commands(self) -> int: return sum([stat["succeeded_commands"] for stat in self.actual_stats]) - def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False) -> None: + def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool = False, transformers: list = None) -> None: """ This function takes as input the data to be executed in specified number of threads. The generator yields a tuple : (array of commands, array of blobs) @@ -268,6 +268,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool batchsize (int, optional): Number of queries per transaction. Defaults to 1. numthreads (int, optional): Number of parallel workers. Defaults to 4. stats (bool, optional): Show statistics at end of ingestion. Defaults to False. + transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None. """ use_dask = hasattr(generator, "use_dask") and generator.use_dask @@ -275,6 +276,10 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool self._reset(batchsize=batchsize, numthreads=numthreads) self.daskmanager = DaskManager(num_workers=numthreads) + if transformers: + for transformer in transformers: + generator = transformer(generator) + if hasattr(self, "query_setup"): self.query_setup(generator) From 163c99b4513d4f7928d8ab8730cc6e0a5bcbc1ce Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 09:26:43 +0000 Subject: [PATCH 02/17] fix: address PR review comments for client-side transformers --- aperturedb/ParallelLoader.py | 6 +----- aperturedb/ParallelQuery.py | 10 ++++++---- test/test_Parallel.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/aperturedb/ParallelLoader.py b/aperturedb/ParallelLoader.py index 4a102496..1e19d8e3 100644 --- a/aperturedb/ParallelLoader.py +++ b/aperturedb/ParallelLoader.py @@ -194,11 +194,7 @@ def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = logger.info( f"Starting ingestion with batchsize={batchsize}, numthreads={numthreads}") - if transformers: - for transformer in transformers: - generator = transformer(generator) - - self.query(generator, batchsize, numthreads, stats) + self.query(generator, batchsize, numthreads, stats, transformers=transformers) def print_stats(self) -> None: diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 124b3a31..fda9f2fc 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -273,16 +273,18 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool use_dask = hasattr(generator, "use_dask") and generator.use_dask if use_dask: + if transformers: + raise ValueError("Transformers cannot be used with Dask mode.") self._reset(batchsize=batchsize, numthreads=numthreads) self.daskmanager = DaskManager(num_workers=numthreads) - if transformers: - for transformer in transformers: - generator = transformer(generator) - if hasattr(self, "query_setup"): self.query_setup(generator) + if transformers: + for transformer in transformers: + generator = transformer(generator, client=self.client) + if use_dask: results, self.total_actions_time = self.daskmanager.run( self.__class__, self.client, generator, batchsize, stats=stats) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 0982e97e..fb4122b9 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -3,10 +3,21 @@ from aperturedb.Connector import Connector from aperturedb.ParallelQuery import ParallelQuery +from aperturedb.ParallelLoader import ParallelLoader from aperturedb.Subscriptable import Subscriptable +from aperturedb.transformers.transformer import Transformer logger = logging.getLogger(__name__) +class DummyTransformer(Transformer): + def __init__(self, generator, client=None): + super().__init__(generator, client=client) + assert client is not None, "Client was not passed to transformer!" + + def getitem(self, idx): + query, blobs = self.data[idx] + return query, blobs + # Tests for parallel which don't involve data. @@ -81,3 +92,27 @@ def test_allBadQueries(self, db: Connector): print(e) print("Failed to renew Session") assert False + + def test_transformers(self, db: Connector): + """ + Verifies that transformers are correctly applied. + """ + elements = 10 + generator = GeneratorWithErrors(elements=elements, error_pct=0) + + loader = ParallelLoader(db) + loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) + + assert loader.get_succeeded_queries() > 0 + + def test_transformers_rejects_dask(self, db: Connector): + elements = 10 + generator = GeneratorWithErrors(elements=elements, error_pct=0) + generator.use_dask = True + + loader = ParallelLoader(db) + try: + loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) + assert False, "Should have raised ValueError" + except ValueError as e: + assert "Transformers cannot be used with Dask" in str(e) From 9a2fa0b4519e3266b202c19b557530327b81adc3 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 10:18:53 +0000 Subject: [PATCH 03/17] style: fix pre-commit formatting --- aperturedb/ParallelLoader.py | 3 ++- test/test_Parallel.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/aperturedb/ParallelLoader.py b/aperturedb/ParallelLoader.py index 1e19d8e3..67852c6c 100644 --- a/aperturedb/ParallelLoader.py +++ b/aperturedb/ParallelLoader.py @@ -194,7 +194,8 @@ def ingest(self, generator: Subscriptable, batchsize: int = 1, numthreads: int = logger.info( f"Starting ingestion with batchsize={batchsize}, numthreads={numthreads}") - self.query(generator, batchsize, numthreads, stats, transformers=transformers) + self.query(generator, batchsize, numthreads, + stats, transformers=transformers) def print_stats(self) -> None: diff --git a/test/test_Parallel.py b/test/test_Parallel.py index fb4122b9..837d278a 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) + class DummyTransformer(Transformer): def __init__(self, generator, client=None): super().__init__(generator, client=client) @@ -99,20 +100,22 @@ def test_transformers(self, db: Connector): """ elements = 10 generator = GeneratorWithErrors(elements=elements, error_pct=0) - + loader = ParallelLoader(db) - loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) - + loader.ingest(generator, batchsize=2, numthreads=2, + stats=False, transformers=[DummyTransformer]) + assert loader.get_succeeded_queries() > 0 def test_transformers_rejects_dask(self, db: Connector): elements = 10 generator = GeneratorWithErrors(elements=elements, error_pct=0) generator.use_dask = True - + loader = ParallelLoader(db) try: - loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) + loader.ingest(generator, batchsize=2, numthreads=2, + stats=False, transformers=[DummyTransformer]) assert False, "Should have raised ValueError" except ValueError as e: assert "Transformers cannot be used with Dask" in str(e) From 5e39e026853bdd9bdfd4af903cb2541ad139f5a1 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 10:56:24 +0000 Subject: [PATCH 04/17] fix: implement attribute delegation in Transformer --- aperturedb/transformers/transformer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/aperturedb/transformers/transformer.py b/aperturedb/transformers/transformer.py index 5370364f..e20df3f5 100644 --- a/aperturedb/transformers/transformer.py +++ b/aperturedb/transformers/transformer.py @@ -88,3 +88,12 @@ def get_client(self): def get_utils(self): return Utils(self.get_client()) + + def __getattr__(self, name): + # Delegate attribute access to the underlying data (generator) + if "data" in self.__dict__: + try: + return getattr(self.data, name) + except AttributeError: + pass + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") From fcb6f8220d1d0256b9bd92ac055ceea0ea599564 Mon Sep 17 00:00:00 2001 From: claw Date: Wed, 20 May 2026 12:47:57 +0000 Subject: [PATCH 05/17] chore: fix pre-commit issues --- aperturedb/transformers/transformer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aperturedb/transformers/transformer.py b/aperturedb/transformers/transformer.py index e20df3f5..a9adeb4d 100644 --- a/aperturedb/transformers/transformer.py +++ b/aperturedb/transformers/transformer.py @@ -96,4 +96,5 @@ def __getattr__(self, name): return getattr(self.data, name) except AttributeError: pass - raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{name}'") From 56c23e99388e1129da37d953f06218bef86d0d8b Mon Sep 17 00:00:00 2001 From: ad-claw000 Date: Thu, 21 May 2026 07:23:18 +0000 Subject: [PATCH 06/17] fix: correctly forward AttributeError in Transformer.__getattr__ --- aperturedb/transformers/transformer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aperturedb/transformers/transformer.py b/aperturedb/transformers/transformer.py index a9adeb4d..434c4e11 100644 --- a/aperturedb/transformers/transformer.py +++ b/aperturedb/transformers/transformer.py @@ -92,9 +92,6 @@ def get_utils(self): def __getattr__(self, name): # Delegate attribute access to the underlying data (generator) if "data" in self.__dict__: - try: - return getattr(self.data, name) - except AttributeError: - pass + return getattr(self.data, name) raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'") From 5c5da7b8c05e1015065207600fb4d0f2d0d8b5d0 Mon Sep 17 00:00:00 2001 From: Luis Remis Date: Sat, 23 May 2026 00:53:59 +0000 Subject: [PATCH 07/17] fix: address remaining PR review comments From 17a022fa5ff1c1384c786eea32648c0b386d3ad5 Mon Sep 17 00:00:00 2001 From: Luis Remis Date: Sat, 23 May 2026 00:54:15 +0000 Subject: [PATCH 08/17] test: add test for transformers manual wrapping equivalence --- test/test_Parallel.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 837d278a..c20bf0fd 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -119,3 +119,23 @@ def test_transformers_rejects_dask(self, db: Connector): assert False, "Should have raised ValueError" except ValueError as e: assert "Transformers cannot be used with Dask" in str(e) + + def test_transformers_equivalence(self, db: Connector): + ''' + Verifies that using transformers parameter is equivalent to manual wrapping. + ''' + elements = 10 + + # Manual wrapping + generator1 = GeneratorWithErrors(elements=elements, error_pct=0) + transformer1 = DummyTransformer(generator1, client=db) + loader1 = ParallelLoader(db) + loader1.ingest(transformer1, batchsize=2, numthreads=2, stats=False) + + # transformers parameter + generator2 = GeneratorWithErrors(elements=elements, error_pct=0) + loader2 = ParallelLoader(db) + loader2.ingest(generator2, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) + + assert loader1.get_succeeded_queries() == loader2.get_succeeded_queries() + assert loader1.get_succeeded_queries() > 0 From 6980319391f86adbb9eff3aa080415207247d3d6 Mon Sep 17 00:00:00 2001 From: claw Date: Sat, 23 May 2026 05:21:30 +0000 Subject: [PATCH 09/17] fix: address review comments --- aperturedb/ParallelQuery.py | 4 ++-- test/test_Parallel.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index a037e0a1..5ee50794 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -282,7 +282,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool if transformers: raise ValueError("Transformers cannot be used with Dask mode.") self._reset(batchsize=batchsize, numthreads=numthreads) - self.daskmanager = DaskManager(num_workers=numthreads) + self.daskManager = DaskManager(num_workers=numthreads) if hasattr(self, "query_setup"): self.query_setup(generator) @@ -292,7 +292,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool generator = transformer(generator, client=self.client) if use_dask: - results, self.total_actions_time = self.daskmanager.run( + results, self.total_actions_time = self.daskManager.run( self.__class__, self.client, generator, batchsize, stats=stats, dry_run=self.dry_run) self.actual_stats = [] for result in results: diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 6c3ea8e7..5ad9d232 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -36,7 +36,7 @@ def getitem(self, subscript): query = [] blobs = [] for i in range(self.commands_per_query): - if random.randint(0, 100) <= (self.error_pct * 100): + if random.random() < self.error_pct: query.append({ "BadCommand": { } From ec1268900a80b97df6f6a69144345ce309a31b54 Mon Sep 17 00:00:00 2001 From: claw Date: Sat, 23 May 2026 09:46:09 +0000 Subject: [PATCH 10/17] docs: update stats parameter docstring in ParallelQuery --- aperturedb/ParallelQuery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 5ee50794..e3bba03d 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -273,7 +273,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool generator (_type_): The class that generates the queries to be executed. batchsize (int, optional): Number of queries per transaction. Defaults to 1. numthreads (int, optional): Number of parallel workers. Defaults to 4. - stats (bool, optional): Show statistics at end of ingestion. Defaults to False. + stats (bool, optional): Show statistics at end of query execution. Defaults to False. transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None. """ From 0edb3c3e19921e662d842c0fbd6bf5bddb078672 Mon Sep 17 00:00:00 2001 From: claw Date: Sun, 24 May 2026 01:11:08 +0000 Subject: [PATCH 11/17] style: fix autopep8 formatting in test_Parallel.py --- test/test_Parallel.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 5ad9d232..a2b31e1d 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -125,21 +125,23 @@ def test_transformers_equivalence(self, db: Connector): Verifies that using transformers parameter is equivalent to manual wrapping. ''' elements = 10 - + # Manual wrapping generator1 = GeneratorWithErrors(elements=elements, error_pct=0) transformer1 = DummyTransformer(generator1, client=db) loader1 = ParallelLoader(db) loader1.ingest(transformer1, batchsize=2, numthreads=2, stats=False) - + # transformers parameter generator2 = GeneratorWithErrors(elements=elements, error_pct=0) loader2 = ParallelLoader(db) - loader2.ingest(generator2, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) - + loader2.ingest(generator2, batchsize=2, numthreads=2, + stats=False, transformers=[DummyTransformer]) + assert loader1.get_succeeded_queries() == loader2.get_succeeded_queries() assert loader1.get_succeeded_queries() > 0 + def test_dask_dry_run(db: Connector): from aperturedb.ParallelLoader import ParallelLoader from aperturedb.EntityDataCSV import EntityDataCSV From 988639009d8072f2c9f60b8e4750f283887890e7 Mon Sep 17 00:00:00 2001 From: claw Date: Sun, 24 May 2026 16:46:42 +0000 Subject: [PATCH 12/17] fix: resolve empty dataset transformers handling and pytest assertions --- aperturedb/ParallelQuery.py | 2 +- test/test_Parallel.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index f4916981..64f94fc6 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -294,7 +294,7 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool if hasattr(self, "query_setup"): self.query_setup(generator) - if transformers: + if transformers and len(generator) > 0: for transformer in transformers: generator = transformer(generator, client=self.client) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 69f13b28..196778dc 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -1,5 +1,6 @@ import logging import random +import pytest from aperturedb.Connector import Connector from aperturedb.ParallelQuery import ParallelQuery @@ -145,12 +146,9 @@ def test_transformers_rejects_dask(self, db: Connector): generator.use_dask = True loader = ParallelLoader(db) - try: + with pytest.raises(ValueError, match="Transformers cannot be used with Dask"): loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) - assert False, "Should have raised ValueError" - except ValueError as e: - assert "Transformers cannot be used with Dask" in str(e) def test_transformers_equivalence(self, db: Connector): ''' From 0b585a29d286d1fe2b88039de0d778ab8d24f11b Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 00:44:10 +0000 Subject: [PATCH 13/17] fix: reject manual transformer wrapping when dask mode is enabled --- aperturedb/ParallelQuery.py | 4 +++- test/test_Parallel.py | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 64f94fc6..6a6428e4 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -284,9 +284,11 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool transformers (list, optional): A list of Transformer classes to apply to the data. Defaults to None. """ + from aperturedb.transformers.transformer import Transformer + use_dask = hasattr(generator, "use_dask") and generator.use_dask if use_dask: - if transformers: + if transformers or isinstance(generator, Transformer): raise ValueError("Transformers cannot be used with Dask mode.") self._reset(batchsize=batchsize, numthreads=numthreads) self.daskManager = DaskManager(num_workers=numthreads) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 196778dc..394cdeee 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -149,6 +149,11 @@ def test_transformers_rejects_dask(self, db: Connector): with pytest.raises(ValueError, match="Transformers cannot be used with Dask"): loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) + + # Test manual wrapping also gets rejected + transformer = DummyTransformer(generator, client=db) + with pytest.raises(ValueError, match="Transformers cannot be used with Dask"): + loader.ingest(transformer, batchsize=2, numthreads=2, stats=False) def test_transformers_equivalence(self, db: Connector): ''' From 71c1191ba0cc6bc11e9fd45a957eade2d8665302 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 02:13:32 +0000 Subject: [PATCH 14/17] style: fix trailing whitespace in test_Parallel.py --- test/test_Parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 394cdeee..7af62aca 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -149,7 +149,7 @@ def test_transformers_rejects_dask(self, db: Connector): with pytest.raises(ValueError, match="Transformers cannot be used with Dask"): loader.ingest(generator, batchsize=2, numthreads=2, stats=False, transformers=[DummyTransformer]) - + # Test manual wrapping also gets rejected transformer = DummyTransformer(generator, client=db) with pytest.raises(ValueError, match="Transformers cannot be used with Dask"): From 0dcc0f50e39b0909d57e8ca9d132e1b414df3b9e Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 02:53:44 +0000 Subject: [PATCH 15/17] fix(ParallelQuery): normalize transformers parameter to list if not iterable --- aperturedb/ParallelQuery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 6a6428e4..2c4a6e7f 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -286,6 +286,9 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool from aperturedb.transformers.transformer import Transformer + if transformers is not None and not isinstance(transformers, (list, tuple)): + transformers = [transformers] + use_dask = hasattr(generator, "use_dask") and generator.use_dask if use_dask: if transformers or isinstance(generator, Transformer): From f83260542f7c6f13a08909efaeea05c45ccecf52 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 25 May 2026 20:45:23 +0000 Subject: [PATCH 16/17] test: address review comments on ParallelQuery and ParallelLoader tests --- test/test_Parallel.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/test/test_Parallel.py b/test/test_Parallel.py index 7af62aca..2b9c50bd 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -14,7 +14,8 @@ class DummyTransformer(Transformer): def __init__(self, generator, client=None): super().__init__(generator, client=client) - assert client is not None, "Client was not passed to transformer!" + if client is None: + pytest.fail("Client was not passed to transformer!") def getitem(self, idx): query, blobs = self.data[idx] @@ -156,9 +157,9 @@ def test_transformers_rejects_dask(self, db: Connector): loader.ingest(transformer, batchsize=2, numthreads=2, stats=False) def test_transformers_equivalence(self, db: Connector): - ''' + """ Verifies that using transformers parameter is equivalent to manual wrapping. - ''' + """ elements = 10 # Manual wrapping @@ -176,6 +177,19 @@ def test_transformers_equivalence(self, db: Connector): assert loader1.get_succeeded_queries() == loader2.get_succeeded_queries() assert loader1.get_succeeded_queries() > 0 + def test_query_transformers(self, db: Connector): + """ + Verifies that transformers are correctly applied when calling ParallelQuery.query(). + """ + elements = 10 + generator = GeneratorWithErrors(elements=elements, error_pct=0) + + querier = ParallelQuery(db) + querier.query(generator, batchsize=2, numthreads=2, + stats=False, transformers=[DummyTransformer]) + + assert querier.get_succeeded_queries() > 0 + def test_dask_dry_run(db: Connector): from aperturedb.ParallelLoader import ParallelLoader From bfb6ff9a85a626ff4fd6ff42d527ad01ee9476cd Mon Sep 17 00:00:00 2001 From: claw Date: Tue, 26 May 2026 20:50:29 +0000 Subject: [PATCH 17/17] fix(transformers): address review comments - Add thread-local Connector clone in Transformer to avoid sharing across threads - Restrict attribute delegation in Transformer to an allowlist - Add validation for transformers parameter in ParallelQuery - Add test coverage for single transformer and invalid transformer inputs --- aperturedb/ParallelQuery.py | 9 ++++++-- aperturedb/transformers/transformer.py | 31 +++++++++++++++++++++----- test/test_Parallel.py | 18 +++++++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/aperturedb/ParallelQuery.py b/aperturedb/ParallelQuery.py index 7e960158..4022ced2 100644 --- a/aperturedb/ParallelQuery.py +++ b/aperturedb/ParallelQuery.py @@ -301,8 +301,13 @@ def query(self, generator, batchsize: int = 1, numthreads: int = 4, stats: bool from aperturedb.transformers.transformer import Transformer - if transformers is not None and not isinstance(transformers, (list, tuple)): - transformers = [transformers] + if transformers is not None: + if not isinstance(transformers, (list, tuple)): + transformers = [transformers] + for t in transformers: + if not (isinstance(t, type) and issubclass(t, Transformer)) and not callable(t): + raise TypeError( + "Each transformer must be a subclass of Transformer or a callable.") use_dask = hasattr(generator, "use_dask") and generator.use_dask if use_dask: diff --git a/aperturedb/transformers/transformer.py b/aperturedb/transformers/transformer.py index 434c4e11..d0813788 100644 --- a/aperturedb/transformers/transformer.py +++ b/aperturedb/transformers/transformer.py @@ -1,6 +1,7 @@ from aperturedb.Subscriptable import Subscriptable from aperturedb.CommonLibrary import create_connector from aperturedb.Utils import Utils +import threading import logging logger = logging.getLogger(__name__) @@ -59,6 +60,7 @@ def __init__(self, data: Subscriptable, client=None, **kwargs) -> None: self._blob_index = [] self._add_image_index = [] self._client = client + self._thread_local = threading.local() bc = 0 for i, c in enumerate(x[0]): @@ -82,16 +84,35 @@ def __len__(self): return len(self.data) def get_client(self): - if self._client is None: - self._client = create_connector() - return self._client + if not hasattr(self, "_thread_local"): + self._thread_local = threading.local() + + if not hasattr(self._thread_local, "client"): + if self._client is not None: + if hasattr(self._client, "clone"): + self._thread_local.client = self._client.clone() + else: + self._thread_local.client = self._client + else: + self._thread_local.client = create_connector() + return self._thread_local.client def get_utils(self): return Utils(self.get_client()) def __getattr__(self, name): - # Delegate attribute access to the underlying data (generator) - if "data" in self.__dict__: + # Delegate specific attribute access to the underlying data (generator) + # to preserve behaviors from original generators (like CSVParser). + allowed_attributes = { + "use_dask", + "strict_response_validation", + "response_handler", + "error_handler", + "blobs_relative_to_csv", + "commands_per_query", + "blobs_per_query" + } + if name in allowed_attributes and "data" in self.__dict__: return getattr(self.data, name) raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'") diff --git a/test/test_Parallel.py b/test/test_Parallel.py index cf006162..c632ab3c 100644 --- a/test/test_Parallel.py +++ b/test/test_Parallel.py @@ -190,6 +190,24 @@ def test_query_transformers(self, db: Connector): assert querier.get_succeeded_queries() > 0 + def test_transformers_single_and_invalid(self, db: Connector): + """ + Verifies that passing a single transformer works and invalid inputs raise TypeError. + """ + elements = 10 + generator = GeneratorWithErrors(elements=elements, error_pct=0) + loader = ParallelLoader(db) + + # Single transformer + loader.ingest(generator, batchsize=2, numthreads=2, + stats=False, transformers=DummyTransformer) + assert loader.get_succeeded_queries() > 0 + + # Invalid input + with pytest.raises(TypeError, match="must be a subclass of Transformer or a callable"): + loader.ingest(generator, batchsize=2, numthreads=2, + stats=False, transformers="invalid_transformer") + def test_parallel_query_worker_closes_connection(self, db, monkeypatch): from aperturedb.QueryGenerator import QueryGenerator import threading