feat(odin): Add support for streaming log files to CDF Files#547
feat(odin): Add support for streaming log files to CDF Files#547vikramlc-cognite wants to merge 16 commits into
Conversation
…logs-to-cdf-files
There was a problem hiding this comment.
Code Review
This pull request introduces a BoundedReader to cap log file uploads to a point-in-time snapshot size, preventing Content-Length mismatches on active logs. It also updates the log upload action to support concurrent uploads and report structured results. The review feedback suggests implementing a seek method in BoundedReader to support HTTP client retries, capping the snapshot size at file-open time to avoid truncation race conditions, and adding corresponding unit tests for the new seek behavior.
…logs-to-cdf-files
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces concurrent log file uploads to CDF Files in the fetch_logs action, including size snapshotting for the current-day log file to prevent content-length mismatches. It also adds support for reporting action execution results and metadata back to the dispatcher. The review feedback highlights a potential resource leak in _upload_candidate where a file is opened outside of a with statement, and suggests refactoring it to use a standard with open(...) as f: block.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request enhances the log upload action by introducing structured upload results, file size snapshotting for active log files using a BoundedReader, and the ability to propagate successful action results and metadata to CDF. The review feedback correctly identifies a critical issue where BoundedReader lacks a seek method, which can break HTTP retries during file uploads and lead to failures or corrupted uploads.
|
|
||
| try: | ||
| with open(candidate.path, "rb") as f: | ||
| reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] |
There was a problem hiding this comment.
Missing seek method on BoundedReader breaks HTTP retries
The BoundedReader class is used to wrap the file stream for the active log file. However, BoundedReader does not implement the seek method.
When the Cognite Python SDK uploads the file using upload_bytes, it uses requests (or urllib3) under the hood. If a transient network error occurs, the HTTP client's retry mechanism will attempt to rewind the request body by calling seek(0).
Because BoundedReader lacks a seek method:
requests/urllib3will raise anUnrewindableBodyErrorduring redirects or retries, causing the upload to fail completely.- If it doesn't raise an error, the retry attempt will read from an already-exhausted stream, resulting in a corrupted or 0-byte file being uploaded to CDF (overwriting the previous attempt since
overwrite=Trueis set).
Suggested Fix
Please add a seek method to BoundedReader in cognite/extractorutils/unstable/core/_bounded_reader.py:
def seek(self, offset: int, whence: int = 0) -> int:
pos = self._stream.seek(offset, whence)
self._remaining = self._size - pos
return posAlso, implement seekable to return True:
def seekable(self) -> bool:
return self._stream.seekable()There was a problem hiding this comment.
@vikramlc-cognite along with that, if the snapshot_size is None then raw f is used.
Yaseen-A-Khan
left a comment
There was a problem hiding this comment.
I have a few nits and comments, please review them
|
|
||
|
|
||
| def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> None: | ||
| from unittest.mock import MagicMock |
There was a problem hiding this comment.
MagicMock duplicate import
|
|
||
|
|
||
| def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> None: | ||
| from unittest.mock import MagicMock |
There was a problem hiding this comment.
MagicMock duplicate import
|
|
||
|
|
||
| def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: Path) -> None: | ||
| from unittest.mock import MagicMock, patch |
There was a problem hiding this comment.
MagicMock duplicate import
|
|
||
|
|
||
| def test_upload_candidate_cdf_error_returns_failed(tmp_path: Path) -> None: | ||
| from unittest.mock import MagicMock |
There was a problem hiding this comment.
MagicMock duplicate import
|
|
||
| try: | ||
| with open(candidate.path, "rb") as f: | ||
| reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] |
There was a problem hiding this comment.
@vikramlc-cognite along with that, if the snapshot_size is None then raw f is used.
| len(skipped_dates), | ||
| ) | ||
|
|
||
| # Snapshot the current-day file size BEFORE spawning upload threads. |
There was a problem hiding this comment.
| # Snapshot the current-day file size BEFORE spawning upload threads. | |
| # Snapshot the current-day file size BEFORE starting uploads. |
[nit] Can you verify if it is upload threads, because according to Line: 253 it looks like a list comprehension.
Summary
Completes the fetch_logs action by wiring the upload loop: candidate log files are streamed to CDF Files via upload_bytes, with per-file result tracking and structured success metadata reported back to Odin.
Type of change
What changed
_log_upload_action.py: Added _FileUploadResult dataclass, _file_external_id(), and _upload_candidate(). The fetch_logs_action now snapshots the current-day file size before spawning uploads, submits candidates to a ThreadPoolExecutor, and callsctx.set_result()with aggregated counts and a per-file JSON array in result_metadata.actions.py: Added ActionContext.set_result(message, *, metadata) so actions can declare structured success data without changing the ActionTarget signature.base.py: _handle_custom_action success path now forwards ctx._result_message and ctx._result_metadata into the ActionUpdate(succeeded) call.test_log_upload_action.py: 9 new tests covering _file_external_id naming, _upload_candidate for rotated files, BoundedReader usage for current-day files, oversized file skipping, CDF upload errors, and end-to-end fetch_logs_action result metadata (partial failures, all-missing, upload failures still report succeeded).test_action_dispatch.py: 1 new test verifying set_result values propagate into the succeeded ActionUpdate.Why it changed
What to focus on during review
_log_upload_action.pyis _-prefixed and co-located in the same package. Exposing these on the public ActionContext API would unnecessarily widen the surface for user-authored actions.Test evidence
tests/test_unstable/test_log_upload_action.py40 passedtests/test_unstable/test_action_dispatch.pypassedruff: clean, mypy: clean
Risks and unknowns
Rollout and rollback
N/A - triggered only on explicit Odin dispatch. No config changes or migrations.
Checklist