Skip to content

feat: Support configurable aggregations setup#607

Open
SandeepTuniki wants to merge 38 commits into
masterfrom
config-options-for-aggregations-v2
Open

feat: Support configurable aggregations setup#607
SandeepTuniki wants to merge 38 commits into
masterfrom
config-options-for-aggregations-v2

Conversation

@SandeepTuniki

Copy link
Copy Markdown
Contributor

This PR adds support to specify config files that will be used to run the aggregations.

This PR supersedes #582 as per the offline discussion & feedback I received on the previous PR's design approach.

- Reorganized aggregation logic into a cohesive 'aggregation' package.
- Implemented AggregationOrchestrator supporting stage-based parallel execution and wildcards.
- Implemented validator utility (CLI & programmatic) validating against schema.json.
- Added new stateless /aggregation/initiate and /poll FastAPI endpoints.
- Retained legacy /run and /status wrappers for backward compatibility.
- Updated spanner-ingestion-workflow.yaml to use the state-passing loop.
- Added comprehensive unit and integration test suites (25 tests total, 100% passing).
…mpty job list

- Fixed a bug where a PENDING BigQuery job could cause premature transition to the next stage.
- Changed the transition check to strictly require a DONE status.
- Added short-circuiting for empty active job lists.
- Added a new integration test 'test_aggregation_poll_still_running' to verify PENDING handling.
- Replaced all hardcoded upper limits of stage '10' in API routes and legacy wrappers.
- Dynamically calculate the maximum stage from the loaded aggregations config.
- Updated integration tests in app_test.py to mock the aggregations list for correct stage limit evaluation.
… side effects

- Removed module-level logging.basicConfig call from validator.py.
- Added logging.basicConfig inside validator's CLI main() function to ensure logging is configured only during standalone script execution.
- Removed global logging.getLogger().setLevel(logging.INFO) call from orchestrator.py to prevent altering root logger levels on module import.
…or schema bypass

- Added an else block in orchestrator's execute_stage() that raises ValueError for unsupported/unimplemented step types (like 'entity'), preventing silent failures.
- Removed the early-return validation bypass in validator.py, ensuring that jsonschema strictly validates missing 'aggregations' keys.
- Added unit test 'test_execute_stage_unsupported_type' in orchestrator_test.py to verify the ValueError fail-fast routing.
- Added unit tests 'test_validate_config_missing_aggregations_key' and 'test_validate_config_empty_file' in validator_test.py to cover the validation fixes.
…loop performance issue

- Added a new get_active_stages() helper in orchestrator.py that returns a sorted list of unique active and enabled stage numbers.
- Replaced all sequential while and range loops in routes/aggregation.py with direct list-comprehension jumps using get_active_stages().
- This completely resolves the performance spike and timeout risk when a very large stage number (e.g. 100 million) is configured.
- Added unit test 'test_get_active_stages' in orchestrator_test.py.
- Updated and significantly simplified integration test mocks in app_test.py to mock get_active_stages() instead of aggregations/has_stage.
… and add docstring

- Renamed the generic StateObject class to AggregationWorkflowState to better reflect its purpose in representing the state of the multi-stage aggregation workflow.
- Added a comprehensive, professional docstring to AggregationWorkflowState explaining its role in the stateless polling loop coordinated by Google Cloud Workflows.
- Updated all type annotations, route definitions, and return statements inside routes/aggregation.py.
- Verified that no other files in the workspace referenced the old class name directly, and that all 30 tests continue to pass 100%.
…temporary TODOs

