feat: Support configurable aggregations setup#607
Conversation
- Reorganized aggregation logic into a cohesive 'aggregation' package. - Implemented AggregationOrchestrator supporting stage-based parallel execution and wildcards. - Implemented validator utility (CLI & programmatic) validating against schema.json. - Added new stateless /aggregation/initiate and /poll FastAPI endpoints. - Retained legacy /run and /status wrappers for backward compatibility. - Updated spanner-ingestion-workflow.yaml to use the state-passing loop. - Added comprehensive unit and integration test suites (25 tests total, 100% passing).
…mpty job list - Fixed a bug where a PENDING BigQuery job could cause premature transition to the next stage. - Changed the transition check to strictly require a DONE status. - Added short-circuiting for empty active job lists. - Added a new integration test 'test_aggregation_poll_still_running' to verify PENDING handling.
- Replaced all hardcoded upper limits of stage '10' in API routes and legacy wrappers. - Dynamically calculate the maximum stage from the loaded aggregations config. - Updated integration tests in app_test.py to mock the aggregations list for correct stage limit evaluation.
… side effects - Removed module-level logging.basicConfig call from validator.py. - Added logging.basicConfig inside validator's CLI main() function to ensure logging is configured only during standalone script execution. - Removed global logging.getLogger().setLevel(logging.INFO) call from orchestrator.py to prevent altering root logger levels on module import.
…or schema bypass - Added an else block in orchestrator's execute_stage() that raises ValueError for unsupported/unimplemented step types (like 'entity'), preventing silent failures. - Removed the early-return validation bypass in validator.py, ensuring that jsonschema strictly validates missing 'aggregations' keys. - Added unit test 'test_execute_stage_unsupported_type' in orchestrator_test.py to verify the ValueError fail-fast routing. - Added unit tests 'test_validate_config_missing_aggregations_key' and 'test_validate_config_empty_file' in validator_test.py to cover the validation fixes.
…loop performance issue - Added a new get_active_stages() helper in orchestrator.py that returns a sorted list of unique active and enabled stage numbers. - Replaced all sequential while and range loops in routes/aggregation.py with direct list-comprehension jumps using get_active_stages(). - This completely resolves the performance spike and timeout risk when a very large stage number (e.g. 100 million) is configured. - Added unit test 'test_get_active_stages' in orchestrator_test.py. - Updated and significantly simplified integration test mocks in app_test.py to mock get_active_stages() instead of aggregations/has_stage.
… and add docstring - Renamed the generic StateObject class to AggregationWorkflowState to better reflect its purpose in representing the state of the multi-stage aggregation workflow. - Added a comprehensive, professional docstring to AggregationWorkflowState explaining its role in the stateless polling loop coordinated by Google Cloud Workflows. - Updated all type annotations, route definitions, and return statements inside routes/aggregation.py. - Verified that no other files in the workspace referenced the old class name directly, and that all 30 tests continue to pass 100%.
…temporary TODOs - Reverted all 'Legacy' prefix names on compatibility Pydantic models (e.g. AggregationRequest, AggregationResponse) back to their original names, ensuring perfect backward compatibility for client-side code generators. - Reverted compatibility route method names back to their original names (run_aggregation and get_aggregation_status). - Added deprecated=True to the FastAPI route decorators for /run and /status to natively flag them in the OpenAPI/Swagger documentation UI. - Added clear TODO comments and docstrings advising that these are temporary compatibility components to be removed once all consumers migrate. - Renamed the test case in app_test.py to test_aggregation_run to match the method name.
- Deleted redundant ASCII box section dividers and decorative headers (e.g. Pydantic Models for the New Stateless API, Router Definition) from routes/aggregation.py. - Simplified backward compatibility section markers to clean, single-line TODO comments. - This removes visual noise and aligns the file with clean pythonic commenting best practices.
…d endpoints - Replaced the global section-level TODO comments in routes/aggregation.py with localized, specific TODO comments. - Placed an explicit, actionable TODO comment directly above each of the four compatibility models (AggregationRequest, AggregationStatusRequest, etc.). - Placed an explicit TODO comment directly above the /run and /status route decorators. - This ensures technical debt is highly visible, actionable, and tied directly to the specific components slated for removal after consumer migration.
- Fixed a PEP 8 E701 violation in orchestrator.py by splitting a single-line 'if' statement into a standard multi-line block. - Resolved a Ruff F401 unused import warning in app_test.py by removing 'import os'. - Verified that all 30 tests continue to pass 100% after the style cleanup.
… calculations with synchronized round execution
…orchestrator, and tests
…yaml, and update README
…aggregation.py to origin/master
…teps across YAML configs
…ar_calculation configs
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| Documentation | 29 minor |
| CodeStyle | 32 minor |
| Complexity | 3 medium |
🟢 Metrics 106 complexity
Metric Results Complexity 106
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request introduces a multi-stage aggregation orchestrator and validator for Data Commons, along with YAML configuration files, a JSON schema, and corresponding unit tests. The review feedback highlights critical issues in the orchestrator, including a potential IsADirectoryError when validating directories, a missing glob import, and a missing check_jobs_status method that would cause an AttributeError during polling. Additionally, the feedback suggests tightening the JSON schema validation by disabling additional properties and explicitly defining the disabled field.
…aggregations-v2 # Conflicts: # pipeline/workflow/ingestion-helper/aggregation/__init__.py # pipeline/workflow/ingestion-helper/pyproject.toml # pipeline/workflow/ingestion-helper/uv.lock
…ng and schema validation
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a multi-stage aggregation orchestrator and a configuration validator for Data Commons aggregations, complete with a JSON schema, various YAML configuration files, and comprehensive unit tests. The review feedback highlights several critical and high-severity issues: the AggregationOrchestrator is missing the check_jobs_status method required by the polling endpoint, which will cause runtime errors; it lacks dispatch logic for several calculation types (such as STAT_VAR_CALCULATION and ENTITY_AGGREGATION); and it contains legacy fallback keys that violate the strict validation constraints of schema.json and should be cleaned up.
…gacy fallback getters
…test via public API
…aggregations-v2 # Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
…ob with dry_run support
| # Copy local code to the container image | ||
| COPY main.py . | ||
| # Copy dependency definition files to leverage Docker layer caching | ||
| COPY ingestion-helper/pyproject.toml ingestion-helper/uv.lock ingestion-helper/__init__.py ./ |
There was a problem hiding this comment.
@vish-cs I had to make a few changes to this Dockerfile (as well as other related files) as it required copying from multiple folders (ingestion-helper and aggregation-helper).
I tested them locally and ensured I could build the container and run it.
…aggregations-v2 # Conflicts: # pipeline/workflow/aggregation-helper/main.py
This PR adds support to specify config files that will be used to run the aggregations.
This PR supersedes #582 as per the offline discussion & feedback I received on the previous PR's design approach.