From 8dee1e766a73b88e67b4a786c8c7ec57a30026c9 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 29 Jun 2026 14:06:40 +0530 Subject: [PATCH 1/2] feat(odin): Update result message and progress --- .../unstable/core/_log_upload_action.py | 7 ++- .../extractorutils/unstable/core/actions.py | 11 +++++ tests/test_unstable/test_log_upload_action.py | 49 +++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index b69fa901..4a87e33c 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -252,6 +252,8 @@ def fetch_logs_action(ctx: ActionContext) -> None: integration_external_id = ctx._extractor.connection_config.integration.external_id cdf_client = ctx._extractor.cognite_client + total_candidates = len(candidates) + completed_count = 0 upload_results: list[_FileUploadResult] = [] with ThreadPoolExecutor(max_workers=DEFAULT_CONCURRENT_UPLOADS) as pool: futures: dict[Future[_FileUploadResult], LogFileCandidate] = { @@ -264,7 +266,10 @@ def fetch_logs_action(ctx: ActionContext) -> None: ): candidate for candidate in candidates } - upload_results.extend(future.result() for future in as_completed(futures)) + for future in as_completed(futures): + upload_results.append(future.result()) + completed_count += 1 + ctx.report_progress(f"Uploading: {completed_count}/{total_candidates} files complete") upload_results.sort(key=lambda r: r.log_date) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index c69cc8a8..faafa559 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -6,6 +6,7 @@ from collections.abc import Callable from typing import TYPE_CHECKING +from cognite.extractorutils.unstable.core._dto import ActionStatus, ActionUpdate from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger @@ -62,6 +63,16 @@ def set_result(self, message: str, *, metadata: dict[str, str] | None = None) -> self._result_message = message self._result_metadata = metadata + def report_progress(self, message: str) -> None: + """Queue an intermediate progress update while the action is still running.""" + self._extractor._checkin_worker.queue_action_update( + ActionUpdate( + external_id=self.external_id, + status=ActionStatus.running, + result_message=message, + ) + ) + class ActionError(Exception): """Deliberate action failure with structured metadata for Odin result reporting.""" diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 1a3ae867..2341824d 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -455,3 +455,52 @@ def test_fetch_logs_action_upload_failure_still_succeeds(tmp_path: Path) -> None assert succeeded.result_metadata is not None assert succeeded.result_metadata["failed_files"] == "1" assert succeeded.result_metadata["uploaded_files"] == "0" + + +def test_report_progress_queues_running_action_update() -> None: + extractor = _make_extractor() + ctx = ActionContext( + action=CustomAction(name="test-action", target=lambda ctx: None), + extractor=extractor, + external_id="act-progress", + ) + ctx.report_progress("Uploading: 1/3 files complete") + + updates = _queued_updates(extractor) + assert len(updates) == 1 + assert updates[0].status == ActionStatus.running + assert updates[0].result_message == "Uploading: 1/3 files complete" + assert updates[0].external_id == "act-progress" + + +def test_upload_action_reports_intermediate_progress_for_multi_file_range(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"day 1 data") + (tmp_path / "extractor.log.2026-06-11").write_bytes(b"day 2 data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-11"}) + progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None] + assert len(progress_updates) == 2 + messages = {u.result_message for u in progress_updates} + assert "Uploading: 1/2 files complete" in messages + assert "Uploading: 2/2 files complete" in messages + + +def test_upload_action_reports_progress_for_single_file(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"day 1 data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None] + assert len(progress_updates) == 1 + assert progress_updates[0].result_message == "Uploading: 1/1 files complete" + + +def test_upload_action_no_progress_updates_when_all_files_missing(tmp_path: Path) -> None: + # No files on disk → no candidates → no futures → no progress updates. + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + progress_updates = [u for u in updates if u.status == ActionStatus.running and u.result_message is not None] + assert progress_updates == [] + assert any(u.status == ActionStatus.succeeded for u in updates) From 9b0f1e8cdf8a64b2a1f000e966ebcd77db18bd2a Mon Sep 17 00:00:00 2001 From: vikramlc Date: Tue, 30 Jun 2026 14:55:24 +0530 Subject: [PATCH 2/2] refactor(odin): Added report_progress changes in refactored code --- .../unstable/core/_log_upload_action.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 9be8a7c6..f62191d5 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -250,15 +250,18 @@ def fetch_logs_action(ctx: ActionContext) -> None: integration_external_id = ctx._extractor.connection_config.integration.external_id cdf_client = ctx._extractor.cognite_client - upload_results: list[_FileUploadResult] = [ - _upload_candidate( - candidate, - integration_external_id, - cdf_client, - snapshot_size if candidate.is_current else None, + total_candidates = len(candidates) + upload_results: list[_FileUploadResult] = [] + for i, candidate in enumerate(candidates, 1): + upload_results.append( + _upload_candidate( + candidate, + integration_external_id, + cdf_client, + snapshot_size if candidate.is_current else None, + ) ) - for candidate in candidates - ] + ctx.report_progress(f"Uploading: {i}/{total_candidates} files complete") counts = Counter(r.status for r in upload_results)