-
Notifications
You must be signed in to change notification settings - Fork 6
feat(odin): Register fetch logs action and validate input dates #541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9d34837
a480f65
acbecd5
cb33f35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" | ||
|
|
||
| import logging | ||
| from datetime import date | ||
|
|
||
| from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
| MAX_DATE_RANGE_DAYS = 7 | ||
| """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" | ||
|
|
||
| _FETCH_LOGS_DESCRIPTION = ( | ||
| f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." | ||
| ) | ||
|
|
||
|
|
||
| def _parse_date(raw: str, field: str) -> date: | ||
| try: | ||
| return date.fromisoformat(raw) | ||
| except (ValueError, TypeError): | ||
| raise ActionError( | ||
| f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", | ||
| error_type="invalid_parameter", | ||
| ) from None | ||
|
|
||
|
|
||
| def fetch_logs_action(ctx: ActionContext) -> None: | ||
| """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" | ||
| params = ctx.call_metadata or {} | ||
|
|
||
| start_date_raw = params.get("start_date") | ||
| end_date_raw = params.get("end_date") | ||
|
|
||
| if start_date_raw is None: | ||
| raise ActionError("Missing required parameter: start_date", error_type="missing_parameter") | ||
| if end_date_raw is None: | ||
| raise ActionError("Missing required parameter: end_date", error_type="missing_parameter") | ||
|
|
||
| start_date = _parse_date(start_date_raw, "start_date") | ||
| end_date = _parse_date(end_date_raw, "end_date") | ||
|
|
||
| if end_date < start_date: | ||
| raise ActionError( | ||
| f"end_date ({end_date}) must be on or after start_date ({start_date})", | ||
| error_type="invalid_date_range", | ||
| ) | ||
|
|
||
| num_days = (end_date - start_date).days + 1 | ||
| if num_days > MAX_DATE_RANGE_DAYS: | ||
| raise ActionError( | ||
| f"Date range of {num_days} days exceeds the maximum of {MAX_DATE_RANGE_DAYS} days; " | ||
| "use multiple invocations for longer spans", | ||
| error_type="invalid_date_range", | ||
| ) | ||
|
vikramlc-cognite marked this conversation as resolved.
|
||
|
|
||
| _logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,8 +98,9 @@ def my_task_function(self, task_context: TaskContext) -> None: | |
| from cognite.extractorutils.unstable.core._dto import ( | ||
| Task as DtoTask, | ||
| ) | ||
| from cognite.extractorutils.unstable.core._log_upload_action import _FETCH_LOGS_DESCRIPTION, fetch_logs_action | ||
| from cognite.extractorutils.unstable.core._messaging import RuntimeMessage | ||
| from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction | ||
| from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction | ||
| from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker | ||
| from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel | ||
| from cognite.extractorutils.unstable.core.logger import CogniteLogger, RobustFileHandler | ||
|
|
@@ -205,6 +206,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker | |
| ) | ||
|
|
||
| self.__init_tasks__() | ||
| self._register_builtin_actions() | ||
| self.__init_actions__() | ||
|
|
||
| def _setup_cancellation_watcher(self, cancel_event: MpEvent) -> None: | ||
|
|
@@ -346,6 +348,16 @@ def __init_tasks__(self) -> None: | |
| """ | ||
| pass | ||
|
|
||
| def _register_builtin_actions(self) -> None: | ||
| """Register framework-level actions available on every extractor.""" | ||
| self.add_action( | ||
| CustomAction( | ||
| name="fetch_logs", | ||
| target=fetch_logs_action, | ||
| description=_FETCH_LOGS_DESCRIPTION, | ||
| ) | ||
| ) | ||
|
|
||
| def __init_actions__(self) -> None: | ||
|
Comment on lines
+351
to
361
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit conflicting, its a built-in action being regarded as a custom action, which is a bit weird
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can also have CogniteActions?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fetch_logs uses the identical dispatch mechanism as user-registered actions (callable + ActionContext). The only difference is registration time (automatic in init vs explicit in init_actions). Introducing a parallel type (CogniteActions) hierarchy for the same runtime behaviour adds complexity without changing anything observable. |
||
| """ | ||
| This method should be overridden by subclasses to register custom actions. | ||
|
|
@@ -667,6 +679,15 @@ def _handle_custom_action(self, action: Action) -> None: | |
| self._checkin_worker.queue_action_update( | ||
| ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) | ||
| ) | ||
| except ActionError as e: | ||
| self._checkin_worker.queue_action_update( | ||
| ActionUpdate( | ||
| external_id=action.external_id, | ||
| status=ActionStatus.failed, | ||
| result_message=str(e), | ||
| result_metadata=e.result_metadata, | ||
| ) | ||
| ) | ||
| except Exception as e: | ||
| self._checkin_worker.queue_action_update( | ||
| ActionUpdate( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.