[DCP Ingestion] Update Ingestion History to have unique records per run and update the record by status / stage#606
Conversation
…t. Requires a different schema with the comlpletion timestamp, stage and status
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| CodeStyle | 1 minor |
🟢 Metrics 21 complexity
Metric Results Complexity 21
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request refactors the ingestion history tracking by updating the Spanner schema for IngestionHistory to use WorkflowExecutionID as the primary key, adding status and stage fields, and introducing a new /imports/ingestion-history endpoint. Feedback focuses on preventing accidental data loss by only updating the Stage column when it is provided, correctly using the RETRY state instead of FAILURE for the ingestion workflow, avoiding writing dummy zero metrics on failure, and correcting optional type hints.
| columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"] | ||
| values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP] |
There was a problem hiding this comment.
If stage is None (e.g., in a subsequent update call that only updates the overall status), unconditionally including Stage in the columns list will overwrite the existing Stage value in the database with NULL. To prevent accidental data loss, only include Stage in the columns and values lists if it is not None.
| columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"] | |
| values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP] | |
| columns = ["WorkflowExecutionID", "Status", "Timestamp"] | |
| values = [workflow_id, status, spanner.COMMIT_TIMESTAMP] | |
| if stage: | |
| columns.append("Stage") | |
| values.append(stage) |
| if status in ('SUCCESS', 'FAILURE'): | ||
| columns.append("IngestionFailure") | ||
| values.append(status == 'FAILURE') |
There was a problem hiding this comment.
In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow. If status is 'RETRY', we should record it as an ingestion failure.
| if status in ('SUCCESS', 'FAILURE'): | |
| columns.append("IngestionFailure") | |
| values.append(status == 'FAILURE') | |
| if status in ('SUCCESS', 'RETRY'): | |
| columns.append("IngestionFailure") | |
| values.append(status == 'RETRY') |
References
- In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow.
| ingested_imports = _extract_import_names(req.importList) | ||
|
|
||
| metrics = None | ||
| if req.status in (IngestionState.SUCCESS, IngestionState.FAILURE) and req.stage == IngestionStage.DATAFLOW and req.jobId: |
There was a problem hiding this comment.
In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow. Therefore, we should check for IngestionState.SUCCESS and IngestionState.RETRY instead of IngestionState.FAILURE.
| if req.status in (IngestionState.SUCCESS, IngestionState.FAILURE) and req.stage == IngestionStage.DATAFLOW and req.jobId: | |
| if req.status in (IngestionState.SUCCESS, IngestionState.RETRY) and req.stage == IngestionStage.DATAFLOW and req.jobId: |
References
- In the ingestion workflow, only check for the 'RETRY' state to identify failed imports. Do not include the 'FAILURE' state, as 'FAILURE' is reserved for batch jobs which do not enter the ingestion workflow.
| def update_ingestion_history(self, | ||
| workflow_id: str, | ||
| status: str, | ||
| stage: str, | ||
| job_id: str = None, | ||
| ingested_imports: list = None, | ||
| metrics: dict = None): |
There was a problem hiding this comment.
The type hint for stage is str, but it can be passed as None from the router. Updating the type hints to use str | None = None (and similarly for other optional parameters) makes the signature more accurate and prevents type checker warnings.
| def update_ingestion_history(self, | |
| workflow_id: str, | |
| status: str, | |
| stage: str, | |
| job_id: str = None, | |
| ingested_imports: list = None, | |
| metrics: dict = None): | |
| def update_ingestion_history(self, | |
| workflow_id: str, | |
| status: str, | |
| stage: str | None = None, | |
| job_id: str | None = None, | |
| ingested_imports: list | None = None, | |
| metrics: dict | None = None): |
| metrics = { | ||
| 'execution_time': 0, | ||
| 'node_count': 0, | ||
| 'edge_count': 0, | ||
| 'obs_count': 0 | ||
| } |
There was a problem hiding this comment.
If fetching metrics fails, setting metrics to a dictionary of all 0s writes dummy 0 values to the database, which corrupts the data quality of the history (making it look like the job processed 0 nodes/edges/observations). It is much better to set metrics = None so that the database columns are left as NULL (or unmodified).
metrics = None| WorkflowExecutionID STRING(1024) NOT NULL, | ||
| Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), | ||
| IngestionFailure Bool, | ||
| Status STRING(1024), |
There was a problem hiding this comment.
Can you add constraints to Status and Stage with the valid string values? Something like:
CONSTRAINT ValidStatus CHECK(Status IN ('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'RETRY'))
(see: https://docs.cloud.google.com/spanner/docs/check-constraint/how-to)
| columns = ["WorkflowExecutionID", "Status", "Stage", "Timestamp"] | ||
| values = [workflow_id, status, stage, spanner.COMMIT_TIMESTAMP] | ||
|
|
||
| if status in ('SUCCESS', 'FAILURE'): |
There was a problem hiding this comment.
Can you add a comment explaining the rationale here?
| PENDING = "PENDING" | ||
| RUNNING = "RUNNING" | ||
| FAILURE = "FAILURE" | ||
| RETRY = "RETRY" |
| IngestionFailure Bool NOT NULL, | ||
| WorkflowExecutionID STRING(1024) NOT NULL, | ||
| Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), | ||
| IngestionFailure Bool, |
| CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ), | ||
| IngestionFailure Bool NOT NULL, | ||
| WorkflowExecutionID STRING(1024) NOT NULL, | ||
| Timestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), |
There was a problem hiding this comment.
Can we track both creation & completion timestamps?
No description provided.