From 4d8ef5b548c34ef2f56b2a36c6522ffc8a7be227 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 23 Jun 2026 19:37:54 +0000 Subject: [PATCH] Add `assembly control`: voice-driven macOS computer-use agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A hands-free, voice-in/voice-out terminal agent that turns spoken instructions into real macOS UI actions — the "voice control plane" a browser/web service can't be, because it drives the actual desktop. Architecture (a `control/` feature slice with every external leg behind an injected seam, so the loop is hermetically testable with no mic, network, subprocess, or macOS): - actions/tools: the action vocabulary + its OpenAI function-calling schema. - engine: the pure observe/act loop (transcript -> LLM tool calls -> execute). - bridge: adapts the LLM Gateway into the engine's Responder seam. - listen: mic Streaming STT -> finalized utterances. - helper: spawns/talks JSON to a bundled Swift helper (CGEvent + the Accessibility API + NSWorkspace) — the "hands". - macos_ui_control.swift: the native helper (Codable JSON-lines protocol). `--dry-run` refuses every UI-mutating action (observe-only). macOS-only; fails fast elsewhere. Registered additively via SPEC; full gate green (100% patch coverage, mutation, types, lint, architecture contracts). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01PiUeSiTo5aV99PPfEQkuNc --- .importlinter | 1 + aai_cli/commands/control/__init__.py | 93 +++++ aai_cli/commands/control/_exec.py | 95 +++++ aai_cli/control/__init__.py | 20 + aai_cli/control/actions.py | 72 ++++ aai_cli/control/bridge.py | 87 +++++ aai_cli/control/engine.py | 191 +++++++++ aai_cli/control/helper.py | 191 +++++++++ aai_cli/control/listen.py | 94 +++++ aai_cli/control/macos_ui_control.swift | 355 +++++++++++++++++ aai_cli/control/prompt.py | 27 ++ aai_cli/control/render.py | 72 ++++ aai_cli/control/tools.py | 74 ++++ pyproject.toml | 2 + .../test_snapshots_help_root.ambr | 2 + .../test_snapshots_help_run.ambr | 52 +++ tests/_control_helpers.py | 135 +++++++ tests/test_control.py | 338 ++++++++++++++++ tests/test_control_exec.py | 362 ++++++++++++++++++ tests/test_smoke.py | 1 + 20 files changed, 2264 insertions(+) create mode 100644 aai_cli/commands/control/__init__.py create mode 100644 aai_cli/commands/control/_exec.py create mode 100644 aai_cli/control/__init__.py create mode 100644 aai_cli/control/actions.py create mode 100644 aai_cli/control/bridge.py create mode 100644 aai_cli/control/engine.py create mode 100644 aai_cli/control/helper.py create mode 100644 aai_cli/control/listen.py create mode 100644 aai_cli/control/macos_ui_control.swift create mode 100644 aai_cli/control/prompt.py create mode 100644 aai_cli/control/render.py create mode 100644 aai_cli/control/tools.py create mode 100644 tests/_control_helpers.py create mode 100644 tests/test_control.py create mode 100644 tests/test_control_exec.py diff --git a/.importlinter b/.importlinter index 6153a684..e023e78f 100644 --- a/.importlinter +++ b/.importlinter @@ -35,6 +35,7 @@ source_modules = aai_cli.agent_cascade aai_cli.auth aai_cli.code_gen + aai_cli.control aai_cli.init aai_cli.onboard aai_cli.streaming diff --git a/aai_cli/commands/control/__init__.py b/aai_cli/commands/control/__init__.py new file mode 100644 index 00000000..5dbe733e --- /dev/null +++ b/aai_cli/commands/control/__init__.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import typer + +from aai_cli import command_registry, help_panels, options +from aai_cli.app.context import run_with_options +from aai_cli.commands.control import _exec as control_exec +from aai_cli.core import llm +from aai_cli.ui.help_text import examples_epilog + +app = typer.Typer() + +SPEC = command_registry.CommandModuleSpec( + panel=help_panels.TRANSCRIPTION, + order=47, # pragma: no mutate -- sparse rank; a +-1 shift is order-equivalent + commands=("control",), +) + + +@app.command( + rich_help_panel=help_panels.TRANSCRIPTION, + epilog=examples_epilog( + [ + ("Control your Mac hands-free by voice", "assembly control"), + ("Preview actions without touching the UI", "assembly control --dry-run"), + ("Use a more capable model for the agent", "assembly control --model claude-opus-4-7"), + ("Emit the loop as newline-delimited JSON", "assembly control --json"), + ] + ), +) +def control( + ctx: typer.Context, + device: int | None = typer.Option( + None, + "--device", + help="Microphone device index", + rich_help_panel=help_panels.OPT_CAPTURE, + ), + sample_rate: int | None = typer.Option( + None, + "--sample-rate", + help="Microphone capture rate in Hz (default: device native)", + min=1, + rich_help_panel=help_panels.OPT_CAPTURE, + ), + model: str = typer.Option( + llm.DEFAULT_MODEL, + "--model", + help="LLM Gateway model that decides the actions", + rich_help_panel=help_panels.OPT_LLM, + autocompletion=llm.complete_model, + ), + max_tokens: int = typer.Option( + llm.DEFAULT_MAX_TOKENS, + "--max-tokens", + help="Max tokens per agent step", + min=1, + rich_help_panel=help_panels.OPT_LLM, + ), + max_steps: int = typer.Option( + 10, + "--max-steps", + help="Max action steps the agent may take per spoken instruction", + min=1, + rich_help_panel=help_panels.OPT_LLM, + ), + dry_run: bool = typer.Option( + False, + "--dry-run", + help="Plan and observe only: refuse every UI-changing action", + ), + json_out: bool = options.json_option("Emit newline-delimited JSON events"), +) -> None: + """Drive your Mac hands-free: speak an instruction, an agent acts on the UI + + Each spoken instruction is transcribed with Streaming STT and handed to an + LLM agent that decides which UI actions to take — typing, key chords, + clicking accessibility elements, launching apps — and performs them through a + bundled native macOS helper, then speaks back a short confirmation. + + macOS only: the helper needs Apple's Swift compiler and the Accessibility + + Microphone permissions granted to your terminal. Use --dry-run to watch the + agent plan without it touching anything. + """ + opts = control_exec.ControlOptions( + device=device, + sample_rate=sample_rate, + model=model, + max_tokens=max_tokens, + max_steps=max_steps, + dry_run=dry_run, + ) + run_with_options(ctx, control_exec.run_control, opts, json=json_out) diff --git a/aai_cli/commands/control/_exec.py b/aai_cli/commands/control/_exec.py new file mode 100644 index 00000000..49b8e68e --- /dev/null +++ b/aai_cli/commands/control/_exec.py @@ -0,0 +1,95 @@ +"""Run logic for `assembly control`: a gh-style options/run split. + +The command module parses argv into a :class:`ControlOptions` and hands it to +:func:`run_control`. The three external legs — mic Streaming STT, the LLM +Gateway, and the native UI helper — are bundled in :class:`ControlDeps` with +real-implementation defaults, so a test drives the whole session by passing +fakes to :func:`_run_control` with no microphone, network, subprocess, or macOS. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable +from dataclasses import dataclass + +from aai_cli.app.context import AppState +from aai_cli.control import bridge, engine, prompt +from aai_cli.control import listen as listen_mod +from aai_cli.control.helper import UiHelper +from aai_cli.control.render import ControlRenderer +from aai_cli.core import signals + + +@dataclass(frozen=True) +class ControlOptions: + """Every `assembly control` flag as plain data.""" + + device: int | None + sample_rate: int | None + model: str + max_tokens: int + max_steps: int + dry_run: bool + + +def _default_transcripts(api_key: str, opts: ControlOptions) -> Iterable[str]: + """Real mic→utterance leg.""" + return listen_mod.listen(api_key, device=opts.device, sample_rate=opts.sample_rate) + + +def _default_responder(api_key: str, opts: ControlOptions) -> engine.Responder: + """Real LLM-Gateway leg.""" + return bridge.build_responder(api_key, model=opts.model, max_tokens=opts.max_tokens) + + +def _default_helper() -> UiHelper: + """Real native-helper leg (compiles + spawns the Swift binary on first action).""" + return UiHelper() + + +@dataclass(frozen=True) +class ControlDeps: + """The three external legs, injectable so the session is exercised with fakes.""" + + transcripts: Callable[[str, ControlOptions], Iterable[str]] = _default_transcripts + responder: Callable[[str, ControlOptions], engine.Responder] = _default_responder + helper: Callable[[], UiHelper] = _default_helper + + +_DEFAULT_DEPS = ControlDeps() + + +def _run_control( + opts: ControlOptions, + state: AppState, + *, + json_mode: bool, + deps: ControlDeps, +) -> None: + """Drive one hands-free control session with the given dependencies.""" + # Build the native helper first: on a non-macOS host this fails fast with the + # "macOS only" message, before the user is ever asked to authenticate. Once it + # exists, everything else runs under try/finally so the child is always closed. + hands = deps.helper() + try: + api_key = state.resolve_api_key() + respond = deps.responder(api_key, opts) + transcripts = deps.transcripts(api_key, opts) + renderer = ControlRenderer(json_mode=json_mode) + with signals.terminate_as_interrupt(): + engine.run_session( + transcripts, + system=prompt.system_prompt(), + respond=respond, + execute=hands.execute, + renderer=renderer, + max_steps=opts.max_steps, + allow_mutate=not opts.dry_run, + ) + finally: + hands.close() + + +def run_control(opts: ControlOptions, state: AppState, /, *, json_mode: bool) -> None: + """Execute one `assembly control` invocation from already-parsed flags.""" + _run_control(opts, state, json_mode=json_mode, deps=_DEFAULT_DEPS) diff --git a/aai_cli/control/__init__.py b/aai_cli/control/__init__.py new file mode 100644 index 00000000..2d843021 --- /dev/null +++ b/aai_cli/control/__init__.py @@ -0,0 +1,20 @@ +"""Voice-controlled computer use: `assembly control`. + +A local agent loop that turns spoken instructions into real macOS UI actions — +the "voice-in, hands-on-the-machine" tool that a browser/web service can't be, +because it drives the actual desktop (keystrokes, clicks, app focus) through a +native Swift helper. + +The slice is split so every external leg is an injectable seam and the loop +itself is pure: + +- `actions` — the action vocabulary the helper understands (pure data). +- `tools` — those actions as OpenAI function-calling tool definitions. +- `prompt` — the system prompt that briefs the model on the loop. +- `engine` — the observe/act loop over a transcript stream (no I/O of its own). +- `bridge` — adapts the LLM Gateway into the engine's `Responder` seam. +- `helper` — spawns and talks JSON to the native `macos_ui_control.swift` helper. +- `listen` — adapts mic Streaming STT into a stream of finalized utterances. +""" + +from __future__ import annotations diff --git a/aai_cli/control/actions.py b/aai_cli/control/actions.py new file mode 100644 index 00000000..74cb1237 --- /dev/null +++ b/aai_cli/control/actions.py @@ -0,0 +1,72 @@ +"""The action protocol: the vocabulary the LLM "brain" uses to drive the macOS +"hands" helper. + +An :class:`Action` is one tool call the model emitted — a name plus JSON +arguments. :func:`validate` checks the name is known and the required arguments +are present, turning a raw model tool call into a request the Swift helper +understands. Everything here is pure data, so the engine is exercised without a +model, a microphone, or macOS. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +# Action name -> the argument names it requires. The Swift helper understands +# exactly these actions; a tool call for any other name is rejected back to the +# model and never executed (see :func:`validate`). +ACTION_SPECS: dict[str, tuple[str, ...]] = { + "type_text": ("text",), + "key_combo": ("keys",), + "click": (), + "launch_app": ("name",), + "focus_app": ("name",), + "get_ui_tree": (), + "screenshot": (), +} + +# Actions that only read the screen and never change UI state. `--dry-run` +# executes these for real (so the model can still "see") but refuses every +# other, UI-mutating action. +OBSERVE_ACTIONS = frozenset({"get_ui_tree", "screenshot"}) + + +class InvalidAction(Exception): + """A model tool call that names an unknown action or omits a required argument. + + Surfaced back to the model as a failed tool result rather than crashing the + session — the model can correct itself on the next step. + """ + + +@dataclass(frozen=True) +class Action: + """One validated UI action: a known name plus its JSON arguments.""" + + name: str + arguments: dict[str, object] + + def is_observe(self) -> bool: + """True for read-only actions (screen observation), which `--dry-run` allows.""" + return self.name in OBSERVE_ACTIONS + + def request(self) -> dict[str, object]: + """The JSON object sent to the Swift helper: the action name plus its arguments.""" + return {"action": self.name, **self.arguments} + + +def validate(name: str, arguments: dict[str, object]) -> Action: + """Turn a model's tool call into an :class:`Action`, or raise :class:`InvalidAction`. + + Rejects an unknown action name and any call missing a required argument, so the + helper is only ever handed a request it can execute. + """ + required = ACTION_SPECS.get(name) + if required is None: + raise InvalidAction(f"Unknown action {name!r}.") + missing = [arg for arg in required if arg not in arguments] + if missing: + raise InvalidAction( + f"Action {name!r} is missing required argument(s): {', '.join(missing)}." + ) + return Action(name=name, arguments=arguments) diff --git a/aai_cli/control/bridge.py b/aai_cli/control/bridge.py new file mode 100644 index 00000000..421fb096 --- /dev/null +++ b/aai_cli/control/bridge.py @@ -0,0 +1,87 @@ +"""Adapt the LLM Gateway into the engine's :data:`~aai_cli.control.engine.Responder`. + +The gateway is OpenAI-compatible, so one chat-completions call with the control +``tools`` is a single model turn. This converts the SDK response into the +engine's plain :class:`~aai_cli.control.engine.Reply` — parsing each tool call's +JSON arguments — so the loop never touches the OpenAI types. The underlying +:func:`aai_cli.core.llm.complete` is injected so the adapter is unit-tested +against a fake completer with no network. +""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from typing import TYPE_CHECKING + +from aai_cli.control import engine, tools +from aai_cli.control.engine import Reply, ToolCall +from aai_cli.core import jsonshape, llm + +if TYPE_CHECKING: + from openai.types.chat import ChatCompletion + + from aai_cli.control.engine import Message + +# The completer seam: same shape as ``llm.complete``'s keyword call below. +type Completer = Callable[..., ChatCompletion] + + +def _parse_arguments(raw: str | None) -> dict[str, object]: + """Parse a tool call's JSON ``arguments`` string into a dict. + + A model occasionally emits empty or malformed arguments; treat those as no + arguments so validation (not a JSON crash) reports the real problem. + """ + if not raw: + return {} + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return {} + return jsonshape.as_mapping(parsed) or {} + + +def _reply_of(response: ChatCompletion) -> Reply: + """Convert a chat-completions response into the engine's :class:`Reply`.""" + message = response.choices[0].message + calls: list[ToolCall] = [] + for call in message.tool_calls or []: + # The SDK union also allows a custom (non-function) tool call; we only ask + # the model for function tools, so narrow to those on the type discriminant. + if call.type != "function": + continue + calls.append( + ToolCall( + id=call.id, + name=call.function.name, + arguments=_parse_arguments(call.function.arguments), + ) + ) + return Reply(content=message.content or "", tool_calls=tuple(calls)) + + +def build_responder( + api_key: str, + *, + model: str, + max_tokens: int, + complete: Completer = llm.complete, +) -> engine.Responder: + """A :data:`Responder` that runs one gateway turn with the control tools. + + The tools and ``tool_choice`` ride in ``extra`` (merged into the request + body), since the gateway accepts the OpenAI tool-calling fields. + """ + + def respond(messages: list[Message]) -> Reply: + response = complete( + api_key, + model=model, + messages=messages, + max_tokens=max_tokens, + extra={"tools": tools.tool_definitions(), "tool_choice": "auto"}, + ) + return _reply_of(response) + + return respond diff --git a/aai_cli/control/engine.py b/aai_cli/control/engine.py new file mode 100644 index 00000000..4c8aa660 --- /dev/null +++ b/aai_cli/control/engine.py @@ -0,0 +1,191 @@ +"""The observe/act loop — the pure heart of `assembly control`. + +Given a stream of spoken instructions and three injected seams — a +:data:`Responder` (one LLM turn), an :data:`Executor` (run one action on the +host), and a :class:`Renderer` (surface progress) — the engine runs the +computer-use loop and owns no I/O of its own. That keeps it exercisable with +fakes: no model, microphone, subprocess, or macOS required. + +Per spoken utterance it appends a user message, then loops: ask the model, +execute any tool calls it returns (feeding each result back as a tool message), +and stop when the model replies with no further tool call (its spoken answer) or +the per-turn step budget is exhausted. +""" + +from __future__ import annotations + +import json +from collections.abc import Callable, Iterable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Protocol + +from aai_cli.control import actions +from aai_cli.control.actions import Action, InvalidAction + +if TYPE_CHECKING: + from openai.types.chat import ChatCompletionMessageParam + +# A chat message in OpenAI shape (role/content plus tool fields). The gateway is +# OpenAI-compatible, so messages are built as the SDK's param type via dict +# literals — that keeps `bridge.complete` type-clean with no cast. The type only +# matters to the checker (lazy alias + TYPE_CHECKING import), so there is no +# runtime dependency on the OpenAI SDK here. +type Message = ChatCompletionMessageParam + + +@dataclass(frozen=True) +class ToolCall: + """One tool call the model emitted: its id, the action name, and parsed arguments.""" + + id: str + name: str + arguments: dict[str, object] + + +@dataclass(frozen=True) +class Reply: + """A single model turn: spoken content plus any tool calls to run first.""" + + content: str + tool_calls: tuple[ToolCall, ...] + + +# One LLM turn: given the running message list, return the model's reply. +type Responder = Callable[[list[Message]], Reply] +# Execute one action on the host and return the helper's JSON result. +type Executor = Callable[[Action], dict[str, object]] + + +class Renderer(Protocol): + """How the engine surfaces progress (printing, a TUI, JSON events…).""" + + def on_user(self, text: str) -> None: + """A finalized spoken instruction was heard.""" + + def on_action(self, action: Action) -> None: + """An action is about to run on the host.""" + + def on_result(self, action: Action, result: dict[str, object]) -> None: + """An action finished, with the helper's result.""" + + def on_refused(self, action: Action, reason: str) -> None: + """A UI-mutating action was refused (e.g. `--dry-run`).""" + + def on_invalid(self, reason: str) -> None: + """The model called an unknown/under-specified tool.""" + + def on_reply(self, text: str) -> None: + """The model's spoken reply that ends a turn.""" + + +# Shown (as the turn's spoken reply) when a turn hits its step budget without +# the model settling on an answer — so a runaway loop ends with feedback. +STEP_LIMIT_REPLY = "I took several steps without finishing; let me know how to continue." + + +def _assistant_message(reply: Reply) -> Message: + """The assistant message to append for ``reply`` (OpenAI tool-call shape).""" + if reply.tool_calls: + return { + "role": "assistant", + "content": reply.content or None, + "tool_calls": [ + { + "id": call.id, + "type": "function", + "function": {"name": call.name, "arguments": json.dumps(call.arguments)}, + } + for call in reply.tool_calls + ], + } + return {"role": "assistant", "content": reply.content or None} + + +def _tool_message(call_id: str, result: dict[str, object]) -> Message: + """The tool-result message to append for a finished (or rejected) tool call.""" + return {"role": "tool", "tool_call_id": call_id, "content": json.dumps(result)} + + +def _dispatch( + call: ToolCall, + *, + execute: Executor, + renderer: Renderer, + allow_mutate: bool, +) -> dict[str, object]: + """Validate, gate, and (if allowed) run one tool call; return the JSON result. + + A bad call or a `--dry-run`-refused mutating action returns an ``ok: False`` + result instead of executing — the model sees the failure and can adapt. + """ + try: + action = actions.validate(call.name, call.arguments) + except InvalidAction as exc: + renderer.on_invalid(str(exc)) + return {"ok": False, "error": str(exc)} + if not allow_mutate and not action.is_observe(): + reason = "dry-run is on: refused to perform a UI-changing action" + renderer.on_refused(action, reason) + return {"ok": False, "error": reason} + renderer.on_action(action) + result = execute(action) + renderer.on_result(action, result) + return result + + +def run_turn( + user_text: str, + history: list[Message], + *, + respond: Responder, + execute: Executor, + renderer: Renderer, + max_steps: int, + allow_mutate: bool, +) -> list[Message]: + """Drive one spoken instruction to completion; return the extended history. + + Loops model→tools→model up to ``max_steps`` times, ending when the model + replies with no tool calls (its spoken answer) or the budget is hit. + """ + renderer.on_user(user_text) + messages: list[Message] = [*history, {"role": "user", "content": user_text}] + for _ in range(max_steps): + reply = respond(messages) + messages.append(_assistant_message(reply)) + if not reply.tool_calls: + renderer.on_reply(reply.content) + return messages + for call in reply.tool_calls: + result = _dispatch(call, execute=execute, renderer=renderer, allow_mutate=allow_mutate) + messages.append(_tool_message(call.id, result)) + renderer.on_reply(STEP_LIMIT_REPLY) + return messages + + +def run_session( + transcripts: Iterable[str], + *, + system: str, + respond: Responder, + execute: Executor, + renderer: Renderer, + max_steps: int, + allow_mutate: bool, +) -> None: + """Run the control loop over a stream of spoken instructions until it ends. + + History (including the system prompt) carries across turns, so a follow-up + like "click it" resolves against what was just observed. + """ + history: list[Message] = [{"role": "system", "content": system}] + for user_text in transcripts: + history = run_turn( + user_text, + history, + respond=respond, + execute=execute, + renderer=renderer, + max_steps=max_steps, + allow_mutate=allow_mutate, + ) diff --git a/aai_cli/control/helper.py b/aai_cli/control/helper.py new file mode 100644 index 00000000..1895a410 --- /dev/null +++ b/aai_cli/control/helper.py @@ -0,0 +1,191 @@ +"""Talk to the native macOS UI-control helper over a JSON-lines pipe. + +The "hands" of the agent are a tiny bundled Swift program +(``macos_ui_control.swift``) that owns the native APIs Python can't reach — +``CGEvent`` for synthetic keystrokes/clicks, the Accessibility API for reading +the focused app's element tree, ``NSWorkspace`` for launching/activating apps. +:class:`UiHelper` compiles it once, runs it as a long-lived child, and exchanges +one JSON request/response line per :class:`~aai_cli.control.actions.Action` — the +same stdout-pipe pattern as the streaming system-audio helper. + +The process factory is injected, so the request/response logic is unit-tested +with in-memory pipes and never spawns anything. +""" + +from __future__ import annotations + +import contextlib +import hashlib +import json +import shutil +import subprocess +import sys +from collections.abc import Callable, Sequence +from importlib import resources +from pathlib import Path +from typing import IO, Protocol + +from platformdirs import user_cache_path + +from aai_cli.control.actions import Action +from aai_cli.core import jsonshape +from aai_cli.core.errors import APIError, CLIError + +_HELPER_RESOURCE = "macos_ui_control.swift" +_CACHE_DIR = "macos-ui-control" +_HELPER_PREFIX = "aai-macos-ui-control" +# Frameworks the helper links: synthetic input + window list (CoreGraphics), +# app launch/activation (AppKit), the Accessibility element tree (ApplicationServices). +_FRAMEWORKS = ("AppKit", "CoreGraphics", "ApplicationServices") + + +class _HelperProcess(Protocol): + @property + def stdin(self) -> IO[str] | None: + """The helper's JSON request pipe.""" + + @property + def stdout(self) -> IO[str] | None: + """The helper's JSON response pipe.""" + + def poll(self) -> int | None: + """Non-blocking exit-code check.""" + + def terminate(self) -> None: + """Ask the helper to exit.""" + + def wait(self, timeout: float | None = None) -> int | None: + """Block until the helper exits.""" + + +def _unsupported_platform() -> CLIError: + return CLIError( + "Voice computer-control is only available on macOS.", + error_type="control_unavailable", + exit_code=2, + ) + + +def _missing_swiftc() -> CLIError: + return CLIError( + "Voice computer-control needs Apple's Swift compiler.", + error_type="control_unavailable", + exit_code=2, + suggestion="Install Xcode Command Line Tools: xcode-select --install", + ) + + +def _is_macos() -> bool: + return sys.platform == "darwin" + + +def _resource_bytes() -> bytes: + return resources.files("aai_cli.control").joinpath(_HELPER_RESOURCE).read_bytes() + + +def build_helper() -> Path: + """Compile the bundled UI-control helper once and return its executable path.""" + if not _is_macos(): + raise _unsupported_platform() + swiftc = shutil.which("swiftc") + if swiftc is None: + raise _missing_swiftc() + + source = _resource_bytes() + digest = hashlib.sha256(source).hexdigest()[:16] + cache_dir = user_cache_path("aai-cli") / _CACHE_DIR + helper = cache_dir / f"{_HELPER_PREFIX}-{digest}" + if helper.exists(): + return helper + + cache_dir.mkdir(parents=True, exist_ok=True) + source_path = cache_dir / f"{_HELPER_PREFIX}-{digest}.swift" + source_path.write_bytes(source) + tmp_helper = helper.with_suffix(".tmp") + frameworks = [arg for framework in _FRAMEWORKS for arg in ("-framework", framework)] + result = subprocess.run( + [swiftc, "-parse-as-library", str(source_path), "-O", *frameworks, "-o", str(tmp_helper)], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + detail = (result.stderr or result.stdout).strip() + raise CLIError( + "Could not build the macOS UI-control helper.", + error_type="control_unavailable", + exit_code=2, + suggestion=detail or "Install Xcode Command Line Tools: xcode-select --install", + ) + tmp_helper.replace(helper) + return helper + + +def _open_process(command: Sequence[str]) -> _HelperProcess: + return subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + bufsize=1, + ) + + +class UiHelper: + """A long-lived UI-control helper process, addressed one action at a time.""" + + def __init__( + self, + *, + helper: Path | None = None, + popen: Callable[[Sequence[str]], _HelperProcess] = _open_process, + ) -> None: + self._helper = helper or build_helper() + self._popen = popen + self._proc: _HelperProcess | None = None + + def _streams(self) -> tuple[IO[str], IO[str]]: + """Spawn the helper on first use and return its (stdin, stdout) pipes.""" + if self._proc is None: + self._proc = self._popen([str(self._helper)]) + stdin, stdout = self._proc.stdin, self._proc.stdout + if stdin is None or stdout is None: + raise APIError("The UI-control helper did not expose its IO streams.") + return stdin, stdout + + def execute(self, action: Action) -> dict[str, object]: + """Send one action and return the helper's JSON result. + + Matches the engine's ``Executor`` seam: a closed pipe or a non-JSON line + becomes an :class:`APIError` so the session fails cleanly rather than + hanging or dumping a traceback. + """ + stdin, stdout = self._streams() + try: + stdin.write(json.dumps(action.request()) + "\n") + stdin.flush() + except OSError as exc: + raise APIError(f"The UI-control helper stopped accepting input: {exc}") from exc + line = stdout.readline() + if not line: + raise APIError("The UI-control helper closed without responding.") + try: + parsed = json.loads(line) + except json.JSONDecodeError as exc: + raise APIError("The UI-control helper returned a non-JSON line.") from exc + mapping = jsonshape.as_mapping(parsed) + if mapping is None: + return {"ok": False, "error": "helper returned a non-object result"} + return mapping + + def close(self) -> None: + """Terminate the helper if it is running.""" + if self._proc is None: + return + if self._proc.poll() is None: + self._proc.terminate() + with contextlib.suppress(Exception): + # The 2s grace before giving up is not observable from a test. + self._proc.wait(timeout=2.0) # pragma: no mutate + self._proc = None diff --git a/aai_cli/control/listen.py b/aai_cli/control/listen.py new file mode 100644 index 00000000..e803ac29 --- /dev/null +++ b/aai_cli/control/listen.py @@ -0,0 +1,94 @@ +"""Turn the microphone into a stream of finalized spoken instructions. + +The engine consumes an ``Iterable[str]`` of utterances; this adapts mic +Streaming STT into exactly that. The blocking stream runs on a worker thread and +pushes each finalized turn onto a queue that the generator drains, so a turn the +agent is still acting on doesn't drop the next thing you say. The stream call and +the microphone are injected, so the queue/threading logic is unit-tested with a +fake stream that just invokes the turn callback — no real audio, socket, or +``websockets`` thread. +""" + +from __future__ import annotations + +import queue +import threading +from collections.abc import Callable, Iterable, Iterator + +from assemblyai.streaming.v3 import StreamingParameters + +from aai_cli.core import client +from aai_cli.core.errors import CLIError +from aai_cli.core.microphone import MicrophoneSource +from aai_cli.streaming.sources import TARGET_RATE + +# Run one streaming session: same shape as ``client.stream_audio``'s keyword call. +type StreamRunner = Callable[..., None] +# Build the mic byte source for a device/rate. +type MicFactory = Callable[..., Iterable[bytes]] + + +def _finalized_text(event: object) -> str | None: + """The spoken text of a finalized, non-empty turn, or None for a partial/empty one.""" + if not getattr(event, "end_of_turn", False): + return None + text = getattr(event, "transcript", "") or "" + return text or None + + +def _build_mic(device: int | None, sample_rate: int | None, mic_factory: MicFactory) -> object: + """Construct the mic source (kept tiny so the default factory stays substitutable).""" + return mic_factory(device=device, capture_rate=sample_rate) + + +def listen( + api_key: str, + *, + device: int | None = None, + sample_rate: int | None = None, + stream: StreamRunner = client.stream_audio, + mic_factory: MicFactory = MicrophoneSource, +) -> Iterator[str]: + """Yield finalized spoken utterances from the mic until the stream ends. + + Surfaces a streaming error (raised on the worker) to the caller after the + queue drains, so a connection failure isn't silently swallowed. + """ + utterances: queue.Queue[str | None] = queue.Queue() + failure: list[CLIError] = [] + + def on_turn(event: object) -> None: + text = _finalized_text(event) + if text is not None: + utterances.put(text) + + def worker() -> None: + try: + mic = _build_mic(device, sample_rate, mic_factory) + rate = getattr(mic, "sample_rate", None) + sample = rate if isinstance(rate, int) else (sample_rate or TARGET_RATE) + params = StreamingParameters(sample_rate=sample, format_turns=True) + stream(api_key, mic, params=params, on_turn=on_turn) + except CLIError as exc: + # The streaming legs raise CLIError/APIError; capture it and re-raise on + # the main thread so a connection failure isn't lost on the worker. + failure.append(exc) + finally: + utterances.put(None) + + # daemon=True is an interpreter-exit safety net (a wedged mic worker can't block + # shutdown); not observable from a test, which always drains to the sentinel. + thread = threading.Thread( + target=worker, + name="aai-control-listen", + daemon=True, # pragma: no mutate + ) + thread.start() + while True: + item = utterances.get() + if item is None: + break + yield item + thread.join() + if failure: + raise failure[0] diff --git a/aai_cli/control/macos_ui_control.swift b/aai_cli/control/macos_ui_control.swift new file mode 100644 index 00000000..3e0225e0 --- /dev/null +++ b/aai_cli/control/macos_ui_control.swift @@ -0,0 +1,355 @@ +import AppKit +import ApplicationServices +import CoreGraphics +import Foundation + +// A tiny JSON-lines UI-control helper: read one request object per stdin line, +// perform the action with native macOS APIs (CGEvent for synthetic input, the +// Accessibility API for the element tree, NSWorkspace for app launch/focus), and +// write one JSON result line per request. Python (aai_cli/control/helper.py) owns +// the lifecycle and speaks this protocol; see that module for the request shape. + +// Maps element ids handed out by get_ui_tree back to their AXUIElement, so a +// later click can target one by id rather than by guessed coordinates. +var elementRegistry: [String: AXUIElement] = [:] + +// US-keyboard virtual key codes for the keys key_combo can press. +let keyCodes: [String: CGKeyCode] = [ + "a": 0, "s": 1, "d": 2, "f": 3, "h": 4, "g": 5, "z": 6, "x": 7, "c": 8, "v": 9, + "b": 11, "q": 12, "w": 13, "e": 14, "r": 15, "y": 16, "t": 17, "1": 18, "2": 19, + "3": 20, "4": 21, "6": 22, "5": 23, "9": 25, "7": 26, "8": 28, "0": 29, "o": 31, + "u": 32, "i": 34, "p": 35, "l": 37, "j": 38, "k": 40, "n": 45, "m": 46, + "return": 36, "enter": 36, "tab": 48, "space": 49, "delete": 51, "backspace": 51, + "escape": 53, "esc": 53, "left": 123, "right": 124, "down": 125, "up": 126, + "home": 115, "end": 119, "pageup": 116, "pagedown": 121, +] + +// Modifier names key_combo accepts, mapped to CGEvent flags. +let modifierFlags: [String: CGEventFlags] = [ + "cmd": .maskCommand, "command": .maskCommand, "meta": .maskCommand, + "shift": .maskShift, + "ctrl": .maskControl, "control": .maskControl, + "alt": .maskAlternate, "option": .maskAlternate, "opt": .maskAlternate, + "fn": .maskSecondaryFn, +] + +// One request line: the action name plus every argument any action may carry +// (all optional; each handler reads the ones it needs). Decoding ignores extra +// keys, so the protocol can grow additively. +struct Request: Decodable { + let action: String + let text: String? + let keys: [String]? + let name: String? + let element: String? + let x: Int? + let y: Int? +} + +// One labeled, clickable accessibility element reported by get_ui_tree. +struct Element: Encodable { + let id: String + let role: String + let title: String + let x: Int? + let y: Int? +} + +// One result line. nil fields are omitted by JSONEncoder, so a plain success is +// just {"ok": true} and an element list / screenshot path appears only when set. +struct Response: Encodable { + var ok: Bool + var error: String? + var elements: [Element]? + var path: String? +} + +func succeeded() -> Response { + return Response(ok: true, error: nil, elements: nil, path: nil) +} + +func failure(_ message: String) -> Response { + return Response(ok: false, error: message, elements: nil, path: nil) +} + +func emit(_ response: Response) { + guard + let data = try? JSONEncoder().encode(response), + let text = String(data: data, encoding: .utf8) + else { + FileHandle.standardError.write(Data("failed to encode helper response\n".utf8)) + return + } + print(text) + fflush(stdout) +} + +func typeText(_ text: String) -> Response { + let source = CGEventSource(stateID: .combinedSessionState) + guard + let down = CGEvent(keyboardEventSource: source, virtualKey: 0, keyDown: true), + let up = CGEvent(keyboardEventSource: source, virtualKey: 0, keyDown: false) + else { + return failure("could not create keyboard event") + } + let utf16 = Array(text.utf16) + utf16.withUnsafeBufferPointer { buffer in + if let base = buffer.baseAddress { + down.keyboardSetUnicodeString(stringLength: buffer.count, unicodeString: base) + up.keyboardSetUnicodeString(stringLength: buffer.count, unicodeString: base) + } + } + down.post(tap: .cghidEventTap) + up.post(tap: .cghidEventTap) + return succeeded() +} + +func keyCombo(_ keys: [String]) -> Response { + var flags: CGEventFlags = [] + var mainKey: CGKeyCode? + for key in keys { + let lower = key.lowercased() + if let flag = modifierFlags[lower] { + flags.insert(flag) + } else if let code = keyCodes[lower] { + mainKey = code + } else { + return failure("unknown key: \(key)") + } + } + guard let code = mainKey else { + return failure("key_combo needs one non-modifier key") + } + let source = CGEventSource(stateID: .combinedSessionState) + guard + let down = CGEvent(keyboardEventSource: source, virtualKey: code, keyDown: true), + let up = CGEvent(keyboardEventSource: source, virtualKey: code, keyDown: false) + else { + return failure("could not create keyboard event") + } + down.flags = flags + up.flags = flags + down.post(tap: .cghidEventTap) + up.post(tap: .cghidEventTap) + return succeeded() +} + +func frontmostApp() -> AXUIElement? { + guard let app = NSWorkspace.shared.frontmostApplication else { + return nil + } + return AXUIElementCreateApplication(app.processIdentifier) +} + +func copyAttribute(_ element: AXUIElement, _ attribute: String) -> CFTypeRef? { + var value: CFTypeRef? + let status = AXUIElementCopyAttributeValue(element, attribute as CFString, &value) + guard status == .success else { + return nil + } + return value +} + +func childElements(_ element: AXUIElement) -> [AXUIElement] { + guard let raw = copyAttribute(element, kAXChildrenAttribute as String) else { + return [] + } + return (raw as? [AXUIElement]) ?? [] +} + +func stringAttribute(_ element: AXUIElement, _ attribute: String) -> String? { + guard let value = copyAttribute(element, attribute) else { + return nil + } + return value as? String +} + +func elementFrame(_ element: AXUIElement) -> CGRect? { + guard + let positionValue = copyAttribute(element, kAXPositionAttribute as String), + let sizeValue = copyAttribute(element, kAXSizeAttribute as String), + CFGetTypeID(positionValue) == AXValueGetTypeID(), + CFGetTypeID(sizeValue) == AXValueGetTypeID() + else { + return nil + } + let position = positionValue as! AXValue + let size = sizeValue as! AXValue + var point = CGPoint.zero + var dimensions = CGSize.zero + guard + AXValueGetValue(position, .cgPoint, &point), + AXValueGetValue(size, .cgSize, &dimensions) + else { + return nil + } + return CGRect(origin: point, size: dimensions) +} + +func buildTree() -> Response { + guard AXIsProcessTrusted() else { + return failure( + "Accessibility permission is required. Grant it in System Settings > " + + "Privacy & Security > Accessibility." + ) + } + guard let app = frontmostApp() else { + return failure("no frontmost application") + } + elementRegistry.removeAll() + var collected: [Element] = [] + var queue: [AXUIElement] = [app] + var index = 0 + let maxElements = 200 + while !queue.isEmpty && collected.count < maxElements { + let element = queue.removeFirst() + queue.append(contentsOf: childElements(element)) + let role = stringAttribute(element, kAXRoleAttribute as String) ?? "" + let label = + stringAttribute(element, kAXTitleAttribute as String) + ?? stringAttribute(element, kAXDescriptionAttribute as String) + ?? stringAttribute(element, kAXValueAttribute as String) + guard !role.isEmpty, let title = label, !title.isEmpty else { + continue + } + let identifier = "e\(index)" + index += 1 + elementRegistry[identifier] = element + let rect = elementFrame(element) + collected.append( + Element( + id: identifier, + role: role, + title: title, + x: rect.map { Int($0.midX) }, + y: rect.map { Int($0.midY) } + ) + ) + } + return Response(ok: true, error: nil, elements: collected, path: nil) +} + +func clickAt(x: CGFloat, y: CGFloat) -> Response { + let point = CGPoint(x: x, y: y) + let source = CGEventSource(stateID: .combinedSessionState) + guard + let down = CGEvent( + mouseEventSource: source, mouseType: .leftMouseDown, + mouseCursorPosition: point, mouseButton: .left), + let up = CGEvent( + mouseEventSource: source, mouseType: .leftMouseUp, + mouseCursorPosition: point, mouseButton: .left) + else { + return failure("could not create mouse event") + } + down.post(tap: .cghidEventTap) + up.post(tap: .cghidEventTap) + return succeeded() +} + +func click(_ request: Request) -> Response { + if let identifier = request.element { + guard let element = elementRegistry[identifier] else { + return failure("unknown element id \(identifier); call get_ui_tree first") + } + if AXUIElementPerformAction(element, kAXPressAction as CFString) == .success { + return succeeded() + } + guard let rect = elementFrame(element) else { + return failure("could not locate element \(identifier)") + } + return clickAt(x: rect.midX, y: rect.midY) + } + if let x = request.x, let y = request.y { + return clickAt(x: CGFloat(x), y: CGFloat(y)) + } + return failure("click needs an element id or x/y coordinates") +} + +func launchApp(_ name: String) -> Response { + if NSWorkspace.shared.launchApplication(name) { + return succeeded() + } + return failure("could not launch application: \(name)") +} + +func focusApp(_ name: String) -> Response { + let lower = name.lowercased() + for app in NSWorkspace.shared.runningApplications where app.localizedName?.lowercased() == lower + { + app.activate(options: [.activateAllWindows]) + return succeeded() + } + return failure("application not running: \(name)") +} + +func screenshot() -> Response { + guard let image = CGDisplayCreateImage(CGMainDisplayID()) else { + return failure("could not capture the screen; grant Screen Recording permission") + } + let bitmap = NSBitmapImageRep(cgImage: image) + guard let data = bitmap.representation(using: .png, properties: [:]) else { + return failure("could not encode the screenshot") + } + let path = NSTemporaryDirectory() + "aai-control-screenshot.png" + do { + try data.write(to: URL(fileURLWithPath: path)) + } catch { + return failure("could not save the screenshot: \(error)") + } + return Response(ok: true, error: nil, elements: nil, path: path) +} + +func handle(_ request: Request) -> Response { + switch request.action { + case "type_text": + guard let text = request.text else { + return failure("type_text needs 'text'") + } + return typeText(text) + case "key_combo": + guard let keys = request.keys else { + return failure("key_combo needs 'keys'") + } + return keyCombo(keys) + case "click": + return click(request) + case "launch_app": + guard let name = request.name else { + return failure("launch_app needs 'name'") + } + return launchApp(name) + case "focus_app": + guard let name = request.name else { + return failure("focus_app needs 'name'") + } + return focusApp(name) + case "get_ui_tree": + return buildTree() + case "screenshot": + return screenshot() + default: + return failure("unknown action: \(request.action)") + } +} + +@main +struct Main { + static func main() { + let decoder = JSONDecoder() + while let line = readLine(strippingNewline: true) { + let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines) + if trimmed.isEmpty { + continue + } + guard + let data = trimmed.data(using: .utf8), + let request = try? decoder.decode(Request.self, from: data) + else { + emit(failure("invalid JSON request")) + continue + } + emit(handle(request)) + } + } +} diff --git a/aai_cli/control/prompt.py b/aai_cli/control/prompt.py new file mode 100644 index 00000000..e43690e9 --- /dev/null +++ b/aai_cli/control/prompt.py @@ -0,0 +1,27 @@ +"""The system prompt that briefs the model on the voice-control loop.""" + +from __future__ import annotations + +_SYSTEM = """\ +You are a hands-free macOS computer-use agent. The user speaks instructions out +loud; their speech is transcribed and handed to you one utterance at a time. You +act on the real desktop by calling the provided tools, then you speak back a +short, spoken-style confirmation of what you did. + +How to work: +- To act on on-screen UI, first call get_ui_tree to see the focused app's + labeled, clickable elements, then click one by its element id. Prefer clicking + an element by id over guessing raw x/y coordinates. +- Use launch_app / focus_app to get the right app in front before acting. +- Use type_text for literal text and key_combo for shortcuts (e.g. ['cmd','s']). +- Take one small step at a time and observe the result before the next step. +- When the request is satisfied, stop calling tools and reply with a brief + spoken confirmation (one sentence). Do not narrate every keystroke. +- If you cannot do something, say so briefly instead of guessing. + +Keep replies short: they are spoken aloud, not read.""" + + +def system_prompt() -> str: + """The control agent's system prompt.""" + return _SYSTEM diff --git a/aai_cli/control/render.py b/aai_cli/control/render.py new file mode 100644 index 00000000..44fee35a --- /dev/null +++ b/aai_cli/control/render.py @@ -0,0 +1,72 @@ +"""Surface control-loop progress: human lines on stderr, the reply on stdout. + +In human mode the running narration (what was heard, each action, refusals) goes +to the stderr console so stdout carries only the agent's spoken reply — the +pipe-safe split the rest of the CLI keeps. In ``--json`` mode every event is one +NDJSON record on stdout, each tagged with a ``type`` field per the streaming +output convention. +""" + +from __future__ import annotations + +from aai_cli.control.actions import Action +from aai_cli.ui import output + + +def _describe(action: Action) -> str: + """A compact one-line description of an action and its arguments.""" + if action.arguments: + return f"{action.name} {action.arguments}" + return action.name + + +class ControlRenderer: + """Render engine events for one ``assembly control`` session.""" + + def __init__(self, *, json_mode: bool) -> None: + self._json = json_mode + + def _event(self, event_type: str, **fields: object) -> None: + output.emit_ndjson({"type": event_type, **fields}) + + def on_user(self, text: str) -> None: + """A finalized spoken instruction was heard.""" + if self._json: + self._event("user", text=text) + else: + output.error_console.print(output.muted(f"you: {text}")) + + def on_action(self, action: Action) -> None: + """An action is about to run on the host.""" + if self._json: + self._event("action", action=action.name, arguments=action.arguments) + else: + output.error_console.print(output.muted(f"→ {_describe(action)}")) + + def on_result(self, action: Action, result: dict[str, object]) -> None: + """An action finished, with the helper's result.""" + if self._json: + self._event("result", action=action.name, result=result) + elif result.get("ok") is False: + output.error_console.print(output.warn(f" {result.get('error', 'failed')}")) + + def on_refused(self, action: Action, reason: str) -> None: + """A UI-mutating action was refused (e.g. ``--dry-run``).""" + if self._json: + self._event("refused", action=action.name, reason=reason) + else: + output.error_console.print(output.warn(f"refused {action.name}: {reason}")) + + def on_invalid(self, reason: str) -> None: + """The model called an unknown/under-specified tool.""" + if self._json: + self._event("invalid", reason=reason) + else: + output.error_console.print(output.warn(reason)) + + def on_reply(self, text: str) -> None: + """The model's spoken reply that ends a turn.""" + if self._json: + self._event("reply", text=text) + else: + output.console.print(text) diff --git a/aai_cli/control/tools.py b/aai_cli/control/tools.py new file mode 100644 index 00000000..79484702 --- /dev/null +++ b/aai_cli/control/tools.py @@ -0,0 +1,74 @@ +"""The control actions as OpenAI function-calling tool definitions. + +The LLM Gateway is OpenAI-compatible, so each macOS action is exposed to the +model as a ``function`` tool; the model picks one and supplies JSON arguments, +which :func:`aai_cli.control.actions.validate` turns into an executable +:class:`~aai_cli.control.actions.Action`. The required-argument set comes +straight from :data:`actions.ACTION_SPECS` so the advertised tools and the +executable vocabulary cannot drift (the tests assert the two agree). +""" + +from __future__ import annotations + +from aai_cli.control import actions + +# Human-readable, imperative one-liners the model sees for each tool. +_DESCRIPTIONS: dict[str, str] = { + "type_text": "Type literal text at the current cursor/focus", + "key_combo": "Press a key chord, e.g. ['cmd','s'] to save or ['cmd','tab'] to switch apps", + "click": "Click an accessibility element by id (from get_ui_tree), or raw screen x/y", + "launch_app": "Launch (or activate) an application by name, e.g. 'Safari'", + "focus_app": "Bring an already-running application to the foreground by name", + "get_ui_tree": "Read the focused app's accessibility tree: labeled, clickable elements", + "screenshot": "Capture the current screen so you can see what is on it", +} + +# JSON-schema property definitions per action. Required-ness is layered on from +# ACTION_SPECS in tool_definitions(), so this only describes the shape of each arg. +_PROPERTIES: dict[str, dict[str, dict[str, object]]] = { + "type_text": {"text": {"type": "string", "description": "The exact text to type"}}, + "key_combo": { + "keys": { + "type": "array", + "items": {"type": "string"}, + "description": "Modifier/key names pressed together, lowercased", + } + }, + "click": { + "element": {"type": "string", "description": "Accessibility element id from get_ui_tree"}, + "x": {"type": "integer", "description": "Screen x coordinate (use instead of element)"}, + "y": {"type": "integer", "description": "Screen y coordinate (use instead of element)"}, + }, + "launch_app": {"name": {"type": "string", "description": "Application name"}}, + "focus_app": {"name": {"type": "string", "description": "Application name"}}, + "get_ui_tree": {}, + "screenshot": {}, +} + + +def tool_names() -> tuple[str, ...]: + """The advertised tool names, sorted — must equal the executable action set.""" + return tuple(sorted(actions.ACTION_SPECS)) + + +def _function_schema(name: str) -> dict[str, object]: + """The ``function`` tool schema for one action, with its required args marked.""" + properties = _PROPERTIES[name] + return { + "type": "function", + "function": { + "name": name, + "description": _DESCRIPTIONS[name], + "parameters": { + "type": "object", + "properties": properties, + "required": list(actions.ACTION_SPECS[name]), + "additionalProperties": False, + }, + }, + } + + +def tool_definitions() -> list[dict[str, object]]: + """Every control action as an OpenAI ``tools`` entry, in stable (sorted) order.""" + return [_function_schema(name) for name in tool_names()] diff --git a/pyproject.toml b/pyproject.toml index 99d29287..3cce4f0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,6 +163,7 @@ artifacts = [ "aai_cli/init/templates/**", "aai_cli/skills/**", "aai_cli/streaming/macos_system_audio.swift", + "aai_cli/control/macos_ui_control.swift", ] exclude = ["**/__pycache__", "**/*.pyc", "**/AGENTS.md", "**/CLAUDE.md"] @@ -483,6 +484,7 @@ max-statements = 40 "aai_cli/commands/webhooks/_listen.py" = ["TID251"] "aai_cli/init/runner.py" = ["TID251"] "aai_cli/init/tunnel.py" = ["TID251"] +"aai_cli/control/helper.py" = ["TID251"] "aai_cli/streaming/macos.py" = ["TID251"] "aai_cli/streaming/sources.py" = ["TID251"] # Sandbox shell-out: launches the OS sandbox binary (sandbox-exec / bwrap) with controlled diff --git a/tests/__snapshots__/test_snapshots_help_root.ambr b/tests/__snapshots__/test_snapshots_help_root.ambr index 527511f5..aeec15c8 100644 --- a/tests/__snapshots__/test_snapshots_help_root.ambr +++ b/tests/__snapshots__/test_snapshots_help_root.ambr @@ -49,6 +49,8 @@ │ back │ │ agent Hold a live two-way voice conversation with the Voice Agent API │ │ live [sandbox] Talk live to a tool-using voice agent │ + │ control Drive your Mac hands-free: speak an instruction, an agent acts │ + │ on the UI │ │ speak [sandbox] Synthesize speech from text with AssemblyAI streaming │ │ TTS │ │ llm Send a prompt to AssemblyAI's LLM Gateway and print the reply │ diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index be0f68e3..82aa4d1a 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -262,6 +262,58 @@ + ''' +# --- +# name: test_command_help_matches_snapshot[control] + ''' + + Usage: assembly control [OPTIONS] + + Drive your Mac hands-free: speak an instruction, an agent acts on the UI + + Each spoken instruction is transcribed with Streaming STT and handed to an + LLM agent that decides which UI actions to take — typing, key chords, + clicking accessibility elements, launching apps — and performs them through a + bundled native macOS helper, then speaks back a short confirmation. + + macOS only: the helper needs Apple's Swift compiler and the Accessibility + + Microphone permissions granted to your terminal. Use --dry-run to watch the + agent plan without it touching anything. + + ╭─ Options ────────────────────────────────────────────────────────────────────╮ + │ --dry-run Plan and observe only: refuse every UI-changing action │ + │ --json -j Emit newline-delimited JSON events │ + │ --help Show this message and exit. │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Audio Capture ──────────────────────────────────────────────────────────────╮ + │ --device INTEGER Microphone device index │ + │ --sample-rate INTEGER RANGE [x>=1] Microphone capture rate in Hz │ + │ (default: device native) │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ LLM Transform ──────────────────────────────────────────────────────────────╮ + │ --model TEXT LLM Gateway model that decides the │ + │ actions │ + │ [default: │ + │ claude-haiku-4-5-20251001] │ + │ --max-tokens INTEGER RANGE [x>=1] Max tokens per agent step │ + │ [default: 8192] │ + │ --max-steps INTEGER RANGE [x>=1] Max action steps the agent may │ + │ take per spoken instruction │ + │ [default: 10] │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + + Examples + Control your Mac hands-free by voice + $ assembly control + Preview actions without touching the UI + $ assembly control --dry-run + Use a more capable model for the agent + $ assembly control --model claude-opus-4-7 + Emit the loop as newline-delimited JSON + $ assembly control --json + + + ''' # --- # name: test_command_help_matches_snapshot[dictate] diff --git a/tests/_control_helpers.py b/tests/_control_helpers.py new file mode 100644 index 00000000..aed9d059 --- /dev/null +++ b/tests/_control_helpers.py @@ -0,0 +1,135 @@ +"""Shared fakes for the ``assembly control`` test modules (``test_control*.py``). + +Every external leg (mic Streaming STT, the LLM Gateway, the native Swift helper) +is faked here so each test module drives the control loop with no microphone, +network, subprocess, or macOS. +""" + +from __future__ import annotations + +import io +import json +from collections.abc import Iterator +from pathlib import Path +from types import SimpleNamespace +from typing import IO + +from openai.types.chat import ChatCompletion + +from aai_cli.commands.control import _exec as control_exec +from aai_cli.control import engine, helper +from aai_cli.control.actions import Action + +OPTS = control_exec.ControlOptions( + device=None, sample_rate=None, model="m", max_tokens=8, max_steps=4, dry_run=False +) + + +class RecordingRenderer: + """A Renderer that records every event for assertions.""" + + def __init__(self) -> None: + self.users: list[str] = [] + self.actions: list[Action] = [] + self.results: list[tuple[Action, dict[str, object]]] = [] + self.refused: list[tuple[Action, str]] = [] + self.invalid: list[str] = [] + self.replies: list[str] = [] + + def on_user(self, text: str) -> None: + self.users.append(text) + + def on_action(self, action: Action) -> None: + self.actions.append(action) + + def on_result(self, action: Action, result: dict[str, object]) -> None: + self.results.append((action, result)) + + def on_refused(self, action: Action, reason: str) -> None: + self.refused.append((action, reason)) + + def on_invalid(self, reason: str) -> None: + self.invalid.append(reason) + + def on_reply(self, text: str) -> None: + self.replies.append(text) + + +def scripted(replies: list[engine.Reply]) -> engine.Responder: + """A responder that returns the next scripted reply on each call.""" + calls = iter(replies) + + def respond(messages: list[engine.Message]) -> engine.Reply: + return next(calls) + + return respond + + +def fake_completion(content, tool_calls) -> ChatCompletion: + # Build a real ChatCompletion the lenient way the SDK parses a wire response + # (model_construct), stuffing SimpleNamespace internals so we needn't hand-build + # every nested SDK model — the replay-fixtures idiom (see tests/AGENTS.md). + message = SimpleNamespace(content=content, tool_calls=tool_calls) + return ChatCompletion.model_construct(choices=[SimpleNamespace(message=message)]) + + +class FakeProc: + """A stand-in helper process with in-memory JSON-lines pipes.""" + + def __init__(self, response_lines: str, *, stdin: IO[str] | None = None) -> None: + self.stdin: IO[str] | None = io.StringIO() if stdin is None else stdin + self.stdout: IO[str] | None = io.StringIO(response_lines) + self.terminated = False + self._exit: int | None = None + + def poll(self) -> int | None: + return self._exit + + def terminate(self) -> None: + self.terminated = True + self._exit = 0 + + def wait(self, timeout: float | None = None) -> int | None: + return self._exit + + +class BrokenStdin(io.StringIO): + def write(self, _data: str, /) -> int: + raise OSError("broken pipe") + + +class FakeMic: + """An iterable-of-bytes mic stand-in that also reports a sample rate.""" + + sample_rate = 16000 + + def __iter__(self) -> Iterator[bytes]: + return iter(()) + + +class RecordingHelper(helper.UiHelper): + """A real UiHelper (so it satisfies the dep type) that records close().""" + + def __init__(self) -> None: + super().__init__(helper=Path("/fake/bin"), popen=lambda command: FakeProc("")) + self.closed = False + + def close(self) -> None: + self.closed = True + super().close() + + +def last_json(out: str) -> dict[str, object]: + parsed = json.loads(out.strip().splitlines()[-1]) + assert isinstance(parsed, dict) + return parsed + + +def deps_for( + hands: helper.UiHelper, *, transcripts: list[str], respond: engine.Responder +) -> control_exec.ControlDeps: + return control_exec.ControlDeps( + transcripts=lambda api_key, opts: transcripts, + responder=lambda api_key, opts: respond, + helper=lambda: hands, + ) diff --git a/tests/test_control.py b/tests/test_control.py new file mode 100644 index 00000000..c222692b --- /dev/null +++ b/tests/test_control.py @@ -0,0 +1,338 @@ +"""Tests for `assembly control` — actions, tools, the engine loop, the LLM +bridge, and rendering. + +Every external leg is faked (see tests/_control_helpers.py), so the loop is +exercised with no microphone, network, subprocess, or macOS. Helper-transport, +listen, and command-wiring tests live in test_control_exec.py. +""" + +from __future__ import annotations + +import json +from types import SimpleNamespace + +import pytest + +from aai_cli.control import actions, bridge, engine, prompt, render, tools +from aai_cli.control.actions import Action, InvalidAction +from tests._control_helpers import RecordingRenderer, fake_completion, last_json, scripted +from tests._snapshot_surface import normalize + +# --- actions ----------------------------------------------------------------- + + +def test_validate_returns_action_for_known_name_with_required_args(): + action = actions.validate("type_text", {"text": "hi"}) + assert action == Action(name="type_text", arguments={"text": "hi"}) + + +def test_validate_rejects_unknown_action(): + with pytest.raises(InvalidAction, match="Unknown action 'bogus'"): + actions.validate("bogus", {}) + + +def test_validate_rejects_missing_required_argument(): + with pytest.raises(InvalidAction, match="missing required argument"): + actions.validate("type_text", {}) + + +def test_is_observe_only_true_for_read_only_actions(): + assert actions.validate("get_ui_tree", {}).is_observe() is True + assert actions.validate("type_text", {"text": "x"}).is_observe() is False + + +def test_request_merges_action_name_and_arguments(): + request = Action(name="key_combo", arguments={"keys": ["cmd", "s"]}).request() + assert request == {"action": "key_combo", "keys": ["cmd", "s"]} + + +# --- tools -------------------------------------------------------------------- + + +def test_tool_names_match_executable_actions(): + assert set(tools.tool_names()) == set(actions.ACTION_SPECS) + assert tools.tool_names() == tuple(sorted(actions.ACTION_SPECS)) + + +def test_tool_definitions_carry_required_args_from_specs(): + # Round-trip through JSON so the nested schema is plain data to index into. + entries = json.loads(json.dumps(tools.tool_definitions())) + defs = {entry["function"]["name"]: entry for entry in entries} + assert len(defs) == len(actions.ACTION_SPECS) + assert defs["type_text"]["function"]["parameters"]["required"] == ["text"] + assert defs["get_ui_tree"]["function"]["parameters"]["required"] == [] + assert defs["type_text"]["type"] == "function" + # The schema forbids extra args, so a model can't smuggle unknown fields. + assert defs["type_text"]["function"]["parameters"]["additionalProperties"] is False + + +# --- engine message shaping --------------------------------------------------- + + +def test_assistant_message_serializes_tool_calls(): + reply = engine.Reply( + content="ok", + tool_calls=(engine.ToolCall(id="c1", name="type_text", arguments={"text": "x"}),), + ) + message = json.loads(json.dumps(engine._assistant_message(reply))) + assert message["role"] == "assistant" + assert message["content"] == "ok" # truthy content is kept, not dropped to null + call = message["tool_calls"][0] + assert call["id"] == "c1" + assert call["type"] == "function" + assert call["function"]["name"] == "type_text" + assert call["function"]["arguments"] == json.dumps({"text": "x"}) + + +def test_assistant_message_without_tool_calls_has_no_tool_calls_key(): + message = json.loads(json.dumps(engine._assistant_message(engine.Reply("done", ())))) + assert message == {"role": "assistant", "content": "done"} + + +def test_tool_message_carries_call_id_and_json_result(): + message = json.loads(json.dumps(engine._tool_message("c9", {"ok": True}))) + assert message == {"role": "tool", "tool_call_id": "c9", "content": json.dumps({"ok": True})} + + +# --- engine loop -------------------------------------------------------------- + + +def test_run_turn_executes_tool_call_then_speaks_reply(): + renderer = RecordingRenderer() + executed: list[Action] = [] + + def execute(action: Action) -> dict[str, object]: + executed.append(action) + return {"ok": True} + + replies = [ + engine.Reply(content="", tool_calls=(engine.ToolCall("c1", "type_text", {"text": "hi"}),)), + engine.Reply(content="typed it", tool_calls=()), + ] + engine.run_turn( + "type hi", + [{"role": "system", "content": "s"}], + respond=scripted(replies), + execute=execute, + renderer=renderer, + max_steps=5, + allow_mutate=True, + ) + assert renderer.users == ["type hi"] + assert executed == [Action("type_text", {"text": "hi"})] + assert renderer.results == [(Action("type_text", {"text": "hi"}), {"ok": True})] + assert renderer.replies == ["typed it"] + + +def test_run_turn_dry_run_refuses_mutating_action_but_runs_observe(): + renderer = RecordingRenderer() + executed: list[Action] = [] + + def execute(action: Action) -> dict[str, object]: + executed.append(action) + return {"ok": True, "elements": []} + + replies = [ + engine.Reply(content="", tool_calls=(engine.ToolCall("c1", "type_text", {"text": "x"}),)), + engine.Reply(content="", tool_calls=(engine.ToolCall("c2", "get_ui_tree", {}),)), + engine.Reply(content="done", tool_calls=()), + ] + messages = engine.run_turn( + "look", + [], + respond=scripted(replies), + execute=execute, + renderer=renderer, + max_steps=5, + allow_mutate=False, + ) + # The mutating action was refused (never executed); the observe action ran. + assert executed == [Action("get_ui_tree", {})] + assert renderer.refused and renderer.refused[0][0].name == "type_text" + # The refused tool call is reported back to the model as a failure (ok False). + refused_msg = json.loads(json.dumps(messages[2])) + assert json.loads(refused_msg["content"])["ok"] is False + + +def test_run_turn_reports_invalid_tool_call_without_executing(): + renderer = RecordingRenderer() + executed: list[Action] = [] + + replies = [ + engine.Reply(content="", tool_calls=(engine.ToolCall("c1", "bogus", {}),)), + engine.Reply(content="sorry", tool_calls=()), + ] + messages = engine.run_turn( + "do bad", + [], + respond=scripted(replies), + execute=lambda action: executed.append(action) or {"ok": True}, + renderer=renderer, + max_steps=5, + allow_mutate=True, + ) + assert executed == [] + assert renderer.invalid and "Unknown action" in renderer.invalid[0] + # The invalid call is reported back to the model as a failure (ok False). + invalid_msg = json.loads(json.dumps(messages[2])) + assert json.loads(invalid_msg["content"])["ok"] is False + + +def test_run_turn_stops_at_step_limit_with_a_spoken_note(): + renderer = RecordingRenderer() + # Always returns a tool call -> never settles -> must hit the step budget. + forever = engine.Reply(content="", tool_calls=(engine.ToolCall("c", "get_ui_tree", {}),)) + engine.run_turn( + "loop", + [], + respond=scripted([forever, forever]), + execute=lambda action: {"ok": True}, + renderer=renderer, + max_steps=2, + allow_mutate=True, + ) + assert renderer.replies == [engine.STEP_LIMIT_REPLY] + + +def test_run_session_threads_system_prompt_and_history_across_turns(): + renderer = RecordingRenderer() + seen: list[list[dict[str, object]]] = [] + + def respond(messages: list[engine.Message]) -> engine.Reply: + seen.append([dict(m) for m in messages]) + return engine.Reply(content="ack", tool_calls=()) + + engine.run_session( + ["first", "second"], + system="SYS", + respond=respond, + execute=lambda action: {"ok": True}, + renderer=renderer, + max_steps=3, + allow_mutate=True, + ) + assert renderer.replies == ["ack", "ack"] + # First call: system + first user. Second call also starts with the system prompt + # and carries the first turn forward (history threading). + assert seen[0][0] == {"role": "system", "content": "SYS"} + assert seen[0][-1] == {"role": "user", "content": "first"} + assert seen[1][0] == {"role": "system", "content": "SYS"} + assert any(m.get("content") == "first" for m in seen[1]) + assert seen[1][-1] == {"role": "user", "content": "second"} + + +def test_system_prompt_is_nonempty_spoken_brief(): + assert "tools" in prompt.system_prompt() + + +# --- bridge (LLM Gateway adapter) --------------------------------------------- + + +def test_parse_arguments_handles_valid_empty_and_malformed(): + assert bridge._parse_arguments(json.dumps({"a": 1})) == {"a": 1} + assert bridge._parse_arguments("") == {} + assert bridge._parse_arguments("not json") == {} + assert bridge._parse_arguments("[1, 2]") == {} + + +def test_reply_of_converts_message_and_tool_calls(): + call = SimpleNamespace( + id="t1", + type="function", + function=SimpleNamespace(name="focus_app", arguments=json.dumps({"name": "Safari"})), + ) + reply = bridge._reply_of(fake_completion("hello", [call])) + assert reply.content == "hello" + assert reply.tool_calls == (engine.ToolCall("t1", "focus_app", {"name": "Safari"}),) + + +def test_reply_of_skips_non_function_tool_calls(): + custom = SimpleNamespace(id="t2", type="custom", function=None) + reply = bridge._reply_of(fake_completion("", [custom])) + assert reply.tool_calls == () + + +def test_reply_of_defaults_missing_content_and_tool_calls(): + reply = bridge._reply_of(fake_completion(None, None)) + assert reply.content == "" + assert reply.tool_calls == () + + +def test_build_responder_passes_tools_in_extra_and_returns_reply(): + seen = {} + + def fake_complete(api_key, *, model, messages, max_tokens, extra): + seen.update(api_key=api_key, model=model, max_tokens=max_tokens, extra=extra) + return fake_completion("ok", None) + + respond = bridge.build_responder("k", model="m", max_tokens=7, complete=fake_complete) + reply = respond([{"role": "user", "content": "hi"}]) + assert reply == engine.Reply(content="ok", tool_calls=()) + assert seen["api_key"] == "k" + assert seen["model"] == "m" + assert seen["max_tokens"] == 7 + assert seen["extra"]["tool_choice"] == "auto" + assert {t["function"]["name"] for t in seen["extra"]["tools"]} == set(actions.ACTION_SPECS) + + +# --- render ------------------------------------------------------------------- + + +def test_describe_includes_arguments_only_when_present(): + assert render._describe(Action("get_ui_tree", {})) == "get_ui_tree" + assert "Safari" in render._describe(Action("focus_app", {"name": "Safari"})) + + +def test_renderer_json_mode_emits_typed_events(capsys): + r = render.ControlRenderer(json_mode=True) + r.on_user("hello") + assert last_json(capsys.readouterr().out) == {"type": "user", "text": "hello"} + + r.on_action(Action("focus_app", {"name": "Safari"})) + event = last_json(capsys.readouterr().out) + assert event == {"type": "action", "action": "focus_app", "arguments": {"name": "Safari"}} + + r.on_result(Action("focus_app", {"name": "Safari"}), {"ok": True}) + assert last_json(capsys.readouterr().out)["type"] == "result" + + r.on_refused(Action("type_text", {"text": "x"}), "nope") + assert last_json(capsys.readouterr().out) == { + "type": "refused", + "action": "type_text", + "reason": "nope", + } + + r.on_invalid("bad call") + assert last_json(capsys.readouterr().out) == {"type": "invalid", "reason": "bad call"} + + r.on_reply("all set") + assert last_json(capsys.readouterr().out) == {"type": "reply", "text": "all set"} + + +def test_renderer_human_mode_splits_progress_and_reply(capsys): + r = render.ControlRenderer(json_mode=False) + r.on_user("hello") + r.on_action(Action("focus_app", {"name": "Safari"})) + r.on_refused(Action("type_text", {"text": "x"}), "nope") + r.on_invalid("bad call") + r.on_reply("all set") + captured = capsys.readouterr() + err = normalize(captured.err) + out = normalize(captured.out) + # Progress narration is on stderr; the spoken reply is the only thing on stdout. + assert "hello" in err + assert "focus_app" in err + assert "nope" in err + assert "bad call" in err + assert out.strip() == "all set" + # The reply line is bare text, not a JSON event (kills the json_mode mutant). + with pytest.raises(json.JSONDecodeError): + json.loads(out.strip()) + + +def test_renderer_human_result_is_quiet_on_success_loud_on_failure(capsys): + r = render.ControlRenderer(json_mode=False) + r.on_result(Action("type_text", {"text": "x"}), {"ok": True}) + assert capsys.readouterr().err.strip() == "" + r.on_result(Action("type_text", {"text": "x"}), {"ok": False, "error": "denied"}) + assert "denied" in normalize(capsys.readouterr().err) diff --git a/tests/test_control_exec.py b/tests/test_control_exec.py new file mode 100644 index 00000000..7a507d1f --- /dev/null +++ b/tests/test_control_exec.py @@ -0,0 +1,362 @@ +"""Tests for `assembly control` wiring: the native-helper transport and build, +the mic listener, and the command/`_run_control` seam. + +All external legs are faked (see tests/_control_helpers.py); the pure loop, +actions, bridge, and rendering are covered by test_control.py. +""" + +from __future__ import annotations + +import dataclasses +import io +import json +import sys +from collections.abc import Iterator +from pathlib import Path +from types import SimpleNamespace + +import pytest +from assemblyai.streaming.v3 import StreamingParameters +from typer.testing import CliRunner + +from aai_cli.app.context import AppState +from aai_cli.commands.control import _exec as control_exec +from aai_cli.control import engine, helper, listen +from aai_cli.control.actions import Action +from aai_cli.core import config +from aai_cli.core.errors import APIError, CLIError +from aai_cli.main import app +from tests._control_helpers import ( + OPTS, + BrokenStdin, + FakeMic, + FakeProc, + RecordingHelper, + deps_for, + last_json, + scripted, +) + +# --- helper (native UI process transport) ------------------------------------- + + +def test_build_helper_refuses_non_macos(monkeypatch): + monkeypatch.setattr(helper, "_is_macos", lambda: False) + with pytest.raises(CLIError, match="only available on macOS") as exc: + helper.build_helper() + assert exc.value.exit_code == 2 + + +def test_build_helper_needs_swiftc(monkeypatch): + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _name: None) + with pytest.raises(CLIError, match="Swift compiler") as exc: + helper.build_helper() + assert exc.value.exit_code == 2 + + +def test_execute_round_trips_one_action(): + proc = FakeProc(json.dumps({"ok": True, "elements": []}) + "\n") + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: proc) + result = hands.execute(Action("get_ui_tree", {})) + assert result == {"ok": True, "elements": []} + assert isinstance(proc.stdin, io.StringIO) + assert json.loads(proc.stdin.getvalue()) == {"action": "get_ui_tree"} + + +def test_execute_raises_when_helper_closes_silently(): + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: FakeProc("")) + with pytest.raises(APIError, match="closed without responding"): + hands.execute(Action("get_ui_tree", {})) + + +def test_execute_raises_on_non_json_line(): + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: FakeProc("not-json\n")) + with pytest.raises(APIError, match="non-JSON"): + hands.execute(Action("get_ui_tree", {})) + + +def test_execute_treats_non_object_result_as_failure(): + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: FakeProc("[1, 2]\n")) + assert hands.execute(Action("get_ui_tree", {})) == { + "ok": False, + "error": "helper returned a non-object result", + } + + +def test_execute_raises_when_write_fails(): + proc = FakeProc("", stdin=BrokenStdin()) + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: proc) + with pytest.raises(APIError, match="stopped accepting input"): + hands.execute(Action("get_ui_tree", {})) + + +def test_execute_raises_when_streams_missing(): + proc = FakeProc("", stdin=None) + proc.stdin = None + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: proc) + with pytest.raises(APIError, match="did not expose"): + hands.execute(Action("get_ui_tree", {})) + + +def test_close_terminates_a_running_helper(): + proc = FakeProc(json.dumps({"ok": True}) + "\n") + hands = helper.UiHelper(helper=Path("/fake/bin"), popen=lambda command: proc) + hands.execute(Action("screenshot", {})) + hands.close() + assert proc.terminated is True + hands.close() # idempotent: nothing to do the second time + + +# --- helper build + spawn (macOS-only paths, mocked) -------------------------- + + +def test_platform_and_resource_probes(): + # Compares against the live platform so the == (not !=) is pinned on any OS. + assert helper._is_macos() == (sys.platform == "darwin") + assert helper._resource_bytes().startswith(b"import AppKit") + + +def test_build_helper_compiles_and_caches(monkeypatch, tmp_path): + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _tool: "/usr/bin/swiftc") + monkeypatch.setattr(helper, "_resource_bytes", lambda: b"swift source") + monkeypatch.setattr(helper, "user_cache_path", lambda _app: tmp_path) + captured_cmd: list[str] = [] + seen_kwargs: dict[str, object] = {} + + def fake_run(cmd, *, capture_output, text, check): + captured_cmd[:] = cmd + seen_kwargs.update(capture_output=capture_output, text=text, check=check) + Path(cmd[-1]).write_bytes(b"binary") + return SimpleNamespace(returncode=0, stderr="", stdout="") + + monkeypatch.setattr(helper.subprocess, "run", fake_run) + built = helper.build_helper() + assert built.read_bytes() == b"binary" + assert "-parse-as-library" in captured_cmd + assert "AppKit" in captured_cmd + # stderr/stdout captured as text; a non-zero compile is inspected, not raised. + assert seen_kwargs["capture_output"] is True + assert seen_kwargs["text"] is True + assert seen_kwargs["check"] is False + + +def _compile_ok(cmd, **_kwargs): + Path(cmd[-1]).write_bytes(b"bin") + return SimpleNamespace(returncode=0, stderr="", stdout="") + + +def test_build_helper_creates_missing_cache_parents(monkeypatch, tmp_path): + # The cache dir's parents may not exist; build_helper must create the whole chain. + nested = tmp_path / "missing1" / "missing2" + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _tool: "/usr/bin/swiftc") + monkeypatch.setattr(helper, "_resource_bytes", lambda: b"swift source") + monkeypatch.setattr(helper, "user_cache_path", lambda _app: nested) + monkeypatch.setattr(helper.subprocess, "run", _compile_ok) + assert helper.build_helper().read_bytes() == b"bin" + + +def test_build_helper_tolerates_existing_cache_dir(monkeypatch, tmp_path): + # A rebuild runs with the cache dir already present, so its mkdir must tolerate it. + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _tool: "/usr/bin/swiftc") + monkeypatch.setattr(helper, "_resource_bytes", lambda: b"swift source") + monkeypatch.setattr(helper, "user_cache_path", lambda _app: tmp_path) + (tmp_path / "macos-ui-control").mkdir(parents=True) # pre-exists + monkeypatch.setattr(helper.subprocess, "run", _compile_ok) + assert helper.build_helper().read_bytes() == b"bin" # must not raise FileExistsError + + +def test_build_helper_reuses_cached_binary(monkeypatch, tmp_path): + source = b"swift source" + digest = helper.hashlib.sha256(source).hexdigest()[:16] + cached = tmp_path / "macos-ui-control" / f"aai-macos-ui-control-{digest}" + cached.parent.mkdir(parents=True) + cached.write_bytes(b"cached") + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _tool: "/usr/bin/swiftc") + monkeypatch.setattr(helper, "_resource_bytes", lambda: source) + monkeypatch.setattr(helper, "user_cache_path", lambda _app: tmp_path) + + def must_not_compile(*_a, **_k): + raise AssertionError("a cached binary must not be recompiled") + + monkeypatch.setattr(helper.subprocess, "run", must_not_compile) + assert helper.build_helper() == cached + + +def test_build_helper_compile_failure_surfaces_detail(monkeypatch, tmp_path): + monkeypatch.setattr(helper, "_is_macos", lambda: True) + monkeypatch.setattr(helper.shutil, "which", lambda _tool: "/usr/bin/swiftc") + monkeypatch.setattr(helper, "_resource_bytes", lambda: b"swift source") + monkeypatch.setattr(helper, "user_cache_path", lambda _app: tmp_path) + monkeypatch.setattr( + helper.subprocess, + "run", + lambda *a, **k: SimpleNamespace(returncode=1, stderr="compile broke", stdout=""), + ) + with pytest.raises(CLIError) as exc: + helper.build_helper() + assert exc.value.exit_code == 2 + assert exc.value.suggestion == "compile broke" + + +def test_open_process_wires_json_line_pipes(monkeypatch): + captured_command: list[str] = [] + captured_kwargs: dict[str, object] = {} + + def fake_popen(command, **kwargs): + captured_command[:] = command + captured_kwargs.update(kwargs) + return SimpleNamespace() + + monkeypatch.setattr(helper.subprocess, "Popen", fake_popen) + helper._open_process(["/bin/helper"]) + assert captured_command == ["/bin/helper"] + assert captured_kwargs["text"] is True + assert captured_kwargs["bufsize"] == 1 + assert captured_kwargs["stdin"] == helper.subprocess.PIPE + assert captured_kwargs["stdout"] == helper.subprocess.PIPE + + +# --- listen (mic -> finalized utterances) ------------------------------------- + + +def test_finalized_text_only_returns_finished_nonempty_turns(): + assert listen._finalized_text(SimpleNamespace(end_of_turn=True, transcript="hi")) == "hi" + assert listen._finalized_text(SimpleNamespace(end_of_turn=False, transcript="partial")) is None + assert listen._finalized_text(SimpleNamespace(end_of_turn=True, transcript="")) is None + # No end_of_turn attribute defaults to "not finalized" -> None (not treated as done). + assert listen._finalized_text(SimpleNamespace(transcript="hi")) is None + + +class _BareMic: + """A mic with no sample_rate attribute, to exercise the rate fallback.""" + + def __iter__(self) -> Iterator[bytes]: + return iter(()) + + +def test_listen_yields_finalized_utterances_with_mic_rate(): + seen_params: list[StreamingParameters] = [] + + def fake_stream(api_key, source, *, params, on_turn): + seen_params.append(params) + on_turn(SimpleNamespace(end_of_turn=True, transcript="open safari")) + on_turn(SimpleNamespace(end_of_turn=False, transcript="ignored partial")) + on_turn(SimpleNamespace(end_of_turn=True, transcript="click go")) + + heard = list(listen.listen("k", stream=fake_stream, mic_factory=lambda **_k: FakeMic())) + assert heard == ["open safari", "click go"] + # Turn formatting is requested, and the mic's own rate is declared to the API. + assert seen_params[0].format_turns is True + assert seen_params[0].sample_rate == 16000 + + +def test_listen_falls_back_to_explicit_rate_when_mic_lacks_one(): + seen_params: list[StreamingParameters] = [] + + def fake_stream(api_key, source, *, params, on_turn): + seen_params.append(params) + + list( + listen.listen( + "k", sample_rate=24000, stream=fake_stream, mic_factory=lambda **_k: _BareMic() + ) + ) + assert seen_params[0].sample_rate == 24000 + + +def test_listen_reraises_a_streaming_failure(): + def boom(api_key, source, *, params, on_turn): + raise APIError("stream failed") + + with pytest.raises(APIError, match="stream failed"): + list(listen.listen("k", stream=boom, mic_factory=lambda **_k: FakeMic())) + + +# --- _exec wiring ------------------------------------------------------------- + + +def test_run_control_drives_a_session_and_closes_the_helper(capsys): + config.set_api_key("default", "sk_live") + hands = RecordingHelper() + deps = deps_for(hands, transcripts=["say hi"], respond=scripted([engine.Reply("done", ())])) + control_exec._run_control(OPTS, AppState(), json_mode=True, deps=deps) + assert hands.closed is True + assert last_json(capsys.readouterr().out) == {"type": "reply", "text": "done"} + + +def test_run_control_dry_run_refuses_mutation(capsys): + config.set_api_key("default", "sk_live") + hands = RecordingHelper() + # The model keeps trying to type; --dry-run must refuse it every step. + forever = engine.Reply("", (engine.ToolCall("c", "type_text", {"text": "x"}),)) + deps = deps_for(hands, transcripts=["type x"], respond=scripted([forever, forever])) + opts = dataclasses.replace(OPTS, dry_run=True, max_steps=2) + control_exec._run_control(opts, AppState(), json_mode=True, deps=deps) + events = [json.loads(line) for line in capsys.readouterr().out.strip().splitlines()] + assert any(e["type"] == "refused" for e in events) + + +def test_run_control_closes_helper_even_if_a_leg_raises(): + config.set_api_key("default", "sk_live") + hands = RecordingHelper() + + def explode(api_key: str, opts: control_exec.ControlOptions) -> list[str]: + raise APIError("listen failed") + + deps = control_exec.ControlDeps( + transcripts=explode, + responder=lambda api_key, opts: scripted([engine.Reply("x", ())]), + helper=lambda: hands, + ) + with pytest.raises(APIError, match="listen failed"): + control_exec._run_control(OPTS, AppState(), json_mode=False, deps=deps) + assert hands.closed is True + + +def test_run_control_delegates_to_run_with_default_deps(monkeypatch): + config.set_api_key("default", "sk_live") + hands = RecordingHelper() + deps = deps_for(hands, transcripts=[], respond=scripted([])) + monkeypatch.setattr(control_exec, "_DEFAULT_DEPS", deps) + control_exec.run_control(OPTS, AppState(), json_mode=True) + assert hands.closed is True + + +def test_default_builders_construct_the_real_legs(monkeypatch): + sentinel = helper.UiHelper.__new__(helper.UiHelper) + monkeypatch.setattr(control_exec, "UiHelper", lambda: sentinel) + assert control_exec._default_helper() is sentinel + + captured: dict[str, object] = {} + + def fake_build_responder(api_key: str, *, model: str, max_tokens: int) -> engine.Responder: + captured["model"] = model + captured["max_tokens"] = max_tokens + return lambda messages: engine.Reply("", ()) + + monkeypatch.setattr(control_exec.bridge, "build_responder", fake_build_responder) + control_exec._default_responder("k", OPTS) + assert captured == {"model": "m", "max_tokens": 8} + + monkeypatch.setattr( + control_exec.listen_mod, "listen", lambda api_key, *, device, sample_rate: iter(["hi"]) + ) + assert list(control_exec._default_transcripts("k", OPTS)) == ["hi"] + + +# --- command body (Typer seam) ------------------------------------------------ + + +def test_control_command_builds_options_and_runs(monkeypatch): + config.set_api_key("default", "sk_live") + hands = RecordingHelper() + deps = deps_for(hands, transcripts=[], respond=scripted([])) + monkeypatch.setattr(control_exec, "_DEFAULT_DEPS", deps) + result = CliRunner().invoke(app, ["control", "--dry-run", "--max-steps", "3"]) + assert result.exit_code == 0 + assert hands.closed is True diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 334a3c55..e21accf4 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -161,6 +161,7 @@ def test_help_lists_commands_in_workflow_order(): "dictate", "agent", "live", + "control", "speak", "llm", "clip",