- Reverted all 'Legacy' prefix names on compatibility Pydantic models (e.g. AggregationRequest, AggregationResponse) back to their original names, ensuring perfect backward compatibility for client-side code generators.
- Reverted compatibility route method names back to their original names (run_aggregation and get_aggregation_status).
- Added deprecated=True to the FastAPI route decorators for /run and /status to natively flag them in the OpenAPI/Swagger documentation UI.
- Added clear TODO comments and docstrings advising that these are temporary compatibility components to be removed once all consumers migrate.
- Renamed the test case in app_test.py to test_aggregation_run to match the method name.
- Deleted redundant ASCII box section dividers and decorative headers (e.g. Pydantic Models for the New Stateless API, Router Definition) from routes/aggregation.py.
- Simplified backward compatibility section markers to clean, single-line TODO comments.
- This removes visual noise and aligns the file with clean pythonic commenting best practices.
…d endpoints

- Replaced the global section-level TODO comments in routes/aggregation.py with localized, specific TODO comments.
- Placed an explicit, actionable TODO comment directly above each of the four compatibility models (AggregationRequest, AggregationStatusRequest, etc.).
- Placed an explicit TODO comment directly above the /run and /status route decorators.
- This ensures technical debt is highly visible, actionable, and tied directly to the specific components slated for removal after consumer migration.
- Fixed a PEP 8 E701 violation in orchestrator.py by splitting a single-line 'if' statement into a standard multi-line block.
- Resolved a Ruff F401 unused import warning in app_test.py by removing 'import os'.
- Verified that all 30 tests continue to pass 100% after the style cleanup.
… calculations with synchronized round execution
@codacy-production

codacy-production Bot commented Jul 2, 2026

Copy link
Copy Markdown

Not up to standards ⛔

🔴 Issues 3 medium · 61 minor

Alerts:
⚠ 64 issues (≤ 0 issues of at least minor severity)

Results:
64 new issues

Category Results
Documentation 29 minor
CodeStyle 32 minor
Complexity 3 medium

View in Codacy

🟢 Metrics 106 complexity

Metric Results
Complexity 106

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a multi-stage aggregation orchestrator and validator for Data Commons, along with YAML configuration files, a JSON schema, and corresponding unit tests. The review feedback highlights critical issues in the orchestrator, including a potential IsADirectoryError when validating directories, a missing glob import, and a missing check_jobs_status method that would cause an AttributeError during polling. Additionally, the feedback suggests tightening the JSON schema validation by disabling additional properties and explicitly defining the disabled field.

Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py
Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py Outdated
Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py
Comment thread pipeline/workflow/ingestion-helper/aggregation/schema.json
Comment thread pipeline/workflow/ingestion-helper/aggregation/schema.json Outdated
…aggregations-v2

# Conflicts:
#	pipeline/workflow/ingestion-helper/aggregation/__init__.py
#	pipeline/workflow/ingestion-helper/pyproject.toml
#	pipeline/workflow/ingestion-helper/uv.lock
@SandeepTuniki

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a multi-stage aggregation orchestrator and a configuration validator for Data Commons aggregations, complete with a JSON schema, various YAML configuration files, and comprehensive unit tests. The review feedback highlights several critical and high-severity issues: the AggregationOrchestrator is missing the check_jobs_status method required by the polling endpoint, which will cause runtime errors; it lacks dispatch logic for several calculation types (such as STAT_VAR_CALCULATION and ENTITY_AGGREGATION); and it contains legacy fallback keys that violate the strict validation constraints of schema.json and should be cleaned up.

Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py
Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py
Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py Outdated
@SandeepTuniki SandeepTuniki marked this pull request as ready for review July 3, 2026 05:30
@SandeepTuniki SandeepTuniki requested a review from vish-cs July 3, 2026 05:35
…aggregations-v2

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
# Copy local code to the container image
COPY main.py .
# Copy dependency definition files to leverage Docker layer caching
COPY ingestion-helper/pyproject.toml ingestion-helper/uv.lock ingestion-helper/__init__.py ./

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vish-cs I had to make a few changes to this Dockerfile (as well as other related files) as it required copying from multiple folders (ingestion-helper and aggregation-helper).

I tested them locally and ensured I could build the container and run it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant