Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions scripts/run_anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import collections
import importlib.util
import os
import subprocess
import sys

# Load the pure core (layer discovery + name-based merge) by path, the same way
Expand Down Expand Up @@ -248,6 +249,55 @@ def gate_workspace_tongs(merged, workspace, approvals_path, prompt=True, out=Non
tongs.save_approvals(approvals_path, approvals)


# --- Secret resolution --------------------------------------------------------


class SecretResolutionError(Exception):
"""A secret reference could not be resolved; the launch must not proceed."""


def make_secret_resolver(providers):
"""Build the impure resolver closure over a configured provider table.

Returns `resolve(provider, ref) -> str`, the side-effectful counterpart to
the pure `tongs.substitute_secrets`/`tongs.plan_tong_secrets`: it shells out
to the provider CLI built by `tongs.secret_provider_command` and returns the
secret printed on stdout. Interactive unlocks (`op signin`, biometrics) work
because the launcher runs in the user's terminal before the anvil starts. A
single trailing newline -- which provider CLIs conventionally append -- is
stripped; any other whitespace is preserved verbatim.

Raises `SecretResolutionError` (naming the provider and reference, never the
secret) for an unknown provider, a CLI that cannot be run, or a non-zero
exit, so a misconfigured secret stops the launch rather than handing the tong
an empty or partial value.
"""

def resolve(provider, ref):
try:
command = tongs.secret_provider_command(providers, provider, ref)
except KeyError:
raise SecretResolutionError(
"no secret provider %r is configured; declare it in "
"secret-providers.yaml" % provider
)
try:
completed = subprocess.run(command, stdout=subprocess.PIPE, check=False)
except OSError as exc:
raise SecretResolutionError(
"secret provider %r could not run: %s" % (provider, exc)
)
if completed.returncode != 0:
raise SecretResolutionError(
"secret provider %r failed for %r (exit %d)"
% (provider, ref, completed.returncode)
)
value = completed.stdout.decode("utf-8")
return value[:-1] if value.endswith("\n") else value

return resolve


def exec_anvil(anvil_cmd):
"""Exec the anvil argv, replacing this process.

Expand Down
67 changes: 67 additions & 0 deletions scripts/test_run_anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import tempfile
import unittest
from unittest import mock

HERE = os.path.dirname(os.path.abspath(__file__))
MODULE_PATH = os.path.join(HERE, "run_anvil.py")
Expand Down Expand Up @@ -329,6 +330,72 @@ def test_missing_workspace_path_fails_closed(self):
self._gate(merged, answer="y\n", workspace="")


class SecretResolverTests(unittest.TestCase):
"""make_secret_resolver shells out to the provider CLI and reports failures."""

# Portable provider commands built on the test interpreter so the suite does
# not depend on op/pass/echo being installed. "{ref}" is substituted by
# tongs.secret_provider_command before exec.
def _writes(self, expr):
return [sys.executable, "-c", "import sys; sys.stdout.write(%s)" % expr, "{ref}"]

def test_resolves_ref_via_provider_cli(self):
resolve = run_anvil.make_secret_resolver({"echo": self._writes("sys.argv[1]")})
self.assertEqual(resolve("echo", "op://Work/secret"), "op://Work/secret")

def test_provider_stderr_inherits_terminal(self):
with mock.patch.object(run_anvil.subprocess, "run") as run:
run.return_value = subprocess.CompletedProcess(["provider"], 0, stdout=b"secret\n")
resolve = run_anvil.make_secret_resolver({"p": ["provider", "{ref}"]})
self.assertEqual(resolve("p", "ref"), "secret")
self.assertIsNone(run.call_args.kwargs.get("stderr"))

def test_strips_single_trailing_newline(self):
resolve = run_anvil.make_secret_resolver({"echo": self._writes("sys.argv[1] + '\\n'")})
self.assertEqual(resolve("echo", "token"), "token")

def test_preserves_inner_and_other_whitespace(self):
# Only one trailing newline is stripped; interior/extra newlines survive.
resolve = run_anvil.make_secret_resolver({"echo": self._writes("sys.argv[1] + '\\n\\n'")})
self.assertEqual(resolve("echo", "a\nb"), "a\nb\n")

def test_unknown_provider_raises(self):
resolve = run_anvil.make_secret_resolver({"op": ["op", "read", "{ref}"]})
with self.assertRaises(run_anvil.SecretResolutionError):
resolve("vault", "x")

def test_nonzero_exit_raises(self):
resolve = run_anvil.make_secret_resolver(
{"boom": [sys.executable, "-c", "import sys; sys.exit(3)"]}
)
with self.assertRaises(run_anvil.SecretResolutionError):
resolve("boom", "x")

def test_unrunnable_provider_raises(self):
resolve = run_anvil.make_secret_resolver({"missing": ["/no/such/binary-xyz", "{ref}"]})
with self.assertRaises(run_anvil.SecretResolutionError):
resolve("missing", "x")

def test_error_message_never_contains_the_secret(self):
# A failing CLI must not surface the resolved value; here it prints the
# ref to stderr and fails, and the error names provider/ref (which are
# not secret) -- the resolver never reaches a secret value on failure.
resolve = run_anvil.make_secret_resolver(
{"boom": [sys.executable, "-c", "import sys; sys.exit(1)"]}
)
with self.assertRaises(run_anvil.SecretResolutionError) as ctx:
resolve("boom", "ref-token")
self.assertIn("boom", str(ctx.exception))

def test_drives_plan_tong_secrets_end_to_end(self):
# The resolver is the impure half of tongs.plan_tong_secrets: a secret
# env var ends up as a tmpfs file, never as a -e value.
resolve = run_anvil.make_secret_resolver({"echo": self._writes("sys.argv[1]")})
plan = tongs.plan_tong_secrets({"TOKEN": "${secret:echo:s3cr3t}"}, resolve)
self.assertEqual(plan["files"], {"/run/swarmforge/secrets/TOKEN": "s3cr3t"})
self.assertEqual(plan["env"], {"TOKEN_FILE": "/run/swarmforge/secrets/TOKEN"})


class MainGateTests(unittest.TestCase):
"""main() stops before exec when a workspace tong is unapproved."""

Expand Down
167 changes: 167 additions & 0 deletions scripts/test_tongs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,26 @@ def test_bad_interface_kind(self):
errors = tongs.validate_tong("t", {"lifecycle": "session", "image": "x", "interface": {"kind": "socket"}})
self.assertTrue(any("interface.kind" in e for e in errors))

def test_rejects_secret_file_pointer_collision(self):
errors = tongs.validate_tong("t", {
"lifecycle": "session",
"image": "x",
"interface": {"kind": "none"},
"readiness": {"mode": "none"},
"env": {"TOKEN": "${secret:op:t}", "TOKEN_FILE": "/declared/path"},
})
self.assertTrue(any("TOKEN_FILE" in e for e in errors))

def test_rejects_invalid_secret_env_name(self):
errors = tongs.validate_tong("t", {
"lifecycle": "session",
"image": "x",
"interface": {"kind": "none"},
"readiness": {"mode": "none"},
"env": {"a/b": "${secret:op:t}"},
})
self.assertTrue(any("a/b" in e for e in errors))


class SecretRefTests(unittest.TestCase):
def test_parse_single_ref_with_inner_colons(self):
Expand Down Expand Up @@ -239,6 +259,153 @@ def test_substitute_uses_injected_resolver(self):
self.assertIn("${secret", defn["env"]["A"]) # original not mutated


PROVIDERS_YAML = """\
providers:
op: ["op", "read", "{ref}"]
pass: ["pass", "show", "{ref}"]
"""


class SecretProviderTests(unittest.TestCase):
def test_loads_provider_table(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "secret-providers.yaml")
with open(path, "w") as f:
f.write(PROVIDERS_YAML)
providers = tongs.load_secret_providers(path)
self.assertEqual(
providers,
{"op": ["op", "read", "{ref}"], "pass": ["pass", "show", "{ref}"]},
)

def test_missing_file_yields_empty(self):
self.assertEqual(tongs.load_secret_providers("/no/such/file.yaml"), {})
self.assertEqual(tongs.load_secret_providers(""), {})

def test_file_without_providers_block_yields_empty(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "p.yaml")
with open(path, "w") as f:
f.write("unrelated: true\n")
self.assertEqual(tongs.load_secret_providers(path), {})

def test_non_mapping_providers_raises(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "p.yaml")
with open(path, "w") as f:
f.write("providers: nope\n")
with self.assertRaises(ValueError):
tongs.load_secret_providers(path)

def test_non_list_command_raises(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "p.yaml")
with open(path, "w") as f:
f.write('providers:\n op: "op read {ref}"\n')
with self.assertRaises(ValueError):
tongs.load_secret_providers(path)

def test_command_substitutes_ref_in_every_element(self):
providers = {"op": ["op", "read", "{ref}", "--prefix={ref}"]}
self.assertEqual(
tongs.secret_provider_command(providers, "op", "op://Work/x"),
["op", "read", "op://Work/x", "--prefix=op://Work/x"],
)

def test_command_unknown_provider_raises_keyerror(self):
with self.assertRaises(KeyError):
tongs.secret_provider_command({"op": ["op"]}, "vault", "x")


class SecretDeliveryTests(unittest.TestCase):
def test_partition_splits_plain_from_secret_bearing_env(self):
env = {
"PLAIN": "value",
"TOKEN": "${secret:op:op://Work/github/token}",
"MIXED": "Bearer ${secret:pass:db/pw}",
}
plain, secret = tongs.partition_secret_env(env)
self.assertEqual(plain, {"PLAIN": "value"})
self.assertEqual(
secret,
{"TOKEN": "${secret:op:op://Work/github/token}", "MIXED": "Bearer ${secret:pass:db/pw}"},
)

def test_partition_empty_env(self):
self.assertEqual(tongs.partition_secret_env(None), ({}, {}))
self.assertEqual(tongs.partition_secret_env({}), ({}, {}))

def test_delivery_plan_routes_secrets_to_tmpfs_files(self):
plan = tongs.secret_delivery_plan({"TOKEN": "s3cr3t", "API_KEY": "k3y"})
self.assertEqual(plan["tmpfs"], "/run/swarmforge/secrets")
self.assertEqual(
plan["files"],
{"/run/swarmforge/secrets/API_KEY": "k3y", "/run/swarmforge/secrets/TOKEN": "s3cr3t"},
)
self.assertEqual(
plan["env"],
{
"API_KEY_FILE": "/run/swarmforge/secrets/API_KEY",
"TOKEN_FILE": "/run/swarmforge/secrets/TOKEN",
},
)

def test_delivery_plan_empty_has_no_tmpfs(self):
plan = tongs.secret_delivery_plan({})
self.assertEqual(plan, {"tmpfs": None, "files": {}, "env": {}})

def test_plan_tong_secrets_keeps_secret_values_out_of_env(self):
env = {"REGION": "us", "TOKEN": "${secret:op:op://Work/github/token}"}
plan = tongs.plan_tong_secrets(env, lambda p, r: "RESOLVED-%s" % r)
# Plain env passes through; the secret becomes a _FILE pointer, never a value.
self.assertEqual(
plan["env"],
{"REGION": "us", "TOKEN_FILE": "/run/swarmforge/secrets/TOKEN"},
)
self.assertEqual(plan["tmpfs"], "/run/swarmforge/secrets")
self.assertEqual(
plan["files"],
{"/run/swarmforge/secrets/TOKEN": "RESOLVED-op://Work/github/token"},
)
# The resolved secret value reaches files only -- never the -e env plan.
self.assertNotIn("RESOLVED-op://Work/github/token", json.dumps(plan["env"]))

def test_delivery_plan_rejects_traversal_env_name(self):
# An env name that would escape the tmpfs dir as a path component is
# refused rather than baked into a file path.
with self.assertRaises(ValueError):
tongs.secret_delivery_plan({"../../etc/cron.d/x": "v"})
with self.assertRaises(ValueError):
tongs.secret_delivery_plan({"a/b": "v"})

def test_plan_tong_secrets_rejects_pointer_collision(self):
# A plain TOKEN_FILE that disagrees with the synthesized pointer would
# make the tong unable to find TOKEN, so the plan fails closed.
env = {"TOKEN_FILE": "/declared/path", "TOKEN": "${secret:op:t}"}
with self.assertRaises(ValueError):
tongs.plan_tong_secrets(env, lambda p, r: "SECRET")

def test_plan_tong_secrets_allows_matching_declared_pointer(self):
env = {
"TOKEN_FILE": "/run/swarmforge/secrets/TOKEN",
"TOKEN": "${secret:op:t}",
}
plan = tongs.plan_tong_secrets(env, lambda p, r: "SECRET")
self.assertEqual(plan["env"]["TOKEN_FILE"], "/run/swarmforge/secrets/TOKEN")
self.assertEqual(plan["files"], {"/run/swarmforge/secrets/TOKEN": "SECRET"})
self.assertNotIn("SECRET", json.dumps(plan["env"]))

def test_plan_tong_secrets_inert_without_secrets(self):
plan = tongs.plan_tong_secrets({"REGION": "us"}, lambda p, r: "x")
self.assertEqual(plan, {"env": {"REGION": "us"}, "tmpfs": None, "files": {}})

def test_plan_tong_secrets_resolves_each_provider_with_its_ref(self):
env = {"A": "${secret:op:a}", "B": "${secret:pass:b}"}
seen = []
tongs.plan_tong_secrets(env, lambda p, r: seen.append((p, r)) or "v")
self.assertEqual(sorted(seen), [("op", "a"), ("pass", "b")])


class EnvNamingTests(unittest.TestCase):
def test_prefix_sanitizes_name(self):
self.assertEqual(tongs.tong_env_prefix("github-creds"), "SWARMFORGE_TONG_GITHUB_CREDS")
Expand Down
Loading
Loading