feat(service): add node-level OCI layer deduplication and pull concurrency control#39
feat(service): add node-level OCI layer deduplication and pull concurrency control#39rishi-jat wants to merge 4 commits intomodelpack:mainfrom
Conversation
Introduces a thread-safe LayerCache that provides: - Singleflight concurrency deduplication per layer digest. - Node-level weighted semaphore for fetch flow control. - Restart-safe rebuild mechanisms via layer_digests.json. Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Adds the LayerCached method to status.Hook. This allows wait-paths and cache hits to increment their individual pod UI progress counters correctly to 100% without emitting duplicated Prometheus node metrics or generating superfluous network completion logs. Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Replaces high-level b.Fetch() wrappers with a robust native pipeline: - Natively parses and fetches descriptors using oras. - Hardlinks cached layers efficiently. - Wraps remote.Fetch within singleflight and node-level semaphores. - Gracefully handles TOCTOU link failures, cross-device EXDEV bounds, and network flakes via exponential backoff retries. - Proved 100% data race free via new httptest simulation testbed. Signed-off-by: Rishi Jat <rishijat098@gmail.com>
Initializes the global LayerCache at the service layer and threads it down to the Worker, passing it into the NewLayerAwarePuller factory. Registers lifecycle eviction hooks when volumes are deleted. Signed-off-by: Rishi Jat <rishijat098@gmail.com>
|
/cc @imeoer |
There was a problem hiding this comment.
Code Review
This pull request introduces a layer-level deduplication mechanism for model pulls through a new LayerCache and a layer-aware puller. The implementation uses hardlinks to reuse existing layers on disk, singleflight to deduplicate concurrent network requests for the same digest, and a node-level semaphore for flow control. Feedback identifies several critical improvements: ensuring semaphore acquisition respects context cancellation to prevent resource leaks, refining path prefix matching to avoid incorrect cache evictions, supporting digest-based image references, and correcting the logic for removing stale cache entries during hardlink failures.
| if h.sem != nil { | ||
| // Use a background context so we don't fail the pull if the parent | ||
| // context is cancelled while waiting — the pull itself handles cancellation. | ||
| _ = h.sem.Acquire(context.Background(), 1) |
There was a problem hiding this comment.
Acquiring the semaphore with context.Background() ignores the pull request's cancellation. If many pulls are cancelled while waiting for the semaphore, these goroutines will remain blocked until they acquire a permit, potentially leading to resource exhaustion. It is recommended to pass the pull context into the layerTrackingHook and use it here.
| for d, paths := range lc.layers { | ||
| filtered := paths[:0] | ||
| for _, p := range paths { | ||
| if !strings.HasPrefix(p, prefix) { |
There was a problem hiding this comment.
The use of strings.HasPrefix for path matching can lead to incorrect results if one volume name is a prefix of another (e.g., /models/vol1 and /models/vol11). It is safer to ensure the prefix ends with a path separator or use a more robust path comparison logic.
| if !strings.HasPrefix(p, prefix) { | |
| if !strings.HasPrefix(p, filepath.Clean(prefix)+string(os.PathSeparator)) && p != prefix { |
| return errors.Wrap(err, "create remote client") | ||
| } | ||
|
|
||
| _, manifestReader, err := client.Manifests().FetchReference(ctx, ref.Tag()) |
There was a problem hiding this comment.
The ref.Tag() call might return an empty string if the model reference is digest-based (e.g., example.com/model@sha256:...). In such cases, FetchReference will likely fail. It is safer to handle both tags and digests.
| _, manifestReader, err := client.Manifests().FetchReference(ctx, ref.Tag()) | |
| target := ref.Tag() | |
| if target == "" { | |
| target = ref.Digest().String() | |
| } | |
| _, manifestReader, err := client.Manifests().FetchReference(ctx, target) |
| if err := os.Link(sourcePath, destPath); err != nil { | ||
| // Hardlink failed (EXDEV or source was deleted). Remove stale entry and retry fetch. | ||
| logger.WithContext(ctx).WithError(err).Warnf("layer-aware pull: hardlink after singleflight failed for %s, retrying", desc.Digest) | ||
| p.layerCache.RemoveByPrefix(sourcePath) |
There was a problem hiding this comment.
Using RemoveByPrefix with a full file path (sourcePath) is conceptually incorrect and potentially dangerous due to the prefix matching logic. It should use Remove(sourcePath) instead. Additionally, if the error was EXDEV (cross-device link), the source file in the cache is still valid and should not be removed; removal should only happen if the source is actually missing (e.g., ENOENT).
| p.layerCache.RemoveByPrefix(sourcePath) | |
| if os.IsNotExist(err) { | |
| p.layerCache.Remove(sourcePath) | |
| } |
Fixes #24
Fixes #35
Summary
Implement node-level OCI layer deduplication and concurrency control for concurrent model pulls.
This replaces the previous volume-scoped inflight behavior with layer-aware orchestration using:
Concurrent pods requesting identical model content on the same node now perform exactly one network fetch per layer digest while safely reusing cached content locally.
Changes
LayerCache
Added a node-scoped LayerCache with:
Layer-aware pull orchestration
Implemented manual layer orchestration in the pull path:
Progress and observability accounting
Added:
go Hook.LayerCached(...)
Cached and waiting callers now:
Only the goroutine executing the real network fetch emits pull telemetry.
Worker integration
Validation
Verified with:
bash go test ./... go test -race ./pkg/service/... golangci-lint run
Additional coverage includes: