-
Notifications
You must be signed in to change notification settings - Fork 6
feat(odin): Add support for streaming log files to CDF Files #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9d34837
a480f65
8670c5e
471e5e1
82d8b6d
a20c8c9
acbecd5
90a2cbd
03be2d1
eca3f93
97a064e
1019b85
021210e
d99a0cb
6598517
556aee1
ea58e9b
c028fac
0ab05e9
3d85fbf
c308409
f903939
7164668
bd85924
5c73846
6366264
dd1554d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,19 +1,27 @@ | ||||||
| """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" | ||||||
|
|
||||||
| import json | ||||||
| import logging | ||||||
| from collections import Counter | ||||||
| from dataclasses import dataclass | ||||||
| from datetime import date, timedelta, timezone | ||||||
| from datetime import datetime as dt | ||||||
| from pathlib import Path | ||||||
| from typing import BinaryIO, Literal | ||||||
|
|
||||||
| from cognite.client import CogniteClient | ||||||
|
|
||||||
| from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig | ||||||
| from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader | ||||||
| from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError | ||||||
|
|
||||||
| _logger = logging.getLogger(__name__) | ||||||
|
|
||||||
| MAX_DATE_RANGE_DAYS = 7 | ||||||
| """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" | ||||||
|
|
||||||
| MAX_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024 # 4 GiB — CDF single-request upload limit | ||||||
|
|
||||||
| _FETCH_LOGS_DESCRIPTION = ( | ||||||
| f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." | ||||||
| ) | ||||||
|
|
@@ -28,6 +36,15 @@ class LogFileCandidate: | |||||
| is_current: bool # True when path is the live (unrotated) file.log | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class _FileUploadResult: | ||||||
| log_date: date | ||||||
| file_external_id: str | ||||||
| status: Literal["uploaded", "skipped_too_large", "failed"] | ||||||
| size_bytes: int = 0 | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Changing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. size_bytes = 0 is unambiguous here because _build_candidate_files already filters out empty files before they become candidates, so a _FileUploadResult with size_bytes=0 always means "size not recorded", never "file is actually empty". The if r.size_bytes: guard works correctly on that basis. Since _FileUploadResult is a private dataclass used only within this file, switching to None would just add is not None checks throughout for no real gain.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you verify, if In case of TOCTOU race, the size might become zero because snapshot_size=0, then it might not work as expected. |
||||||
| error: str | None = None | ||||||
|
|
||||||
|
|
||||||
| def _today_utc() -> date: | ||||||
| return dt.now(tz=timezone.utc).date() | ||||||
|
|
||||||
|
|
@@ -100,6 +117,67 @@ def _build_candidate_files( | |||||
| return candidates, skipped | ||||||
|
|
||||||
|
|
||||||
| def _file_external_id(integration_external_id: str, log_date: date) -> str: | ||||||
| return f"extractor-logs-{integration_external_id}-{log_date.isoformat()}" | ||||||
|
|
||||||
|
|
||||||
| def _upload_candidate( | ||||||
| candidate: LogFileCandidate, | ||||||
| integration_external_id: str, | ||||||
| cdf_client: CogniteClient, | ||||||
| snapshot_size: int | None, | ||||||
| ) -> _FileUploadResult: | ||||||
| """Upload one candidate log file to CDF Files. Returns a result regardless of success or failure.""" | ||||||
| external_id = _file_external_id(integration_external_id, candidate.log_date) | ||||||
|
|
||||||
| try: | ||||||
| actual_size = snapshot_size if snapshot_size is not None else candidate.path.stat().st_size | ||||||
| except OSError as e: | ||||||
| return _FileUploadResult( | ||||||
| log_date=candidate.log_date, file_external_id=external_id, status="failed", error=str(e) | ||||||
| ) | ||||||
|
|
||||||
| if actual_size > MAX_FILE_SIZE_BYTES: | ||||||
| _logger.warning( | ||||||
| "fetch_logs: skipping %s (%d bytes) — exceeds MAX_FILE_SIZE_BYTES (%d bytes)", | ||||||
| candidate.path.name, | ||||||
| actual_size, | ||||||
| MAX_FILE_SIZE_BYTES, | ||||||
| ) | ||||||
| return _FileUploadResult( | ||||||
| log_date=candidate.log_date, | ||||||
| file_external_id=external_id, | ||||||
| status="skipped_too_large", | ||||||
| size_bytes=actual_size, | ||||||
| ) | ||||||
|
|
||||||
| try: | ||||||
| with open(candidate.path, "rb") as f: | ||||||
| reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] | ||||||
|
vikramlc-cognite marked this conversation as resolved.
|
||||||
| cdf_client.files.upload_bytes( | ||||||
| content=reader, | ||||||
| name=f"{external_id}.log", | ||||||
| external_id=external_id, | ||||||
| mime_type="text/plain", | ||||||
| overwrite=True, | ||||||
| ) | ||||||
| _logger.info("fetch_logs: uploaded %s (%d bytes)", external_id, actual_size) | ||||||
| return _FileUploadResult( | ||||||
| log_date=candidate.log_date, | ||||||
| file_external_id=external_id, | ||||||
| status="uploaded", | ||||||
| size_bytes=actual_size, | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| _logger.error("fetch_logs: failed to upload %s — %s", external_id, e) | ||||||
| return _FileUploadResult( | ||||||
| log_date=candidate.log_date, | ||||||
| file_external_id=external_id, | ||||||
| status="failed", | ||||||
| error=str(e), | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def fetch_logs_action(ctx: ActionContext) -> None: | ||||||
| """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" | ||||||
| params = ctx.call_metadata or {} | ||||||
|
|
@@ -144,11 +222,77 @@ def fetch_logs_action(ctx: ActionContext) -> None: | |||||
| error_type="no_file_handler_configured", | ||||||
| ) | ||||||
|
|
||||||
| candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) | ||||||
| candidates, skipped_dates = _build_candidate_files(log_file_path, start_date, end_date, today) | ||||||
| _logger.info( | ||||||
| "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", | ||||||
| len(candidates), | ||||||
| start_date, | ||||||
| end_date, | ||||||
| len(skipped), | ||||||
| len(skipped_dates), | ||||||
| ) | ||||||
|
|
||||||
| # Snapshot the current-day file size BEFORE starting uploads. | ||||||
| # This gives a fixed read ceiling for BoundedReader — bytes written after this | ||||||
| # point are excluded from the upload, preventing Content-Length mismatches. | ||||||
| snapshot_size: int | None = None | ||||||
| current_candidate = next((c for c in candidates if c.is_current), None) | ||||||
| if current_candidate is not None: | ||||||
| try: | ||||||
| snapshot_size = current_candidate.path.stat().st_size | ||||||
| _logger.info( | ||||||
| "fetch_logs: current-day snapshot %d bytes (%s)", | ||||||
| snapshot_size, | ||||||
| current_candidate.path.name, | ||||||
| ) | ||||||
| except OSError as e: | ||||||
| _logger.warning("fetch_logs: could not snapshot current-day file size — %s", e) | ||||||
|
|
||||||
| integration_external_id = ctx.integration_external_id | ||||||
| cdf_client = ctx.cdf_client | ||||||
|
|
||||||
| upload_results: list[_FileUploadResult] = [ | ||||||
| _upload_candidate( | ||||||
| candidate, | ||||||
| integration_external_id, | ||||||
| cdf_client, | ||||||
| snapshot_size if candidate.is_current else None, | ||||||
| ) | ||||||
| for candidate in candidates | ||||||
| ] | ||||||
|
|
||||||
| counts = Counter(r.status for r in upload_results) | ||||||
|
|
||||||
| # Per-file entries: upload results (sorted by date) + missing dates (skipped by candidate builder) | ||||||
| files_list: list[dict[str, str]] = [] | ||||||
| for r in upload_results: | ||||||
| entry: dict[str, str] = { | ||||||
| "date": str(r.log_date), | ||||||
| "file_external_id": r.file_external_id, | ||||||
| "status": r.status, | ||||||
| } | ||||||
| if r.size_bytes: | ||||||
| entry["size_bytes"] = str(r.size_bytes) | ||||||
| if r.error: | ||||||
| entry["error"] = r.error | ||||||
| files_list.append(entry) | ||||||
| files_list.extend( | ||||||
| { | ||||||
| "date": str(skipped_date), | ||||||
| "file_external_id": _file_external_id(integration_external_id, skipped_date), | ||||||
| "status": "skipped", | ||||||
| } | ||||||
| for skipped_date in skipped_dates | ||||||
| ) | ||||||
| files_list.sort(key=lambda e: e["date"]) | ||||||
|
|
||||||
| ctx.set_result( | ||||||
| f"{counts['uploaded']} of {num_days} log files uploaded to CDF Files", | ||||||
| metadata={ | ||||||
| "total_files": str(num_days), | ||||||
| "uploaded_files": str(counts["uploaded"]), | ||||||
| "skipped_missing_files": str(len(skipped_dates)), | ||||||
| "skipped_too_large_files": str(counts["skipped_too_large"]), | ||||||
| "failed_files": str(counts["failed"]), | ||||||
| "files": json.dumps(files_list), | ||||||
| }, | ||||||
| ) | ||||||
Uh oh!
There was an error while loading. Please reload this page.