Skip to content

Commit 09af4e4

Browse files
committed
Merge branch 'main' of https://github.com/junwha/DockerDiff
2 parents e18dfff + 07be7cb commit 09af4e4

3 files changed

Lines changed: 358 additions & 5 deletions

File tree

README.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,13 @@ echo "export PATH=$(pwd)/DockerDiff:\$PATH" >> ~/.bashrc && source ~/.bashrc
7373
# Docker base images
7474
For easier sharing of base images, DockerDiff provides several pre-configured base images:
7575

76-
- [ddiff-base](https://hub.docker.com/r/junwha/ddiff-base): A CUDA base image with essential tools (e.g., Git, Vim, OpenSSH).
77-
- [ddiff-base-py](https://hub.docker.com/r/junwha/ddiff-base-py): A Conda-based image with a specific Python version, built on `ddiff-base`.
78-
- [ddiff-base-torch](https://hub.docker.com/r/junwha/ddiff-base-torch): A PyTorch image with a specific Python version, built on `ddiff-base-py`.
76+
```
77+
docker pull junwha/ddiff-base:py3.10-torch2.4.1
78+
```
79+
80+
~~- [dslice-base](https://hub.docker.com/r/junwha/dslice-base): A CUDA base image with essential tools (e.g., Git, Vim, OpenSSH).~~
81+
~~- [dslice-base-py](https://hub.docker.com/r/junwha/dslice-base-py): A Conda-based image with a specific Python version, built on `dslice-base`.~~
82+
~~- [dslice-base-torch](https://hub.docker.com/r/junwha/dslice-base-torch): A PyTorch image with a specific Python version, built on `dslice-base-py`.~~
7983

8084
# Requirements
81-
- Python 3.X
85+
- Python 3.X

ddiff.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def diff_image(base_tag, target_tag):
213213
def load_image(image_tarball):
214214
input_dir = ".ddiff-image"
215215
shutil.rmtree(input_dir, ignore_errors=True)
216-
with tarfile.open(image_tarball, "r:gz") as tar:
216+
with tarfile.open(image_tarball) as tar:
217217
tar.extractall()
218218

219219
with open(os.path.join(input_dir, "BASE")) as f:

hub2registry.py

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
#!/usr/bin/env python3
2+
# registry_seed_parallel.py
3+
# Build a registry:2.8.3-compatible on-disk store directly from a list of Docker Hub images.
4+
# Parallel, image-by-image processing with best-effort deduplication.
5+
#
6+
# Input: a text file with one "repo[:tag]" per line (e.g., swebench/foo:latest)
7+
# Output: directory you can mount as /var/lib/registry for registry:2.8.3
8+
#
9+
# What it does (per image, in parallel):
10+
# 1) Resolve manifest (if index, pick linux/amd64)
11+
# 2) Compute manifest blob digest = sha256(manifest_bytes)
12+
# 3) For each required blob (manifest-as-blob, config, layers):
13+
# - If target path already exists, skip
14+
# - Otherwise download (streamed, sha256-verified)
15+
# 4) Populate registry filesystem links for the repo:tag
16+
#
17+
# Notes:
18+
# - Optional Docker Hub token is used to mitigate rate limits (public pulls).
19+
# - Parallelism is per image, not per blob (simpler dedup; avoids global gather).
20+
# - Uses best-effort in-memory dedup across threads; file existence checks remain the source of truth.
21+
# - Only linux/amd64 is selected from manifest lists.
22+
# - This script does NOT run the registry; it only prepares the data dir.
23+
24+
import os
25+
import json
26+
import argparse
27+
import hashlib
28+
import time
29+
import threading
30+
from typing import Tuple, Dict, List, Optional
31+
import requests
32+
from concurrent.futures import ThreadPoolExecutor, as_completed
33+
34+
SRC_REG = "https://registry-1.docker.io"
35+
ACCEPT = ",".join([
36+
"application/vnd.docker.distribution.manifest.list.v2+json",
37+
"application/vnd.oci.image.index.v1+json",
38+
"application/vnd.docker.distribution.manifest.v2+json",
39+
"application/vnd.oci.image.manifest.v1+json",
40+
])
41+
42+
# --------- globals for best-effort cross-thread dedup ---------
43+
DOWNLOADED = set() # set of digests (e.g., "sha256:...") known to be downloaded this run
44+
DOWNLOADED_LOCK = threading.Lock()
45+
46+
# Thread-local session holder
47+
TLS = threading.local()
48+
49+
# ---------- FS helpers ----------
50+
def ensure(path: str):
51+
os.makedirs(path, exist_ok=True)
52+
53+
def blob_file_path(root: str, digest: str) -> str:
54+
algo, hexd = digest.split(":", 1)
55+
return os.path.join(root, "docker", "registry", "v2", "blobs", algo, hexd[:2], hexd, "data")
56+
57+
def write_link(path: str, digest: str):
58+
ensure(os.path.dirname(path))
59+
with open(path, "w", encoding="utf-8") as f:
60+
f.write(digest)
61+
62+
def sha256_hex_of_file(fp) -> str:
63+
h = hashlib.sha256()
64+
for chunk in iter(lambda: fp.read(1024 * 1024), b""):
65+
h.update(chunk)
66+
return h.hexdigest()
67+
68+
def sha256_hex_of_bytes(b: bytes) -> str:
69+
h = hashlib.sha256()
70+
h.update(b)
71+
return h.hexdigest()
72+
73+
# ---------- Docker Hub session/auth ----------
74+
def dockerhub_token(repository: str, scope: str = "pull") -> str:
75+
r = requests.get(
76+
"https://auth.docker.io/token",
77+
params={"service": "registry.docker.io", "scope": f"repository:{repository}:{scope}"},
78+
timeout=30,
79+
)
80+
r.raise_for_status()
81+
return r.json()["token"]
82+
83+
def get_session(repository: str) -> requests.Session:
84+
# Keep one Session per thread for connection reuse.
85+
if getattr(TLS, "session", None) is None:
86+
s = requests.Session()
87+
# Best-effort token; if it fails, continue without Authorization.
88+
try:
89+
tok = dockerhub_token(repository, "pull")
90+
s.headers["Authorization"] = f"Bearer {tok}"
91+
except Exception:
92+
pass
93+
TLS.session = s
94+
return TLS.session
95+
96+
# ---------- Image ref parsing ----------
97+
def parse_ref(line: str) -> Optional[Tuple[str, str]]:
98+
line = line.strip()
99+
if not line:
100+
return None
101+
# repo[:tag] (default tag=latest)
102+
if ":" in line.split("/")[-1]:
103+
repo, tag = line.rsplit(":", 1)
104+
else:
105+
repo, tag = line, "latest"
106+
# "library" fallback if single segment like "ubuntu"
107+
if "/" not in repo:
108+
repo = f"library/{repo}"
109+
return repo, tag
110+
111+
# ---------- HTTP helpers with simple retry ----------
112+
def _with_retry(fn, *, retries=5, base_delay=1.0, max_delay=8.0):
113+
last = None
114+
for i in range(retries):
115+
try:
116+
return fn()
117+
except requests.HTTPError as e:
118+
status = e.response.status_code if e.response is not None else None
119+
# Retry on 429 and 5xx
120+
if status in (429, 500, 502, 503, 504):
121+
delay = min(max_delay, base_delay * (2 ** i))
122+
time.sleep(delay)
123+
last = e
124+
continue
125+
raise
126+
except (requests.ConnectionError, requests.Timeout) as e:
127+
delay = min(max_delay, base_delay * (2 ** i))
128+
time.sleep(delay)
129+
last = e
130+
continue
131+
if last:
132+
raise last
133+
134+
# ---------- Manifest & blob fetch ----------
135+
def fetch_manifest(repo: str, ref: str) -> Tuple[bytes, dict, str]:
136+
"""
137+
Return (manifest_bytes, manifest_json, content_type).
138+
If ref resolves to a list/index, select linux/amd64 manifest.
139+
"""
140+
s = get_session(repo)
141+
142+
def _get(url):
143+
r = s.get(url, headers={"Accept": ACCEPT}, timeout=60)
144+
r.raise_for_status()
145+
return r
146+
147+
r = _with_retry(lambda: _get(f"{SRC_REG}/v2/{repo}/manifests/{ref}"))
148+
ct = r.headers.get("Content-Type", "")
149+
body = r.content
150+
151+
if "manifest.list.v2+json" in ct or "image.index.v1+json" in ct:
152+
idx = r.json()
153+
picked = None
154+
for m in idx.get("manifests", []):
155+
p = m.get("platform", {})
156+
if p.get("os") == "linux" and p.get("architecture") == "amd64":
157+
picked = m["digest"]
158+
break
159+
if not picked:
160+
raise RuntimeError(f"linux/amd64 not found in index for {repo}:{ref}")
161+
162+
r2 = _with_retry(lambda: _get(f"{SRC_REG}/v2/{repo}/manifests/{picked}"))
163+
ct = r2.headers.get("Content-Type", "")
164+
body = r2.content
165+
166+
man = json.loads(body.decode("utf-8"))
167+
return body, man, ct
168+
169+
def stream_download_blob(repo: str, digest: str, dest_path: str):
170+
"""
171+
Stream download a blob from Docker Hub into dest_path.
172+
Verify sha256 while writing.
173+
Skip if file already exists or was downloaded by another thread.
174+
"""
175+
# Check if already present on disk
176+
if os.path.exists(dest_path):
177+
with DOWNLOADED_LOCK:
178+
DOWNLOADED.add(digest)
179+
return
180+
181+
# Best-effort in-memory dedup
182+
with DOWNLOADED_LOCK:
183+
if digest in DOWNLOADED:
184+
return
185+
# Tentatively mark as in-progress to reduce duplicate downloads
186+
DOWNLOADED.add(digest)
187+
188+
s = get_session(repo)
189+
url = f"{SRC_REG}/v2/{repo}/blobs/{digest}"
190+
191+
def _get_stream():
192+
r = s.get(url, stream=True, timeout=600)
193+
r.raise_for_status()
194+
return r
195+
196+
with _with_retry(_get_stream) as r:
197+
algo, hexd = digest.split(":", 1)
198+
h = hashlib.sha256()
199+
ensure(os.path.dirname(dest_path))
200+
tmp = dest_path + ".part"
201+
with open(tmp, "wb") as f:
202+
for chunk in r.iter_content(chunk_size=1024 * 1024):
203+
if chunk:
204+
h.update(chunk)
205+
f.write(chunk)
206+
got = h.hexdigest()
207+
if got != hexd:
208+
# Clear downloaded marker on mismatch so another attempt can proceed
209+
with DOWNLOADED_LOCK:
210+
DOWNLOADED.discard(digest)
211+
os.remove(tmp)
212+
raise RuntimeError(f"SHA256 mismatch for {digest}: got {got}")
213+
os.replace(tmp, dest_path)
214+
215+
# ---------- Registry mapping (repositories/* links) ----------
216+
def map_repository(root: str, repo: str, tag: str, manifest_digest: str, config: str, layers: List[str]):
217+
regv2 = os.path.join(root, "docker", "registry", "v2")
218+
repo_root = os.path.join(regv2, "repositories", *repo.split("/"))
219+
220+
# _layers (config + layers)
221+
for dg in [config] + list(layers):
222+
hexd = dg.split(":", 1)[1]
223+
link = os.path.join(repo_root, "_layers", "sha256", hexd, "link")
224+
write_link(link, dg)
225+
226+
# revisions/<mdgst>/link
227+
mdhex = manifest_digest.split(":", 1)[1]
228+
rev_link = os.path.join(repo_root, "_manifests", "revisions", "sha256", mdhex, "link")
229+
write_link(rev_link, manifest_digest)
230+
231+
# tags/<tag>/index/sha256/<mdgst>/link
232+
idx_link = os.path.join(repo_root, "_manifests", "tags", tag, "index", "sha256", mdhex, "link")
233+
write_link(idx_link, manifest_digest)
234+
235+
# tags/<tag>/current/link
236+
cur_link = os.path.join(repo_root, "_manifests", "tags", tag, "current", "link")
237+
write_link(cur_link, manifest_digest)
238+
239+
# ---------- Per-image worker ----------
240+
def process_image(out: str, repo: str, tag: str) -> Dict[str, str]:
241+
"""
242+
Process one image (repo:tag):
243+
- fetch manifest (or selected manifest from index)
244+
- compute manifest digest and store as a blob (if missing)
245+
- download config + layers if missing
246+
- write repository links
247+
Returns a small index dict for debugging.
248+
"""
249+
# Fetch manifest (or selected platform manifest)
250+
man_bytes, man, mct = fetch_manifest(repo, tag)
251+
mdhex = sha256_hex_of_bytes(man_bytes)
252+
mdgst = f"sha256:{mdhex}"
253+
254+
# Prepare list of required blobs for this image
255+
cfg = man["config"]["digest"]
256+
layers = [l["digest"] for l in man.get("layers", [])]
257+
258+
# Ensure manifest blob exists (write bytes directly if missing)
259+
m_dest = blob_file_path(out, mdgst)
260+
if not os.path.exists(m_dest):
261+
ensure(os.path.dirname(m_dest))
262+
tmp = m_dest + ".part"
263+
with open(tmp, "wb") as f:
264+
f.write(man_bytes)
265+
# Verify content hash just in case
266+
with open(tmp, "rb") as f:
267+
got = sha256_hex_of_file(f)
268+
if got != mdhex:
269+
os.remove(tmp)
270+
raise RuntimeError(f"Manifest blob sha mismatch for {mdgst}")
271+
os.replace(tmp, m_dest)
272+
with DOWNLOADED_LOCK:
273+
DOWNLOADED.add(mdgst)
274+
275+
# Download config + layer blobs (skip existing)
276+
for dg in [cfg] + layers:
277+
dest = blob_file_path(out, dg)
278+
if os.path.exists(dest):
279+
with DOWNLOADED_LOCK:
280+
DOWNLOADED.add(dg)
281+
continue
282+
# Find any repo that can serve this blob; we use the current repo for simplicity.
283+
stream_download_blob(repo, dg, dest)
284+
285+
# Write repository links so the registry can discover the tag
286+
map_repository(out, repo, tag, mdgst, cfg, layers)
287+
288+
# Return meta for a global index
289+
return {
290+
"repo": repo,
291+
"tag": tag,
292+
"manifest_digest": mdgst,
293+
"config": cfg,
294+
"layers": layers,
295+
"manifest_media_type": mct,
296+
}
297+
298+
# ---------- Main ----------
299+
def main():
300+
ap = argparse.ArgumentParser(description="Seed a registry:2.8.3 data dir from Docker Hub images (parallel, image-by-image).")
301+
ap.add_argument("--images", required=True, help="Text file: one <repo[:tag]> per line")
302+
ap.add_argument("--out", required=True, help="Output dir (mount this as /var/lib/registry)")
303+
ap.add_argument("--workers", type=int, default=min(8, (os.cpu_count() or 4)*2),
304+
help="Number of parallel image workers (default: 2x CPUs, capped at 8)")
305+
args = ap.parse_args()
306+
307+
out = os.path.abspath(args.out)
308+
regv2 = os.path.join(out, "docker", "registry", "v2")
309+
ensure(os.path.join(regv2, "blobs"))
310+
ensure(os.path.join(regv2, "repositories"))
311+
312+
# Parse image list
313+
with open(args.images, "r", encoding="utf-8") as f:
314+
raw_lines = [ln.strip() for ln in f if ln.strip()]
315+
refs = [parse_ref(ln) for ln in raw_lines]
316+
refs = [r for r in refs if r is not None]
317+
318+
results = []
319+
errors = []
320+
321+
# Process images in parallel (image-by-image)
322+
with ThreadPoolExecutor(max_workers=args.workers) as ex:
323+
fut2ref = {ex.submit(process_image, out, repo, tag): (repo, tag) for (repo, tag) in refs}
324+
for fut in as_completed(fut2ref):
325+
repo, tag = fut2ref[fut]
326+
try:
327+
res = fut.result()
328+
results.append(res)
329+
print(f"[ok] {repo}:{tag}")
330+
except Exception as e:
331+
errors.append((repo, tag, str(e)))
332+
print(f"[err] {repo}:{tag} -> {e}")
333+
334+
# Write a combined index for reference/debugging (not used by registry)
335+
meta_dir = os.path.join(out, "meta")
336+
ensure(meta_dir)
337+
with open(os.path.join(meta_dir, "index.json"), "w", encoding="utf-8") as f:
338+
json.dump(results, f, indent=2)
339+
340+
print(f"[done] images={len(refs)}, ok={len(results)}, err={len(errors)}")
341+
if errors:
342+
print("[errors]")
343+
for repo, tag, msg in errors:
344+
print(f" - {repo}:{tag} -> {msg}")
345+
print(f"[next] start registry with:")
346+
print(f"docker run -d --name registry -p 5000:5000 -v {out}:/var/lib/registry registry:2.8.3")
347+
348+
if __name__ == "__main__":
349+
main()

0 commit comments

Comments
 (0)