|
24 | 24 | ) |
25 | 25 | from pyatlan.errors import ErrorCode |
26 | 26 | from pyatlan.model.enums import AtlanWorkflowPhase, WorkflowPackage |
27 | | -from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Regexp, Term |
| 27 | +from pyatlan.model.search import ( |
| 28 | + Bool, |
| 29 | + Exists, |
| 30 | + NestedQuery, |
| 31 | + Prefix, |
| 32 | + Query, |
| 33 | + Range, |
| 34 | + Regexp, |
| 35 | + Term, |
| 36 | + Terms, |
| 37 | +) |
28 | 38 | from pyatlan.model.workflow import ( |
29 | 39 | ReRunRequest, |
30 | 40 | ScheduleQueriesSearchRequest, |
@@ -149,6 +159,51 @@ def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]: |
149 | 159 | response = self._find_runs(query, size=1) |
150 | 160 | return results[0] if (results := response.hits and response.hits.hits) else None |
151 | 161 |
|
| 162 | + @validate_arguments |
| 163 | + def find_runs_by_status_and_time_range( |
| 164 | + self, |
| 165 | + status: List[AtlanWorkflowPhase], |
| 166 | + started_at: Optional[str] = None, |
| 167 | + finished_at: Optional[str] = None, |
| 168 | + ) -> List[WorkflowSearchResult]: |
| 169 | + """ |
| 170 | + Find workflows by status and optional time filters on startedAt and/or finishedAt. |
| 171 | +
|
| 172 | + :param status: list of the workflow statuses to filter |
| 173 | + :param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h') |
| 174 | + :param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h') |
| 175 | + :returns: list of workflows matching the filters |
| 176 | + :raises ValidationError: if inputs are invalid |
| 177 | + :raises AtlanError: on any API communication issue |
| 178 | + """ |
| 179 | + time_filters = [] |
| 180 | + |
| 181 | + if started_at: |
| 182 | + time_filters.append(Range(field="status.startedAt", gte=started_at)) |
| 183 | + if finished_at: |
| 184 | + time_filters.append(Range(field="status.finishedAt", gte=finished_at)) |
| 185 | + |
| 186 | + run_lookup_query = Bool( |
| 187 | + must=[ |
| 188 | + NestedQuery( |
| 189 | + query=Terms( |
| 190 | + field="metadata.labels.workflows.argoproj.io/phase.keyword", |
| 191 | + values=[s.value for s in status], |
| 192 | + ), |
| 193 | + path="metadata", |
| 194 | + ), |
| 195 | + *time_filters, |
| 196 | + NestedQuery( |
| 197 | + query=Exists(field="metadata.labels.workflows.argoproj.io/creator"), |
| 198 | + path="metadata", |
| 199 | + ), |
| 200 | + ], |
| 201 | + ) |
| 202 | + |
| 203 | + run_lookup_results = self._find_runs(run_lookup_query) |
| 204 | + |
| 205 | + return run_lookup_results.hits and run_lookup_results.hits.hits or [] |
| 206 | + |
152 | 207 | @validate_arguments |
153 | 208 | def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]: |
154 | 209 | """ |
|
0 commit comments