diff --git a/Makefile b/Makefile index d3331b4..c87e75c 100644 --- a/Makefile +++ b/Makefile @@ -193,6 +193,7 @@ define run_agent_container --workspace-tongs "$$workspace_dir/.swarmforge/tongs" \ --workspace "$$workspace_dir" \ --approvals "$(SWARMFORGE_USER_ASSETS_DIR)/approvals.json" \ + --anvil-image "$(4)" \ -- \ docker run -it --rm --name "$(1)" \ --network "$(NETWORK)" \ diff --git a/scripts/run_anvil.py b/scripts/run_anvil.py index 67baa53..a684679 100644 --- a/scripts/run_anvil.py +++ b/scripts/run_anvil.py @@ -7,13 +7,29 @@ run_anvil.py [--user-tongs DIR] [--org-tongs DIR] [--repo-tongs DIR] [--workspace-tongs DIR] [--workspace PATH] [--approvals PATH] - [--no-prompt] -- docker run -it --rm ... ... + [--anvil-image IMAGE] [--no-prompt] -- docker run -it --rm ... ... Tongs are sibling containers that must be orchestrated from the host (they are started alongside the anvil, not from inside it), which is why this wrapper sits between Make and `docker run`. It discovers tong definitions across the four layers using the pure core in `tongs.py`, then runs the anvil. +Shared tongs +------------ +When a tong is discovered, the launcher starts it before the anvil, waits for it +to report ready, makes it reachable from the anvil, runs the anvil in the +foreground, and leaves the tong running afterwards. A `shared` tong is one +long-lived container keyed by a stable name: a running one whose config-hash +label still matches is reused untouched, and a missing/stopped/stale one is +(re)started. A `port` tong's reachability is injected into the anvil as +environment; a `none` tong is started but has no anvil-facing surface. + +The launcher starts only `shared` tongs reached over the network (`port`) or with +no anvil-facing surface (`none`), carrying no secret references. A `session` +lifecycle, a secret reference, an `mcp` or `volume` interface, or a `shared` tong +that mounts the workspace is refused with a clear message rather than started +half-wired. + First-run approval ------------------ The user, org, and Swarmforge-repo layers are installed deliberately and are @@ -43,6 +59,7 @@ import os import subprocess import sys +import time # Load the pure core (layer discovery + name-based merge) by path, the same way # tongs.py loads translate_agents.py, so the launcher needs no package install @@ -56,7 +73,7 @@ USAGE = ( "usage: run_anvil.py [--user-tongs DIR] [--org-tongs DIR] " "[--repo-tongs DIR] [--workspace-tongs DIR] [--workspace PATH] " - "[--approvals PATH] [--no-prompt] -- " + "[--approvals PATH] [--anvil-image IMAGE] [--no-prompt] -- " ) # Each flag names the host directory for one definition layer. The merge always @@ -70,10 +87,13 @@ } # Parsed launcher options. `workspace` is the workspace root used to key approval -# of workspace-sourced tongs; `approvals` is the store path (default resolved in -# main); `no_prompt` makes the approval gate fail closed for scripted runs. +# of workspace-sourced tongs and to resolve the `workspace` mount word; `approvals` +# is the store path (default resolved in main); `anvil_image` is the image the +# readiness prober runs to dial a tong's network-internal port; `no_prompt` makes +# the approval gate fail closed for scripted runs. LauncherOptions = collections.namedtuple( - "LauncherOptions", ["layer_dirs", "workspace", "approvals", "no_prompt"] + "LauncherOptions", + ["layer_dirs", "workspace", "approvals", "anvil_image", "no_prompt"], ) @@ -92,6 +112,7 @@ def parse_args(argv): paths = {} workspace = None approvals = None + anvil_image = None no_prompt = False index = 0 while index < len(argv): @@ -101,7 +122,10 @@ def parse_args(argv): if not anvil_cmd: raise UsageError("missing anvil command after '--'") layer_dirs = [(layer, paths[layer]) for layer in tongs.LAYERS if layer in paths] - return LauncherOptions(layer_dirs, workspace, approvals, no_prompt), anvil_cmd + return ( + LauncherOptions(layer_dirs, workspace, approvals, anvil_image, no_prompt), + anvil_cmd, + ) if token in LAYER_FLAGS: if index + 1 >= len(argv): raise UsageError("%s requires a directory argument" % token) @@ -120,6 +144,12 @@ def parse_args(argv): approvals = argv[index + 1] index += 2 continue + if token == "--anvil-image": + if index + 1 >= len(argv): + raise UsageError("--anvil-image requires an image argument") + anvil_image = argv[index + 1] + index += 2 + continue if token == "--no-prompt": no_prompt = True index += 1 @@ -298,6 +328,343 @@ def resolve(provider, ref): return resolve +# --- Docker seam -------------------------------------------------------------- +# Every docker invocation goes through DockerCLI so the orchestration logic can +# be unit-tested against a fake. The methods are thin wrappers; the launch +# sequencing and policy live in `run_with_tongs`. `_run` defaults to +# subprocess.run and is the single injection point for tests. + + +class DockerError(Exception): + """A docker command the launch depends on failed; the launch must stop.""" + + +# Labels read back to decide whether a running `shared` container is stale. +_INSPECT_STATE_FORMAT = ( + '{{.State.Running}}|{{index .Config.Labels "%s"}}' % tongs.LABEL_CONFIG_HASH +) +_INSPECT_HEALTH_FORMAT = "{{if .State.Health}}{{.State.Health.Status}}{{end}}" + + +class DockerCLI: + def __init__(self, run=None): + self._run = run or subprocess.run + + def _quiet(self, argv): + """Run a command whose output we don't need; return its exit code.""" + return self._run( + argv, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ).returncode + + def _checked(self, argv): + """Run a command the launch depends on; raise DockerError on failure.""" + try: + completed = self._run(argv, stdout=subprocess.DEVNULL) + except OSError as exc: + raise DockerError("could not run %r: %s" % (argv[:3], exc)) + if completed.returncode != 0: + raise DockerError( + "docker command failed (exit %d): %s" + % (completed.returncode, " ".join(argv[:4])) + ) + + def rm_force(self, container): + self._quiet(["docker", "rm", "-f", container]) + + def run_detached(self, argv): + self._checked(argv) + + def inspect_state(self, container): + """`{"running": bool, "label": str|None}` for a container, or None if absent.""" + completed = self._run( + ["docker", "inspect", "--format", _INSPECT_STATE_FORMAT, container], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + if completed.returncode != 0: + return None + running, _, label = _decode(completed.stdout).strip().partition("|") + return {"running": running == "true", "label": label or None} + + def health_status(self, container): + completed = self._run( + ["docker", "inspect", "--format", _INSPECT_HEALTH_FORMAT, container], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + if completed.returncode != 0: + return None + return _decode(completed.stdout).strip() or None + + def exec_ok(self, container, command): + return self._quiet(["docker", "exec", container] + list(command)) == 0 + + def tcp_probe(self, network, host, port, image): + """True if `host:port` accepts a TCP connection from within `network`. + + Runs a throwaway container on the network -- the anvil image, which has + python3 -- since a tong's own port is only reachable over the docker + network, not from the host. + """ + script = ( + "import socket,sys\n" + "s=socket.socket()\n" + "s.settimeout(2)\n" + "try:\n" + " s.connect((sys.argv[1], int(sys.argv[2])))\n" + "except OSError:\n" + " sys.exit(1)\n" + ) + argv = ["docker", "run", "--rm", "--network", network, + "--entrypoint", "python3", image, "-c", script, host, str(port)] + return self._quiet(argv) == 0 + + def run_foreground(self, argv): + """Run the anvil in the foreground and return its exit code. + + Popen + wait (rather than exec) so the launcher regains control after the + anvil exits. On Ctrl-C the SIGINT reaches both this process and the anvil + through the controlling terminal's process group; the anvil handles it and + exits, we reap it, and the KeyboardInterrupt propagates to the caller. + """ + try: + proc = subprocess.Popen(argv) + except OSError as exc: + raise DockerError("cannot run anvil %r: %s" % (argv[:2], exc)) + try: + return proc.wait() + except KeyboardInterrupt: + proc.wait() + raise + + +def _decode(output): + if isinstance(output, bytes): + return output.decode("utf-8", "replace") + return output or "" + + +# --- Readiness ---------------------------------------------------------------- + + +def wait_ready(docker, container, defn, alias, network, *, anvil_image, + sleep=time.sleep, monotonic=time.monotonic, interval=0.5): + """Block until a tong reports ready, returning True/False on timeout. + + Dispatches on the tong's resolved readiness mode (see + `tongs.readiness_settings`): `tcp` dials the canonical alias on the network; + `healthcheck` runs the declared exec command or polls the image HEALTHCHECK; + `none` is treated as ready immediately. A `tcp` probe dials the tong's + network-internal port from a throwaway container, which needs both a network + to dial on and the anvil image to run from; without either it degrades to "is + the container running" -- decided and warned once, not on every poll. + """ + mode, command, timeout_s = tongs.readiness_settings(defn) + if mode == "none": + return True + + interface = defn.get("interface") or {} + port = interface.get("port") + + tcp_degraded = mode == "tcp" and (not anvil_image or not network) + if tcp_degraded: + tongs.warn( + "cannot run a TCP readiness probe of '%s' (no anvil image or " + "network); falling back to a container-running check" % container + ) + + def probe(): + if mode == "tcp": + if tcp_degraded: + state = docker.inspect_state(container) + return bool(state and state["running"]) + return docker.tcp_probe(network, alias, port, anvil_image) + # healthcheck + if command: + return docker.exec_ok(container, command) + return docker.health_status(container) == "healthy" + + start = monotonic() + while True: + if probe(): + return True + if monotonic() - start >= timeout_s: + return False + sleep(interval) + + +# --- Orchestration ------------------------------------------------------------ + + +class OrchestrationError(Exception): + """A tong could not be started/made ready; the launch stops.""" + + +def _mounts_workspace(defn): + """True if a tong's `mounts:` request the session workspace. + + The magic word may carry a `:mode` suffix (e.g. `workspace:ro`), so compare + only the word before the colon. + """ + for mount in defn.get("mounts") or []: + if isinstance(mount, str) and mount.split(":", 1)[0] == tongs.WORKSPACE_MOUNT: + return True + return False + + +def unsupported_tong_reasons(merged): + """Reasons each discovered tong is outside what the launcher can start. + + The launcher starts only `shared`, network-or-nothing tongs that hold no + secret. Refused here: + + * a `session` lifecycle -- it needs a per-session network; + * a secret reference -- it needs tmpfs delivery; + * an `mcp` interface -- it needs generated MCP config; + * a `volume` interface -- a shared named volume has no consumer yet, so it + is not wired into either container; + * a `shared` tong that mounts the `workspace` -- a `shared` tong is one + long-lived container reused across sessions, so binding one session's + workspace into it would expose that workspace to every later session that + reuses the container (a `session` tong is the right home for a + per-workspace mount). + + A refused tong is reported rather than started half-wired. Returns a list of + human-readable reason strings (empty == every discovered tong is startable). + """ + reasons = [] + for name in sorted(merged): + defn = merged[name]["definition"] + kind = (defn.get("interface") or {}).get("kind") + if defn.get("lifecycle") == "session": + reasons.append( + "tong '%s' is a 'session' tong, which this launcher does not " + "start (only 'shared' tongs are supported)" % name + ) + if tongs.find_secret_refs(defn): + reasons.append( + "tong '%s' references a secret, which this launcher does not " + "deliver" % name + ) + if kind == "mcp": + reasons.append( + "tong '%s' has an 'mcp' interface, which this launcher does not " + "wire up" % name + ) + if kind == "volume": + reasons.append( + "tong '%s' has a 'volume' interface, which this launcher does not " + "wire up" % name + ) + if defn.get("lifecycle") == "shared" and _mounts_workspace(defn): + reasons.append( + "tong '%s' is a 'shared' tong that mounts the workspace; a shared " + "container is reused across sessions, so it would leak one " + "session's workspace into the next" % name + ) + return reasons + + +def _start_shared_tong(docker, name, defn, *, container, network, alias, + workspace, label_hash): + """Start one `shared` tong container detached, replacing any old one. + + The launcher only reaches here for a secret-less tong, so the definition's + `env` is passed straight through as `-e`; any existing container of the same + name is removed first so a stale or stopped one is replaced cleanly. + """ + argv = tongs.tong_run_argv( + name, defn, + container_name=container, network=network, alias=alias, + env=defn.get("env") or {}, label_hash=label_hash, workspace=workspace, + ) + docker.rm_force(container) + docker.run_detached(argv) + + +def _ensure_shared_tong(docker, name, defn, *, container, network, alias, + workspace, label_hash): + """Start a `shared` tong, or reuse the running one, recreating it if stale. + + A `shared` tong is one long-lived container keyed by `shared_container_name`. + Its config-hash label answers "did the definition change since it started?": + a missing container, a stopped one, or a hash mismatch triggers a fresh start + (removing any old container first); a running container with a matching hash + is reused untouched. The hash is over the merged definition, so the same + long-lived container is reused across sessions while the definition is stable. + """ + state = docker.inspect_state(container) + if state and state["running"] and state["label"] == label_hash: + return + _start_shared_tong( + docker, name, defn, + container=container, network=network, alias=alias, + workspace=workspace, label_hash=label_hash, + ) + + +def _injection_pre_image_args(injection): + """`-e`/`-v` options the discovered tongs add to the anvil before the image. + + A `port` tong contributes the env vars the anvil reads to reach it. The + named-volume mount path is a faithful consumer of `plan_injection`'s shape but + stays empty here, since `volume` tongs are refused before this runs. + """ + args = [] + for key in sorted(injection["env"]): + args += ["-e", "%s=%s" % (key, injection["env"][key])] + for mount in injection["mounts"]: + args += ["-v", "%s:%s" % (mount["volume"], mount["mountpoint"])] + return args + + +def run_with_tongs(merged, anvil_cmd, opts, *, docker, + sleep=time.sleep, monotonic=time.monotonic): + """Start the discovered `shared` tongs, run the anvil, and leave them running. + + Only reached when at least one tong was discovered and every tong is startable + (the empty case stays a direct exec; unsupported tongs are refused earlier). + Sequence: ensure each `shared` tong is up on the anvil's base network + (reusing a running one whose config hash still matches), probe each tong's + readiness, inject `port` reachability into the anvil argv, then run the anvil + in the foreground. `shared` tongs are long-lived, so nothing is torn down when + the anvil exits. + + Returns the anvil's exit code. Raises `OrchestrationError` if a tong never + becomes ready -- the anvil does not run against a half-up environment. + """ + base_network = tongs.anvil_option_value(anvil_cmd, "--network") + # Only `port`/`none` tongs reach this path (`mcp`/`volume` are refused + # upstream), so the MCP emitter is unused and the injection is `port` env only. + injection = tongs.plan_injection(merged, None) + + ready_checks = [] + for name in sorted(merged): + defn = merged[name]["definition"] + alias = tongs.canonical_alias(name, defn) + label_hash = tongs.config_hash(defn) + container = tongs.shared_container_name(name) + _ensure_shared_tong( + docker, name, defn, + container=container, network=base_network, alias=alias, + workspace=opts.workspace, label_hash=label_hash, + ) + ready_checks.append((name, defn, alias, base_network, container)) + + for name, defn, alias, probe_net, container in ready_checks: + if not wait_ready( + docker, container, defn, alias, probe_net, + anvil_image=opts.anvil_image, sleep=sleep, monotonic=monotonic, + ): + raise OrchestrationError("tong '%s' did not become ready in time" % name) + + injected = tongs.inject_anvil_argv( + anvil_cmd, network=base_network, + pre_image_args=_injection_pre_image_args(injection), + ) + return docker.run_foreground(injected) + + def exec_anvil(anvil_cmd): """Exec the anvil argv, replacing this process. @@ -336,19 +703,47 @@ def main(argv): tongs.warn(str(exc)) return 1 - if merged: - # The launcher discovers tongs but does not start them; the anvil runs - # without them. The passthrough invariant only governs the empty case, - # so surface the discovered tongs rather than ignoring them silently. - tongs.warn( - "%d tong definition(s) discovered (%s); this launcher does not " - "start tongs, so the anvil runs without them" - % (len(merged), ", ".join(sorted(merged))) - ) + # Passthrough invariant: with no tong definitions discovered, exec the anvil + # argv verbatim -- byte-identical to the direct docker run, and the process + # is replaced so the controlling tty, signals, and --rm cleanup are untouched. + if not merged: + return exec_anvil(anvil_cmd) + + # From here a tong actually starts, so validate before touching docker: an + # invalid definition should stop the launch with a clear message, not fail + # mid-orchestration with a docker error. + errors = [] + for name in sorted(merged): + errors.extend(tongs.validate_tong(name, merged[name]["definition"])) + if errors: + for error in errors: + tongs.warn(error) + return 1 - # On success exec_anvil replaces this process; it only returns a status if - # the anvil command could not be execed. - return exec_anvil(anvil_cmd) + # Refuse anything this launcher cannot start (see unsupported_tong_reasons: + # a session lifecycle, a secret reference, an MCP or volume interface, or a + # shared tong mounting the workspace) rather than starting it half-wired. + # Every remaining tong is a `port`/`none` tong, whose canonical alias is its + # unique filename, so no two can claim the same network alias. + unsupported = unsupported_tong_reasons(merged) + if unsupported: + for reason in unsupported: + tongs.warn(reason) + return 1 + + # run_with_tongs runs the anvil in the foreground and returns its exit code, + # leaving the (long-lived) shared tongs running. A tong that never becomes + # ready stops the launch rather than running the anvil against a half-up + # environment. + try: + return run_with_tongs(merged, anvil_cmd, opts, docker=DockerCLI()) + except (OrchestrationError, DockerError) as exc: + tongs.warn(str(exc)) + return 1 + except KeyboardInterrupt: + # The anvil was interrupted (Ctrl-C); the shared tongs stay running by + # design. Report the conventional 128+SIGINT status. + return 130 if __name__ == "__main__": diff --git a/scripts/test_run_anvil.py b/scripts/test_run_anvil.py index deddba7..138cf0a 100644 --- a/scripts/test_run_anvil.py +++ b/scripts/test_run_anvil.py @@ -78,17 +78,24 @@ def test_approval_options_default_to_inert(self): opts, _ = run_anvil.parse_args(["--", "x"]) self.assertIsNone(opts.workspace) self.assertIsNone(opts.approvals) + self.assertIsNone(opts.anvil_image) self.assertFalse(opts.no_prompt) def test_parses_workspace_approvals_and_no_prompt(self): opts, cmd = run_anvil.parse_args( - ["--workspace", "/ws", "--approvals", "/a.json", "--no-prompt", "--", "x"] + ["--workspace", "/ws", "--approvals", "/a.json", + "--anvil-image", "anvil:img", "--no-prompt", "--", "x"] ) self.assertEqual(opts.workspace, "/ws") self.assertEqual(opts.approvals, "/a.json") + self.assertEqual(opts.anvil_image, "anvil:img") self.assertTrue(opts.no_prompt) self.assertEqual(cmd, ["x"]) + def test_anvil_image_without_value_raises(self): + with self.assertRaises(run_anvil.UsageError): + run_anvil.parse_args(["--anvil-image"]) + def test_workspace_without_value_raises(self): with self.assertRaises(run_anvil.UsageError): run_anvil.parse_args(["--workspace"]) @@ -185,16 +192,15 @@ def test_missing_workspace_tongs_dir_forwards_verbatim(self): self.assertEqual(forwarded, ANVIL_ARGV) self.assertNotIn("tong", stderr) - def test_present_trusted_tong_still_forwards_anvil_argv_unchanged(self): - # A trusted-layer (repo) tong is discovered but not gated, and the - # launcher does not rewrite the anvil command; it warns and runs the - # anvil as given. (A workspace tong would gate -- see GateTests.) - with tempfile.TemporaryDirectory() as tmp: - with open(os.path.join(tmp, "gh.yaml"), "w") as handle: - handle.write("lifecycle: session\nimage: x\ninterface:\n kind: none\n") - forwarded, stderr = _run_launcher(["--repo-tongs", tmp]) - self.assertEqual(forwarded, ANVIL_ARGV) - self.assertIn("gh", stderr) + def test_launcher_flags_do_not_leak_into_anvil_argv(self): + # The Makefile always passes --anvil-image; with no tongs it is consumed + # by the launcher and the anvil argv is forwarded unchanged. + forwarded, stderr = _run_launcher([ + "--anvil-image", "opencode:local", + "--repo-tongs", "/nonexistent/tongs", + ]) + self.assertEqual(forwarded, ANVIL_ARGV) + self.assertNotIn("tong", stderr) def _run_launcher_raw(extra_args, stdin_text=None): @@ -438,23 +444,413 @@ def test_no_prompt_unapproved_does_not_forward_anvil(self): self.assertEqual(completed.stdout, "") self.assertIn("fails closed", completed.stderr) - def test_approved_workspace_tong_forwards_verbatim(self): + def test_approved_workspace_tong_passes_gate_then_refused_as_unsupported(self): + # Approval is no longer the only gate: an approved (and otherwise valid) + # workspace tong clears the approval prompt but, being a `session` tong, is + # then refused as unsupported -- proving the gate passed without the anvil + # ever running. with tempfile.TemporaryDirectory() as tmp: - tongs_dir = self._workspace_tongs_dir(tmp) + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "gh.yaml"), "w") as handle: + handle.write( + "lifecycle: session\nimage: x\ninterface:\n kind: none\n" + "readiness:\n mode: none\n" + ) defn = tongs.load_tong_file(os.path.join(tongs_dir, "gh.yaml")) approvals_path = os.path.join(tmp, "approvals.json") tongs.save_approvals( approvals_path, tongs.record_approval({}, tmp, "gh", defn) ) - forwarded, stderr = _run_launcher( + completed = _run_launcher_raw( [ "--workspace-tongs", tongs_dir, "--workspace", tmp, "--approvals", approvals_path, ] ) - self.assertEqual(forwarded, ANVIL_ARGV) - self.assertIn("gh", stderr) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") # anvil never ran + self.assertIn("session", completed.stderr) + self.assertNotIn("fails closed", completed.stderr) + + def test_invalid_tong_returns_one_without_exec(self): + # A discovered but invalid definition stops the launch before docker. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "bad.yaml"), "w") as handle: + handle.write("image: x\n") # missing lifecycle + interface + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") # anvil never ran + + def test_session_tong_refused_without_exec(self): + # A `session` tong is beyond the shared-only launch path, so it is refused + # before any docker call -- the anvil never runs. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "ship.yaml"), "w") as handle: + handle.write( + "lifecycle: session\nimage: x\ninterface:\n kind: none\n" + "readiness:\n mode: none\n" + ) + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") + self.assertIn("session", completed.stderr) + + def test_secret_tong_refused_without_exec(self): + # A shared tong that references a secret cannot be delivered here, so it + # is refused before docker. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "creds.yaml"), "w") as handle: + handle.write( + "lifecycle: shared\nimage: x\n" + "env:\n TOKEN: ${secret:op:op://Work/t}\n" + "interface:\n kind: none\nreadiness:\n mode: none\n" + ) + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") + self.assertIn("secret", completed.stderr) + + def test_keyboard_interrupt_during_run_returns_130(self): + # Ctrl-C while the anvil runs leaves the (long-lived) shared tongs up and + # reports the conventional 128+SIGINT status rather than a traceback. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "shipper.yaml"), "w") as handle: + handle.write( + "lifecycle: shared\nimage: x\ninterface:\n kind: none\n" + "readiness:\n mode: none\n" + ) + with mock.patch.object(run_anvil, "run_with_tongs", side_effect=KeyboardInterrupt): + rc = run_anvil.main(["--repo-tongs", tongs_dir, "--", "/no/such/binary-xyz"]) + self.assertEqual(rc, 130) + + def test_mcp_tong_refused_without_exec(self): + # An `mcp`-interface tong needs generated MCP config the launcher does not + # emit here, so it is refused before docker. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "gh.yaml"), "w") as handle: + handle.write( + "lifecycle: shared\nimage: x\ninterface:\n" + " kind: mcp\n name: github\n port: 8080\n" + "readiness:\n mode: none\n" + ) + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") + self.assertIn("mcp", completed.stderr) + + def test_volume_tong_refused_without_exec(self): + # A `volume` interface (a shared named volume) has no consumer yet, so it + # is refused before docker. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "cache.yaml"), "w") as handle: + handle.write( + "lifecycle: shared\nimage: x\ninterface:\n" + " kind: volume\n volume: build-cache\n mountpoint: /cache\n" + "readiness:\n mode: none\n" + ) + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") + self.assertIn("volume", completed.stderr) + + def test_shared_workspace_mount_refused_without_exec(self): + # A `shared` tong is reused across sessions, so mounting the workspace + # into it would leak one session's workspace into the next -- refused. + with tempfile.TemporaryDirectory() as tmp: + tongs_dir = os.path.join(tmp, "tongs") + os.makedirs(tongs_dir) + with open(os.path.join(tongs_dir, "watch.yaml"), "w") as handle: + handle.write( + "lifecycle: shared\nimage: x\nmounts:\n - workspace:ro\n" + "interface:\n kind: none\nreadiness:\n mode: none\n" + ) + completed = _run_launcher_raw(["--repo-tongs", tongs_dir]) + self.assertEqual(completed.returncode, 1) + self.assertEqual(completed.stdout, "") + self.assertIn("workspace", completed.stderr) + + +class UnsupportedTongReasonsTests(unittest.TestCase): + """The single chokepoint that refuses tongs the launcher cannot start yet.""" + + def _reasons(self, defn): + return run_anvil.unsupported_tong_reasons(_merged("t", defn, source=tongs.REPO)) + + def test_startable_port_tong_has_no_reasons(self): + self.assertEqual( + self._reasons({ + "lifecycle": "shared", "image": "x", + "interface": {"kind": "port", "port": 5432}, "readiness": {"mode": "none"}, + }), + [], + ) + + def test_startable_none_tong_has_no_reasons(self): + self.assertEqual(self._reasons(SHARED_NONE), []) + + def test_session_secret_mcp_volume_each_refused(self): + self.assertTrue(self._reasons( + {"lifecycle": "session", "image": "x", "interface": {"kind": "none"}, + "readiness": {"mode": "none"}})) + self.assertTrue(self._reasons( + {"lifecycle": "shared", "image": "x", "env": {"T": "${secret:op:r}"}, + "interface": {"kind": "none"}, "readiness": {"mode": "none"}})) + self.assertTrue(self._reasons( + {"lifecycle": "shared", "image": "x", + "interface": {"kind": "mcp", "name": "g", "port": 8080}, + "readiness": {"mode": "none"}})) + self.assertTrue(self._reasons( + {"lifecycle": "shared", "image": "x", + "interface": {"kind": "volume", "volume": "v", "mountpoint": "/m"}, + "readiness": {"mode": "none"}})) + + def test_shared_workspace_mount_refused_but_docker_socket_allowed(self): + # A shared tong that mounts the workspace leaks it across sessions, so it + # is refused; the docker-socket mount (the broker pattern) is not. + self.assertTrue(any( + "workspace" in r for r in self._reasons({ + "lifecycle": "shared", "image": "x", "mounts": ["workspace:ro"], + "interface": {"kind": "none"}, "readiness": {"mode": "none"}, + }) + )) + self.assertEqual( + self._reasons({ + "lifecycle": "shared", "image": "x", "mounts": ["docker-socket"], + "interface": {"kind": "none"}, "readiness": {"mode": "none"}, + }), + [], + ) + + def test_workspace_refusal_is_shared_scoped(self): + # The workspace-mount leak is a `shared`-reuse hazard, so a non-shared tong + # that mounts the workspace must NOT be refused for the workspace -- only a + # `shared` one. (A session+workspace watcher is legitimate; it lands when + # session tongs turn on.) A session tong here is refused for being session, + # not for the workspace mount. + session_reasons = self._reasons({ + "lifecycle": "session", "image": "x", "mounts": ["workspace:ro"], + "interface": {"kind": "none"}, "readiness": {"mode": "none"}, + }) + self.assertTrue(any("session" in r for r in session_reasons)) + self.assertFalse(any("workspace" in r for r in session_reasons)) + + +class FakeDocker: + """In-process stand-in for DockerCLI that records calls and returns canned + results, so orchestration is tested without a docker daemon.""" + + def __init__(self, states=None, ready=True, anvil_rc=0): + self.calls = [] + self._states = states or {} # container -> inspect_state dict + self._ready = ready + self._anvil_rc = anvil_rc + self.run_argvs = [] # detached `docker run` argvs + self.anvil_argv = None # set when the anvil runs via run_foreground + + def rm_force(self, container): + self.calls.append(("rm_force", container)) + + def run_detached(self, argv): + self.run_argvs.append(argv) + + def inspect_state(self, container): + return self._states.get(container) + + def health_status(self, container): + return "healthy" if self._ready else "starting" + + def exec_ok(self, container, command): + return self._ready + + def tcp_probe(self, network, host, port, image): + self.calls.append(("tcp_probe", network, host, port, image)) + return self._ready + + def run_foreground(self, argv): + self.anvil_argv = argv + self.calls.append(("run_foreground", argv)) + return self._anvil_rc + + +# Tiny launcher options for driving run_with_tongs directly. +def _opts(workspace=None, anvil_image="anvil:img"): + return run_anvil.LauncherOptions( + layer_dirs=[], workspace=workspace, approvals=None, + anvil_image=anvil_image, no_prompt=False, + ) + + +# A counter clock so readiness loops never sleep on the wall clock in tests. +class _Clock: + def __init__(self, step=1.0): + self.t = 0.0 + self.step = step + + def __call__(self): + self.t += self.step + return self.t + + +SHARED_OLLAMA = { + "lifecycle": "shared", + "image": "ollama/ollama", + "interface": {"kind": "port", "port": 11434}, + "readiness": {"mode": "tcp"}, +} + +# A background side-effect tong with no anvil-facing surface and no probe. +SHARED_NONE = { + "lifecycle": "shared", + "image": "log-shipper", + "interface": {"kind": "none"}, + "readiness": {"mode": "none"}, +} + + +class RunWithTongsTests(unittest.TestCase): + def _run(self, docker, merged, anvil=None, workspace=None): + return run_anvil.run_with_tongs( + merged, anvil or ANVIL_ARGV, _opts(workspace=workspace), + docker=docker, sleep=lambda _s: None, monotonic=_Clock(), + ) + + def test_shared_tong_starts_when_absent_and_runs_anvil(self): + # ollama-shape shared tong on the anvil's base network: it is started + # there under its canonical alias, then the anvil runs on that network. + docker = FakeDocker() + rc = self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertEqual(rc, 0) + self.assertEqual(len(docker.run_argvs), 1) + started = docker.run_argvs[0] + self.assertIn("swarmforge-shared-ollama", started) + self.assertEqual(started[started.index("--network") + 1], "opencode-net") + self.assertIn("ollama", started) # network-alias + # The anvil ran on the unchanged base network. + self.assertEqual( + docker.anvil_argv[docker.anvil_argv.index("--network") + 1], "opencode-net" + ) + + def test_shared_tong_reused_when_running_and_hash_matches(self): + defn = SHARED_OLLAMA + states = {"swarmforge-shared-ollama": {"running": True, "label": tongs.config_hash(defn)}} + docker = FakeDocker(states=states) + self._run(docker, _merged("ollama", defn, source=tongs.REPO)) + self.assertEqual(docker.run_argvs, []) # reused, not restarted + + def test_shared_tong_recreated_when_hash_differs(self): + states = {"swarmforge-shared-ollama": {"running": True, "label": "stale"}} + docker = FakeDocker(states=states) + self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertIn(("rm_force", "swarmforge-shared-ollama"), docker.calls) + self.assertEqual(len(docker.run_argvs), 1) + + def test_shared_tong_recreated_when_absent(self): + # No running container of that name => start fresh (rm_force clears any + # stopped leftover first). + docker = FakeDocker() + self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertIn(("rm_force", "swarmforge-shared-ollama"), docker.calls) + self.assertEqual(len(docker.run_argvs), 1) + + def test_stopped_shared_tong_is_recreated(self): + # A container exists by name but is not running (a stale leftover) => + # recreate even though its label happens to match. + states = {"swarmforge-shared-ollama": + {"running": False, "label": tongs.config_hash(SHARED_OLLAMA)}} + docker = FakeDocker(states=states) + self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertIn(("rm_force", "swarmforge-shared-ollama"), docker.calls) + self.assertEqual(len(docker.run_argvs), 1) + + def test_multiple_shared_tongs_started_and_injected(self): + # Two shared tongs in one launch: both are started and both contribute + # their reachability to the anvil. + pg = { + "lifecycle": "shared", "image": "pg", + "interface": {"kind": "port", "port": 5432}, "readiness": {"mode": "none"}, + } + redis = { + "lifecycle": "shared", "image": "redis", + "interface": {"kind": "port", "port": 6379}, "readiness": {"mode": "none"}, + } + docker = FakeDocker() + merged = { + "pg": {"source": tongs.REPO, "definition": pg}, + "redis": {"source": tongs.REPO, "definition": redis}, + } + self._run(docker, merged) + self.assertEqual(len(docker.run_argvs), 2) + argv = docker.anvil_argv + self.assertIn("SWARMFORGE_TONG_PG_HOST=pg", argv) + self.assertIn("SWARMFORGE_TONG_PG_PORT=5432", argv) + self.assertIn("SWARMFORGE_TONG_REDIS_HOST=redis", argv) + self.assertIn("SWARMFORGE_TONG_REDIS_PORT=6379", argv) + + def test_tcp_readiness_probes_alias_with_anvil_image(self): + docker = FakeDocker() + self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertIn(("tcp_probe", "opencode-net", "ollama", 11434, "anvil:img"), docker.calls) + + def test_port_tong_injects_host_and_port_env_into_anvil(self): + defn = { + "lifecycle": "shared", "image": "pg", + "interface": {"kind": "port", "port": 5432}, "readiness": {"mode": "none"}, + } + docker = FakeDocker() + self._run(docker, _merged("pg", defn, source=tongs.REPO)) + argv = docker.anvil_argv + self.assertIn("SWARMFORGE_TONG_PG_HOST=pg", argv) + self.assertIn("SWARMFORGE_TONG_PG_PORT=5432", argv) + + def test_none_tong_leaves_anvil_argv_unchanged(self): + # A `none` shared tong has no anvil-facing surface, so nothing is injected + # and the anvil command is exactly what the macro built. + docker = FakeDocker() + self._run(docker, _merged("shipper", SHARED_NONE, source=tongs.REPO)) + self.assertEqual(docker.anvil_argv, ANVIL_ARGV) + + def test_unready_tong_raises_and_anvil_never_runs(self): + docker = FakeDocker(ready=False) + defn = { + "lifecycle": "shared", "image": "pg", + "interface": {"kind": "port", "port": 5432}, + "readiness": {"mode": "tcp", "timeout": "1s"}, + } + with self.assertRaises(run_anvil.OrchestrationError): + self._run(docker, _merged("pg", defn, source=tongs.REPO)) + self.assertIsNone(docker.anvil_argv) # anvil never ran + + def test_anvil_exit_code_is_returned(self): + docker = FakeDocker(anvil_rc=42) + rc = self._run(docker, _merged("ollama", SHARED_OLLAMA, source=tongs.REPO)) + self.assertEqual(rc, 42) + + def test_no_anvil_image_degrades_tcp_to_running_check(self): + # Without an anvil image a TCP probe cannot dial the tong's port, so it + # falls back to "is the container running" using inspect_state. + states = {"swarmforge-shared-ollama": {"running": True, "label": tongs.config_hash(SHARED_OLLAMA)}} + docker = FakeDocker(states=states) + rc = run_anvil.run_with_tongs( + _merged("ollama", SHARED_OLLAMA, source=tongs.REPO), ANVIL_ARGV, + _opts(anvil_image=None), docker=docker, + sleep=lambda _s: None, monotonic=_Clock(), + ) + self.assertEqual(rc, 0) + self.assertNotIn("tcp_probe", [c[0] for c in docker.calls]) if __name__ == "__main__":