Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
de069f7
feat(aggregation): implement stateless stage-based orchestration
SandeepTuniki Jun 24, 2026
d2e3128
fix(aggregation): handle PENDING state in polling and short-circuit e…
SandeepTuniki Jun 24, 2026
43f3c56
feat(aggregation): calculate max_stage dynamically from configuration
SandeepTuniki Jun 24, 2026
ce20588
style(logging): follow logging best practices and remove module-level…
SandeepTuniki Jun 24, 2026
4ab6468
fix(aggregation): fail-fast on unsupported step types and fix validat…
SandeepTuniki Jun 24, 2026
1c01196
perf(aggregation): optimize stage execution and resolve sparse stage …
SandeepTuniki Jun 24, 2026
92ae624
refactor(aggregation): rename StateObject to AggregationWorkflowState…
SandeepTuniki Jun 24, 2026
0aa172a
refactor(aggregation): revert legacy prefixes to original names with …
SandeepTuniki Jun 24, 2026
8562747
style(aggregation): remove redundant decorative comment headers
SandeepTuniki Jun 24, 2026
f02540b
refactor(aggregation): localize TODO comments for temporary models an…
SandeepTuniki Jun 24, 2026
41494ef
refactor(aggregation): fix PEP 8 formatting and clean up unused imports
SandeepTuniki Jun 24, 2026
a753fd7
refactor(aggregation): improve orchestrator design, logging, and layout
SandeepTuniki Jun 24, 2026
dec9024
docs(aggregation): add module user guide README
SandeepTuniki Jun 24, 2026
fb4be20
test(aggregation): refactor validator and orchestrator test suites
SandeepTuniki Jun 24, 2026
a26d9d4
refactor(aggregation): polish orchestrator logging and flatten unit t…
SandeepTuniki Jun 24, 2026
cfecadc
style(aggregation): clean up and simplify aggregation.yaml comments
SandeepTuniki Jun 24, 2026
ccfef82
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki Jun 24, 2026
05ea5d9
feat(aggregation): add YAML aggregation configs
SandeepTuniki Jul 2, 2026
481a469
style(aggregation): remove internal links and bug references from YAM…
SandeepTuniki Jul 2, 2026
832726c
feat(aggregation): update schema.json and validator.py for YAML calcu…
SandeepTuniki Jul 2, 2026
d6012a7
feat(aggregation): expand schema.json with type-specific calculation …
SandeepTuniki Jul 2, 2026
d4f6690
test(aggregation): update validator_test.py for calculations schema
SandeepTuniki Jul 2, 2026
c1cafee
feat(aggregation): update orchestrator to process per-import isolated…
SandeepTuniki Jul 2, 2026
969f195
refactor(aggregation): rename round to stage across configs, schema, …
SandeepTuniki Jul 2, 2026
cb0e2da
feat(aggregation): add common.yaml config, remove legacy aggregation.…
SandeepTuniki Jul 2, 2026
78d4418
revert(workflow): restore spanner-ingestion-workflow.yaml to origin/m…
SandeepTuniki Jul 2, 2026
dc008b6
revert(workflow): restore spanner-ingestion-workflow.yaml and routes/…
SandeepTuniki Jul 2, 2026
086db67
feat(aggregation): add informative name property to all calculation s…
SandeepTuniki Jul 2, 2026
fc4f2c4
style(aggregation): reorder keys in YAML configs for consistent field…
SandeepTuniki Jul 2, 2026
c8267c7
style(aggregation): clean up comment indentation in statvar and statv…
SandeepTuniki Jul 2, 2026
777102d
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki Jul 2, 2026
645966d
fix(aggregation): resolve review comments for config directory scanni…
SandeepTuniki Jul 2, 2026
507ade4
fix(aggregation): connect StatVarCalculationGenerator and clean up le…
SandeepTuniki Jul 3, 2026
3b1baf7
refactor(aggregation): make get_active_stages_for_import private and …
SandeepTuniki Jul 3, 2026
724f831
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki Jul 3, 2026
de358cf
feat(aggregation): integrate AggregationOrchestrator into Cloud Run J…
SandeepTuniki Jul 3, 2026
30e51cd
Merge remote-tracking branch 'origin/master' into config-options-for-…
SandeepTuniki Jul 3, 2026
d06e25e
Merge branch 'master' into config-options-for-aggregations-v2
SandeepTuniki Jul 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions pipeline/workflow/aggregation-helper/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,29 @@

