Skip to content

[2.8] Fix StorageException after every job from empty SystemLogStreamer context#4559

Open
nvidianz wants to merge 1 commit intoNVIDIA:2.8from
nvidianz:2.8-fix-systemlogstreamer-empty-context
Open

[2.8] Fix StorageException after every job from empty SystemLogStreamer context#4559
nvidianz wants to merge 1 commit intoNVIDIA:2.8from
nvidianz:2.8-fix-systemlogstreamer-empty-context

Conversation

@nvidianz
Copy link
Copy Markdown
Collaborator

@nvidianz nvidianz commented May 8, 2026

Summary

After PR #4476, every successful federated job emits this error in the server log:

ObjectStreamer - ERROR - exception from stream_done_cb dispatch_stream_done:
StorageException: path /tmp/nvflare/jobs-storage/unknown is not a valid directory.

(plus, in some cases, an earlier no stream processing info registered for log_streaming:live_log line during the run.)

Root cause

SystemLogStreamer._on_job_completed runs on CLIENT_PARENT and uploads the per-job error_log.txt snapshot to the server. It hands the upload thread a fresh context from engine.new_context(). CLIENT_PARENT serves multiple jobs over its lifetime, so its engine ctx_manager does NOT carry per-job RUN_NUM / IDENTITY_NAME as sticky props — and the fresh context arrives at the receiver with an empty job_id.

JobLogReceiver._get_trusted_stream_identity reads job_id via peer_ctx.get_job_id(default="unknown") from the sender-forwarded context. With the empty fresh context, peer_ctx.get_job_id() returns "" and the receiver falls back to the literal "unknown", then asks the job manager to save the log under .../jobs-storage/unknown, which fails because it isn't a real path.

The bug was dormant before #4476: StaticFileBuilder._modify_system_log_streamer used to strip system_log_streamer from deployed client resources.json by default. #4476 removed that filter and flipped the default to allow_log_streaming=True, so the dormant buggy code now runs after every job.

Fix

In SystemLogStreamer._on_job_completed, explicitly seed RUN_NUM and IDENTITY_NAME on the fresh stream context before handing it to the upload thread. The receiver continues to read job identity from the sender-forwarded peer_ctx as before — preserving the existing peer-context-as-trust-source security model and leaving JobLogReceiver._get_trusted_stream_identity unchanged.

engine = fl_ctx.get_engine()
stream_fl_ctx = engine.new_context() if engine else fl_ctx
if stream_fl_ctx is not fl_ctx:
    stream_fl_ctx.put(key=ReservedKey.RUN_NUM, value=job_id, private=True, sticky=False)
    stream_fl_ctx.put(key=ReservedKey.IDENTITY_NAME, value=client_name, private=True, sticky=False)

Why sender-side instead of receiver-side

The bug originally proposed an "Option A" (read job_id from stream_ctx in the receiver) but that approach was rejected after reviewing the receiver's existing security test:

tests/unit_test/app_common/logging/job_log_receiver_test.py::test_job_log_receiver_uses_trusted_peer_identity_for_storage deliberately passes forged stream_ctx values ("../../forged_job", "../../forged_client") and asserts the receiver routes storage paths via peer_ctx (authenticated cellnet identity), not stream_ctx (sender-supplied data). The function name _get_trusted_stream_identity and the test name make the trust boundary explicit.

stream_ctx-based routing would let a malicious client spoof another job's id and write log content into that other job's storage directory. Fixing the bug at its real source — the sender that handed off an empty context — preserves the security model and makes the existing security test continue to pass unchanged. Confirmed by running it as part of the test suite below.

Tests

  • New: test_completed_upload_seeds_run_num_and_identity_on_fresh_context — regression test that verifies the fresh context handed to the upload thread now carries the real job_id and client_name.
  • Existing security test passes unchanged: test_job_log_receiver_uses_trusted_peer_identity_for_storage — confirms this fix does not relax the receiver's authentication model.
  • Existing SystemLogStreamer tests unchanged.

Verified locally

  • pytest tests/unit_test/app_common/logging -v13 passed (12 existing + 1 new) on Python 3.12.10 / pytest 8.3.4.
  • black, isort, flake8 clean on all changed files.

Test plan

  • CI: license + style + full unit test suite green on the 2.8 base.
  • Manual: reproduce the bug's repro on 2.8 head, confirm grep "StorageException.*jobs-storage/unknown" .../server/log.txt returns no lines after the fix.

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change.
  • New tests added to cover the changes.
  • Quick tests passed locally by running unit tests for app_common/logging/.
  • In-line docstrings updated.
  • Documentation updated.

🤖 Generated with Claude Code

…er context

NVBugs 6151564 / FLARE-2921. After PR NVIDIA#4476 every successful federated
job emits this error in the server log:

  ObjectStreamer - ERROR - exception from stream_done_cb
  dispatch_stream_done: StorageException: path
  /tmp/nvflare/jobs-storage/unknown is not a valid directory.

Root cause:

SystemLogStreamer._on_job_completed runs on CLIENT_PARENT and uploads
the per-job error_log.txt snapshot to the server. It hands the upload
thread a fresh context from `engine.new_context()`. CLIENT_PARENT
serves multiple jobs over its lifetime, so its engine ctx_manager does
NOT carry per-job RUN_NUM / IDENTITY_NAME as sticky props -- and the
fresh context arrives at the receiver with an empty job_id.

JobLogReceiver._get_trusted_stream_identity reads job_id via
`peer_ctx.get_job_id(default="unknown")` from the sender-forwarded
context. With the empty fresh context, peer_ctx.get_job_id() returns
"" and the receiver falls back to the literal "unknown", then asks
the job manager to save the log under `.../jobs-storage/unknown`,
which fails because it isn't a real path.

