diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py new file mode 100644 index 00000000..9d6b20e1 --- /dev/null +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -0,0 +1,57 @@ +"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" + +import logging +from datetime import date + +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError + +_logger = logging.getLogger(__name__) + +MAX_DATE_RANGE_DAYS = 7 +"""Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" + +_FETCH_LOGS_DESCRIPTION = ( + f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." +) + + +def _parse_date(raw: str, field: str) -> date: + try: + return date.fromisoformat(raw) + except (ValueError, TypeError): + raise ActionError( + f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", + error_type="invalid_parameter", + ) from None + + +def fetch_logs_action(ctx: ActionContext) -> None: + """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" + params = ctx.call_metadata or {} + + start_date_raw = params.get("start_date") + end_date_raw = params.get("end_date") + + if start_date_raw is None: + raise ActionError("Missing required parameter: start_date", error_type="missing_parameter") + if end_date_raw is None: + raise ActionError("Missing required parameter: end_date", error_type="missing_parameter") + + start_date = _parse_date(start_date_raw, "start_date") + end_date = _parse_date(end_date_raw, "end_date") + + if end_date < start_date: + raise ActionError( + f"end_date ({end_date}) must be on or after start_date ({start_date})", + error_type="invalid_date_range", + ) + + num_days = (end_date - start_date).days + 1 + if num_days > MAX_DATE_RANGE_DAYS: + raise ActionError( + f"Date range of {num_days} days exceeds the maximum of {MAX_DATE_RANGE_DAYS} days; " + "use multiple invocations for longer spans", + error_type="invalid_date_range", + ) + + _logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 480f7c11..0193dc67 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -12,7 +12,7 @@ if TYPE_CHECKING: from cognite.extractorutils.unstable.core.base import Extractor -__all__ = ["ActionContext", "ActionTarget", "CustomAction"] +__all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"] class ActionContext(CogniteLogger): @@ -56,6 +56,23 @@ def _new_error( ) +class ActionError(Exception): + """Deliberate action failure with structured metadata for Odin result reporting.""" + + def __init__(self, message: str, *, error_type: str, details: str | None = None) -> None: + super().__init__(message) + self.error_type = error_type + self.details = details + + @property + def result_metadata(self) -> dict[str, str]: + """Structured metadata dict for the action update.""" + meta: dict[str, str] = {"error_type": self.error_type} + if self.details is not None: + meta["error_detail"] = self.details + return meta + + ActionTarget = Callable[["ActionContext"], None] diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index a8549b09..0fff2d52 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -98,8 +98,9 @@ def my_task_function(self, task_context: TaskContext) -> None: from cognite.extractorutils.unstable.core._dto import ( Task as DtoTask, ) +from cognite.extractorutils.unstable.core._log_upload_action import _FETCH_LOGS_DESCRIPTION, fetch_logs_action from cognite.extractorutils.unstable.core._messaging import RuntimeMessage -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger, RobustFileHandler @@ -205,6 +206,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker ) self.__init_tasks__() + self._register_builtin_actions() self.__init_actions__() def _setup_cancellation_watcher(self, cancel_event: MpEvent) -> None: @@ -346,6 +348,16 @@ def __init_tasks__(self) -> None: """ pass + def _register_builtin_actions(self) -> None: + """Register framework-level actions available on every extractor.""" + self.add_action( + CustomAction( + name="fetch_logs", + target=fetch_logs_action, + description=_FETCH_LOGS_DESCRIPTION, + ) + ) + def __init_actions__(self) -> None: """ This method should be overridden by subclasses to register custom actions. @@ -667,6 +679,15 @@ def _handle_custom_action(self, action: Action) -> None: self._checkin_worker.queue_action_update( ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) ) + except ActionError as e: + self._checkin_worker.queue_action_update( + ActionUpdate( + external_id=action.external_id, + status=ActionStatus.failed, + result_message=str(e), + result_metadata=e.result_metadata, + ) + ) except Exception as e: self._checkin_worker.queue_action_update( ActionUpdate( diff --git a/tests/test_unstable/test_action_dispatch.py b/tests/test_unstable/test_action_dispatch.py index e885d617..7b61a1ed 100644 --- a/tests/test_unstable/test_action_dispatch.py +++ b/tests/test_unstable/test_action_dispatch.py @@ -6,7 +6,7 @@ import pytest from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext @@ -170,6 +170,20 @@ def target(ctx: ActionContext) -> None: assert expected_message in (updates[-1].result_message or "") +def test_action_error_sets_result_metadata_and_keeps_failed_status() -> None: + def target(ctx: ActionContext) -> None: + raise ActionError("bad input", error_type="invalid_parameter") + + extractor = _make_extractor() + extractor.add_action(CustomAction(name="strict", target=target)) + extractor._dispatch_single_action(_make_action("act-err", "strict")) + + updates = _queued_updates(extractor) + failed = next(u for u in updates if u.status == ActionStatus.failed) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert failed.result_message == "bad input" + + def test_custom_action_receives_call_metadata_in_context() -> None: received_metadata: list[dict | None] = [] diff --git a/tests/test_unstable/test_action_registration.py b/tests/test_unstable/test_action_registration.py index 6d3142d3..e8223f09 100644 --- a/tests/test_unstable/test_action_registration.py +++ b/tests/test_unstable/test_action_registration.py @@ -39,21 +39,25 @@ def _startup_request(extractor: Extractor) -> StartupRequest: ), ], ) -def test_available_actions_none_without_scheduled_tasks(extra_tasks: list) -> None: +def test_available_actions_without_scheduled_tasks_only_has_builtins(extra_tasks: list) -> None: extractor = _make_extractor() for task in extra_tasks: extractor.add_task(task) - assert _startup_request(extractor).available_actions is None + req = _startup_request(extractor) + assert req.available_actions is not None + # Only built-in actions present — no start/stop actions from scheduled tasks + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == set() -def test_two_scheduled_tasks_produce_four_available_actions() -> None: +def test_two_scheduled_tasks_produce_four_task_start_stop_actions() -> None: extractor = _make_extractor() extractor.add_task(ScheduledTask.from_interval(interval="1h", name="alpha", target=lambda _: None)) extractor.add_task(ScheduledTask.from_interval(interval="2h", name="beta", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 4 - assert {a.name for a in req.available_actions} == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} @pytest.mark.parametrize( @@ -79,19 +83,23 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None: extractor.add_action(CustomAction(name="flush", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 3 names = [a.name for a in req.available_actions] - assert names == ["Start sync", "Stop sync", "flush"] + # Ordering: scheduled-task start/stop actions, then _custom_actions in registration order + # (_custom_actions = built-ins registered first, then user-registered actions) + assert names.index("Start sync") < names.index("flush") + assert names.index("Stop sync") < names.index("flush") + assert names.index("fetch_logs") < names.index("flush") def test_custom_action_appears_with_correct_type_and_description() -> None: extractor = _make_extractor() extractor.add_action(CustomAction(name="flush cache", target=lambda _: None, description="Clears state")) actions = _startup_request(extractor).available_actions - assert actions is not None and len(actions) == 1 - assert actions[0].name == "flush cache" - assert actions[0].type == ActionType.custom - assert actions[0].description == "Clears state" + assert actions is not None + by_name = {a.name: a for a in actions} + assert "flush cache" in by_name + assert by_name["flush cache"].type == ActionType.custom + assert by_name["flush cache"].description == "Clears state" def test_init_actions_hook_called_after_init_tasks_and_can_register_actions() -> None: @@ -107,15 +115,15 @@ def __init_actions__(self) -> None: extractor = _make_extractor(_Ext) assert call_order == ["tasks", "actions"] - assert len(extractor._custom_actions) == 1 - assert extractor._custom_actions[0].name == "ping" + assert any(a.name == "ping" for a in extractor._custom_actions) def test_multiple_add_action_calls_accumulate_in_registration_order() -> None: extractor = _make_extractor() for name in ("a1", "a2", "a3"): extractor.add_action(CustomAction(name=name, target=lambda _: None)) - assert [a.name for a in extractor._custom_actions] == ["a1", "a2", "a3"] + names = [a.name for a in extractor._custom_actions] + assert names.index("a1") < names.index("a2") < names.index("a3") def test_add_action_raises_on_duplicate_name() -> None: diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py new file mode 100644 index 00000000..1fc00915 --- /dev/null +++ b/tests/test_unstable/test_log_upload_action.py @@ -0,0 +1,155 @@ +from datetime import date, timedelta +from unittest.mock import MagicMock + +import pytest + +from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate +from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction +from cognite.extractorutils.unstable.core.base import FullConfig + +from .conftest import TestConfig, TestExtractor + + +def _make_extractor() -> TestExtractor: + conn = MagicMock() + conn.integration.external_id = "test-integration" + full_config = FullConfig( + connection_config=conn, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + ) + return TestExtractor(full_config, MagicMock()) + + +def _queued_updates(extractor: TestExtractor) -> list[ActionUpdate]: + return [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list] + + +def _dispatch(extractor: TestExtractor, call_metadata: dict[str, str] | None) -> list[ActionUpdate]: + action = Action( + external_id="act-1", + action_name="fetch_logs", + status=ActionStatus.pending, + call_metadata=call_metadata, + ) + extractor._dispatch_single_action(action) + return _queued_updates(extractor) + + +def _failed_update(updates: list[ActionUpdate]) -> ActionUpdate: + return next(u for u in updates if u.status == ActionStatus.failed) + + +def test_fetch_logs_registered_as_builtin_with_description() -> None: + extractor = _make_extractor() + action = next((a for a in extractor._custom_actions if a.name == "fetch_logs"), None) + assert action is not None + assert action.description + + +def test_registering_fetch_logs_as_user_action_raises() -> None: + extractor = _make_extractor() + with pytest.raises(ValueError, match="fetch_logs"): + extractor.add_action(CustomAction(name="fetch_logs", target=lambda ctx: None)) + + +@pytest.mark.parametrize( + "call_metadata,missing_field", + [ + (None, "start_date"), + ({"end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10"}, "end_date"), + ], + ids=["no_metadata", "missing_start_date", "missing_end_date"], +) +def test_missing_required_date_reports_missing_parameter( + call_metadata: dict[str, str] | None, missing_field: str +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "missing_parameter"} + assert missing_field in (failed.result_message or "") + + +@pytest.mark.parametrize( + "call_metadata,bad_field", + [ + ({"start_date": "not-a-date", "end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10", "end_date": "2026/06/11"}, "end_date"), + ({"start_date": "2026-13-01", "end_date": "2026-06-11"}, "start_date"), + ], + ids=["invalid_start", "slash_end", "out_of_range_month"], +) +def test_non_iso_date_reports_invalid_parameter(call_metadata: dict[str, str], bad_field: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert bad_field in (failed.result_message or "") + + +def test_parse_date_non_string_type_raises_action_error() -> None: + # Pydantic guards dict[str, str] at the DTO boundary, but _parse_date may be called + # directly, so TypeError from date.fromisoformat must also surface as ActionError. + from cognite.extractorutils.unstable.core._log_upload_action import _parse_date + + with pytest.raises(ActionError) as exc_info: + _parse_date(20260610, "start_date") # type: ignore[arg-type] + assert exc_info.value.error_type == "invalid_parameter" + + +@pytest.mark.parametrize( + "call_metadata,message_contains", + [ + ({"start_date": "2026-06-10", "end_date": "2026-06-09"}, None), + ( + { + "start_date": str(date(2026, 6, 1)), + "end_date": str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS)), + }, + str(MAX_DATE_RANGE_DAYS), + ), + ], + ids=["end_before_start", "exceeds_max_days"], +) +def test_invalid_date_range_reports_invalid_date_range( + call_metadata: dict[str, str], message_contains: str | None +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_date_range"} + if message_contains: + assert message_contains in (failed.result_message or "") + + +@pytest.mark.parametrize( + "start_str,end_str", + [ + ("2026-06-10", "2026-06-10"), + ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), + ], + ids=["single_day", "exact_max_days"], +) +def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_action_error_details_included_in_result_metadata() -> None: + extractor = _make_extractor() + + def raise_with_details(ctx: ActionContext) -> None: + raise ActionError("boom", error_type="unexpected_error", details="inner detail") + + extractor.add_action(CustomAction(name="boom-action", target=raise_with_details)) + action = Action(external_id="act-detail", action_name="boom-action", status=ActionStatus.pending) + extractor._dispatch_single_action(action) + failed = _failed_update(_queued_updates(extractor)) + assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} + assert failed.result_message == "boom"