FROM python:3.12-slim

# Copy uv binary
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

# Allow statements and log messages to immediately appear in the logs
ENV PYTHONUNBUFFERED True

WORKDIR /app

# Copy local code to the container image
COPY main.py .
# Copy dependency definition files to leverage Docker layer caching
COPY ingestion-helper/pyproject.toml ingestion-helper/uv.lock ingestion-helper/__init__.py ./

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vish-cs I had to make a few changes to this Dockerfile (as well as other related files) as it required copying from multiple folders (ingestion-helper and aggregation-helper).

I tested them locally and ensured I could build the container and run it.


# Install production dependencies (without the project itself)
RUN uv sync --no-dev --no-install-project

# Copy ingestion-helper code (includes aggregation package) and main.py
COPY ingestion-helper/ .
COPY aggregation-helper/main.py .

# Install the project itself
RUN uv sync --no-dev

# Place the virtual environment's bin directory on the PATH
ENV PATH="/app/.venv/bin:$PATH"

# Run the script
# Run the Cloud Run Job script
ENTRYPOINT ["python", "main.py"]
1 change: 1 addition & 0 deletions pipeline/workflow/aggregation-helper/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ steps:
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-f', 'aggregation-helper/Dockerfile',
'-t', '${_AR_REPO_URL}/${_IMAGE_NAME}:${_TAG}',
'-t', '${_AR_REPO_URL}/${_IMAGE_NAME}:${_VERSION}',
'.'
Expand Down
59 changes: 48 additions & 11 deletions pipeline/workflow/aggregation-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Aggregation Helper Cloud Run Job skeleton."""
"""Aggregation Helper Cloud Run Job execution entry point."""

import argparse
import json
import logging
import os
import sys

from aggregation import AggregationOrchestrator


def main():
logging.basicConfig(level=logging.INFO)
logging.info("Starting Aggregation Helper Job")
logging.info("Starting Aggregation Helper Cloud Run Job...")

parser = argparse.ArgumentParser(description="Run aggregation helper job.")
parser.add_argument("--import_list", help="JSON string representing the list of imports to process.")

parser.add_argument(
"--import_list",
help="JSON string representing the list of imports to process."
)
parser.add_argument(
"--dry_run",
action="store_true",
default=True,
help="Run in dry-run mode without executing jobs (default: True)."
)
parser.add_argument(
"--execute",
action="store_false",
dest="dry_run",
help="Disable dry-run mode and execute BigQuery aggregation jobs."
)

args = parser.parse_args()

if not args.import_list:
Expand All @@ -34,7 +53,7 @@ def main():

try:
import_list = json.loads(args.import_list)
logging.info(f"Received import list: {import_list}")
logging.info(f"Received active imports to process: {import_list}")
except json.JSONDecodeError as e:
logging.error(f"Failed to parse import_list JSON: {e}")
sys.exit(1)
Expand All @@ -43,12 +62,30 @@ def main():
logging.error("Parsed import_list is not a list")
sys.exit(1)

# Dummy logic
logging.info("Processing aggregation (dummy)...")
for imp in import_list:
logging.info(f"Processing import: {imp}")

logging.info("Aggregation Helper Job completed successfully.")
connection_id = os.environ.get("BQ_SPANNER_CONN_ID")
project_id = os.environ.get("PROJECT_ID")
instance_id = os.environ.get("SPANNER_INSTANCE_ID")
database_id = os.environ.get("SPANNER_DATABASE_ID") or os.environ.get("SPANNER_GRAPH_DATABASE_ID")
location = os.environ.get("LOCATION")

if not all([connection_id, project_id, instance_id, database_id]):
logging.error(
f"Missing required environment variables. connection_id={connection_id}, "
f"project_id={project_id}, instance_id={instance_id}, database_id={database_id}"
)
sys.exit(1)

orchestrator = AggregationOrchestrator(
connection_id=connection_id,
project_id=project_id,
instance_id=instance_id,
database_id=database_id,
location=location,
)

logging.info(f"Executing AggregationOrchestrator pipeline (dry_run={args.dry_run}) for imports: {import_list}")
orchestrator.run(active_imports=import_list, dry_run=args.dry_run)
logging.info("Aggregation Helper Cloud Run Job completed successfully.")

if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion pipeline/workflow/build-services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ steps:

- id: 'build-aggregation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['builds', 'submit', 'aggregation-helper', '--config', 'aggregation-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}']
args: ['builds', 'submit', '.', '--config', 'aggregation-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}']
dir: 'pipeline/workflow'
waitFor: ['-']

Expand Down
101 changes: 101 additions & 0 deletions pipeline/workflow/ingestion-helper/aggregation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Aggregations

This module orchestrates the execution of Data Commons aggregations through BigQuery Federation. The aggregations include place rollups, statistical variable aggregations, entity aggregations, linked edges, and metadata summaries.

## Core Concepts

* **Sequential Stages**: Aggregations are executed sequentially by their `stage` number (e.g., Stage 1 steps are guaranteed to complete before Stage 2 steps begin). This is useful when later steps depend on the output of earlier ones.
* **Parallel Execution**: All aggregation steps configured in the same stage are executed in parallel to maximize performance.
* **Per-Import Isolation**: Aggregations run independently for each active import dataset.

---

## Configuration Guide (`configs/*.yaml`)

The aggregation pipeline is configured via YAML files in the `configs/` directory (`place.yaml`, `statvar.yaml`, `common.yaml`, etc.). Each file defines a top-level `calculations:` list.

### Common Configuration Fields
Every calculation step supports these common fields:
* `type` (string, Required): The type of calculation step (e.g. `PLACE_AGGREGATION`, `STAT_VAR_AGGREGATION`, `LINKED_EDGES`).
* `stage` (integer, Optional, default: 1): The sequential stage number. Steps in lower stages are guaranteed to finish before higher stages start.
* `input_imports` (list of strings, Required): The list of import names this step applies to. Use `["*"]` (wildcard) to apply the step to **all** active imports.
* `output_import` (string, Optional): The output import dataset name to write aggregated observations under.
* `disabled` (boolean, Optional, default: false): Set to `true` to temporarily disable a step without deleting it.

---

### Supported Calculation Types

#### 1. Place Aggregation (`PLACE_AGGREGATION`)
Aggregates and rolls up statistical data from a smaller place type (source) to a larger place type (destination).
* **Fields**:
* `from_place_types` (string, Required): The source place type (e.g., `State`).
* `to_place_types` (string, Required): The destination place type (e.g., `Country`).
* `allow_multiple_to_places` (boolean, Optional, default: false): Allows mapping to multiple parent places if true.
* **Example**:
```yaml
- type: PLACE_AGGREGATION
stage: 1
input_imports:
- CensusACS5YearSurvey
output_import: CensusACS5YearSurvey_AggCountry
place_aggregation:
from_place_types: State
to_place_types: Country
```

#### 2. Statistical Variable Aggregation (`STAT_VAR_AGGREGATION`)
Aggregates raw statistical variables into a summarized ancestor variable.
* **Fields**:
* `ancestor_sv_id` (string, Required): The ID of the parent/summary statistical variable (e.g., `Count_Person`).
* `source_sv_ids` (list of strings, Required): The list of individual statistical variables to sum up.
* `skip_all_sources_present_check` (boolean, Optional, default: false): If true, aggregates even if some source variables are missing.
* **Example**:
```yaml
- type: STAT_VAR_AGGREGATION
stage: 2
input_imports:
- CensusACS5YearSurvey
output_import: CensusACS5YearSurvey_StatVarAgg
stat_var_aggregation:
aggregations:
- ancestor_sv_id: Count_Person
source_sv_ids:
- Count_Person_Male
- Count_Person_Female
```

#### 3. Common Aggregations (`LINKED_EDGES`, `PROVENANCE_SUMMARY`, `STAT_VAR_GROUPS`)
Common graph structure, lineage, and UI group hierarchy rollups defined in `common.yaml`.
* **Example**:
```yaml
- type: LINKED_EDGES
stage: 1
input_imports:
- "*"
```

---

## Local Configuration Validation

The orchestrator strictly validates configuration files against `schema.json`. If there is any syntax error, type mismatch, or missing required field, validation will fail.

### Running the Validator Locally
You can validate all configuration files locally using the built-in CLI tool:

1. **Navigate to the ingestion-helper root**:
```bash
cd pipeline/workflow/ingestion-helper
```
2. **Run the validator**:
```bash
python3 -m aggregation.validator

# Sample output:
# Validating 7 configuration file(s) in 'aggregation/configs' against 'schema.json'...
# ✓ common.yaml (3 calculation steps)
# ✓ place.yaml (16 calculation steps)
# ✓ statvar.yaml (21 calculation steps)
# [SUCCESS] All 7 configuration file(s) passed validation! (64 calculation steps total)
```
6 changes: 5 additions & 1 deletion pipeline/workflow/ingestion-helper/aggregation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from .place_aggregation_generator import PlaceAggregationGenerator
from .stat_var_group_generator import StatVarGroupGenerator
from .stat_var_calculation_generator import StatVarCalculationGenerator
from .orchestrator import AggregationOrchestrator
from .validator import validate_config

__all__ = [
'BigQueryExecutor',
Expand All @@ -32,5 +34,7 @@
'StatVarAggregator',
'PlaceAggregationGenerator',
'StatVarGroupGenerator',
'StatVarCalculationGenerator'
'StatVarCalculationGenerator',
'AggregationOrchestrator',
'validate_config',
]
10 changes: 8 additions & 2 deletions pipeline/workflow/ingestion-helper/aggregation/bq_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ def __init__(self,
self.location = location
# TODO: Remove run_sequential logic once DCP migrates to async execution.
self.run_sequential = run_sequential
self.client = bigquery.Client(project=self.project_id,
location=self.location)
self._client: Optional[bigquery.Client] = None

@property
def client(self) -> bigquery.Client:
"""Lazily initializes and returns the BigQuery client."""
if self._client is None:
self._client = bigquery.Client(project=self.project_id, location=self.location)
return self._client

def get_spanner_destination_uri(self) -> str:
"""Returns the Spanner destination URI for EXPORT DATA."""
Expand Down
21 changes: 21 additions & 0 deletions pipeline/workflow/ingestion-helper/aggregation/configs/common.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
calculations:
# Generates linkedContainedInPlace, linkedMemberOf, etc.
- name: "Global: ContainedInPlace & MemberOf Graph Linked Edges"
type: LINKED_EDGES
stage: 1
input_imports:
- "*"

# Generates summary statistics in the Cache table
- name: "Global: Cache Provenance & Lineage Summary"
type: PROVENANCE_SUMMARY
stage: 1
input_imports:
- "*"

# Generates the Statistical Variable hierarchy/verticals
- name: "Global: StatVar Group Hierarchy & Verticals"
type: STAT_VAR_GROUPS
stage: 1
input_imports:
- "*"
Loading