|
29 | 29 | ReRunRequest, |
30 | 30 | ScheduleQueriesSearchRequest, |
31 | 31 | Workflow, |
32 | | - WorkflowMetadata, |
33 | 32 | WorkflowResponse, |
34 | 33 | WorkflowRunResponse, |
35 | 34 | WorkflowSchedule, |
@@ -417,37 +416,39 @@ def monitor( |
417 | 416 | Monitor the status of the workflow's run. |
418 | 417 |
|
419 | 418 | :param workflow_response: The workflow_response returned from running the workflow |
420 | | - :param workflow_name: name of the workflow to be monitored |
421 | 419 | :param logger: the logger to log status information |
422 | | - (logging.INFO for summary info. logging:DEBUG for detail info) |
| 420 | + (logging.INFO for summary info. logging.DEBUG for detail info) |
| 421 | + :param workflow_name: name of the workflow to be monitored |
423 | 422 | :returns: the status at completion or None if the workflow wasn't run |
424 | 423 | :raises ValidationError: If the provided `workflow_response`, `logger` is invalid |
425 | 424 | :raises AtlanError: on any API communication issue |
426 | 425 | """ |
427 | | - if workflow_name and not workflow_response: |
428 | | - workflow_response = WorkflowResponse( |
429 | | - metadata=WorkflowMetadata(name=workflow_name) |
430 | | - ) |
| 426 | + name = workflow_name or ( |
| 427 | + workflow_response.metadata.name |
| 428 | + if workflow_response and workflow_response.metadata |
| 429 | + else None |
| 430 | + ) |
431 | 431 |
|
432 | | - if workflow_response.metadata and workflow_response.metadata.name: # type: ignore |
433 | | - name = workflow_response.metadata.name # type: ignore |
434 | | - status: Optional[AtlanWorkflowPhase] = None |
435 | | - while status not in { |
436 | | - AtlanWorkflowPhase.SUCCESS, |
437 | | - AtlanWorkflowPhase.ERROR, |
438 | | - AtlanWorkflowPhase.FAILED, |
439 | | - }: |
440 | | - sleep(MONITOR_SLEEP_SECONDS) |
441 | | - if run_details := self._get_run_details(name): |
442 | | - status = run_details.status |
| 432 | + if not name: |
| 433 | + if logger: |
| 434 | + logger.info("Skipping workflow monitoring — nothing to monitor.") |
| 435 | + return None |
| 436 | + |
| 437 | + status: Optional[AtlanWorkflowPhase] = None |
| 438 | + while status not in { |
| 439 | + AtlanWorkflowPhase.SUCCESS, |
| 440 | + AtlanWorkflowPhase.ERROR, |
| 441 | + AtlanWorkflowPhase.FAILED, |
| 442 | + }: |
| 443 | + sleep(MONITOR_SLEEP_SECONDS) |
| 444 | + if run_details := self._get_run_details(name): |
| 445 | + status = run_details.status |
443 | 446 | if logger: |
444 | 447 | logger.debug("Workflow status: %s", status) |
445 | | - if logger: |
446 | | - logger.info("Workflow completion status: %s", status) |
447 | | - return status |
| 448 | + |
448 | 449 | if logger: |
449 | | - logger.info("Skipping workflow monitoring — nothing to monitor.") |
450 | | - return None |
| 450 | + logger.info("Workflow completion status: %s", status) |
| 451 | + return status |
451 | 452 |
|
452 | 453 | def _get_run_details(self, name: str) -> Optional[WorkflowSearchResult]: |
453 | 454 | return self._find_latest_run(workflow_name=name) |
|
0 commit comments