The bug was dormant before NVIDIA#4476 because StaticFileBuilder.
_modify_system_log_streamer used to strip system_log_streamer from
deployed client resources.json by default. NVIDIA#4476 removed that filter
and changed the default to allow_log_streaming=True, so the dormant
buggy code now runs after every job.

Fix:

In SystemLogStreamer._on_job_completed, explicitly seed RUN_NUM and
IDENTITY_NAME on the fresh stream context before handing it to the
upload thread. The receiver continues to read job identity from the
sender-forwarded peer_ctx as before -- preserving the existing
peer-context-as-trust-source security model and leaving
JobLogReceiver._get_trusted_stream_identity unchanged. The pre-existing
test test_job_log_receiver_uses_trusted_peer_identity_for_storage
(which deliberately passes forged stream_ctx values to assert the
receiver ignores them) continues to pass unchanged, confirming this
fix doesn't open a path-traversal / job-spoofing surface.

Why the sender-side fix instead of receiver-side:

The bug's "Option A" (read job_id from stream_ctx in the receiver) was
considered but rejected. stream_ctx is sender-supplied data, while
peer_ctx is authenticated cellnet identity. Routing storage paths
based on stream_ctx would let a malicious client spoof another job's
id and write log content into that other job's storage directory.
The existing _get_trusted_stream_identity name and security test make
the trust boundary explicit, so the right place to fix the bug is the
sender that handed off an empty context, not the receiver.

Tests:

- New: test_completed_upload_seeds_run_num_and_identity_on_fresh_context
  verifies the fresh context handed to the upload thread now carries
  the real job_id and client_name (regression for the StorageException).
- Existing: test_system_log_streamer_uploads_completed_error_log_snapshot
  and the JobLogReceiver security test continue to pass unchanged.

Verified locally:
- pytest tests/unit_test/app_common/logging -v
  -> 13 passed (Python 3.12.10, pytest 8.3.4)
- black, isort, flake8 clean on changed files.
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Greptile Summary

Fixes a StorageException emitted after every successful job by correctly seeding RUN_NUM and IDENTITY_NAME on the fresh FLContext handed to the background upload thread in SystemLogStreamer._on_job_completed.

  • Seeds ReservedKey.RUN_NUM (read by get_job_id()) and ReservedKey.IDENTITY_NAME on the fresh context only when engine.new_context() returns a distinct object, so the receiver's peer_ctx-based identity routing receives a non-empty job id and routes the upload to the correct job directory.
  • Adds a regression test that verifies the fresh context starts empty and is correctly populated after the event handler fires, while leaving the existing peer-context trust-boundary security test unchanged.

Confidence Score: 5/5

Safe to merge — the change is a minimal two-line seed of well-known reserved keys on a fresh context, guarded so it only activates when the engine provides a separate context object, and the security test for receiver-side trust-boundary routing passes unchanged.

The fix is targeted at a single root cause, clearly explained in code comments and the PR description. The new regression test faithfully mirrors the production empty-context precondition, and the existing security test continues to pass unmodified, confirming the trust boundary is preserved.

No files require special attention.

Important Files Changed

Filename Overview
nvflare/app_common/logging/system_log_streamer.py Adds ReservedKey import and seeds RUN_NUM/IDENTITY_NAME on the fresh upload-thread context; fix is minimal, well-commented, and only activates when engine.new_context() produces a new object.
tests/unit_test/app_common/logging/system_log_streamer_test.py Adds regression test verifying the fresh context is seeded before the upload thread runs; correctly mirrors the production empty-context precondition and asserts both RUN_NUM and IDENTITY_NAME are populated.

Sequence Diagram

sequenceDiagram
    participant EP as CLIENT_PARENT Engine
    participant SLS as SystemLogStreamer
    participant UL as Upload Thread
    participant SRV as JobLogReceiver (Server)

    EP->>SLS: JOB_COMPLETED event (fl_ctx)
    SLS->>EP: engine.new_context()
    EP-->>SLS: stream_fl_ctx (empty — no RUN_NUM/IDENTITY_NAME)
    Note over SLS: FIX: seed stream_fl_ctx with RUN_NUM=job_id, IDENTITY_NAME=client_name
    SLS->>UL: "Thread(target=_stream_completed_log, args=(stream_fl_ctx, ...))"
    UL->>SRV: "LogStreamer.stream_log(fl_ctx=stream_fl_ctx, stream_ctx={job_id, client_name})"
    Note over SRV: _get_trusted_stream_identity reads peer_ctx (cellnet-authenticated) → correct job path
    SRV-->>UL: stream_done_cb (success)
Loading

Reviews (1): Last reviewed commit: "[2.8] Fix StorageException after every j..." | Re-trigger Greptile

engine = fl_ctx.get_engine()
stream_fl_ctx = engine.new_context() if engine else fl_ctx
if stream_fl_ctx is not fl_ctx:
stream_fl_ctx.put(key=ReservedKey.RUN_NUM, value=job_id, private=True, sticky=False)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AuxRunner forwards peer identity with fl_ctx.get_all_public_props(), and the receiver rebuilds peer_ctx from those public props. These new RUN_NUM and IDENTITY_NAME values are written with private=True, so they remain local to the upload thread context and are not included in the peer props sent to JobLogReceiver. The regression test only checks the local context, so it passes while the actual stream transport still sees missing or empty job identity. These should be public, matching FLContextManager.new_context() for job contexts, and the test should assert stream_fl_ctx.get_all_public_props() or exercise the aux/receiver path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants