Skip to content

feat(service): add node-level OCI layer deduplication and pull concurrency control#39

Open
rishi-jat wants to merge 4 commits intomodelpack:mainfrom
rishi-jat:feat/layer-dedup
Open

feat(service): add node-level OCI layer deduplication and pull concurrency control#39
rishi-jat wants to merge 4 commits intomodelpack:mainfrom
rishi-jat:feat/layer-dedup

Conversation

@rishi-jat
Copy link
Copy Markdown

Fixes #24
Fixes #35

Summary

Implement node-level OCI layer deduplication and concurrency control for concurrent model pulls.

This replaces the previous volume-scoped inflight behavior with layer-aware orchestration using:

  • digest-level singleflight
  • node-level weighted semaphore flow control
  • persistent layer metadata rebuild support
  • hardlink-based layer reuse across volumes

Concurrent pods requesting identical model content on the same node now perform exactly one network fetch per layer digest while safely reusing cached content locally.


Changes

LayerCache

Added a node-scoped LayerCache with:

  • digest → file path tracking
  • concurrent-safe lookup/register/remove operations
  • startup rebuild from persisted layer_digests.json
  • stale cache eviction during volume cleanup

Layer-aware pull orchestration

Implemented manual layer orchestration in the pull path:

  • manifest layer inspection
  • cache lookup before fetch
  • digest-level singleflight coordination
  • semaphore-based node-wide concurrency limiting
  • hardlink reuse for waiting callers
  • fallback handling for hardlink failures (EXDEV)

Progress and observability accounting

Added:

go Hook.LayerCached(...)

Cached and waiting callers now:

  • correctly advance CSI progress state
  • avoid duplicate pulled layer logs
  • avoid duplicate metrics.NodePullOpObserve increments

Only the goroutine executing the real network fetch emits pull telemetry.

Worker integration

  • wire LayerCache into worker/service lifecycle
  • rebuild cache during service startup
  • evict layer entries during model deletion

Validation

Verified with:

bash go test ./... go test -race ./pkg/service/... golangci-lint run

Additional coverage includes:

  • concurrent layer dedup correctness
  • semaphore release during cancellation
  • cache rebuild persistence
  • cache eviction lifecycle
  • race detection under concurrent pull workloads

rishi-jat added 4 commits May 10, 2026 05:11
Introduces a thread-safe LayerCache that provides:
- Singleflight concurrency deduplication per layer digest.
- Node-level weighted semaphore for fetch flow control.
- Restart-safe rebuild mechanisms via layer_digests.json.

Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Adds the LayerCached method to status.Hook. This allows wait-paths and
cache hits to increment their individual pod UI progress counters correctly
to 100% without emitting duplicated Prometheus node metrics or generating
superfluous network completion logs.

Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Replaces high-level b.Fetch() wrappers with a robust native pipeline:
- Natively parses and fetches descriptors using oras.
- Hardlinks cached layers efficiently.
- Wraps remote.Fetch within singleflight and node-level semaphores.
- Gracefully handles TOCTOU link failures, cross-device EXDEV bounds,
  and network flakes via exponential backoff retries.
- Proved 100% data race free via new httptest simulation testbed.

Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Initializes the global LayerCache at the service layer and threads it
down to the Worker, passing it into the NewLayerAwarePuller factory.
Registers lifecycle eviction hooks when volumes are deleted.

Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Copilot AI review requested due to automatic review settings May 9, 2026 23:55
@rishi-jat
Copy link
Copy Markdown
Author

/cc @imeoer
/cc @xujihui1985
/cc @aftersnow
/cc @chlins

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a layer-level deduplication mechanism for model pulls through a new LayerCache and a layer-aware puller. The implementation uses hardlinks to reuse existing layers on disk, singleflight to deduplicate concurrent network requests for the same digest, and a node-level semaphore for flow control. Feedback identifies several critical improvements: ensuring semaphore acquisition respects context cancellation to prevent resource leaks, refining path prefix matching to avoid incorrect cache evictions, supporting digest-based image references, and correcting the logic for removing stale cache entries during hardlink failures.

Comment thread pkg/service/puller.go
if h.sem != nil {
// Use a background context so we don't fail the pull if the parent
// context is cancelled while waiting — the pull itself handles cancellation.
_ = h.sem.Acquire(context.Background(), 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Acquiring the semaphore with context.Background() ignores the pull request's cancellation. If many pulls are cancelled while waiting for the semaphore, these goroutines will remain blocked until they acquire a permit, potentially leading to resource exhaustion. It is recommended to pass the pull context into the layerTrackingHook and use it here.

for d, paths := range lc.layers {
filtered := paths[:0]
for _, p := range paths {
if !strings.HasPrefix(p, prefix) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of strings.HasPrefix for path matching can lead to incorrect results if one volume name is a prefix of another (e.g., /models/vol1 and /models/vol11). It is safer to ensure the prefix ends with a path separator or use a more robust path comparison logic.

Suggested change
if !strings.HasPrefix(p, prefix) {
if !strings.HasPrefix(p, filepath.Clean(prefix)+string(os.PathSeparator)) && p != prefix {

Comment thread pkg/service/puller.go
return errors.Wrap(err, "create remote client")
}

_, manifestReader, err := client.Manifests().FetchReference(ctx, ref.Tag())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ref.Tag() call might return an empty string if the model reference is digest-based (e.g., example.com/model@sha256:...). In such cases, FetchReference will likely fail. It is safer to handle both tags and digests.

Suggested change
_, manifestReader, err := client.Manifests().FetchReference(ctx, ref.Tag())
target := ref.Tag()
if target == "" {
target = ref.Digest().String()
}
_, manifestReader, err := client.Manifests().FetchReference(ctx, target)

Comment thread pkg/service/puller.go
if err := os.Link(sourcePath, destPath); err != nil {
// Hardlink failed (EXDEV or source was deleted). Remove stale entry and retry fetch.
logger.WithContext(ctx).WithError(err).Warnf("layer-aware pull: hardlink after singleflight failed for %s, retrying", desc.Digest)
p.layerCache.RemoveByPrefix(sourcePath)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using RemoveByPrefix with a full file path (sourcePath) is conceptually incorrect and potentially dangerous due to the prefix matching logic. It should use Remove(sourcePath) instead. Additionally, if the error was EXDEV (cross-device link), the source file in the cache is still valid and should not be removed; removal should only happen if the source is actually missing (e.g., ENOENT).

Suggested change
p.layerCache.RemoveByPrefix(sourcePath)
if os.IsNotExist(err) {
p.layerCache.Remove(sourcePath)
}

@rishi-jat rishi-jat changed the title Feat/layer dedup feat(service): add node-level OCI layer deduplication and pull concurrency control May 9, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Deduplicate concurrent pulls for identical model references across pods with node-level singleflight possible race condition

2 participants