Skip to content

feat(odin): Add support for streaming log files to CDF Files#547

Open
vikramlc-cognite wants to merge 16 commits into
EDGE-607-implement-bounded-readerfrom
EDG-371-upload-logs-to-cdf-files
Open

feat(odin): Add support for streaming log files to CDF Files#547
vikramlc-cognite wants to merge 16 commits into
EDGE-607-implement-bounded-readerfrom
EDG-371-upload-logs-to-cdf-files

Conversation

@vikramlc-cognite

Copy link
Copy Markdown
Contributor

Summary

Completes the fetch_logs action by wiring the upload loop: candidate log files are streamed to CDF Files via upload_bytes, with per-file result tracking and structured success metadata reported back to Odin.

Type of change

  • New feature (non-breaking change that adds functionality)

What changed

  • _log_upload_action.py: Added _FileUploadResult dataclass, _file_external_id(), and _upload_candidate(). The fetch_logs_action now snapshots the current-day file size before spawning uploads, submits candidates to a ThreadPoolExecutor, and calls ctx.set_result() with aggregated counts and a per-file JSON array in result_metadata.
  • actions.py: Added ActionContext.set_result(message, *, metadata) so actions can declare structured success data without changing the ActionTarget signature.
  • base.py: _handle_custom_action success path now forwards ctx._result_message and ctx._result_metadata into the ActionUpdate(succeeded) call.
  • test_log_upload_action.py: 9 new tests covering _file_external_id naming, _upload_candidate for rotated files, BoundedReader usage for current-day files, oversized file skipping, CDF upload errors, and end-to-end fetch_logs_action result metadata (partial failures, all-missing, upload failures still report succeeded).
  • test_action_dispatch.py: 1 new test verifying set_result values propagate into the succeeded ActionUpdate.

Why it changed

  • Implements upload execution and result reporting for the fetch_logs action (EDG-371). Prior commits established parameter validation, candidate file enumeration, and BoundedReader; this commit closes the loop end-to-end.

What to focus on during review

  • Snapshot-before-threads ordering: snapshot_size is captured via stat() before the ThreadPoolExecutor is created. This is load-bearing - if the snapshot happened inside the thread, it would race with the logger appending to the file, making BoundedReader's ceiling unreliable.
  • Partial failure semantics: A file that fails to upload results in status="failed" in result_metadata["files"], but the action still reports ActionStatus.succeeded with failed_files > 0. The caller (Odin operator) sees the breakdown and decides whether to re-trigger.
  • ctx._extractor access: fetch_logs_action reads ctx._extractor.cognite_client and ctx._extractor.connection_config.integration.external_id. This is internal framework access - _log_upload_action.py is _-prefixed and co-located in the same package. Exposing these on the public ActionContext API would unnecessarily widen the surface for user-authored actions.

Test evidence

tests/test_unstable/test_log_upload_action.py 40 passed
tests/test_unstable/test_action_dispatch.py passed
ruff: clean, mypy: clean

Risks and unknowns

  • DEFAULT_CONCURRENT_UPLOADS = 1 keeps uploads sequential. Safe on constrained networks; parallelism is a one-line change when needed.
  • MAX_FILE_SIZE_BYTES = 4 GiB - should be validated against the SDK's actual per-request upload limit.

Rollout and rollback

N/A - triggered only on explicit Odin dispatch. No config changes or migrations.

Checklist

  • Self-reviewed the diff
  • Tests added or updated
  • Docs updated (N/A - internal framework)
  • No secrets, credentials, or PII committed
  • Breaking changes called out above and communicated to affected teams

@vikramlc-cognite vikramlc-cognite self-assigned this Jun 25, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BoundedReader to cap log file uploads to a point-in-time snapshot size, preventing Content-Length mismatches on active logs. It also updates the log upload action to support concurrent uploads and report structured results. The review feedback suggests implementing a seek method in BoundedReader to support HTTP client retries, capping the snapshot size at file-open time to avoid truncation race conditions, and adding corresponding unit tests for the new seek behavior.

Comment thread cognite/extractorutils/unstable/core/_bounded_reader.py
Comment thread cognite/extractorutils/unstable/core/_log_upload_action.py Outdated
Comment thread tests/test_unstable/test_bounded_reader.py
Base automatically changed from EDGE-606-build-candidate-log-file-list to master June 29, 2026 08:18
@vikramlc-cognite vikramlc-cognite changed the base branch from master to EDGE-607-implement-bounded-reader June 29, 2026 09:08
@vikramlc-cognite

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces concurrent log file uploads to CDF Files in the fetch_logs action, including size snapshotting for the current-day log file to prevent content-length mismatches. It also adds support for reporting action execution results and metadata back to the dispatcher. The review feedback highlights a potential resource leak in _upload_candidate where a file is opened outside of a with statement, and suggests refactoring it to use a standard with open(...) as f: block.

Comment thread cognite/extractorutils/unstable/core/_log_upload_action.py Outdated
@vikramlc-cognite vikramlc-cognite marked this pull request as ready for review June 30, 2026 08:37
@vikramlc-cognite vikramlc-cognite requested a review from a team as a code owner June 30, 2026 08:37
@vikramlc-cognite

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances the log upload action by introducing structured upload results, file size snapshotting for active log files using a BoundedReader, and the ability to propagate successful action results and metadata to CDF. The review feedback correctly identifies a critical issue where BoundedReader lacks a seek method, which can break HTTP retries during file uploads and lead to failures or corrupted uploads.


try:
with open(candidate.path, "rb") as f:
reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Missing seek method on BoundedReader breaks HTTP retries

The BoundedReader class is used to wrap the file stream for the active log file. However, BoundedReader does not implement the seek method.

When the Cognite Python SDK uploads the file using upload_bytes, it uses requests (or urllib3) under the hood. If a transient network error occurs, the HTTP client's retry mechanism will attempt to rewind the request body by calling seek(0).

Because BoundedReader lacks a seek method:

  1. requests / urllib3 will raise an UnrewindableBodyError during redirects or retries, causing the upload to fail completely.
  2. If it doesn't raise an error, the retry attempt will read from an already-exhausted stream, resulting in a corrupted or 0-byte file being uploaded to CDF (overwriting the previous attempt since overwrite=True is set).

Suggested Fix

Please add a seek method to BoundedReader in cognite/extractorutils/unstable/core/_bounded_reader.py:

def seek(self, offset: int, whence: int = 0) -> int:
    pos = self._stream.seek(offset, whence)
    self._remaining = self._size - pos
    return pos

Also, implement seekable to return True:

def seekable(self) -> bool:
    return self._stream.seekable()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramlc Seems legitimate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramlc-cognite along with that, if the snapshot_size is None then raw f is used.

@Yaseen-A-Khan Yaseen-A-Khan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few nits and comments, please review them



def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> None:
from unittest.mock import MagicMock

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MagicMock duplicate import



def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> None:
from unittest.mock import MagicMock

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MagicMock duplicate import



def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: Path) -> None:
from unittest.mock import MagicMock, patch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MagicMock duplicate import



def test_upload_candidate_cdf_error_returns_failed(tmp_path: Path) -> None:
from unittest.mock import MagicMock

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MagicMock duplicate import


try:
with open(candidate.path, "rb") as f:
reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramlc-cognite along with that, if the snapshot_size is None then raw f is used.

len(skipped_dates),
)

# Snapshot the current-day file size BEFORE spawning upload threads.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Snapshot the current-day file size BEFORE spawning upload threads.
# Snapshot the current-day file size BEFORE starting uploads.

[nit] Can you verify if it is upload threads, because according to Line: 253 it looks like a list comprehension.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants