Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cognite/extractorutils/unstable/core/_log_upload_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files."""

import logging
from datetime import date

from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError

_logger = logging.getLogger(__name__)

MAX_DATE_RANGE_DAYS = 7
"""Maximum number of calendar days a single ``fetch_logs`` invocation may cover."""

_FETCH_LOGS_DESCRIPTION = (
f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation."
)


def _parse_date(raw: str, field: str) -> date:
try:
return date.fromisoformat(raw)
except (ValueError, TypeError):
raise ActionError(
f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)",
error_type="invalid_parameter",
) from None
Comment thread
vikramlc-cognite marked this conversation as resolved.


def fetch_logs_action(ctx: ActionContext) -> None:
"""Validate parameters and upload rotated log files for the requested date range to CDF Files."""
params = ctx.call_metadata or {}

start_date_raw = params.get("start_date")
end_date_raw = params.get("end_date")

if start_date_raw is None:
raise ActionError("Missing required parameter: start_date", error_type="missing_parameter")
if end_date_raw is None:
raise ActionError("Missing required parameter: end_date", error_type="missing_parameter")

start_date = _parse_date(start_date_raw, "start_date")
end_date = _parse_date(end_date_raw, "end_date")

if end_date < start_date:
raise ActionError(
f"end_date ({end_date}) must be on or after start_date ({start_date})",
error_type="invalid_date_range",
)

num_days = (end_date - start_date).days + 1
if num_days > MAX_DATE_RANGE_DAYS:
raise ActionError(
f"Date range of {num_days} days exceeds the maximum of {MAX_DATE_RANGE_DAYS} days; "
"use multiple invocations for longer spans",
error_type="invalid_date_range",
)
Comment thread
vikramlc-cognite marked this conversation as resolved.

_logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days)
19 changes: 18 additions & 1 deletion cognite/extractorutils/unstable/core/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
if TYPE_CHECKING:
from cognite.extractorutils.unstable.core.base import Extractor

__all__ = ["ActionContext", "ActionTarget", "CustomAction"]
__all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"]


class ActionContext(CogniteLogger):
Expand Down Expand Up @@ -56,6 +56,23 @@ def _new_error(
)


class ActionError(Exception):
"""Deliberate action failure with structured metadata for Odin result reporting."""

def __init__(self, message: str, *, error_type: str, details: str | None = None) -> None:
super().__init__(message)
self.error_type = error_type
self.details = details

@property
def result_metadata(self) -> dict[str, str]:
"""Structured metadata dict for the action update."""
meta: dict[str, str] = {"error_type": self.error_type}
if self.details is not None:
meta["error_detail"] = self.details
return meta


ActionTarget = Callable[["ActionContext"], None]


Expand Down
23 changes: 22 additions & 1 deletion cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def my_task_function(self, task_context: TaskContext) -> None:
from cognite.extractorutils.unstable.core._dto import (
Task as DtoTask,
)
from cognite.extractorutils.unstable.core._log_upload_action import _FETCH_LOGS_DESCRIPTION, fetch_logs_action
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction
from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction
from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.logger import CogniteLogger, RobustFileHandler
Expand Down Expand Up @@ -205,6 +206,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker
)

self.__init_tasks__()
self._register_builtin_actions()
self.__init_actions__()

def _setup_cancellation_watcher(self, cancel_event: MpEvent) -> None:
Expand Down Expand Up @@ -346,6 +348,16 @@ def __init_tasks__(self) -> None:
"""
pass

def _register_builtin_actions(self) -> None:
"""Register framework-level actions available on every extractor."""
self.add_action(
CustomAction(
name="fetch_logs",
target=fetch_logs_action,
description=_FETCH_LOGS_DESCRIPTION,
)
)

def __init_actions__(self) -> None:
Comment on lines +351 to 361

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit conflicting, its a built-in action being regarded as a custom action, which is a bit weird

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also have CogniteActions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_logs uses the identical dispatch mechanism as user-registered actions (callable + ActionContext). The only difference is registration time (automatic in init vs explicit in init_actions). Introducing a parallel type (CogniteActions) hierarchy for the same runtime behaviour adds complexity without changing anything observable.

"""
This method should be overridden by subclasses to register custom actions.
Expand Down Expand Up @@ -667,6 +679,15 @@ def _handle_custom_action(self, action: Action) -> None:
self._checkin_worker.queue_action_update(
ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded)
)
except ActionError as e:
self._checkin_worker.queue_action_update(
ActionUpdate(
external_id=action.external_id,
status=ActionStatus.failed,
result_message=str(e),
result_metadata=e.result_metadata,
)
)
except Exception as e:
self._checkin_worker.queue_action_update(
ActionUpdate(
Expand Down
16 changes: 15 additions & 1 deletion tests/test_unstable/test_action_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate
from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction
from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction
from cognite.extractorutils.unstable.core.base import FullConfig
from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext

Expand Down Expand Up @@ -170,6 +170,20 @@ def target(ctx: ActionContext) -> None:
assert expected_message in (updates[-1].result_message or "")


def test_action_error_sets_result_metadata_and_keeps_failed_status() -> None:
def target(ctx: ActionContext) -> None:
raise ActionError("bad input", error_type="invalid_parameter")

extractor = _make_extractor()
extractor.add_action(CustomAction(name="strict", target=target))
extractor._dispatch_single_action(_make_action("act-err", "strict"))

updates = _queued_updates(extractor)
failed = next(u for u in updates if u.status == ActionStatus.failed)
assert failed.result_metadata == {"error_type": "invalid_parameter"}
assert failed.result_message == "bad input"


def test_custom_action_receives_call_metadata_in_context() -> None:
received_metadata: list[dict | None] = []

Expand Down
36 changes: 22 additions & 14 deletions tests/test_unstable/test_action_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ def _startup_request(extractor: Extractor) -> StartupRequest:
),
],
)
def test_available_actions_none_without_scheduled_tasks(extra_tasks: list) -> None:
def test_available_actions_without_scheduled_tasks_only_has_builtins(extra_tasks: list) -> None:
extractor = _make_extractor()
for task in extra_tasks:
extractor.add_task(task)
assert _startup_request(extractor).available_actions is None
req = _startup_request(extractor)
assert req.available_actions is not None
# Only built-in actions present — no start/stop actions from scheduled tasks
task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom}
assert task_action_names == set()


def test_two_scheduled_tasks_produce_four_available_actions() -> None:
def test_two_scheduled_tasks_produce_four_task_start_stop_actions() -> None:
extractor = _make_extractor()
extractor.add_task(ScheduledTask.from_interval(interval="1h", name="alpha", target=lambda _: None))
extractor.add_task(ScheduledTask.from_interval(interval="2h", name="beta", target=lambda _: None))
req = _startup_request(extractor)
assert req.available_actions is not None
assert len(req.available_actions) == 4
assert {a.name for a in req.available_actions} == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"}
task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom}
assert task_action_names == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"}


@pytest.mark.parametrize(
Expand All @@ -79,19 +83,23 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None:
extractor.add_action(CustomAction(name="flush", target=lambda _: None))
req = _startup_request(extractor)
assert req.available_actions is not None
assert len(req.available_actions) == 3
names = [a.name for a in req.available_actions]
assert names == ["Start sync", "Stop sync", "flush"]
# Ordering: scheduled-task start/stop actions, then _custom_actions in registration order
# (_custom_actions = built-ins registered first, then user-registered actions)
assert names.index("Start sync") < names.index("flush")
assert names.index("Stop sync") < names.index("flush")
assert names.index("fetch_logs") < names.index("flush")


def test_custom_action_appears_with_correct_type_and_description() -> None:
extractor = _make_extractor()
extractor.add_action(CustomAction(name="flush cache", target=lambda _: None, description="Clears state"))
actions = _startup_request(extractor).available_actions
assert actions is not None and len(actions) == 1
assert actions[0].name == "flush cache"
assert actions[0].type == ActionType.custom
assert actions[0].description == "Clears state"
assert actions is not None
by_name = {a.name: a for a in actions}
assert "flush cache" in by_name
assert by_name["flush cache"].type == ActionType.custom
assert by_name["flush cache"].description == "Clears state"


def test_init_actions_hook_called_after_init_tasks_and_can_register_actions() -> None:
Expand All @@ -107,15 +115,15 @@ def __init_actions__(self) -> None:

extractor = _make_extractor(_Ext)
assert call_order == ["tasks", "actions"]
assert len(extractor._custom_actions) == 1
assert extractor._custom_actions[0].name == "ping"
assert any(a.name == "ping" for a in extractor._custom_actions)


def test_multiple_add_action_calls_accumulate_in_registration_order() -> None:
extractor = _make_extractor()
for name in ("a1", "a2", "a3"):
extractor.add_action(CustomAction(name=name, target=lambda _: None))
assert [a.name for a in extractor._custom_actions] == ["a1", "a2", "a3"]
names = [a.name for a in extractor._custom_actions]
assert names.index("a1") < names.index("a2") < names.index("a3")


def test_add_action_raises_on_duplicate_name() -> None:
Expand Down
Loading
Loading