From 2fffedbf11fc2eecef8fe5902754356ab175af14 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 23 Jun 2026 16:34:40 +0000 Subject: [PATCH] live: surface the agent's write_todos plan in the TUI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `assembly live` answers each spoken turn with a deepagents graph, which auto-installs langchain's TodoListMiddleware (the `write_todos` tool) — but the plan it writes was invisible. Surface it so a multi-step spoken request ("book a flight, then check the weather there") shows a visible checklist alongside the spoken "first I'll…, then I'll…", a recognizable Gemini-Live-style affordance. - plan.py: the write_todos plan subsystem — TodoItem/TodoUpdate, the JSON-arg parsing, and TodoCollector, which reassembles a write_todos call from streamed chunk args (streaming) or a non-streaming model's complete .tool_calls dict. - streamer detects the write_todos tool result and emits a TodoUpdate (never a generic "Using write_todos" affordance); engine forwards it to renderer.todos_updated without gating the spoken reply. - Voice TUI renders an in-place TodoList checklist (✓ done, ▸ in progress, ○ pending), one panel per turn, revised in place. AgentRenderer emits a `plan` NDJSON event / a stderr checklist on the non-TUI paths. Also splits brain.py along its build-vs-drive seam: graph assembly stays in brain.py, the per-turn streaming leg (build_streamer + helpers) moves to the new streamer.py (one-directional: streamer imports brain). This keeps both files under the 500-line gate. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_0153dUtXjPNhfMvFDyjiLgdh --- REFERENCE.md | 2 +- aai_cli/AGENTS.md | 2 +- aai_cli/agent/events.py | 25 +- aai_cli/agent/render.py | 31 +- aai_cli/agent_cascade/_io.py | 18 +- aai_cli/agent_cascade/_runtime.py | 8 +- aai_cli/agent_cascade/brain.py | 252 +-------------- aai_cli/agent_cascade/engine.py | 41 ++- aai_cli/agent_cascade/messages.py | 51 +++ aai_cli/agent_cascade/plan.py | 123 +++++++ aai_cli/agent_cascade/streamer.py | 300 ++++++++++++++++++ aai_cli/agent_cascade/tui.py | 24 ++ pyproject.toml | 18 +- .../test_tui_snapshots/test_live_plan.raw | 182 +++++++++++ tests/_cascade_fakes.py | 3 + tests/test_agent_cascade_brain.py | 89 +++++- tests/test_agent_cascade_command.py | 8 +- tests/test_agent_cascade_engine.py | 17 + tests/test_agent_cascade_files.py | 14 +- tests/test_agent_cascade_streamer.py | 105 +++++- tests/test_agent_cascade_subagents.py | 10 +- tests/test_agent_events.py | 6 + tests/test_agent_render.py | 55 ++++ tests/test_live_tui_plan.py | 135 ++++++++ tests/test_tui_snapshots.py | 21 ++ 25 files changed, 1219 insertions(+), 321 deletions(-) create mode 100644 aai_cli/agent_cascade/plan.py create mode 100644 aai_cli/agent_cascade/streamer.py create mode 100644 tests/__snapshots__/test_tui_snapshots/test_live_plan.raw create mode 100644 tests/test_live_tui_plan.py diff --git a/REFERENCE.md b/REFERENCE.md index 75f8164c..ef359544 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -94,7 +94,7 @@ each carrying a `"type"` field to dispatch on: | ------- | ----------- | | `assembly stream --json` | `begin`, `turn`, `termination` (with `--from-stdin`, a `source` event precedes each file's events) | | `assembly agent --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | -| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `tool.use`, `reply.started`, `transcript.agent`, `reply.done` | +| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `tool.use`, `plan`, `reply.started`, `transcript.agent`, `reply.done` | | `assembly dictate --json` | `utterance` | | `assembly llm --follow --json` | `answer` | | `assembly transcribe --json` | `result` (one per source), then `reduce` if `--llm-reduce` is set | diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index 34e5bd6d..57b72eda 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -153,7 +153,7 @@ heavily-reworked commands with long bodies; small commands keep the inline - **`streaming/`** + `client.stream_audio` — v3 realtime API. Event callbacks run on the SDK reader thread and guard against `BrokenPipeError` (`stdio.silence_stdout()`) so a closed pipe never dumps a thread traceback. - **`core/sync_stt.py`** + **`core/signals.py`** + `commands/dictate/` — `assembly dictate`: headless dictation over the **Sync STT API** (`Environment.sync_base`, one POST `/transcribe` per utterance with the required `X-AAI-Model: u3-sync-pro` header; 80 ms–120 s of PCM/WAV). It needs no terminal: recording starts immediately and `dictate_exec._record` polls `signals.stop_on_terminate` between ~100 ms mic chunks for a SIGTERM, which finishes the utterance (clean exit 0) — so a hotkey tool like Hammerspoon can launch it as a background task and `kill -TERM`/`task:terminate()` to transcribe. SIGINT (Ctrl-C) still cancels (exit 130). Both boundaries (the stop latch, mic, HTTP) are injectable, so the suite never needs a real signal or microphone (`tests/test_dictate_exec.py` scripts the SIGTERM latch). Contrast `signals.terminate_as_interrupt` (used by `stream`/`agent`/`speak`), which routes SIGTERM into the *cancel* path instead. - **`agent/`** — full-duplex voice agent (mic in, TTS out via `voices.py`). -- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, clause-level streaming TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`) streamed token-by-token via `brain.build_streamer` (`graph.stream(stream_mode="messages")`): the engine buffers `SpeechDelta`s, flushes complete clauses with `text.pop_clauses` (soft-separator clauses gated by `engine._MIN_CLAUSE_CHARS`), and synthesizes each clause with **streaming TTS** (`tts.session.synthesize(on_audio=…)`) so audio starts on the first frame instead of after the whole reply. The reply runs on a throwaway producer thread feeding a `queue.Queue` the worker drains under a monotonic deadline (the wall-clock backstop that replaced `_complete_within`), and an abandoned-on-timeout graph leg's langchain `ThreadPoolExecutor` worker is detached (`_detach_executor_threads_since`) so it can't wedge interpreter exit. A `ToolNotice` surfaces the "Searching the web…" affordance and drops any unspoken preamble. Under `-v` (`debuglog.active()`) `brain._stream_graph` logs each accumulated assistant line, tool call, and tool result as it streams. **Front-end:** an interactive mic session in human mode runs a **voice-only Textual TUI** (`agent_cascade/tui.py`, `LiveAgentApp`) by default — there's no text input (you can't type to it), just a transcript + an animated voice bar tracking listening/thinking/speaking. It uses its own `banner` wordmark, `messages` widgets, and `tui_status.voicebar_markup`/`VOICE_FRAMES` — all modules that now live in `agent_cascade/`; the blocking `run_cascade` runs on a worker thread and reaches the UI through a `_TuiRenderer` (the `engine.Renderer` protocol) that hops each call onto the UI thread, and a quit calls `DuplexAudio.close` to end the mic iterator and unblock that worker. `_exec._should_use_tui` gates it: file/sample input, `--json`/`-o text`, and a non-TTY all fall back to the plain `AgentRenderer` line output. **`--files`** (off by default) swaps the brain's in-memory backend for a real-cwd, sandbox-capable `SandboxedShellBackend` (`aai_cli/agent_cascade/sandbox.py`): file ops behave as before (traversal-blocked `virtual_mode`), and because it implements `SandboxBackendProtocol` deepagents binds a *functional* `execute` that runs commands OS-sandboxed in the real cwd — `sandbox-exec` (SBPL) on macOS, `bwrap` on Linux, refused (never an unconfined fallback) on any other platform or with the sandbox binary missing; the OS sandbox blocks the network, confines writes to cwd (+ the temp dir), and read-denies credential stores (`~/.ssh`/`~/.aws`/…, `.env*`, `.claude/`). The policy renderers are pure and the subprocess/capability boundaries injected, so the suite asserts *what we'd run* with no real sandbox. `write_file`/`edit_file`/`execute` are gated via `interrupt_on` + an `InMemorySaver`; `brain._stream_gated` detects the post-stream interrupt (`graph.get_state(config).interrupts`), asks an injected `Approver`, and resumes with `Command(resume=…)`, bracketing the human wait in `ApprovalPause` events so `engine._consume` suspends its reply deadline (`risk.py` surfaces a shell-risk warning on the prompt). The voice TUI supplies the approver via `agent_cascade.modals.ApprovalScreen` (`y`/`a`/`n`), which can *also* be resolved hands-free by voice: while a write awaits approval, `_consume` arms `_awaiting_approval` and `engine.on_turn` routes the next final transcript to `app.submit_voice_approval` → `ApprovalScreen.try_voice`, which applies `spoken_approval.spoken_decision` (an unambiguous affirmative approves, anything else rejects — fail-safe; destructive `risk.py`-flagged commands ignore the spoken answer and require a keypress). Headless runs auto-deny (`_exec._deny_writes`). `--files` also turns on durable per-project memory via deepagents' `MemoryMiddleware` (`memory=["./.deepagents/AGENTS.md"]`), distinct from the in-session `InMemorySaver`, and binds one gateway-bound, sandbox-backed general-purpose subagent (deepagents' `task` tool; spec in `agent_cascade/subagents.py`, omitting `model`/`tools` so it inherits both) for delegating a focused subtask. The subagent's own `interrupt_on` mirrors `_WRITE_TOOLS`, and a delegated `write_file`/`edit_file`/`execute` surfaces at the *parent* `get_state().interrupts` (so `_pending_writes` gates it too — verified by a HITL spike, locked in `tests/test_agent_cascade_subagents.py`). Reads (incl. `grep`) stay ungated. +- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, clause-level streaming TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph **assembled** by `brain.py` (tools, backend, middleware, `build_graph`, plus the shared `SpeechDelta`/`ToolNotice`/`ApprovalPause` event types and tool-affordance vocabulary) and **driven** turn-by-turn by `streamer.py` (`build_streamer`, `graph.stream(stream_mode="messages")`) — the natural build-vs-drive seam, split so each file stays under the 500-line gate (the dependency is one-directional: `streamer` imports `brain`, never the reverse). The engine buffers `SpeechDelta`s, flushes complete clauses with `text.pop_clauses` (soft-separator clauses gated by `engine._MIN_CLAUSE_CHARS`), and synthesizes each clause with **streaming TTS** (`tts.session.synthesize(on_audio=…)`) so audio starts on the first frame instead of after the whole reply. The reply runs on a throwaway producer thread feeding a `queue.Queue` the worker drains under a monotonic deadline (the wall-clock backstop that replaced `_complete_within`), and an abandoned-on-timeout graph leg's langchain `ThreadPoolExecutor` worker is detached (`_detach_executor_threads_since`) so it can't wedge interpreter exit. A `ToolNotice` surfaces the "Searching the web…" affordance and drops any unspoken preamble. **The `write_todos` plan** (deepagents auto-installs langchain's `TodoListMiddleware`) is reassembled from the streamed call args by `plan.py`'s `TodoCollector` and surfaced as a `plan.TodoUpdate` (not a `ToolNotice`); `engine._consume` forwards it to `renderer.todos_updated` without gating speech, so the voice TUI shows an in-place `messages.TodoList` checklist (and `AgentRenderer` emits a `plan` NDJSON event / a stderr checklist) — a visible plan to go with the spoken "first I'll…, then I'll…". Under `-v` (`debuglog.active()`) `streamer._stream_graph` logs each accumulated assistant line, tool call, and tool result as it streams. **Front-end:** an interactive mic session in human mode runs a **voice-only Textual TUI** (`agent_cascade/tui.py`, `LiveAgentApp`) by default — there's no text input (you can't type to it), just a transcript + an animated voice bar tracking listening/thinking/speaking. It uses its own `banner` wordmark, `messages` widgets, and `tui_status.voicebar_markup`/`VOICE_FRAMES` — all modules that now live in `agent_cascade/`; the blocking `run_cascade` runs on a worker thread and reaches the UI through a `_TuiRenderer` (the `engine.Renderer` protocol) that hops each call onto the UI thread, and a quit calls `DuplexAudio.close` to end the mic iterator and unblock that worker. `_exec._should_use_tui` gates it: file/sample input, `--json`/`-o text`, and a non-TTY all fall back to the plain `AgentRenderer` line output. **`--files`** (off by default) swaps the brain's in-memory backend for a real-cwd, sandbox-capable `SandboxedShellBackend` (`aai_cli/agent_cascade/sandbox.py`): file ops behave as before (traversal-blocked `virtual_mode`), and because it implements `SandboxBackendProtocol` deepagents binds a *functional* `execute` that runs commands OS-sandboxed in the real cwd — `sandbox-exec` (SBPL) on macOS, `bwrap` on Linux, refused (never an unconfined fallback) on any other platform or with the sandbox binary missing; the OS sandbox blocks the network, confines writes to cwd (+ the temp dir), and read-denies credential stores (`~/.ssh`/`~/.aws`/…, `.env*`, `.claude/`). The policy renderers are pure and the subprocess/capability boundaries injected, so the suite asserts *what we'd run* with no real sandbox. `write_file`/`edit_file`/`execute` are gated via `interrupt_on` + an `InMemorySaver`; `brain._stream_gated` detects the post-stream interrupt (`graph.get_state(config).interrupts`), asks an injected `Approver`, and resumes with `Command(resume=…)`, bracketing the human wait in `ApprovalPause` events so `engine._consume` suspends its reply deadline (`risk.py` surfaces a shell-risk warning on the prompt). The voice TUI supplies the approver via `agent_cascade.modals.ApprovalScreen` (`y`/`a`/`n`), which can *also* be resolved hands-free by voice: while a write awaits approval, `_consume` arms `_awaiting_approval` and `engine.on_turn` routes the next final transcript to `app.submit_voice_approval` → `ApprovalScreen.try_voice`, which applies `spoken_approval.spoken_decision` (an unambiguous affirmative approves, anything else rejects — fail-safe; destructive `risk.py`-flagged commands ignore the spoken answer and require a keypress). Headless runs auto-deny (`_exec._deny_writes`). `--files` also turns on durable per-project memory via deepagents' `MemoryMiddleware` (`memory=["./.deepagents/AGENTS.md"]`), distinct from the in-session `InMemorySaver`, and binds one gateway-bound, sandbox-backed general-purpose subagent (deepagents' `task` tool; spec in `agent_cascade/subagents.py`, omitting `model`/`tools` so it inherits both) for delegating a focused subtask. The subagent's own `interrupt_on` mirrors `_WRITE_TOOLS`, and a delegated `write_file`/`edit_file`/`execute` surfaces at the *parent* `get_state().interrupts` (so `_pending_writes` gates it too — verified by a HITL spike, locked in `tests/test_agent_cascade_subagents.py`). Reads (incl. `grep`) stay ungated. - **`tts/`** + `commands/speak.py` — `assembly speak` synthesizes text to speech over the sandbox streaming-TTS WebSocket (`streaming-tts.sandbox000.…`). **Sandbox-only:** `session.is_available()` is false in production (empty `Environment.streaming_tts_host`), so the command exits 2 with a `--sandbox` hint. `session.synthesize` drives a Begin→Generate→Flush→Audio→Terminate protocol with an injectable `connect` for hermetic tests (mirrors `agent/session.py`); `audio.py` plays the PCM (default) or writes a WAV (`--out`). The single-voice default-playback path **streams**: `synthesize`'s `on_audio(chunk, sample_rate)` callback is wired to `audio.PcmPlayer.feed`, so speech starts on the first Audio frame (it opens the device lazily, since the rate is only known at Begin) instead of after the whole text — the win for a long `--url` page. `--out` (needs the full buffer) and the multi-voice dialogue path (`synthesize_dialogue` → `_output_audio` → buffered `play_pcm`) stay buffered; `synthesize` still returns the complete PCM for the summary regardless. - **`code_gen/`** — backs `--show-code` on `transcribe`/`stream`/`agent`: builds a ready-to-run Python SDK script from exactly the flags passed (no API key needed; generated code reads `ASSEMBLYAI_API_KEY`). - **`auth/`** — browser-assisted `assembly login` via AMS + **Stytch B2B OAuth discovery** (`discovery.py`, `flow.py`, `loopback.py`, `ams.py`). Not Stytch Connected Apps. diff --git a/aai_cli/agent/events.py b/aai_cli/agent/events.py index 3247bfca..be7bfcdd 100644 --- a/aai_cli/agent/events.py +++ b/aai_cli/agent/events.py @@ -59,6 +59,20 @@ class ToolUse(_Event): label: str +class TodoItem(_Event): + """One task in the agent's plan: its text and lifecycle ``status``.""" + + content: str + status: str + + +class PlanUpdate(_Event): + """The agent's task list (its ``write_todos`` plan); ``todos`` replaces any prior plan.""" + + type: Literal["plan"] = "plan" + todos: tuple[TodoItem, ...] + + class AgentTranscript(_Event): """The agent's reply transcript (``interrupted`` when the user barged in).""" @@ -74,4 +88,13 @@ class ReplyDone(_Event): interrupted: bool -Event = SessionReady | UserDelta | UserFinal | ToolUse | ReplyStarted | AgentTranscript | ReplyDone +Event = ( + SessionReady + | UserDelta + | UserFinal + | ToolUse + | PlanUpdate + | ReplyStarted + | AgentTranscript + | ReplyDone +) diff --git a/aai_cli/agent/render.py b/aai_cli/agent/render.py index 7288fc86..5e6b65da 100644 --- a/aai_cli/agent/render.py +++ b/aai_cli/agent/render.py @@ -1,12 +1,23 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any from rich.text import Text from aai_cli.agent import events from aai_cli.ui.render import BaseRenderer +if TYPE_CHECKING: + from aai_cli.agent_cascade.plan import TodoItem + +# Single-cell status marks for the plan, pipe-safe and aligned (the TUI styles its own). +_TODO_MARKS = {"completed": "[x]", "in_progress": "[~]", "pending": "[ ]"} + + +def _mark(status: str) -> str: + """The checklist marker for a todo status, falling back to the pending box for an unknown one.""" + return _TODO_MARKS.get(status, "[ ]") + def _labeled(label: str, body: str, *, style: str = "aai.label") -> Text: """A transcript line tinted entirely in `style` — both the `label` prefix and the body. @@ -88,6 +99,24 @@ def tool_call(self, label: str) -> None: else: self._line(_labeled("", f"{label}…", style="aai.muted")) + def todos_updated(self, todos: tuple[TodoItem, ...]) -> None: + """Surface the agent's plan (its ``write_todos`` list), replacing any prior plan. + + JSON emits a ``plan`` event; piped text routes a compact one-line summary to stderr + (transcript-only stdout); human mode shows a muted multi-line checklist. + """ + if self.json_mode: + items = tuple(events.TodoItem(content=t.content, status=t.status) for t in todos) + self._emit_event(events.PlanUpdate(todos=items)) + elif self.text_mode: + self._status("Plan: " + "; ".join(f"{_mark(t.status)} {t.content}" for t in todos)) + else: + self._line(_labeled("", "Plan:", style="aai.muted")) + for todo in todos: + self._line( + _labeled(" ", f"{_mark(todo.status)} {todo.content}", style="aai.muted") + ) + # --- agent ------------------------------------------------------------- def reply_started(self) -> None: if self.json_mode: diff --git a/aai_cli/agent_cascade/_io.py b/aai_cli/agent_cascade/_io.py index 89a2a2e6..d14c634d 100644 --- a/aai_cli/agent_cascade/_io.py +++ b/aai_cli/agent_cascade/_io.py @@ -13,7 +13,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Protocol -from aai_cli.agent_cascade import brain +from aai_cli.agent_cascade import brain, plan, streamer from aai_cli.agent_cascade._runtime import Worker as _Worker from aai_cli.agent_cascade._runtime import spawn_thread as _spawn_thread from aai_cli.agent_cascade.config import CascadeConfig @@ -43,6 +43,9 @@ def user_final(self, text: str) -> None: def tool_call(self, label: str) -> None: """Show that the agent is using a tool (e.g. "Searching the web") while it thinks.""" + def todos_updated(self, todos: tuple[plan.TodoItem, ...]) -> None: + """Show the agent's current task list (its ``write_todos`` plan), replacing any prior.""" + def reply_started(self) -> None: """Mark the start of an agent reply.""" @@ -82,12 +85,13 @@ class CascadeDeps: """ run_stt: Callable[[Callable[[object], None]], None] - # stream_reply(messages) -> iterable of SpeechDelta/ToolNotice events (plus ApprovalPause - # markers under --files write gating). The reply is streamed token-by-token so the engine - # can speak each clause as it lands; a ToolNotice surfaces the "Searching the web…" - # affordance (brain.build_streamer). + # stream_reply(messages) -> iterable of SpeechDelta/ToolNotice/TodoUpdate events (plus + # ApprovalPause markers under --files write gating). The reply is streamed token-by-token so + # the engine can speak each clause as it lands; a ToolNotice surfaces the "Searching the web…" + # affordance and a TodoUpdate the agent's plan (streamer.build_streamer). stream_reply: Callable[ - ..., Iterable[brain.SpeechDelta | brain.ToolNotice | brain.ApprovalPause] + ..., + Iterable[brain.SpeechDelta | brain.ToolNotice | plan.TodoUpdate | brain.ApprovalPause], ] # synthesize(text, sink): streaming TTS — sink is called with each PCM frame as it # arrives so playback starts on the first frame instead of after the whole clause. @@ -110,7 +114,7 @@ def run_stt(on_turn: Callable[[object], None]) -> None: # The LLM leg is a deepagents graph (web search / MCP tools), streamed token-by-token # so a spoken turn can transparently use tools and start speaking sooner. ``approver`` # gates --files writes (None on the non-files path, where the graph never pauses). - stream_reply = brain.build_streamer(api_key, config, approver=approver) + stream_reply = streamer.build_streamer(api_key, config, approver=approver) def synthesize(text: str, sink: Callable[[bytes], None]) -> None: spec = SpeakConfig( diff --git a/aai_cli/agent_cascade/_runtime.py b/aai_cli/agent_cascade/_runtime.py index 5c23ac0f..a3f1a07a 100644 --- a/aai_cli/agent_cascade/_runtime.py +++ b/aai_cli/agent_cascade/_runtime.py @@ -23,7 +23,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Protocol -from aai_cli.agent_cascade import brain +from aai_cli.agent_cascade import brain, plan from aai_cli.core.errors import CLIError if TYPE_CHECKING: @@ -53,9 +53,11 @@ class Timeout: """Consumer sentinel: the wall-clock deadline elapsed before the next event arrived.""" -# What the producer thread puts on the consumer's queue: a speech/tool event from the +# What the producer thread puts on the consumer's queue: a speech/tool/plan event from the # streaming leg, an approval-pause marker (--files write gating), or a terminal sentinel. -type ReplyEvent = brain.SpeechDelta | brain.ToolNotice | brain.ApprovalPause | Done | Failure +type ReplyEvent = ( + brain.SpeechDelta | brain.ToolNotice | plan.TodoUpdate | brain.ApprovalPause | Done | Failure +) def timeout_error() -> CLIError: diff --git a/aai_cli/agent_cascade/brain.py b/aai_cli/agent_cascade/brain.py index ead1d7fe..309f6418 100644 --- a/aai_cli/agent_cascade/brain.py +++ b/aai_cli/agent_cascade/brain.py @@ -4,21 +4,21 @@ LLM completion, so the agent can transparently reach for a tool — web search — mid-conversation, mimicking a live multimodal assistant (the "talk to Gemini Live" experience). The toolset is deliberately minimal: a low-latency spoken turn does best -with one obvious tool rather than a menu it has to choose among. The graph is built once per session -(:func:`build_graph`) and driven turn-by-turn with the running history the -cascade already keeps (:func:`build_streamer`); tools are read-only and auto-approved, -because a spoken turn can't pause for a keyboard confirmation, and the system prompt -keeps every reply short and speakable. - -The graph is the only network seam: :func:`build_streamer` accepts an injected graph, -so the per-turn streaming reply leg is unit-tested against a fake with no sockets — the -same seam the rest of the cascade uses for its STT/LLM/TTS legs. +with one obvious tool rather than a menu it has to choose among. The graph is built once +per session (:func:`build_graph`); tools are read-only and auto-approved, because a spoken +turn can't pause for a keyboard confirmation, and the system prompt keeps every reply short +and speakable. + +This module owns graph *assembly* (tools, backend, middleware, the compiled graph) plus the +shared stream-event types (:class:`SpeechDelta`/:class:`ToolNotice`/:class:`ApprovalPause`) +and tool affordance vocabulary. Driving the graph turn-by-turn lives beside it in +:mod:`aai_cli.agent_cascade.streamer` (``build_streamer``) — the natural build-vs-drive seam, +split out to keep each file within the length gate; that streaming leg is what the cascade's +STT/LLM/TTS injection seam exercises against a fake graph with no sockets. """ from __future__ import annotations -import itertools -import logging from collections.abc import Callable, Iterator, Mapping, Sequence from dataclasses import dataclass from pathlib import Path @@ -28,13 +28,10 @@ from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.agent_cascade.firecrawl_search import WEB_SEARCH_TOOL_NAME from aai_cli.agent_cascade.prompt import build_system_prompt -from aai_cli.core import debuglog -from aai_cli.core.errors import CLIError if TYPE_CHECKING: from langchain.agents.middleware import AgentMiddleware from langchain_core.tools import BaseTool - from openai.types.chat import ChatCompletionMessageParam class CompiledAgent(Protocol): @@ -50,17 +47,6 @@ def invoke( """Run one step of the graph, returning the updated state (incl. messages).""" -# Verbose (`-v`) flow logging for the agent's tool loop. `invoke` runs the whole loop -# internally, so without this `-v` only shows the httpx request lines and never which -# tools the agent reached for or what they returned — exactly what you need to see when -# a spoken turn stalls mid-tool. Logged at INFO so plain `-v` surfaces it. -_FLOW_LOG = logging.getLogger("aai_cli.agent_cascade.brain") - -# Tool outputs (a fetched page, a search payload) can be huge; cap what we log per result -# so a single tool call doesn't bury the rest of the flow in stderr. The exact cap is an -# arbitrary tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it. -_RESULT_LOG_CAP = 500 # pragma: no mutate - # Human, speakable labels for the tool affordance the live UI shows while a tool runs (so a # spoken turn that pauses to use a tool says *why* it's working, not just spin silently). _TOOL_LABELS = { @@ -164,11 +150,9 @@ def get_state(self, config: Mapping[str, object] | None) -> object: # Decide whether a gated write may run (front-end supplied). Mirrors the code agent's Approver. +# The streaming leg that consults it lives in :mod:`aai_cli.agent_cascade.streamer`. Approver = Callable[[str, dict[str, object]], bool] -# Message handed back to the model when the user declines a write (matches the code agent's copy). -_DECLINED = "User declined to run this tool." - def build_live_tools() -> list[BaseTool]: """The live agent's built-in tools: the keyless weather, read-a-URL, and date/time @@ -285,215 +269,3 @@ def build_graph( middleware=_build_middleware(config), **_graph_kwargs(config), ) - - -def build_streamer( - api_key: str, - config: CascadeConfig, - *, - graph: CompiledAgent | None = None, - approver: Approver | None = None, -) -> Callable[..., Iterator[SpeechDelta | ToolNotice | ApprovalPause]]: - """A streaming reply leg for the cascade engine, backed by the deepagents graph. - - The cascade prepends its own ``system`` message each turn; the graph owns the system - prompt, so it is dropped before streaming. The graph is driven with - ``stream_mode="messages"`` and each top-level assistant token delta is yielded as a - :class:`SpeechDelta`, each started tool call as a :class:`ToolNotice` (the live UI's - affordance). Under ``-v`` the flow is logged. ``graph`` is injected in tests so the - per-turn wiring runs against a fake with no network. - - With ``--files`` on (``config.files``) the graph gates ``write_file``/``edit_file``: a - pending write pauses the stream, ``approver`` decides, and the turn resumes (see - :func:`_stream_gated`). Each turn uses a fresh ``thread_id`` so the checkpointer never - accumulates the cascade's full-history-per-turn input across turns. - """ - resolved = build_graph(api_key, config) if graph is None else graph - turn_ids = itertools.count() - - def stream_reply( - messages: list[ChatCompletionMessageParam], - ) -> Iterator[SpeechDelta | ToolNotice | ApprovalPause]: - conversation = [message for message in messages if message.get("role") != "system"] - run_config = ( - {"configurable": {"thread_id": f"live-{next(turn_ids)}"}} if config.files else None - ) - return _stream_graph( - resolved, conversation, approver=approver, config=run_config, gated=config.files - ) - - return stream_reply - - -def _stream_graph( - graph: CompiledAgent, - conversation: list[ChatCompletionMessageParam], - *, - approver: Approver | None = None, - config: dict[str, object] | None = None, - gated: bool = False, -) -> Iterator[SpeechDelta | ToolNotice | ApprovalPause]: - """Stream one turn through the graph token-by-token, yielding speech/tool events. - - Wraps any graph failure as a CLIError (a clean ``CLIError`` passes through) so the - cascade surfaces it instead of the reply worker dying silently. Under ``-v`` the - accumulated assistant text, each tool call, and each tool result are logged to - ``_FLOW_LOG``. When ``gated`` (``--files``), writes pause for ``approver`` (see - :func:`_stream_gated`); otherwise it is a single uninterrupted stream pass. - """ - verbose = debuglog.active() - pending: list[str] = [] # assistant deltas accumulated for one verbose "llm:" line - - def flush_log() -> None: - if verbose and pending: - _FLOW_LOG.info("llm: %s", "".join(pending)) - pending.clear() - - if not hasattr(graph, "stream"): - raise CLIError( - "the agent couldn't complete the turn: the agent graph cannot stream", - error_type="agent_brain_error", - ) - try: - # The gated path needs stream + get_state (the graph is built with a checkpointer, so it - # always satisfies _GatedGraph); the isinstance both narrows for mypy and falls back to a - # plain stream for the impossible non-gated-graph case. - if gated and isinstance(graph, _GatedGraph): - yield from _stream_gated( - graph, - conversation, - approver, - config, - verbose=verbose, - pending=pending, - flush_log=flush_log, - ) - else: - for chunk, _m in graph.stream( - {"messages": conversation}, config, stream_mode="messages" - ): - yield from _events_from_chunk( - chunk, verbose=verbose, pending=pending, flush_log=flush_log - ) - flush_log() - except CLIError: - raise - except Exception as exc: - raise CLIError( - f"the agent couldn't complete the turn: {exc}", error_type="agent_brain_error" - ) from exc - - -def _stream_gated( - graph: _GatedGraph, - conversation: list[ChatCompletionMessageParam], - approver: Approver | None, - config: dict[str, object] | None, - *, - verbose: bool, - pending: list[str], - flush_log: Callable[[], None], -) -> Iterator[SpeechDelta | ToolNotice | ApprovalPause]: - """Stream a write-gated turn: each pause on a write asks ``approver`` and resumes. - - The graph pauses (before executing a gated write) by ending the ``messages`` stream with - a pending interrupt on the checkpointed state. We surface its action requests, bracket the - human decision with :class:`ApprovalPause` events, and resume with the approve/reject - ``Command`` — looping until the turn finishes without pausing. - """ - from langgraph.types import Command - - graph_input: object = {"messages": conversation} - while True: - for chunk, _m in graph.stream(graph_input, config, stream_mode="messages"): - yield from _events_from_chunk( - chunk, verbose=verbose, pending=pending, flush_log=flush_log - ) - flush_log() - requests = _pending_writes(graph, config) - if not requests: - return - decisions: list[dict[str, object]] = [] - for request in requests: - yield ApprovalPause(active=True) - decisions.append(_decide(request, approver)) - yield ApprovalPause(active=False) - graph_input = Command(resume={"decisions": decisions}) - - -def _pending_writes( - graph: _GatedGraph, config: dict[str, object] | None -) -> list[dict[str, object]]: - """The action requests of a paused gated write (empty when the turn isn't paused). - - deepagents surfaces an approval pause as ``interrupts`` on the checkpointed state, each - interrupt's ``.value`` carrying the ``action_requests`` (the gated tool calls). - """ - state = graph.get_state(config) - requests: list[dict[str, object]] = [] - for interrupt in getattr(state, "interrupts", ()) or (): - value = getattr(interrupt, "value", None) - actions = value.get("action_requests") if isinstance(value, dict) else None - if isinstance(actions, list): - requests.extend(action for action in actions if isinstance(action, dict)) - return requests - - -def _decide(action: dict[str, object], approver: Approver | None) -> dict[str, object]: - """Ask the approver about one pending write and shape the resume decision (reject if none).""" - name = str(action.get("name", "")) - args = action.get("args") or {} - if not isinstance(args, dict): - args = {} - if approver is not None and approver(name, args): - return {"type": "approve"} - return {"type": "reject", "message": _DECLINED} - - -def _events_from_chunk( - chunk: object, *, verbose: bool, pending: list[str], flush_log: Callable[[], None] -) -> Iterator[SpeechDelta | ToolNotice]: - """Translate one streamed message chunk into speech/tool events (and verbose logs).""" - if type(chunk).__name__ == "ToolMessage": - flush_log() - if verbose: - content = _content_text(getattr(chunk, "content", "")) - _FLOW_LOG.info("tool result %s -> %s", getattr(chunk, "name", ""), _clip(content)) - return - for call in getattr(chunk, "tool_call_chunks", None) or []: - name = call.get("name") - if name: - flush_log() - if verbose: - _FLOW_LOG.info("tool call %s", name) - yield ToolNotice(_tool_label(name), _tool_fillers(name)) - text = _content_text(getattr(chunk, "content", "")) - if text: - pending.append(text) - yield SpeechDelta(text) - - -def _clip(text: str) -> str: - """Flatten a tool result onto one line and truncate it for the flow log. - - Tool output is untrusted external content (a fetched page, a search payload), so its - whitespace — newlines especially — is collapsed before logging: a result can't then - forge extra ``[aai_cli.…]`` log lines, and each result stays on one readable line. The - length is capped so a multi-KB payload can't bury the rest of the flow. (Secrets are - separately masked by the debuglog formatter across every record.) - """ - flattened = " ".join(text.split()) - if len(flattened) <= _RESULT_LOG_CAP: - return flattened - return f"{flattened[:_RESULT_LOG_CAP]}… ({len(flattened)} chars)" - - -def _content_text(content: object) -> str: - """Coerce a message's content (a string, or a list of content blocks) to plain text.""" - if isinstance(content, str): - return content - if isinstance(content, list): - return "".join( - block.get("text", "") if isinstance(block, dict) else str(block) for block in content - ) - return str(content) diff --git a/aai_cli/agent_cascade/engine.py b/aai_cli/agent_cascade/engine.py index a048d062..9d3c25d0 100644 --- a/aai_cli/agent_cascade/engine.py +++ b/aai_cli/agent_cascade/engine.py @@ -19,7 +19,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING -from aai_cli.agent_cascade import brain +from aai_cli.agent_cascade import brain, plan from aai_cli.agent_cascade._io import CascadeDeps, Player, Renderer from aai_cli.agent_cascade._runtime import ( REPLY_TIMEOUT_SECONDS as _REPLY_TIMEOUT_SECONDS, @@ -270,18 +270,16 @@ def _consume( used_tool = False # once a tool ran, hold text unspoken so only the final answer is read while True: item = self._next_event(events, deadline, before) - if isinstance(item, _Timeout): - self._surface_error(_timeout_error(), started=self._speaking.is_set()) - return None - if isinstance(item, _Failure): - self._surface_error(item.error, started=self._speaking.is_set()) - return None - if isinstance(item, _Done): - return _final_tail(buffer, held, used_tool=used_tool) + terminal, result = self._terminal_result(item, buffer, held, used_tool=used_tool) + if terminal: + return result if isinstance(item, brain.ApprovalPause): deadline = _approval_deadline(item) self._set_awaiting_approval(active=item.active) continue + if isinstance(item, plan.TodoUpdate): + self.renderer.todos_updated(item.todos) # visual only; never gates the reply + continue if isinstance(item, brain.ToolNotice): if not self._handle_tool_notice(item, spoke_filler=spoke_filler): return None @@ -292,11 +290,26 @@ def _consume( continue if self._stop.is_set(): return None - # item is a streamed SpeechDelta (every other case returned/continued above). - tail = self._speak_delta(item, buffer, held, spoken, used_tool=used_tool) - if tail is None: - return None - buffer = tail + # Only a SpeechDelta reaches here; the isinstance re-narrows it for the type checker. + if isinstance(item, brain.SpeechDelta): # pragma: no mutate — narrowing-only guard + tail = self._speak_delta(item, buffer, held, spoken, used_tool=used_tool) + if tail is None: + return None + buffer = tail + + def _terminal_result( + self, item: _ReplyEvent | _Timeout, buffer: str, held: list[str], *, used_tool: bool + ) -> tuple[bool, str | None]: + """Classify a turn-ending queue item: a timeout/leg :class:`_Failure` surfaces the error + and ends with no tail, a :class:`_Done` carries the tail to flush, else ``(False, None)``. + """ + if isinstance(item, _Timeout | _Failure): + error = _timeout_error() if isinstance(item, _Timeout) else item.error + self._surface_error(error, started=self._speaking.is_set()) + return True, None + if isinstance(item, _Done): + return True, _final_tail(buffer, held, used_tool=used_tool) + return False, None def _speak_delta( self, diff --git a/aai_cli/agent_cascade/messages.py b/aai_cli/agent_cascade/messages.py index 66bb8e02..3d5e4c9d 100644 --- a/aai_cli/agent_cascade/messages.py +++ b/aai_cli/agent_cascade/messages.py @@ -12,15 +12,23 @@ from __future__ import annotations from collections.abc import Mapping +from typing import TYPE_CHECKING from rich.markdown import Markdown from rich.text import Text from textual.widgets import Static +from aai_cli.agent_cascade import banner from aai_cli.agent_cascade.summarize import summarize_call, summarize_result +if TYPE_CHECKING: + from collections.abc import Sequence + + from aai_cli.agent_cascade.plan import TodoItem + _DIM = "#8a8f98" # muted gray for tool lines / notes _ERROR = "#f04438" +_DONE = "#22c55e" # green for a completed task (the voice bar's "speaking" hue) class Note(Static): @@ -91,6 +99,49 @@ def __init__(self, name: str, args: Mapping[str, object]) -> None: super().__init__(Text(f"→ {summarize_call(name, args)}", style=_DIM)) +# Per-status glyph + style for a plan line: a green ✓ when done, a brand-accented ▸ for the +# task in progress, a dim ○ for one still pending (an unknown status falls back to pending). +_TODO_STYLES: dict[str, tuple[str, str]] = { + "completed": ("✓", _DONE), + "in_progress": ("▸", banner.BRAND_HEX), + "pending": ("○", _DIM), +} + + +def _todos_markup(todos: Sequence[TodoItem]) -> Text: + """Render the plan as a ``Plan`` heading above one styled line per task. + + Completed tasks are struck through and dimmed; the in-progress task is brand-accented; the + rest are dim — so the panel reads as a live checklist of the spoken plan at a glance. + """ + text = Text("Plan", style=f"bold {_DIM}") + for todo in todos: + glyph, color = _TODO_STYLES.get(todo.status, _TODO_STYLES["pending"]) + style = f"{color} strike" if todo.status == "completed" else color + text.append(f"\n{glyph} ", style=color) + text.append(todo.content, style=style) + return text + + +class TodoList(Static): + """The agent's task plan (its ``write_todos`` list), updated in place as the plan evolves. + + ``write_todos`` replaces the whole list on each call, so one widget shows the current plan + (revised in place) rather than a fresh copy per revision — the visible counterpart to a + hands-free "first I'll…, then I'll…". Spaced off the turn above by a top margin. + """ + + DEFAULT_CSS = "TodoList { margin-top: 1; }" + + def __init__(self, todos: Sequence[TodoItem]) -> None: + super().__init__() + self.set_todos(todos) + + def set_todos(self, todos: Sequence[TodoItem]) -> None: + """Repaint the panel with the current plan (replacing whatever it showed before).""" + self.update(_todos_markup(todos)) + + class ErrorMessage(Static): """A failed turn, shown instead of crashing the UI.""" diff --git a/aai_cli/agent_cascade/plan.py b/aai_cli/agent_cascade/plan.py new file mode 100644 index 00000000..382aafa1 --- /dev/null +++ b/aai_cli/agent_cascade/plan.py @@ -0,0 +1,123 @@ +"""The `assembly live` plan subsystem: the agent's ``write_todos`` task list. + +deepagents auto-installs langchain's ``TodoListMiddleware``, so the live agent can lay out a +multi-step plan with ``write_todos`` (and revise it as it works). This module owns the plan's +data shape (:class:`TodoItem` / :class:`TodoUpdate`) and the :class:`TodoCollector` that +reassembles a ``write_todos`` call's args out of the streamed message chunks — kept apart from +``brain`` (which builds and drives the graph) so neither module crowds the 500-line gate. The +collector is fed each chunk by ``streamer._events_from_chunk`` and surfaces a :class:`TodoUpdate` +when the call's tool result lands; ``engine``/the renderers then show the plan as a checklist. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +# The planning/task-list tool deepagents auto-installs (langchain's TodoListMiddleware). Its +# call is surfaced as a TodoUpdate (the visible plan), not a generic tool affordance — so the +# name is recognized rather than carried in brain's _TOOL_LABELS. +WRITE_TODOS_TOOL_NAME = "write_todos" + + +@dataclass(frozen=True) +class TodoItem: + """One task in the agent's plan: its text and lifecycle status. + + ``status`` is one of ``pending`` / ``in_progress`` / ``completed`` (langchain's todo + schema); it is kept as a plain string so an unknown future status renders rather than raises. + """ + + content: str + status: str + + +@dataclass(frozen=True) +class TodoUpdate: + """The agent's current task list, emitted when it (re)writes its plan via ``write_todos``. + + Carries the *whole* list each time (``write_todos`` replaces the list outright), so the live + UI renders the plan in place rather than appending. Purely visual — never spoken — it's the + on-screen counterpart to a hands-free "first I'll…, then I'll…". + """ + + todos: tuple[TodoItem, ...] + + +def _parse_todos(args_json: str) -> TodoUpdate | None: + """Parse a ``write_todos`` args blob (``{"todos": [{content, status}, …]}``) into a TodoUpdate. + + Returns ``None`` for anything that doesn't parse to a non-empty todo list, so a malformed or + still-partial fragment is dropped rather than surfaced as an empty plan. + """ + try: + payload = json.loads(args_json) + except (ValueError, TypeError): + return None + return _todos_from_list(payload.get("todos") if isinstance(payload, dict) else None) + + +def _todos_from_list(raw: object) -> TodoUpdate | None: + """Shape a raw ``todos`` list (of ``{content, status}`` dicts) into a TodoUpdate, or ``None``. + + Shared by the streamed-JSON path (:func:`_parse_todos`) and the non-streaming model path + (a chunk's already-parsed ``.tool_calls`` args), so both yield the same event. + """ + if not isinstance(raw, list): + return None + items = tuple( + TodoItem(content=str(item.get("content", "")), status=str(item.get("status", ""))) + for item in raw + if isinstance(item, dict) + ) + return TodoUpdate(items) if items else None + + +class TodoCollector: + """Accumulates a ``write_todos`` call's streamed args, parsing them into a :class:`TodoUpdate`. + + A streaming model emits the plan as JSON arg fragments across many AIMessage chunks; a + non-streaming model (the test fakes) emits one chunk with already-parsed ``.tool_calls`` + args. Either way the call completes when its ``write_todos`` ToolMessage lands, at which point + :meth:`take` shapes whatever was gathered into the event. At most one ``write_todos`` call is + open at a time (langchain's planning middleware forbids parallel calls), so a single buffer + suffices; a fresh call resets it (``write_todos`` replaces the whole list). + """ + + def __init__(self) -> None: + self._index: int | None = None + self._buffer = "" # accumulated JSON arg fragments (streaming path) + self._args: dict[str, object] | None = None # complete args dict (non-streaming path) + + def note_chunk(self, chunk: object) -> None: + """Record a chunk's contribution to the in-flight ``write_todos`` call (if any).""" + self._note_complete_args(chunk) + self._note_arg_fragments(chunk) + + def _note_complete_args(self, chunk: object) -> None: + """Capture a non-streaming model's complete ``write_todos`` args dict (``.tool_calls``).""" + for call in getattr(chunk, "tool_calls", None) or []: + if call.get("name") == WRITE_TODOS_TOOL_NAME and isinstance(call.get("args"), dict): + self._args = call["args"] + + def _note_arg_fragments(self, chunk: object) -> None: + """Accumulate a streaming model's incremental ``write_todos`` JSON arg fragments.""" + for call in getattr(chunk, "tool_call_chunks", None) or []: + index = call.get("index") + if call.get("name") == WRITE_TODOS_TOOL_NAME: + self._index, self._buffer = index, call.get("args") or "" + elif self._index is not None and index == self._index: + self._buffer += call.get("args") or "" + + def take(self) -> TodoUpdate | None: + """Shape the gathered plan into a :class:`TodoUpdate` (``None`` if empty) and reset. + + The streamed JSON buffer is preferred — it's the complete, raw args at ToolMessage time — + and the chunk's ``.tool_calls`` dict is only the fallback for a non-streaming model, since + a streaming chunk's auto-derived ``.tool_calls`` can hold a *partial* parse of the args. + """ + args, buffer = self._args, self._buffer + self._args, self._index, self._buffer = None, None, "" + return _parse_todos(buffer) or ( + _todos_from_list(args.get("todos")) if args is not None else None + ) diff --git a/aai_cli/agent_cascade/streamer.py b/aai_cli/agent_cascade/streamer.py new file mode 100644 index 00000000..450b2269 --- /dev/null +++ b/aai_cli/agent_cascade/streamer.py @@ -0,0 +1,300 @@ +"""The per-turn streaming reply leg for the live voice cascade. + +:func:`build_streamer` drives the deepagents graph (built by :mod:`brain`) one turn at a time +with ``stream_mode="messages"``, translating each streamed chunk into the speech/tool/plan events +the cascade engine consumes — :class:`~aai_cli.agent_cascade.brain.SpeechDelta` tokens to speak, +:class:`~aai_cli.agent_cascade.brain.ToolNotice` affordances, :class:`~aai_cli.agent_cascade.plan.TodoUpdate` +plans, and (under ``--files``) :class:`~aai_cli.agent_cascade.brain.ApprovalPause` brackets around +a human write approval. Split from :mod:`brain` (which assembles the graph) along the module's +natural build-vs-drive seam to keep each file within the length gate; the dependency is +one-directional (``streamer`` imports ``brain``, never the reverse). + +The graph is the only network seam: ``build_streamer`` accepts an injected graph, so this leg is +unit-tested against a fake with no sockets (see ``tests/test_agent_cascade_streamer.py``). +""" + +from __future__ import annotations + +import itertools +import logging +from collections.abc import Callable, Iterator +from typing import TYPE_CHECKING + +from aai_cli.agent_cascade import plan +from aai_cli.agent_cascade.brain import ( + ApprovalPause, + Approver, + CompiledAgent, + SpeechDelta, + ToolNotice, + _GatedGraph, + _tool_fillers, + _tool_label, + build_graph, +) +from aai_cli.agent_cascade.config import CascadeConfig +from aai_cli.core import debuglog +from aai_cli.core.errors import CLIError + +if TYPE_CHECKING: + from openai.types.chat import ChatCompletionMessageParam + +# Verbose (`-v`) flow logging for the agent's tool loop. `invoke` runs the whole loop internally, +# so without this `-v` only shows the httpx request lines and never which tools the agent reached +# for or what they returned — exactly what you need when a spoken turn stalls mid-tool. Logged at +# INFO so plain `-v` surfaces it. +_FLOW_LOG = logging.getLogger("aai_cli.agent_cascade.streamer") + +# Tool outputs (a fetched page, a search payload) can be huge; cap what we log per result so a +# single tool call doesn't bury the rest of the flow in stderr. The exact cap is an arbitrary +# tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it. +_RESULT_LOG_CAP = 500 # pragma: no mutate + +# Message handed back to the model when the user declines a write (matches the code agent's copy). +_DECLINED = "User declined to run this tool." + + +def build_streamer( + api_key: str, + config: CascadeConfig, + *, + graph: CompiledAgent | None = None, + approver: Approver | None = None, +) -> Callable[..., Iterator[SpeechDelta | ToolNotice | plan.TodoUpdate | ApprovalPause]]: + """A streaming reply leg for the cascade engine, backed by the deepagents graph. + + The cascade prepends its own ``system`` message each turn; the graph owns the system + prompt, so it is dropped before streaming. The graph is driven with + ``stream_mode="messages"`` and each top-level assistant token delta is yielded as a + :class:`SpeechDelta`, each started tool call as a :class:`ToolNotice` (the live UI's + affordance), and each ``write_todos`` plan as a :class:`plan.TodoUpdate`. Under ``-v`` the + flow is logged. ``graph`` is injected in tests so the per-turn wiring runs against a fake with + no network. + + With ``--files`` on (``config.files``) the graph gates ``write_file``/``edit_file``: a + pending write pauses the stream, ``approver`` decides, and the turn resumes (see + :func:`_stream_gated`). Each turn uses a fresh ``thread_id`` so the checkpointer never + accumulates the cascade's full-history-per-turn input across turns. + """ + resolved = build_graph(api_key, config) if graph is None else graph + turn_ids = itertools.count() + + def stream_reply( + messages: list[ChatCompletionMessageParam], + ) -> Iterator[SpeechDelta | ToolNotice | plan.TodoUpdate | ApprovalPause]: + conversation = [message for message in messages if message.get("role") != "system"] + run_config: dict[str, object] | None = ( + {"configurable": {"thread_id": f"live-{next(turn_ids)}"}} if config.files else None + ) + return _stream_graph( + resolved, conversation, approver=approver, config=run_config, gated=config.files + ) + + return stream_reply + + +def _stream_graph( + graph: CompiledAgent, + conversation: list[ChatCompletionMessageParam], + *, + approver: Approver | None = None, + config: dict[str, object] | None = None, + gated: bool = False, +) -> Iterator[SpeechDelta | ToolNotice | plan.TodoUpdate | ApprovalPause]: + """Stream one turn through the graph token-by-token, yielding speech/tool/plan events. + + Wraps any graph failure as a CLIError (a clean ``CLIError`` passes through) so the + cascade surfaces it instead of the reply worker dying silently. Under ``-v`` the + accumulated assistant text, each tool call, and each tool result are logged to + ``_FLOW_LOG``. When ``gated`` (``--files``), writes pause for ``approver`` (see + :func:`_stream_gated`); otherwise it is a single uninterrupted stream pass. + """ + verbose = debuglog.active() + pending: list[str] = [] # assistant deltas accumulated for one verbose "llm:" line + todos = plan.TodoCollector() # accumulates a write_todos call's args across the turn's chunks + + def flush_log() -> None: + if verbose and pending: + _FLOW_LOG.info("llm: %s", "".join(pending)) + pending.clear() + + if not hasattr(graph, "stream"): + raise CLIError( + "the agent couldn't complete the turn: the agent graph cannot stream", + error_type="agent_brain_error", + ) + try: + # The gated path needs stream + get_state (the graph is built with a checkpointer, so it + # always satisfies _GatedGraph); the isinstance both narrows for mypy and falls back to a + # plain stream for the impossible non-gated-graph case. + if gated and isinstance(graph, _GatedGraph): + yield from _stream_gated( + graph, + conversation, + approver, + config, + verbose=verbose, + pending=pending, + flush_log=flush_log, + todos=todos, + ) + else: + for chunk, _m in graph.stream( + {"messages": conversation}, config, stream_mode="messages" + ): + yield from _events_from_chunk( + chunk, verbose=verbose, pending=pending, flush_log=flush_log, todos=todos + ) + flush_log() + except CLIError: + raise + except Exception as exc: + raise CLIError( + f"the agent couldn't complete the turn: {exc}", error_type="agent_brain_error" + ) from exc + + +def _stream_gated( + graph: _GatedGraph, + conversation: list[ChatCompletionMessageParam], + approver: Approver | None, + config: dict[str, object] | None, + *, + verbose: bool, + pending: list[str], + flush_log: Callable[[], None], + todos: plan.TodoCollector, +) -> Iterator[SpeechDelta | ToolNotice | plan.TodoUpdate | ApprovalPause]: + """Stream a write-gated turn: each pause on a write asks ``approver`` and resumes. + + The graph pauses (before executing a gated write) by ending the ``messages`` stream with + a pending interrupt on the checkpointed state. We surface its action requests, bracket the + human decision with :class:`ApprovalPause` events, and resume with the approve/reject + ``Command`` — looping until the turn finishes without pausing. + """ + from langgraph.types import Command + + graph_input: object = {"messages": conversation} + while True: + for chunk, _m in graph.stream(graph_input, config, stream_mode="messages"): + yield from _events_from_chunk( + chunk, verbose=verbose, pending=pending, flush_log=flush_log, todos=todos + ) + flush_log() + requests = _pending_writes(graph, config) + if not requests: + return + decisions: list[dict[str, object]] = [] + for request in requests: + yield ApprovalPause(active=True) + decisions.append(_decide(request, approver)) + yield ApprovalPause(active=False) + graph_input = Command(resume={"decisions": decisions}) + + +def _pending_writes( + graph: _GatedGraph, config: dict[str, object] | None +) -> list[dict[str, object]]: + """The action requests of a paused gated write (empty when the turn isn't paused). + + deepagents surfaces an approval pause as ``interrupts`` on the checkpointed state, each + interrupt's ``.value`` carrying the ``action_requests`` (the gated tool calls). + """ + state = graph.get_state(config) + requests: list[dict[str, object]] = [] + for interrupt in getattr(state, "interrupts", ()) or (): + value = getattr(interrupt, "value", None) + actions = value.get("action_requests") if isinstance(value, dict) else None + if isinstance(actions, list): + requests.extend(action for action in actions if isinstance(action, dict)) + return requests + + +def _decide(action: dict[str, object], approver: Approver | None) -> dict[str, object]: + """Ask the approver about one pending write and shape the resume decision (reject if none).""" + name = str(action.get("name", "")) + args = action.get("args") or {} + if not isinstance(args, dict): + args = {} + if approver is not None and approver(name, args): + return {"type": "approve"} + return {"type": "reject", "message": _DECLINED} + + +def _events_from_chunk( + chunk: object, + *, + verbose: bool, + pending: list[str], + flush_log: Callable[[], None], + todos: plan.TodoCollector, +) -> Iterator[SpeechDelta | ToolNotice | plan.TodoUpdate]: + """Translate one streamed message chunk into speech/tool/plan events (and verbose logs). + + ``write_todos`` is special-cased: ``todos`` accumulates its args and surfaces them as a + :class:`plan.TodoUpdate` when the tool result lands — never a generic :class:`ToolNotice`. + """ + if type(chunk).__name__ == "ToolMessage": + yield from _tool_result_events(chunk, verbose=verbose, flush_log=flush_log, todos=todos) + return + todos.note_chunk(chunk) + yield from _tool_call_notices(chunk, verbose=verbose, flush_log=flush_log) + text = _content_text(_content(chunk)) + if text: + pending.append(text) + yield SpeechDelta(text) + + +def _tool_result_events( + chunk: object, *, verbose: bool, flush_log: Callable[[], None], todos: plan.TodoCollector +) -> Iterator[plan.TodoUpdate]: + """Handle a ToolMessage: log its result and, for ``write_todos``, surface the reassembled plan.""" + flush_log() + name = getattr(chunk, "name", "") + if verbose: + _FLOW_LOG.info("tool result %s -> %s", name, _clip(_content_text(_content(chunk)))) + if name == plan.WRITE_TODOS_TOOL_NAME and (update := todos.take()) is not None: + yield update # the plan, reassembled from the call's args + + +def _tool_call_notices( + chunk: object, *, verbose: bool, flush_log: Callable[[], None] +) -> Iterator[ToolNotice]: + """Emit a :class:`ToolNotice` for each started tool call (``write_todos`` surfaces as a plan).""" + for call in getattr(chunk, "tool_call_chunks", None) or []: + name = call.get("name") + # The plan tool surfaces as a TodoUpdate (in _tool_result_events), not an affordance. + if name and name != plan.WRITE_TODOS_TOOL_NAME: + flush_log() + if verbose: + _FLOW_LOG.info("tool call %s", name) + yield ToolNotice(_tool_label(name), _tool_fillers(name)) + + +def _content(chunk: object) -> object: + """A chunk's raw ``content`` attribute (empty string when absent).""" + return getattr(chunk, "content", "") + + +def _clip(text: str) -> str: + """Flatten a tool result onto one line and truncate it for the flow log. + + Tool output is untrusted, so whitespace (newlines especially) is collapsed first — a result + can't then forge extra ``[aai_cli.…]`` log lines — and the length is capped so a multi-KB + payload can't bury the flow. (Secrets are masked separately by the debuglog formatter.) + """ + flattened = " ".join(text.split()) + if len(flattened) <= _RESULT_LOG_CAP: + return flattened + return f"{flattened[:_RESULT_LOG_CAP]}… ({len(flattened)} chars)" + + +def _content_text(content: object) -> str: + """Coerce a message's content (a string, or a list of content blocks) to plain text.""" + if isinstance(content, str): + return content + if isinstance(content, list): + return "".join( + block.get("text", "") if isinstance(block, dict) else str(block) for block in content + ) + return str(content) diff --git a/aai_cli/agent_cascade/tui.py b/aai_cli/agent_cascade/tui.py index d807b468..c320004f 100644 --- a/aai_cli/agent_cascade/tui.py +++ b/aai_cli/agent_cascade/tui.py @@ -29,6 +29,7 @@ AssistantMessage, ErrorMessage, Note, + TodoList, ToolAffordance, UserMessage, ) @@ -41,6 +42,7 @@ from textual.timer import Timer from aai_cli.agent_cascade.engine import Renderer + from aai_cli.agent_cascade.plan import TodoItem # Splash intro copy (the code agent's banner copy is code-specific, so `live` carries its own). _READY_LINE = "Listening… start talking when you're ready." @@ -85,6 +87,9 @@ def user_final(self, text: str) -> None: def tool_call(self, label: str) -> None: self._dispatch(self._app.show_tool_call, label) + def todos_updated(self, todos: tuple[TodoItem, ...]) -> None: + self._dispatch(self._app.show_todos, todos) + def reply_started(self) -> None: self._dispatch(self._app.begin_reply) @@ -164,6 +169,10 @@ def __init__( self._voice_timer: Timer | None = None self._user_partial: UserMessage | None = None # the in-place "you: …" widget for a turn self._reply_msg: AssistantMessage | None = None # the reply widget sentences stream into + # The current turn's plan panel, updated in place as write_todos revises it; reset at each + # new turn (show_user_final) so a fresh turn's plan mounts inline rather than editing a + # widget scrolled far above. + self._todo_widget: TodoList | None = None self._stopped = False # guards on_stop against a double teardown (quit + unmount) # A fatal cascade error caught on the worker thread, re-raised on the main thread (after # app.run returns) so the command exits with the error's code instead of a silent 0 — @@ -234,6 +243,7 @@ def show_user_final(self, text: str) -> None: else: self._user_partial.set_text(text) self._user_partial = None # finalized; the next partial starts a fresh line + self._todo_widget = None # a new turn's plan mounts fresh, not into the prior turn's panel self._set_phase("thinking") self._scroll_end() @@ -249,6 +259,20 @@ def show_tool_call(self, label: str) -> None: self._mount(ToolAffordance(f"{label}…", tight=tight)) self._scroll_end() + def show_todos(self, todos: tuple[TodoItem, ...]) -> None: + """Surface the agent's plan (its ``write_todos`` list) as an inline checklist. + + The first plan of a turn mounts a fresh :class:`TodoList`; a later revision within the + same turn repaints it in place, so a multi-step spoken request shows one evolving plan + rather than a stack of copies. + """ + if self._todo_widget is None: + self._todo_widget = TodoList(todos) + self._mount(self._todo_widget) + else: + self._todo_widget.set_todos(todos) + self._scroll_end() + def begin_reply(self) -> None: """Start a fresh reply: drop the previous reply widget and switch to the speaking phase. The new widget is *not* mounted here — it is created lazily on the first streamed sentence diff --git a/pyproject.toml b/pyproject.toml index 99d29287..65523507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -280,12 +280,18 @@ include = ["aai_cli"] # (aai_cli.init.templates..api.*), the same bar as the rest of the package; only # generated/hidden dirs are skipped. exclude = ["**/node_modules", "**/__pycache__", "**/.*"] -# agent_cascade/brain.py and model.py wire the deeply-generic, only-partially-typed -# deepagents/langchain/ChatOpenAI boundary, where pyright-strict floods on -# Unknown*/invariance diagnostics our precise signatures can't satisfy. mypy still -# type-checks these modules (with the targeted overrides above) as the safety net, so -# we suppress pyright diagnostics here rather than littering per-line `# pyright: ignore`. -ignore = ["aai_cli/agent_cascade/brain.py", "aai_cli/agent_cascade/model.py"] +# agent_cascade/brain.py, streamer.py, plan.py, and model.py wire the deeply-generic, +# only-partially-typed deepagents/langchain/ChatOpenAI boundary (streamed chunks, the untyped +# write_todos args), where pyright-strict floods on Unknown*/invariance diagnostics our precise +# signatures can't satisfy. mypy still type-checks these modules (with the targeted overrides +# above) as the safety net, so we suppress pyright diagnostics here rather than littering +# per-line `# pyright: ignore`. +ignore = [ + "aai_cli/agent_cascade/brain.py", + "aai_cli/agent_cascade/streamer.py", + "aai_cli/agent_cascade/plan.py", + "aai_cli/agent_cascade/model.py", +] pythonVersion = "3.12" typeCheckingMode = "strict" # Third-party deps (assemblyai, sounddevice) ship no type stubs. diff --git a/tests/__snapshots__/test_tui_snapshots/test_live_plan.raw b/tests/__snapshots__/test_tui_snapshots/test_live_plan.raw new file mode 100644 index 00000000..e9f037e7 --- /dev/null +++ b/tests/__snapshots__/test_tui_snapshots/test_live_plan.raw @@ -0,0 +1,182 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + AssemblyAI Live + + + + + + + + + + + █████╗  ███████╗ ███████╗ ███████╗ ███╗   ███╗ ██████╗  ██╗      ██╗   ██╗ +██╔══██╗ ██╔════╝ ██╔════╝ ██╔════╝ ████╗ ████║ ██╔══██╗ ██║      ╚██╗ ██╔╝ +███████║ ███████╗ ███████╗ █████╗   ██╔████╔██║ ██████╔╝ ██║       ╚████╔╝  +██╔══██║ ╚════██║ ╚════██║ ██╔══╝   ██║╚██╔╝██║ ██╔══██╗ ██║        ╚██╔╝   +██║  ██║ ███████║ ███████║ ███████╗ ██║ ╚═╝ ██║ ██████╔╝ ███████╗    ██║    +╚═╝  ╚═╝ ╚══════╝ ╚══════╝ ╚══════╝ ╚═╝     ╚═╝ ╚═════╝  ╚══════╝    ╚═╝    +v9.9.9 + +Listening… start talking when you're ready. +Use headphones — the mic stays open while the agent speaks. + +» book a flight to Seattle, then check the weather there + +Plan +✓ Book a flight to Seattle +▸ Check the Seattle weather +○ Suggest what to pack + + + + + + + + +╭────────────────────────────────────────────────────────────────────────────────────────────────╮ +▃▅▇ Thinking… +╰────────────────────────────────────────────────────────────────────────────────────────────────╯ +Space to start/stop listening · Esc/Ctrl-C to interrupt · Ctrl-Q to quit + + + diff --git a/tests/_cascade_fakes.py b/tests/_cascade_fakes.py index 38147abc..525be3a0 100644 --- a/tests/_cascade_fakes.py +++ b/tests/_cascade_fakes.py @@ -55,6 +55,9 @@ def user_final(self, text): def tool_call(self, label): self.calls.append(("tool_call", label)) + def todos_updated(self, todos): + self.calls.append(("todos_updated", todos)) + def reply_started(self): self.calls.append(("reply_started",)) diff --git a/tests/test_agent_cascade_brain.py b/tests/test_agent_cascade_brain.py index 7a12f9e5..8a759d88 100644 --- a/tests/test_agent_cascade_brain.py +++ b/tests/test_agent_cascade_brain.py @@ -12,7 +12,7 @@ from langchain_core.messages import AIMessage -from aai_cli.agent_cascade import brain, datetime_tool, weather_tool, webpage_tool +from aai_cli.agent_cascade import brain, datetime_tool, plan, streamer, weather_tool, webpage_tool from aai_cli.agent_cascade import model as model_mod from aai_cli.agent_cascade.config import CascadeConfig from tests._cascade_fakes import FakeChatModel @@ -96,15 +96,15 @@ def test_tool_label_execute_is_running_code(): def test_clip_passes_short_text_and_truncates_long_text(): - assert brain._clip("short") == "short" + assert streamer._clip("short") == "short" # A result exactly at the cap is left whole (the boundary is inclusive). - at_cap = "y" * brain._RESULT_LOG_CAP - assert brain._clip(at_cap) == at_cap - long = "x" * (brain._RESULT_LOG_CAP + 5000) - clipped = brain._clip(long) + at_cap = "y" * streamer._RESULT_LOG_CAP + assert streamer._clip(at_cap) == at_cap + long = "x" * (streamer._RESULT_LOG_CAP + 5000) + clipped = streamer._clip(long) # Only the first _RESULT_LOG_CAP chars survive, with a marker noting the full length — # so a multi-KB tool payload can't bury the rest of the flow in stderr. - assert clipped == "x" * brain._RESULT_LOG_CAP + f"… ({len(long)} chars)" + assert clipped == "x" * streamer._RESULT_LOG_CAP + f"… ({len(long)} chars)" assert len(clipped) < len(long) @@ -113,9 +113,9 @@ def test_clip_flattens_whitespace_so_tool_output_cant_forge_log_lines(): # "[aai_cli.…]" log lines. _clip collapses all whitespace runs to single spaces, so the # result stays on one line. forged = "ok\n[aai_cli.agent_cascade.brain] tool call rm_rf args={}\r\nmore" - assert brain._clip(forged) == "ok [aai_cli.agent_cascade.brain] tool call rm_rf args={} more" - assert "\n" not in brain._clip(forged) - assert "\r" not in brain._clip(forged) + assert streamer._clip(forged) == "ok [aai_cli.agent_cascade.brain] tool call rm_rf args={} more" + assert "\n" not in streamer._clip(forged) + assert "\r" not in streamer._clip(forged) # --- _content_text ----------------------------------------------------------- @@ -123,11 +123,11 @@ def test_clip_flattens_whitespace_so_tool_output_cant_forge_log_lines(): def test_content_text_coerces_unexpected_content(): # A content that is neither a string nor a list of blocks (defensive fallback). - assert brain._content_text(123) == "123" + assert streamer._content_text(123) == "123" def test_content_text_joins_list_content_blocks(): - assert brain._content_text([{"type": "text", "text": "Hello "}, "world"]) == "Hello world" + assert streamer._content_text([{"type": "text", "text": "Hello "}, "world"]) == "Hello world" # --- build_live_tools -------------------------------------------------------- @@ -186,8 +186,8 @@ def fake_build_model(api_key, *, model, max_tokens, extra): # The cascade's model + knobs are threaded into the gateway model build. assert captured == {"model": "claude-x", "max_tokens": 128, "extra": {"temperature": 0.2}} # The compiled graph is a real deepagents graph that answers offline via the fake model. - streamer = brain.build_streamer("k", cfg, graph=graph) - spoken = "".join(e.text for e in streamer([{"role": "user", "content": "hi"}])) + stream_reply = streamer.build_streamer("k", cfg, graph=graph) + spoken = "".join(e.text for e in stream_reply([{"role": "user", "content": "hi"}])) assert spoken == "hi from the agent" @@ -263,3 +263,64 @@ def test_tool_label_maps_weather(): def test_tool_label_maps_datetime(): assert brain._tool_label(datetime_tool.DATETIME_TOOL_NAME) == "Checking the time" + + +# --- write_todos plan parsing ------------------------------------------------ + + +def test_parse_todos_shapes_a_valid_blob_and_rejects_junk(): + update = plan._parse_todos('{"todos":[{"content":"A","status":"pending"}]}') + assert update == plan.TodoUpdate((plan.TodoItem(content="A", status="pending"),)) + # Not-JSON, valid-JSON-without-todos, and an empty list all yield no plan (None). + assert plan._parse_todos("{not json") is None + assert plan._parse_todos('{"other":1}') is None + assert plan._parse_todos('{"todos":[]}') is None + + +def test_todos_from_list_drops_non_dict_items_and_defaults_missing_fields(): + update = plan._todos_from_list([{"content": "A"}, "junk", {"status": "completed"}]) + # The string item is dropped; missing content/status default to empty strings. + assert update == plan.TodoUpdate( + ( + plan.TodoItem(content="A", status=""), + plan.TodoItem(content="", status="completed"), + ) + ) + assert plan._todos_from_list("not a list") is None + + +def test_todo_collector_resets_after_take(): + collector = plan.TodoCollector() + chunk = AIMessage( + content="", + tool_calls=[ + {"name": "write_todos", "args": {"todos": [{"content": "A", "status": "x"}]}, "id": "w"} + ], + ) + collector.note_chunk(chunk) + assert collector.take() == plan.TodoUpdate((plan.TodoItem(content="A", status="x"),)) + # take() consumed the buffer: a second take with no further chunks yields nothing. + assert collector.take() is None + + +def test_todo_collector_prefers_complete_buffer_over_partial_tool_calls(): + # A streaming chunk carries both the full JSON fragment AND langchain's partial-parse + # .tool_calls; the complete buffer must win so the plan isn't truncated to the partial parse. + from langchain_core.messages import AIMessageChunk + + collector = plan.TodoCollector() + collector.note_chunk( + AIMessageChunk( + content="", + tool_call_chunks=[ + {"name": "write_todos", "args": '{"todos":[{"content":"A",', "id": "w", "index": 0} + ], + ) + ) + collector.note_chunk( + AIMessageChunk( + content="", + tool_call_chunks=[{"name": None, "args": '"status":"done"}]}', "id": "w", "index": 0}], + ) + ) + assert collector.take() == plan.TodoUpdate((plan.TodoItem(content="A", status="done"),)) diff --git a/tests/test_agent_cascade_command.py b/tests/test_agent_cascade_command.py index 84c6eaee..45b865ae 100644 --- a/tests/test_agent_cascade_command.py +++ b/tests/test_agent_cascade_command.py @@ -15,7 +15,7 @@ from typer.testing import CliRunner from aai_cli.agent.render import AgentRenderer -from aai_cli.agent_cascade import _io, engine +from aai_cli.agent_cascade import _io from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.agent_cascade.engine import CascadeDeps from aai_cli.app.context import AppState @@ -247,7 +247,7 @@ def test_run_wires_deps_and_invokes_cascade(monkeypatch): # CascadeDeps.real builds the brain graph (which would launch the default MCP servers); # stub the streamer so deps still wire up without spawning any npx/uvx subprocess. monkeypatch.setattr( - _exec.engine.brain, "build_streamer", lambda api_key, config, *, approver=None: lambda m: [] + _io.streamer, "build_streamer", lambda api_key, config, *, approver=None: lambda m: [] ) captured = {} @@ -289,7 +289,7 @@ def _wire_run(monkeypatch, run_cascade): monkeypatch.setattr(_exec.client, "resolve_audio_source", lambda source, sample: "clip.wav") # Stub the brain streamer so CascadeDeps.real never launches the default MCP servers. monkeypatch.setattr( - _exec.engine.brain, "build_streamer", lambda api_key, config, *, approver=None: lambda m: [] + _io.streamer, "build_streamer", lambda api_key, config, *, approver=None: lambda m: [] ) monkeypatch.setattr(_exec.engine, "run_cascade", run_cascade) rendered = {} @@ -473,7 +473,7 @@ def fake_build_streamer(api_key, config, *, approver=None): del api_key, config return lambda messages: [SpeechDelta("reply to " + messages[-1]["content"])] - monkeypatch.setattr(engine.brain, "build_streamer", fake_build_streamer) + monkeypatch.setattr(_io.streamer, "build_streamer", fake_build_streamer) cfg = CascadeConfig() deps = CascadeDeps.real("k", cfg, audio=[], stt_params=_stt_params()) events = list(deps.stream_reply([{"role": "user", "content": "hi"}])) diff --git a/tests/test_agent_cascade_engine.py b/tests/test_agent_cascade_engine.py index 82c8807c..cd1b6079 100644 --- a/tests/test_agent_cascade_engine.py +++ b/tests/test_agent_cascade_engine.py @@ -15,6 +15,7 @@ from aai_cli.agent_cascade.brain import SpeechDelta, ToolNotice from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.agent_cascade.engine import CascadeDeps, CascadeSession, run_cascade +from aai_cli.agent_cascade.plan import TodoItem, TodoUpdate from aai_cli.core.errors import APIError from tests._cascade_fakes import FakePlayer, FakeRenderer, FakeWorker, make_session from tests._cascade_fakes import deltas as _deltas @@ -101,6 +102,22 @@ def stream(messages): assert ("tool_call", "Searching the web") in renderer.calls +def test_reply_forwards_a_plan_and_still_speaks_the_reply(): + # A TodoUpdate is a purely visual affordance: it reaches the renderer as todos_updated and, + # unlike a ToolNotice, does NOT suppress the spoken reply that follows it. + todos = (TodoItem(content="Book a flight", status="in_progress"),) + + def stream(messages): + yield TodoUpdate(todos) + yield SpeechDelta("First I'll book the flight.") + + session, renderer, _player = make_session(stream_reply=stream) + session.on_turn(_turn("plan my trip")) + assert ("todos_updated", todos) in renderer.calls + # The reply was still spoken (a plain ToolNotice would have held it back). + assert ("agent_transcript", "First I'll book the flight.", False) in renderer.calls + + def test_on_turn_interim_shows_partial_and_does_not_reply(): streamed = [] session, renderer, _player = make_session( diff --git a/tests/test_agent_cascade_files.py b/tests/test_agent_cascade_files.py index e80b2a8d..cb6bd924 100644 --- a/tests/test_agent_cascade_files.py +++ b/tests/test_agent_cascade_files.py @@ -12,7 +12,7 @@ import pytest -from aai_cli.agent_cascade import brain, engine +from aai_cli.agent_cascade import brain, engine, streamer from aai_cli.agent_cascade.brain import ApprovalPause, SpeechDelta from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.app.context import AppState @@ -59,7 +59,7 @@ def fake_build_streamer(api_key, config, *, approver=None): captured["approver"] = approver return lambda messages: [] - monkeypatch.setattr(engine.brain, "build_streamer", fake_build_streamer) + monkeypatch.setattr(streamer, "build_streamer", fake_build_streamer) def approve(name, args): return True @@ -144,9 +144,9 @@ def test_approval_deadline_suspends_then_restores_into_the_future(): def test_declined_execute_yields_declined_message(): action = {"name": "execute", "args": {"command": "rm -rf build"}} - assert brain._decide(action, lambda name, args: False) == { + assert streamer._decide(action, lambda name, args: False) == { "type": "reject", - "message": brain._DECLINED, + "message": streamer._DECLINED, } @@ -160,7 +160,7 @@ def approver(name: str, args: dict[str, object]) -> bool: seen["args"] = args return True - decision = brain._decide({"name": "execute", "args": [1, 2]}, approver) + decision = streamer._decide({"name": "execute", "args": [1, 2]}, approver) assert decision == {"type": "approve"} assert seen["name"] == "execute" @@ -176,7 +176,7 @@ def approver(name: str, args: dict[str, object]) -> bool: seen["args"] = args return True - brain._decide({"name": "write_file", "args": {"file_path": "n.txt"}}, approver) + streamer._decide({"name": "write_file", "args": {"file_path": "n.txt"}}, approver) assert seen["args"] == {"file_path": "n.txt"} @@ -220,6 +220,6 @@ def test_stream_graph_defaults_to_ungated(): # asserting get_state is never called kills it. graph = _SpyGatedGraph() - list(brain._stream_graph(graph, [])) + list(streamer._stream_graph(graph, [])) assert graph.get_state_calls == 0 diff --git a/tests/test_agent_cascade_streamer.py b/tests/test_agent_cascade_streamer.py index 78121711..c2bf3bfb 100644 --- a/tests/test_agent_cascade_streamer.py +++ b/tests/test_agent_cascade_streamer.py @@ -14,7 +14,7 @@ from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import AIMessage, AIMessageChunk, ToolMessage -from aai_cli.agent_cascade import brain +from aai_cli.agent_cascade import brain, plan, streamer from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.core.errors import CLIError from tests._cascade_fakes import FakeChatModel @@ -37,8 +37,8 @@ def stream(self, graph_input, config, *, stream_mode): def _collect(graph, messages): - streamer = brain.build_streamer("k", CascadeConfig(), graph=graph) - return list(streamer(messages)) + stream_reply = streamer.build_streamer("k", CascadeConfig(), graph=graph) + return list(stream_reply(messages)) def test_streamer_yields_speech_deltas_for_assistant_tokens(): @@ -109,15 +109,84 @@ def test_streamer_emits_one_notice_per_call_ignoring_arg_only_chunks(): assert [e.label for e in events if isinstance(e, brain.ToolNotice)] == ["Using get_time"] +# --- build_streamer (write_todos -> TodoUpdate plan) ------------------------- + + +def test_streamer_emits_a_todo_update_from_streamed_write_todos_args(): + # write_todos args stream as JSON fragments across chunks; the streamer accumulates them and + # surfaces the full plan as a TodoUpdate when the tool result lands — NOT a ToolNotice. + frag1 = AIMessageChunk( + content="", + tool_call_chunks=[ + { + "name": plan.WRITE_TODOS_TOOL_NAME, + "args": '{"todos":[{"content":"Book", ', + "id": "w", + "index": 0, + } + ], + ) + frag2 = AIMessageChunk( + content="", + tool_call_chunks=[ + {"name": None, "args": '"status":"in_progress"}]}', "id": "w", "index": 0} + ], + ) + result = ToolMessage(content="Updated todo list to [...]", name="write_todos", tool_call_id="w") + graph = _MessageStreamGraph( + [(frag1, {}), (frag2, {}), (result, {}), (AIMessageChunk(content="On it."), {})] + ) + events = _collect(graph, [{"role": "user", "content": "book a flight"}]) + updates = [e for e in events if isinstance(e, plan.TodoUpdate)] + assert updates == [plan.TodoUpdate((plan.TodoItem(content="Book", status="in_progress"),))] + # The plan tool never doubles as a spoken affordance. + assert [e for e in events if isinstance(e, brain.ToolNotice)] == [] + assert [e.text for e in events if isinstance(e, brain.SpeechDelta)] == ["On it."] + + +def test_streamer_emits_a_todo_update_from_a_non_streaming_model(): + # A non-streaming model (the test fakes) surfaces a complete write_todos call as a single + # AIMessage with a parsed .tool_calls dict; the streamer still produces the same TodoUpdate. + todo_call = AIMessage( + content="", + tool_calls=[ + { + "name": "write_todos", + "args": { + "todos": [ + {"content": "Find a flight", "status": "in_progress"}, + {"content": "Check Seattle weather", "status": "pending"}, + ] + }, + "id": "w1", + } + ], + ) + graph = _real_graph(FakeChatModel(responses=[todo_call, AIMessage(content="Working on it.")])) + events = _collect(graph, [{"role": "user", "content": "plan my trip"}]) + (update,) = [e for e in events if isinstance(e, plan.TodoUpdate)] + assert update.todos == ( + plan.TodoItem(content="Find a flight", status="in_progress"), + plan.TodoItem(content="Check Seattle weather", status="pending"), + ) + + +def _real_graph(model: BaseChatModel): + """A real deepagents graph (with the auto-installed write_todos tool) over a fake model.""" + from deepagents import create_deep_agent + + return create_deep_agent(model=model, tools=[], system_prompt="be a friendly live agent") + + def test_streamer_wraps_graph_errors_in_cli_error(): class _Boom: def stream(self, graph_input, config, *, stream_mode): del graph_input, config, stream_mode raise ValueError("gateway said no") - streamer = brain.build_streamer("k", CascadeConfig(), graph=_Boom()) + stream_reply = streamer.build_streamer("k", CascadeConfig(), graph=_Boom()) with pytest.raises(CLIError) as excinfo: - list(streamer([{"role": "user", "content": "hi"}])) + list(stream_reply([{"role": "user", "content": "hi"}])) assert "couldn't complete the turn" in excinfo.value.message assert "gateway said no" in excinfo.value.message @@ -128,9 +197,9 @@ def stream(self, graph_input, config, *, stream_mode): del graph_input, config, stream_mode raise CLIError("already clean", error_type="x") - streamer = brain.build_streamer("k", CascadeConfig(), graph=_CliBoom()) + stream_reply = streamer.build_streamer("k", CascadeConfig(), graph=_CliBoom()) with pytest.raises(CLIError, match="already clean"): - list(streamer([{"role": "user", "content": "hi"}])) + list(stream_reply([{"role": "user", "content": "hi"}])) def test_streamer_errors_when_graph_cannot_stream(): @@ -141,14 +210,14 @@ def invoke(self, graph_input): del graph_input return {"messages": []} - streamer = brain.build_streamer("k", CascadeConfig(), graph=_InvokeOnly()) + stream_reply = streamer.build_streamer("k", CascadeConfig(), graph=_InvokeOnly()) with pytest.raises(CLIError) as excinfo: - list(streamer([{"role": "user", "content": "hi"}])) + list(stream_reply([{"role": "user", "content": "hi"}])) assert "cannot stream" in excinfo.value.message def test_streamer_logs_flow_when_verbose(monkeypatch, caplog, preserve_logging_state): - monkeypatch.setattr(brain.debuglog, "active", lambda: True) + monkeypatch.setattr(streamer.debuglog, "active", lambda: True) call_chunk = AIMessageChunk( content="", tool_call_chunks=[{"name": "tavily_search", "args": "", "id": "c1", "index": 0}] ) @@ -160,7 +229,7 @@ def test_streamer_logs_flow_when_verbose(monkeypatch, caplog, preserve_logging_s (AIMessageChunk(content="It's rainy."), {}), ] graph = _MessageStreamGraph(items) - with caplog.at_level(logging.INFO, logger="aai_cli.agent_cascade.brain"): + with caplog.at_level(logging.INFO, logger="aai_cli.agent_cascade.streamer"): _collect(graph, [{"role": "user", "content": "weather?"}]) messages = [r.getMessage() for r in caplog.records] # Accumulated assistant text is logged as one line per assistant turn, around the @@ -210,8 +279,10 @@ def approve(name, args): return True graph = _gated_graph(_write_then("Saved your note."), str(tmp_path)) - streamer = brain.build_streamer("k", CascadeConfig(files=True), graph=graph, approver=approve) - events = list(streamer([{"role": "user", "content": "save a note"}])) + stream_reply = streamer.build_streamer( + "k", CascadeConfig(files=True), graph=graph, approver=approve + ) + events = list(stream_reply([{"role": "user", "content": "save a note"}])) spoken = "".join(e.text for e in events if isinstance(e, brain.SpeechDelta)) assert spoken == "Saved your note." # The approver was consulted for the write, and the approved write hit the rooted dir. @@ -221,10 +292,10 @@ def approve(name, args): def test_streamer_rejects_write_without_approval(tmp_path): graph = _gated_graph(_write_then("Okay, I won't save it."), str(tmp_path)) - streamer = brain.build_streamer( + stream_reply = streamer.build_streamer( "k", CascadeConfig(files=True), graph=graph, approver=lambda name, args: False ) - events = list(streamer([{"role": "user", "content": "save a note"}])) + events = list(stream_reply([{"role": "user", "content": "save a note"}])) spoken = "".join(e.text for e in events if isinstance(e, brain.SpeechDelta)) assert spoken == "Okay, I won't save it." # Declined: nothing was written to the rooted directory. @@ -237,12 +308,12 @@ def test_streamer_brackets_write_approval_with_pause_events(tmp_path): # the two markers by construction (the streamer yields True, asks, then yields False). asked: list[str] = [] graph = _gated_graph(_write_then("Done."), str(tmp_path)) - streamer = brain.build_streamer( + stream_reply = streamer.build_streamer( "k", CascadeConfig(files=True), graph=graph, approver=lambda name, args: asked.append(name) or True, ) - events = list(streamer([{"role": "user", "content": "save"}])) + events = list(stream_reply([{"role": "user", "content": "save"}])) pauses = [event.active for event in events if isinstance(event, brain.ApprovalPause)] assert pauses == [True, False] # the write was bracketed: pause on, then resume diff --git a/tests/test_agent_cascade_subagents.py b/tests/test_agent_cascade_subagents.py index 19baf998..5365b4ce 100644 --- a/tests/test_agent_cascade_subagents.py +++ b/tests/test_agent_cascade_subagents.py @@ -5,7 +5,7 @@ from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import AIMessage -from aai_cli.agent_cascade import brain +from aai_cli.agent_cascade import brain, streamer from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.agent_cascade.subagents import general_purpose_subagent from tests._cascade_fakes import FakeChatModel @@ -107,14 +107,14 @@ def test_subagent_write_surfaces_through_the_parent_gate_and_is_approved(tmp_pat # approval loop (build_streamer -> _stream_gated -> _pending_writes -> approver). Approved, it lands. asked: list[tuple[str, dict]] = [] graph = _delegating_graph(_delegate_then_write("Saved it via the helper."), str(tmp_path)) - streamer = brain.build_streamer( + stream_reply = streamer.build_streamer( "k", CascadeConfig(files=True), graph=graph, approver=lambda name, args: asked.append((name, args)) or True, ) - list(streamer([{"role": "user", "content": "have the helper save a note"}])) + list(stream_reply([{"role": "user", "content": "have the helper save a note"}])) assert any(name == "write_file" for name, _ in asked) # the SUBAGENT's write was gated by us assert (tmp_path / "n.txt").read_text() == "hi" # approved -> actually written @@ -122,10 +122,10 @@ def test_subagent_write_surfaces_through_the_parent_gate_and_is_approved(tmp_pat def test_subagent_write_is_declined_when_the_approver_rejects(tmp_path): graph = _delegating_graph(_delegate_then_write("Okay, left it alone."), str(tmp_path)) - streamer = brain.build_streamer( + stream_reply = streamer.build_streamer( "k", CascadeConfig(files=True), graph=graph, approver=lambda name, args: False ) - list(streamer([{"role": "user", "content": "have the helper save a note"}])) + list(stream_reply([{"role": "user", "content": "have the helper save a note"}])) assert not (tmp_path / "n.txt").exists() # declined -> nothing written by the subagent diff --git a/tests/test_agent_events.py b/tests/test_agent_events.py index 879d1a06..64f3224c 100644 --- a/tests/test_agent_events.py +++ b/tests/test_agent_events.py @@ -22,6 +22,12 @@ events.ToolUse(label="Searching the web"), {"type": "tool.use", "label": "Searching the web"}, ), + ( + events.PlanUpdate(todos=(events.TodoItem(content="Book a flight", status="pending"),)), + # model_dump keeps todos a tuple of nested dicts; it serializes to a JSON array on the + # wire (asserted end-to-end in test_agent_render.test_json_todos_emits_plan_event). + {"type": "plan", "todos": ({"content": "Book a flight", "status": "pending"},)}, + ), ( events.AgentTranscript(text="hi back", interrupted=False), {"type": "transcript.agent", "text": "hi back", "interrupted": False}, diff --git a/tests/test_agent_render.py b/tests/test_agent_render.py index 5cfc9fa3..c3a629f5 100644 --- a/tests/test_agent_render.py +++ b/tests/test_agent_render.py @@ -144,6 +144,61 @@ def test_human_tool_call_shows_inline_line(): assert "Searching the web" in buf.getvalue() +# --- todos / plan ---------------------------------------------------------- + + +def _plan(): + from aai_cli.agent_cascade.plan import TodoItem + + return ( + TodoItem(content="Book a flight", status="in_progress"), + TodoItem(content="Check the weather", status="pending"), + ) + + +def test_json_todos_emits_plan_event(): + buf = io.StringIO() + AgentRenderer(json_mode=True, out=buf).todos_updated(_plan()) + assert { + "type": "plan", + "todos": [ + {"content": "Book a flight", "status": "in_progress"}, + {"content": "Check the weather", "status": "pending"}, + ], + } in _json_lines(buf) + + +def test_text_todos_summary_goes_to_stderr_not_stdout(): + # The plan is status, so in piped text mode it stays off stdout (transcript-only). Each + # status maps to its own mark, and an unknown status falls back to the pending box. + from aai_cli.agent_cascade.plan import TodoItem + + todos = ( + TodoItem(content="Booked", status="completed"), + TodoItem(content="Booking", status="in_progress"), + TodoItem(content="Queued", status="pending"), + TodoItem(content="Mystery", status="weird"), + ) + out, err = io.StringIO(), io.StringIO() + AgentRenderer(json_mode=False, text_mode=True, out=out, err=err).todos_updated(todos) + summary = err.getvalue() + assert "Plan:" in summary + assert "[x] Booked" in summary # completed + assert "[~] Booking" in summary # in_progress + assert "[ ] Queued" in summary # pending + assert "[ ] Mystery" in summary # unknown status -> pending fallback + assert out.getvalue() == "" + + +def test_human_todos_show_an_inline_checklist(): + r, buf = _human() + r.todos_updated(_plan()) + rendered = buf.getvalue() + assert "Plan:" in rendered + assert "Book a flight" in rendered + assert "Check the weather" in rendered + + def test_human_close_commits_open_partial(): r, buf = _human() r.user_partial("half a sentence") diff --git a/tests/test_live_tui_plan.py b/tests/test_live_tui_plan.py new file mode 100644 index 00000000..81d4d7ca --- /dev/null +++ b/tests/test_live_tui_plan.py @@ -0,0 +1,135 @@ +"""Tests for the `assembly live` write_todos plan panel (the voice TUI's checklist). + +Drives the real Textual app headless: most tests call ``show_todos`` directly (it runs on the +UI thread), and one drives the worker leg through the real ``_TuiRenderer`` to cover the +off-thread ``todos_updated`` hop. Split from ``test_live_tui.py`` to keep that file under the +500-line gate. The plan-markup helper (``messages._todos_markup``) is unit-tested as a pure +function. +""" + +from __future__ import annotations + +import asyncio +import threading + +from aai_cli.agent_cascade import banner, messages +from aai_cli.agent_cascade.messages import TodoList +from aai_cli.agent_cascade.plan import TodoItem +from aai_cli.agent_cascade.tui import LiveAgentApp + + +def _run(coro) -> None: + asyncio.run(coro) + + +def _app(run_conversation=None, on_stop=None): + """A LiveAgentApp whose worker blocks until teardown (mirrors test_live_tui._app).""" + release = threading.Event() + + def stop() -> None: + release.set() + if on_stop is not None: + on_stop() + + def block(renderer) -> None: + release.wait(30) # block like a live mic; teardown releases it well before this + + return LiveAgentApp( + run_conversation=run_conversation or block, + on_stop=stop, + on_toggle_listen=lambda: True, + ) + + +def test_show_todos_mounts_a_plan_and_revises_it_in_place() -> None: + # The first plan of a turn mounts one TodoList; a later revision within the same turn repaints + # that same widget rather than stacking a second panel. + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_user_final("book a flight then check the weather") + app.show_todos( + ( + TodoItem(content="Book a flight", status="in_progress"), + TodoItem(content="Check the weather", status="pending"), + ) + ) + panels = list(app.query(TodoList)) + assert len(panels) == 1 + assert "Book a flight" in str(panels[0].render()) + # A revision marking the first task done updates the same panel in place. + app.show_todos( + ( + TodoItem(content="Book a flight", status="completed"), + TodoItem(content="Check the weather", status="in_progress"), + ) + ) + panels = list(app.query(TodoList)) + assert len(panels) == 1 # still one panel, revised in place + assert panels[0] is app._todo_widget + + _run(go()) + + +def test_show_todos_starts_a_fresh_panel_each_turn() -> None: + # A new turn (show_user_final) resets the plan reference, so the next turn's plan mounts its + # own panel instead of editing the prior turn's (which has scrolled up). + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_user_final("first task") + app.show_todos((TodoItem(content="One", status="in_progress"),)) + app.show_user_final("second, unrelated task") + app.show_todos((TodoItem(content="Two", status="in_progress"),)) + assert len(list(app.query(TodoList))) == 2 + + _run(go()) + + +def test_todos_updated_hops_to_the_ui_thread_from_the_worker() -> None: + # The cascade runs on a worker thread; todos_updated must hop to the UI thread (via the real + # _TuiRenderer) and mount the plan panel — the off-thread leg the direct calls above skip. + done = threading.Event() + + def run_conversation(renderer) -> None: + renderer.connected() + renderer.user_final("plan it") + renderer.todos_updated((TodoItem(content="Find it", status="in_progress"),)) + done.wait(30) + + async def go() -> None: + app = _app(run_conversation=run_conversation, on_stop=done.set) + async with app.run_test(size=(100, 30)) as pilot: + for _ in range(200): + await pilot.pause(0.01) + if any("Find it" in str(p.render()) for p in app.query(TodoList)): + break + assert any("Find it" in str(p.render()) for p in app.query(TodoList)) + assert done.is_set() + + _run(go()) + + +def test_todos_markup_glyphs_and_completed_strikethrough() -> None: + # The plan heading sits above one glyph-prefixed line per task; completed tasks are struck + # through and the in-progress task is brand-accented, so the panel reads as a live checklist. + text = messages._todos_markup( + ( + TodoItem(content="Booked", status="completed"), + TodoItem(content="Booking", status="in_progress"), + TodoItem(content="Queued", status="pending"), + TodoItem(content="Mystery", status="weird"), + ) + ) + plain = text.plain + assert plain.startswith("Plan") + assert "✓ Booked" in plain # completed glyph + assert "▸ Booking" in plain # in_progress glyph + assert "○ Queued" in plain # pending glyph + assert "○ Mystery" in plain # unknown status falls back to the pending glyph + # The completed task's content carries a strikethrough; the in-progress one is brand-accented. + styles = {span.style for span in text.spans} + assert any("strike" in str(s) for s in styles) + assert banner.BRAND_HEX in styles diff --git a/tests/test_tui_snapshots.py b/tests/test_tui_snapshots.py index 53e39a10..a378e582 100644 --- a/tests/test_tui_snapshots.py +++ b/tests/test_tui_snapshots.py @@ -139,6 +139,27 @@ async def run_before(pilot: Pilot[None]) -> None: assert snap_compare(h.build_live_app(), terminal_size=h.TERMINAL_SIZE, run_before=run_before) +def test_live_plan(snap_compare) -> None: + """A multi-step turn surfaces the agent's plan: a `Plan` panel of status-marked tasks.""" + + async def run_before(pilot: Pilot[None]) -> None: + from aai_cli.agent_cascade.plan import TodoItem + + app = pilot.app + assert isinstance(app, LiveAgentApp) + h.freeze_animation(app) + app.show_user_final("book a flight to Seattle, then check the weather there") + app.show_todos( + ( + TodoItem(content="Book a flight to Seattle", status="completed"), + TodoItem(content="Check the Seattle weather", status="in_progress"), + TodoItem(content="Suggest what to pack", status="pending"), + ) + ) + + assert snap_compare(h.build_live_app(), terminal_size=h.TERMINAL_SIZE, run_before=run_before) + + def test_live_interrupted(snap_compare) -> None: """An interrupted reply is finalized and tagged `(interrupted)`, then returns to listening."""