Skip to content

[DCP Ingestion] Update Ingestion History to have unique records per run and update the record by status / stage#606

Open
gmechali wants to merge 2 commits into
datacommonsorg:masterfrom
gmechali:ingestionHistory
Open

[DCP Ingestion] Update Ingestion History to have unique records per run and update the record by status / stage#606
gmechali wants to merge 2 commits into
datacommonsorg:masterfrom
gmechali:ingestionHistory

Conversation

@gmechali

@gmechali gmechali commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

No description provided.

gmechali added 2 commits July 2, 2026 00:19
…t. Requires a different schema with the comlpletion timestamp, stage and status
@codacy-production

Copy link
Copy Markdown

Not up to standards ⛔

🔴 Issues 1 minor

Alerts:
⚠ 1 issue (≤ 0 issues of at least minor severity)

Results:
1 new issue

Category Results
CodeStyle 1 minor

View in Codacy

🟢 Metrics 21 complexity

Metric Results
Complexity 21

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the ingestion history tracking by updating the Spanner schema for IngestionHistory to use WorkflowExecutionID as the primary key, adding status and stage fields, and introducing a new /imports/ingestion-history endpoint. Feedback focuses on preventing accidental data loss by only updating the Stage column when it is provided, correctly using the RETRY state instead of FAILURE for the ingestion workflow, avoiding writing dummy zero metrics on failure, and correcting optional type hints.

Comment on lines +307 to +308
columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"]
values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If stage is None (e.g., in a subsequent update call that only updates the overall status), unconditionally including Stage in the columns list will overwrite the existing Stage value in the database with NULL. To prevent accidental data loss, only include Stage in the columns and values lists if it is not None.

Suggested change
columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"]
values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP]
columns = ["WorkflowExecutionID", "Status", "Timestamp"]
values = [workflow_id, status, spanner.COMMIT_TIMESTAMP]
if stage:
columns.append("Stage")
values.append(stage)

Comment on lines +310 to +312
if status in ('SUCCESS', 'FAILURE'):
columns.append("IngestionFailure")
values.append(status == 'FAILURE')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow. If status is 'RETRY', we should record it as an ingestion failure.

Suggested change
if status in ('SUCCESS', 'FAILURE'):
columns.append("IngestionFailure")
values.append(status == 'FAILURE')
if status in ('SUCCESS', 'RETRY'):
columns.append("IngestionFailure")
values.append(status == 'RETRY')
References
  1. In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow.

ingested_imports = _extract_import_names(req.importList)

metrics = None
if req.status in (IngestionState.SUCCESS, IngestionState.FAILURE) and req.stage == IngestionStage.DATAFLOW and req.jobId:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow. Therefore, we should check for IngestionState.SUCCESS and IngestionState.RETRY instead of IngestionState.FAILURE.

Suggested change
if req.status in (IngestionState.SUCCESS, IngestionState.FAILURE) and req.stage == IngestionStage.DATAFLOW and req.jobId:
if req.status in (IngestionState.SUCCESS, IngestionState.RETRY) and req.stage == IngestionStage.DATAFLOW and req.jobId:
References
  1. In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow.

Comment on lines +285 to +291
def update_ingestion_history(self,
workflow_id: str,
status: str,
stage: str,
job_id: str = None,
ingested_imports: list = None,
metrics: dict = None):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for stage is str, but it can be passed as None from the router. Updating the type hints to use str | None = None (and similarly for other optional parameters) makes the signature more accurate and prevents type checker warnings.

Suggested change
def update_ingestion_history(self,
workflow_id: str,
status: str,
stage: str,
job_id: str = None,
ingested_imports: list = None,
metrics: dict = None):
def update_ingestion_history(self,
workflow_id: str,
status: str,
stage: str | None = None,
job_id: str | None = None,
ingested_imports: list | None = None,
metrics: dict | None = None):

Comment on lines +126 to +131
metrics = {
'execution_time': 0,
'node_count': 0,
'edge_count': 0,
'obs_count': 0
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If fetching metrics fails, setting metrics to a dictionary of all 0s writes dummy 0 values to the database, which corrupts the data quality of the history (making it look like the job processed 0 nodes/edges/observations). It is much better to set metrics = None so that the database columns are left as NULL (or unmodified).

            metrics = None

WorkflowExecutionID STRING(1024) NOT NULL,
Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
IngestionFailure Bool,
Status STRING(1024),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add constraints to Status and Stage with the valid string values? Something like:

CONSTRAINT ValidStatus CHECK(Status IN ('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'RETRY'))

(see: https://docs.cloud.google.com/spanner/docs/check-constraint/how-to)

columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"]
values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP]

if status in ('SUCCESS', 'FAILURE'):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining the rationale here?

PENDING = "PENDING"
RUNNING = "RUNNING"
FAILURE = "FAILURE"
RETRY = "RETRY"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is RETRY used?

IngestionFailure Bool NOT NULL,
WorkflowExecutionID STRING(1024) NOT NULL,
Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
IngestionFailure Bool,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove NOT NULL?

CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ),
IngestionFailure Bool NOT NULL,
WorkflowExecutionID STRING(1024) NOT NULL,
Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we track both creation & completion timestamps?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants