Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cognite/extractorutils/unstable/core/_log_upload_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,18 @@ def fetch_logs_action(ctx: ActionContext) -> None:
integration_external_id = ctx._extractor.connection_config.integration.external_id
cdf_client = ctx._extractor.cognite_client

upload_results: list[_FileUploadResult] = [
_upload_candidate(
candidate,
integration_external_id,
cdf_client,
snapshot_size if candidate.is_current else None,
total_candidates = len(candidates)
upload_results: list[_FileUploadResult] = []
for i, candidate in enumerate(candidates, 1):
upload_results.append(
_upload_candidate(
candidate,
integration_external_id,
cdf_client,
snapshot_size if candidate.is_current else None,
)
)
for candidate in candidates
]
ctx.report_progress(f"Uploading: {i}/{total_candidates} files complete")

counts = Counter(r.status for r in upload_results)

Expand Down
11 changes: 11 additions & 0 deletions cognite/extractorutils/unstable/core/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Generic

from cognite.extractorutils.unstable.core._dto import ActionStatus, ActionUpdate
from cognite.extractorutils.unstable.configuration.models import ConfigType
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.logger import CogniteLogger
Expand Down Expand Up @@ -68,6 +69,16 @@ def set_result(self, message: str, *, metadata: dict[str, str] | None = None) ->
self._result_message = message
self._result_metadata = metadata

def report_progress(self, message: str) -> None:
"""Queue an intermediate progress update while the action is still running."""
self._extractor._checkin_worker.queue_action_update(
ActionUpdate(
external_id=self.external_id,
status=ActionStatus.running,
result_message=message,
)
)


class ActionError(Exception):
"""Deliberate action failure with structured metadata for Odin result reporting."""
Expand Down
49 changes: 49 additions & 0 deletions tests/test_unstable/test_log_upload_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,52 @@ def test_fetch_logs_action_upload_failure_still_succeeds(tmp_path: Path) -> None
assert succeeded.result_metadata is not None
assert succeeded.result_metadata["failed_files"] == "1"
assert succeeded.result_metadata["uploaded_files"] == "0"


def test_report_progress_queues_running_action_update() -> None:
extractor = _make_extractor()
ctx = ActionContext(
action=CustomAction(name="test-action", target=lambda ctx: None),
extractor=extractor,
external_id="act-progress",
)
ctx.report_progress("Uploading: 1/3 files complete")

updates = _queued_updates(extractor)
assert len(updates) == 1
assert updates[0].status == ActionStatus.running
assert updates[0].result_message == "Uploading: 1/3 files complete"
assert updates[0].external_id == "act-progress"


def test_upload_action_reports_intermediate_progress_for_multi_file_range(tmp_path: Path) -> None:
log_path = tmp_path / "extractor.log"
(tmp_path / "extractor.log.2026-06-10").write_bytes(b"day 1 data")
(tmp_path / "extractor.log.2026-06-11").write_bytes(b"day 2 data")
extractor = _make_extractor(log_path=log_path)
updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-11"})
progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None]
assert len(progress_updates) == 2
messages = {u.result_message for u in progress_updates}
assert "Uploading: 1/2 files complete" in messages
assert "Uploading: 2/2 files complete" in messages


def test_upload_action_reports_progress_for_single_file(tmp_path: Path) -> None:
log_path = tmp_path / "extractor.log"
(tmp_path / "extractor.log.2026-06-10").write_bytes(b"day 1 data")
extractor = _make_extractor(log_path=log_path)
updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"})
progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None]
assert len(progress_updates) == 1
assert progress_updates[0].result_message == "Uploading: 1/1 files complete"


def test_upload_action_no_progress_updates_when_all_files_missing(tmp_path: Path) -> None:
# No files on disk → no candidates → no futures → no progress updates.
log_path = tmp_path / "extractor.log"
extractor = _make_extractor(log_path=log_path)
updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"})
progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None]
assert progress_updates == []
assert any(u.status == ActionStatus.succeeded for u in updates)