-
Notifications
You must be signed in to change notification settings - Fork 37
feat: Support configurable aggregations setup #607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
SandeepTuniki
wants to merge
38
commits into
master
Choose a base branch
from
config-options-for-aggregations-v2
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
de069f7
feat(aggregation): implement stateless stage-based orchestration
SandeepTuniki d2e3128
fix(aggregation): handle PENDING state in polling and short-circuit e…
SandeepTuniki 43f3c56
feat(aggregation): calculate max_stage dynamically from configuration
SandeepTuniki ce20588
style(logging): follow logging best practices and remove module-level…
SandeepTuniki 4ab6468
fix(aggregation): fail-fast on unsupported step types and fix validat…
SandeepTuniki 1c01196
perf(aggregation): optimize stage execution and resolve sparse stage …
SandeepTuniki 92ae624
refactor(aggregation): rename StateObject to AggregationWorkflowState…
SandeepTuniki 0aa172a
refactor(aggregation): revert legacy prefixes to original names with …
SandeepTuniki 8562747
style(aggregation): remove redundant decorative comment headers
SandeepTuniki f02540b
refactor(aggregation): localize TODO comments for temporary models an…
SandeepTuniki 41494ef
refactor(aggregation): fix PEP 8 formatting and clean up unused imports
SandeepTuniki a753fd7
refactor(aggregation): improve orchestrator design, logging, and layout
SandeepTuniki dec9024
docs(aggregation): add module user guide README
SandeepTuniki fb4be20
test(aggregation): refactor validator and orchestrator test suites
SandeepTuniki a26d9d4
refactor(aggregation): polish orchestrator logging and flatten unit t…
SandeepTuniki cfecadc
style(aggregation): clean up and simplify aggregation.yaml comments
SandeepTuniki ccfef82
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki 05ea5d9
feat(aggregation): add YAML aggregation configs
SandeepTuniki 481a469
style(aggregation): remove internal links and bug references from YAM…
SandeepTuniki 832726c
feat(aggregation): update schema.json and validator.py for YAML calcu…
SandeepTuniki d6012a7
feat(aggregation): expand schema.json with type-specific calculation …
SandeepTuniki d4f6690
test(aggregation): update validator_test.py for calculations schema
SandeepTuniki c1cafee
feat(aggregation): update orchestrator to process per-import isolated…
SandeepTuniki 969f195
refactor(aggregation): rename round to stage across configs, schema, …
SandeepTuniki cb0e2da
feat(aggregation): add common.yaml config, remove legacy aggregation.…
SandeepTuniki 78d4418
revert(workflow): restore spanner-ingestion-workflow.yaml to origin/m…
SandeepTuniki dc008b6
revert(workflow): restore spanner-ingestion-workflow.yaml and routes/…
SandeepTuniki 086db67
feat(aggregation): add informative name property to all calculation s…
SandeepTuniki fc4f2c4
style(aggregation): reorder keys in YAML configs for consistent field…
SandeepTuniki c8267c7
style(aggregation): clean up comment indentation in statvar and statv…
SandeepTuniki 777102d
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki 645966d
fix(aggregation): resolve review comments for config directory scanni…
SandeepTuniki 507ade4
fix(aggregation): connect StatVarCalculationGenerator and clean up le…
SandeepTuniki 3b1baf7
refactor(aggregation): make get_active_stages_for_import private and …
SandeepTuniki 724f831
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki de358cf
feat(aggregation): integrate AggregationOrchestrator into Cloud Run J…
SandeepTuniki 30e51cd
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki d06e25e
Merge branch 'master' into config-options-for-aggregations-v2
SandeepTuniki File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
pipeline/workflow/ingestion-helper/aggregation/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| # Aggregations | ||
|
|
||
| This module orchestrates the execution of Data Commons aggregations through BigQuery Federation. The aggregations include place rollups, statistical variable aggregations, entity aggregations, linked edges, and metadata summaries. | ||
|
|
||
| ## Core Concepts | ||
|
|
||
| * **Sequential Stages**: Aggregations are executed sequentially by their `stage` number (e.g., Stage 1 steps are guaranteed to complete before Stage 2 steps begin). This is useful when later steps depend on the output of earlier ones. | ||
| * **Parallel Execution**: All aggregation steps configured in the same stage are executed in parallel to maximize performance. | ||
| * **Per-Import Isolation**: Aggregations run independently for each active import dataset. | ||
|
|
||
| --- | ||
|
|
||
| ## Configuration Guide (`configs/*.yaml`) | ||
|
|
||
| The aggregation pipeline is configured via YAML files in the `configs/` directory (`place.yaml`, `statvar.yaml`, `common.yaml`, etc.). Each file defines a top-level `calculations:` list. | ||
|
|
||
| ### Common Configuration Fields | ||
| Every calculation step supports these common fields: | ||
| * `type` (string, Required): The type of calculation step (e.g. `PLACE_AGGREGATION`, `STAT_VAR_AGGREGATION`, `LINKED_EDGES`). | ||
| * `stage` (integer, Optional, default: 1): The sequential stage number. Steps in lower stages are guaranteed to finish before higher stages start. | ||
| * `input_imports` (list of strings, Required): The list of import names this step applies to. Use `["*"]` (wildcard) to apply the step to **all** active imports. | ||
| * `output_import` (string, Optional): The output import dataset name to write aggregated observations under. | ||
| * `disabled` (boolean, Optional, default: false): Set to `true` to temporarily disable a step without deleting it. | ||
|
|
||
| --- | ||
|
|
||
| ### Supported Calculation Types | ||
|
|
||
| #### 1. Place Aggregation (`PLACE_AGGREGATION`) | ||
| Aggregates and rolls up statistical data from a smaller place type (source) to a larger place type (destination). | ||
| * **Fields**: | ||
| * `from_place_types` (string, Required): The source place type (e.g., `State`). | ||
| * `to_place_types` (string, Required): The destination place type (e.g., `Country`). | ||
| * `allow_multiple_to_places` (boolean, Optional, default: false): Allows mapping to multiple parent places if true. | ||
| * **Example**: | ||
| ```yaml | ||
| - type: PLACE_AGGREGATION | ||
| stage: 1 | ||
| input_imports: | ||
| - CensusACS5YearSurvey | ||
| output_import: CensusACS5YearSurvey_AggCountry | ||
| place_aggregation: | ||
| from_place_types: State | ||
| to_place_types: Country | ||
| ``` | ||
|
|
||
| #### 2. Statistical Variable Aggregation (`STAT_VAR_AGGREGATION`) | ||
| Aggregates raw statistical variables into a summarized ancestor variable. | ||
| * **Fields**: | ||
| * `ancestor_sv_id` (string, Required): The ID of the parent/summary statistical variable (e.g., `Count_Person`). | ||
| * `source_sv_ids` (list of strings, Required): The list of individual statistical variables to sum up. | ||
| * `skip_all_sources_present_check` (boolean, Optional, default: false): If true, aggregates even if some source variables are missing. | ||
| * **Example**: | ||
| ```yaml | ||
| - type: STAT_VAR_AGGREGATION | ||
| stage: 2 | ||
| input_imports: | ||
| - CensusACS5YearSurvey | ||
| output_import: CensusACS5YearSurvey_StatVarAgg | ||
| stat_var_aggregation: | ||
| aggregations: | ||
| - ancestor_sv_id: Count_Person | ||
| source_sv_ids: | ||
| - Count_Person_Male | ||
| - Count_Person_Female | ||
| ``` | ||
|
|
||
| #### 3. Common Aggregations (`LINKED_EDGES`, `PROVENANCE_SUMMARY`, `STAT_VAR_GROUPS`) | ||
| Common graph structure, lineage, and UI group hierarchy rollups defined in `common.yaml`. | ||
| * **Example**: | ||
| ```yaml | ||
| - type: LINKED_EDGES | ||
| stage: 1 | ||
| input_imports: | ||
| - "*" | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## Local Configuration Validation | ||
|
|
||
| The orchestrator strictly validates configuration files against `schema.json`. If there is any syntax error, type mismatch, or missing required field, validation will fail. | ||
|
|
||
| ### Running the Validator Locally | ||
| You can validate all configuration files locally using the built-in CLI tool: | ||
|
|
||
| 1. **Navigate to the ingestion-helper root**: | ||
| ```bash | ||
| cd pipeline/workflow/ingestion-helper | ||
| ``` | ||
| 2. **Run the validator**: | ||
| ```bash | ||
| python3 -m aggregation.validator | ||
|
|
||
| # Sample output: | ||
| # Validating 7 configuration file(s) in 'aggregation/configs' against 'schema.json'... | ||
| # ✓ common.yaml (3 calculation steps) | ||
| # ✓ place.yaml (16 calculation steps) | ||
| # ✓ statvar.yaml (21 calculation steps) | ||
| # [SUCCESS] All 7 configuration file(s) passed validation! (64 calculation steps total) | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
pipeline/workflow/ingestion-helper/aggregation/configs/common.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| calculations: | ||
| # Generates linkedContainedInPlace, linkedMemberOf, etc. | ||
| - name: "Global: ContainedInPlace & MemberOf Graph Linked Edges" | ||
| type: LINKED_EDGES | ||
| stage: 1 | ||
| input_imports: | ||
| - "*" | ||
|
|
||
| # Generates summary statistics in the Cache table | ||
| - name: "Global: Cache Provenance & Lineage Summary" | ||
| type: PROVENANCE_SUMMARY | ||
| stage: 1 | ||
| input_imports: | ||
| - "*" | ||
|
|
||
| # Generates the Statistical Variable hierarchy/verticals | ||
| - name: "Global: StatVar Group Hierarchy & Verticals" | ||
| type: STAT_VAR_GROUPS | ||
| stage: 1 | ||
| input_imports: | ||
| - "*" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vish-cs I had to make a few changes to this Dockerfile (as well as other related files) as it required copying from multiple folders (
ingestion-helperandaggregation-helper).I tested them locally and ensured I could build the container and run it.