diff --git a/backend/internal/adapters/runtime/zellij/commands.go b/backend/internal/adapters/runtime/zellij/commands.go index 3e834212..e64d1921 100644 --- a/backend/internal/adapters/runtime/zellij/commands.go +++ b/backend/internal/adapters/runtime/zellij/commands.go @@ -23,15 +23,20 @@ func versionArgs() []string { } func createSessionArgs(id, layoutPath string) []string { - return []string{ + clientOptions := embeddedClientOptions() + args := make([]string, 0, 6+len(clientOptions)+6) + args = append(args, "attach", "--create-background", id, "options", "--default-layout", layoutPath, - "--pane-frames", "false", + ) + args = append(args, clientOptions...) + args = append(args, "--session-serialization", "false", "--show-startup-tips", "false", "--show-release-notes", "false", - } + ) + return args } func listPanesArgs(id string) []string { @@ -68,10 +73,25 @@ func deleteSessionArgs(id string) []string { } func attachArgs(id string) []string { - return []string{ + clientOptions := embeddedClientOptions() + args := make([]string, 0, 3+len(clientOptions)) + args = append(args, "attach", id, "options", + ) + args = append(args, clientOptions...) + return args +} + +func embeddedClientOptions() []string { + return []string{ "--pane-frames", "false", + "--mouse-mode", "false", + "--advanced-mouse-actions", "false", + "--mouse-hover-effects", "false", + "--focus-follows-mouse", "false", + "--mouse-click-through", "false", + "--support-kitty-keyboard-protocol", "false", } } @@ -88,7 +108,7 @@ func buildLayout(cfg ports.RuntimeConfig, shellPath string) string { return directLayoutString(cfg.WorkspacePath, cfg.Argv) } spec := shellLaunchSpecFor(shellPath) - shellCommand := shellLaunchCommand(cfg, shellPath, spec) + shellCommand := shellLaunchCommand(cfg, spec) return layoutString(cfg.WorkspacePath, shellPath, spec.args, shellCommand) } @@ -176,17 +196,17 @@ func layoutString(workspacePath, shellPath string, shellArgs []string, shellComm "}\n" } -func shellLaunchCommand(cfg ports.RuntimeConfig, shellPath string, spec shellLaunchSpec) string { +func shellLaunchCommand(cfg ports.RuntimeConfig, spec shellLaunchSpec) string { if len(spec.args) > 0 && spec.args[0] == "-NoLogo" { return wrapLaunchCommandPowerShell(cfg) } if len(spec.args) > 0 && spec.args[0] == "/D" { return wrapLaunchCommandCmd(cfg) } - return wrapLaunchCommandUnix(cfg, shellPath) + return wrapLaunchCommandUnix(cfg) } -func wrapLaunchCommandUnix(cfg ports.RuntimeConfig, shellPath string) string { +func wrapLaunchCommandUnix(cfg ports.RuntimeConfig) string { path := cfg.Env["PATH"] if path == "" { path = getenv("PATH") @@ -209,9 +229,6 @@ func wrapLaunchCommandUnix(cfg ports.RuntimeConfig, shellPath string) string { b.WriteString("; ") } b.WriteString(quoteArgvUnix(cfg.Argv)) - b.WriteString("; exec ") - b.WriteString(shellQuote(shellPath)) - b.WriteString(" -i") return b.String() } diff --git a/backend/internal/adapters/runtime/zellij/zellij.go b/backend/internal/adapters/runtime/zellij/zellij.go index f71f5ed2..f7326138 100644 --- a/backend/internal/adapters/runtime/zellij/zellij.go +++ b/backend/internal/adapters/runtime/zellij/zellij.go @@ -283,9 +283,10 @@ func (r *Runtime) Destroy(ctx context.Context, handle ports.RuntimeHandle) error if err != nil { return err } - if _, err := r.run(ctx, deleteSessionArgs(id)...); err != nil { + out, err := r.run(ctx, deleteSessionArgs(id)...) + if err != nil { var exitErr *exec.ExitError - if errors.As(err, &exitErr) { + if errors.As(err, &exitErr) && deleteSessionMissingOutput(string(out)) { return nil } return fmt.Errorf("zellij runtime: destroy session %s: %w", id, err) @@ -357,6 +358,18 @@ func noActiveSessionsOutput(out string) bool { return strings.Contains(s, "no active") && strings.Contains(s, "session") } +func deleteSessionMissingOutput(out string) bool { + s := strings.ToLower(out) + if noActiveSessionsOutput(s) { + return true + } + return strings.Contains(s, "session") && + (strings.Contains(s, "not found") || + strings.Contains(s, "does not exist") || + strings.Contains(s, "not exist") || + strings.Contains(s, "not a session")) +} + // AttachCommand returns the argv a human runs to attach their terminal to the // session, plus an optional env block that the spawn should apply (used on // Windows where wrapping the attach in an `env` shim is unsafe under ConPTY). diff --git a/backend/internal/adapters/runtime/zellij/zellij_integration_test.go b/backend/internal/adapters/runtime/zellij/zellij_integration_test.go index c0d7d390..ac6a33e4 100644 --- a/backend/internal/adapters/runtime/zellij/zellij_integration_test.go +++ b/backend/internal/adapters/runtime/zellij/zellij_integration_test.go @@ -30,7 +30,7 @@ func TestRuntimeIntegration(t *testing.T) { } r := New(opts) _ = r.Destroy(ctx, ports.RuntimeHandle{ID: id}) - argv := []string{"sh", "-c", "echo ready-$AO_SESSION_ID"} + argv := []string{"sh", "-lc", "printf ready-$AO_SESSION_ID\\n; exec sh -i"} sendCommand := "echo hello-from-zellij" if runtime.GOOS == "windows" { argv = []string{"cmd.exe", "/D", "/Q", "/K", "echo ready-%AO_SESSION_ID%"} @@ -117,7 +117,11 @@ func buildAOForIntegration(t *testing.T) string { func tempSocketDir(t *testing.T, pattern string) string { t.Helper() - socketDir, err := os.MkdirTemp(os.TempDir(), pattern) + parent := os.TempDir() + if runtime.GOOS != "windows" { + parent = "/tmp" + } + socketDir, err := os.MkdirTemp(parent, pattern) if err != nil { t.Fatalf("mkdir socket dir: %v", err) } @@ -144,7 +148,7 @@ func TestRuntimeIntegrationUsesExactSessionParsing(t *testing.T) { h, err := r.Create(ctx, ports.RuntimeConfig{ SessionID: "ao_zj_exact_long", WorkspacePath: t.TempDir(), - Argv: []string{"printf", "ready\n"}, + Argv: []string{"sh", "-lc", "printf ready\\n; exec sh -i"}, }) if err != nil { t.Fatalf("Create: %v", err) diff --git a/backend/internal/adapters/runtime/zellij/zellij_test.go b/backend/internal/adapters/runtime/zellij/zellij_test.go index 38420ae6..610a62ec 100644 --- a/backend/internal/adapters/runtime/zellij/zellij_test.go +++ b/backend/internal/adapters/runtime/zellij/zellij_test.go @@ -87,10 +87,21 @@ func containsKey(values []string, key string) bool { } func TestCommandBuilders(t *testing.T) { + embeddedOptions := []string{ + "--pane-frames", "false", + "--mouse-mode", "false", + "--advanced-mouse-actions", "false", + "--mouse-hover-effects", "false", + "--focus-follows-mouse", "false", + "--mouse-click-through", "false", + "--support-kitty-keyboard-protocol", "false", + } if got, want := versionArgs(), []string{"--version"}; !reflect.DeepEqual(got, want) { t.Fatalf("versionArgs = %#v, want %#v", got, want) } - if got, want := createSessionArgs("sess-1", "/tmp/layout.kdl"), []string{"attach", "--create-background", "sess-1", "options", "--default-layout", "/tmp/layout.kdl", "--pane-frames", "false", "--session-serialization", "false", "--show-startup-tips", "false", "--show-release-notes", "false"}; !reflect.DeepEqual(got, want) { + wantCreate := append([]string{"attach", "--create-background", "sess-1", "options", "--default-layout", "/tmp/layout.kdl"}, embeddedOptions...) + wantCreate = append(wantCreate, "--session-serialization", "false", "--show-startup-tips", "false", "--show-release-notes", "false") + if got, want := createSessionArgs("sess-1", "/tmp/layout.kdl"), wantCreate; !reflect.DeepEqual(got, want) { t.Fatalf("createSessionArgs = %#v, want %#v", got, want) } if got, want := listPanesArgs("sess-1"), []string{"--session", "sess-1", "action", "list-panes", "--all", "--json"}; !reflect.DeepEqual(got, want) { @@ -112,7 +123,8 @@ func TestCommandBuilders(t *testing.T) { if got, want := deleteSessionArgs("sess-1"), []string{"delete-session", "--force", "sess-1"}; !reflect.DeepEqual(got, want) { t.Fatalf("deleteSessionArgs = %#v, want %#v", got, want) } - if got, want := attachArgs("sess-1"), []string{"attach", "sess-1", "options", "--pane-frames", "false"}; !reflect.DeepEqual(got, want) { + wantAttach := append([]string{"attach", "sess-1", "options"}, embeddedOptions...) + if got, want := attachArgs("sess-1"), wantAttach; !reflect.DeepEqual(got, want) { t.Fatalf("attachArgs = %#v, want %#v", got, want) } } @@ -190,7 +202,7 @@ func TestHandleID(t *testing.T) { } } -func TestBuildLayoutExportsEnvAndKeepsPaneAlive(t *testing.T) { +func TestBuildLayoutExportsEnvAndRunsAgentCommand(t *testing.T) { oldGetenv := getenv getenv = func(key string) string { if key == "PATH" { @@ -224,12 +236,15 @@ func TestBuildLayoutExportsEnvAndKeepsPaneAlive(t *testing.T) { "export AO_SESSION_ID='sess-1';", "export ODD='can'\\\\''t';", "export PATH='/custom/bin:/usr/bin';", - "'ao' 'run'; exec '/bin/zsh' -i", + "'ao' 'run'", } { if !strings.Contains(got, want) { t.Fatalf("layout missing %q in %q", want, got) } } + if strings.Contains(got, "exec '/bin/zsh' -i") { + t.Fatalf("layout kept pane alive after agent exit: %q", got) + } } func TestBuildLayoutUsesPowerShellLaunchOnWindowsShells(t *testing.T) { @@ -418,20 +433,32 @@ func TestCreateClearsStaleSessionBeforeCreating(t *testing.T) { } } -func TestAttachCommandDisablesPaneFrames(t *testing.T) { +func TestAttachCommandUsesEmbeddedClientOptions(t *testing.T) { r := New(Options{}) args, _, err := r.AttachCommand(ports.RuntimeHandle{ID: "sess-1/terminal_0"}) if err != nil { t.Fatalf("AttachCommand: %v", err) } + embeddedOptions := []string{ + "--pane-frames", "false", + "--mouse-mode", "false", + "--advanced-mouse-actions", "false", + "--mouse-hover-effects", "false", + "--focus-follows-mouse", "false", + "--mouse-click-through", "false", + "--support-kitty-keyboard-protocol", "false", + } if runtime.GOOS == "windows" { - want := []string{r.binary, "attach", "sess-1", "options", "--pane-frames", "false"} - if !reflect.DeepEqual(args, want) { - t.Fatalf("AttachCommand = %#v, want %#v", args, want) + joined := strings.Join(args, " ") + for _, want := range embeddedOptions { + if !strings.Contains(joined, want) { + t.Fatalf("windows attach command missing %q: %#v", want, args) + } } return } - want := append(expectedAttachEnvPrefix(), "zellij", "attach", "sess-1", "options", "--pane-frames", "false") + want := append(expectedAttachEnvPrefix(), r.binary, "attach", "sess-1", "options") + want = append(want, embeddedOptions...) if !reflect.DeepEqual(args, want) { t.Fatalf("AttachCommand = %#v, want %#v", args, want) } @@ -613,7 +640,7 @@ func TestIsAliveReportsOtherExitFailuresAsProbeErrors(t *testing.T) { } func TestDestroyIsIdempotentWhenSessionMissing(t *testing.T) { - fr := &fakeRunner{err: &exec.ExitError{}} + fr := &fakeRunner{outputs: [][]byte{[]byte("No active zellij sessions found.")}, err: &exec.ExitError{}} r := New(Options{Timeout: time.Second}) r.runner = fr @@ -625,6 +652,16 @@ func TestDestroyIsIdempotentWhenSessionMissing(t *testing.T) { } } +func TestDestroyReportsUnexpectedExitFailures(t *testing.T) { + fr := &fakeRunner{outputs: [][]byte{[]byte("permission denied")}, err: &exec.ExitError{}} + r := New(Options{Timeout: time.Second}) + r.runner = fr + + if err := r.Destroy(context.Background(), ports.RuntimeHandle{ID: "sess-1/terminal_0"}); err == nil { + t.Fatal("Destroy: got nil, want unexpected delete-session failure") + } +} + // Destroy must delete the session's serialized state, not merely kill it: a // killed-but-cached session is resurrected (agent re-run included) by any later // `zellij attach`, bringing a terminated session's runtime back to life. diff --git a/backend/internal/terminal/attachment.go b/backend/internal/terminal/attachment.go index ab03762e..e45bca6c 100644 --- a/backend/internal/terminal/attachment.go +++ b/backend/internal/terminal/attachment.go @@ -54,38 +54,43 @@ const ( // attachment is ONE client's hold on a pane: a private `zellij attach` PTY // spawned per mux open, streaming to a single sink. Zellij is the multiplexer — // it owns the session's screen state and scrollback, and answers every fresh -// attach with its full init handshake (alt screen, SGR mouse tracking, -// bracketed paste) followed by a faithful repaint. That handshake is why the -// PTY is per-client and there is no server-side replay buffer: a byte ring -// can replay recent output, but the one-time mode negotiation at the head of -// the stream scrolls out of any bounded buffer, leaving late subscribers -// without mouse reporting (wheel scroll dead). A fresh attach per client makes -// Zellij re-send it, every time, by construction. +// attach with its init handshake (alt screen, bracketed paste, and other terminal +// modes enabled by the embedded client options) followed by a faithful repaint. +// That handshake is why the PTY is per-client and there is no server-side replay +// buffer: a byte ring can replay recent output, but the one-time mode negotiation +// at the head of the stream scrolls out of any bounded buffer. A fresh attach per +// client makes Zellij re-send it, every time, by construction. // -// onData must not block: the WS layer funnels frames onto its own buffered -// writer. onExit fires at most once, when the attach loop gives up (runtime -// dead, attach failure cap) — never on close(). +// onOpen fires once the attach PTY is actually ready to accept input. onData +// must not block: the WS layer funnels frames onto its own buffered writer. +// onExit fires at most once, when the attach loop gives up (runtime dead, +// attach failure cap) — never on close(). type attachment struct { id string handle ports.RuntimeHandle src PTYSource spawn spawnFunc log *slog.Logger + onOpen func() onData func(data []byte) onExit func() maxReattach int resetGrace time.Duration - mu sync.Mutex - pty ptyProcess - rows uint16 // last size the client asked for; re-applied on every attach - cols uint16 - closed bool - exited bool + mu sync.Mutex + pty ptyProcess + cancel context.CancelFunc + rows uint16 // last size the client asked for; re-applied on every attach + cols uint16 + closed bool + exited bool + opened bool + inputReady bool + pendingInput [][]byte } -func newAttachment(id string, handle ports.RuntimeHandle, src PTYSource, spawn spawnFunc, onData func([]byte), onExit func(), log *slog.Logger) *attachment { +func newAttachment(id string, handle ports.RuntimeHandle, src PTYSource, spawn spawnFunc, onOpen func(), onData func([]byte), onExit func(), log *slog.Logger) *attachment { if log == nil { log = slog.Default() } @@ -98,6 +103,7 @@ func newAttachment(id string, handle ports.RuntimeHandle, src PTYSource, spawn s src: src, spawn: spawn, log: log, + onOpen: onOpen, onData: onData, onExit: onExit, maxReattach: defaultMaxReattach, @@ -108,9 +114,16 @@ func newAttachment(id string, handle ports.RuntimeHandle, src PTYSource, spawn s // run drives attach → read-loop → re-attach until the pane exits cleanly, the // attachment is closed, or ctx is cancelled. It is started once per attachment. func (a *attachment) run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + if !a.setRunCancel(cancel) { + cancel() + return + } + defer a.clearRunCancel(cancel) + failures := 0 for { - if a.isClosed() || ctx.Err() != nil { + if a.shouldStop(ctx) { return } @@ -122,6 +135,9 @@ func (a *attachment) run(ctx context.Context) { // not proof of death: it retries with backoff up to the same // consecutive-failure cap as attach failures. alive, err := a.src.IsAlive(ctx, a.handle) + if a.shouldStop(ctx) { + return + } if err != nil { failures++ if failures > a.maxReattach { @@ -139,12 +155,24 @@ func (a *attachment) run(ctx context.Context) { } argv, env, err := a.src.AttachCommand(a.handle) + if a.shouldStop(ctx) { + return + } if err != nil { a.fail("attach command: " + err.Error()) return } rows, cols := a.size() + if a.shouldStop(ctx) { + return + } p, err := a.spawn(ctx, argv, env, rows, cols) + if a.shouldStop(ctx) { + if p != nil { + _ = p.Close() + } + return + } if err != nil { failures++ if failures > a.maxReattach { @@ -157,10 +185,17 @@ func (a *attachment) run(ctx context.Context) { continue } - a.setPTY(p) + if !a.setPTY(p) { + _ = p.Close() + return + } start := time.Now() a.copyOut(p) + a.clearPTY(p) _ = p.Close() + if a.shouldStop(ctx) { + return + } if time.Since(start) >= a.resetGrace { failures = 0 @@ -215,15 +250,28 @@ func reattachBackoff(failures int) time.Duration { return d } -// write sends client keystrokes to the PTY. It is a no-op if no PTY is attached. +// write sends client keystrokes to the PTY. Input that arrives after open but +// before the attach PTY is published is buffered and flushed as soon as setPTY +// runs, so a fast user cannot type into the attach race and lose bytes. func (a *attachment) write(p []byte) error { + if len(p) == 0 { + return nil + } + chunk := append([]byte(nil), p...) + a.mu.Lock() + if a.closed || a.exited { + a.mu.Unlock() + return errors.New("terminal: attachment closed") + } pty := a.pty - a.mu.Unlock() - if pty == nil { - return errors.New("terminal: no active pty") + if pty == nil || !a.inputReady { + a.pendingInput = append(a.pendingInput, chunk) + a.mu.Unlock() + return nil } - _, err := pty.Write(p) + a.mu.Unlock() + _, err := pty.Write(chunk) return err } @@ -255,14 +303,55 @@ func (a *attachment) size() (rows, cols uint16) { // requested size onto it (see resize) — the spawn already started at the size // read in run, but a resize frame can land between that read and registration // here; the replay (Setsize + explicit WINCH) converges the late case. -func (a *attachment) setPTY(p ptyProcess) { +func (a *attachment) setPTY(p ptyProcess) bool { a.mu.Lock() + if a.closed || a.exited { + a.mu.Unlock() + return false + } a.pty = p + a.inputReady = false rows, cols := a.rows, a.cols + shouldOpen := !a.opened + if shouldOpen { + a.opened = true + } + onOpen := a.onOpen a.mu.Unlock() if rows > 0 && cols > 0 { _ = p.Resize(rows, cols) } + if shouldOpen && onOpen != nil { + onOpen() + } + + for { + a.mu.Lock() + pending := append([][]byte(nil), a.pendingInput...) + a.pendingInput = nil + if len(pending) == 0 { + a.inputReady = true + a.mu.Unlock() + return true + } + a.mu.Unlock() + + for _, chunk := range pending { + if _, err := p.Write(chunk); err != nil { + a.fail("flush pending input: " + err.Error()) + return false + } + } + } +} + +func (a *attachment) clearPTY(p ptyProcess) { + a.mu.Lock() + if a.pty == p { + a.pty = nil + a.inputReady = false + } + a.mu.Unlock() } // close detaches this client: stop re-attaching and kill the attach PTY. It @@ -277,10 +366,33 @@ func (a *attachment) close() { a.closed = true pty := a.pty a.pty = nil + a.inputReady = false + a.pendingInput = nil + cancel := a.cancel a.mu.Unlock() if pty != nil { _ = pty.Close() } + if cancel != nil { + cancel() + } +} + +func (a *attachment) setRunCancel(cancel context.CancelFunc) bool { + a.mu.Lock() + defer a.mu.Unlock() + if a.closed { + return false + } + a.cancel = cancel + return true +} + +func (a *attachment) clearRunCancel(cancel context.CancelFunc) { + a.mu.Lock() + a.cancel = nil + a.mu.Unlock() + cancel() } func (a *attachment) isClosed() bool { @@ -289,6 +401,10 @@ func (a *attachment) isClosed() bool { return a.closed } +func (a *attachment) shouldStop(ctx context.Context) bool { + return ctx.Err() != nil || a.isClosed() +} + func (a *attachment) isExited() bool { a.mu.Lock() defer a.mu.Unlock() diff --git a/backend/internal/terminal/attachment_integration_test.go b/backend/internal/terminal/attachment_integration_test.go index 10a82c0b..c2eb899d 100644 --- a/backend/internal/terminal/attachment_integration_test.go +++ b/backend/internal/terminal/attachment_integration_test.go @@ -35,7 +35,7 @@ func TestAttachmentStreamsRealZellijPane(t *testing.T) { handle, err := rt.Create(context.Background(), ports.RuntimeConfig{ SessionID: domain.SessionID(name), WorkspacePath: t.TempDir(), - Argv: []string{"printf", "AO_READY\\n"}, + Argv: []string{"sh", "-lc", "printf AO_READY\\n; exec sh -i"}, }) if err != nil { t.Fatalf("Create: %v", err) @@ -43,7 +43,7 @@ func TestAttachmentStreamsRealZellijPane(t *testing.T) { t.Cleanup(func() { _ = rt.Destroy(context.Background(), handle) }) var got safeBytes - a := newAttachment(name, handle, rt, defaultSpawn, got.add, nil, testLogger()) + a := newAttachment(name, handle, rt, defaultSpawn, nil, got.add, nil, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go a.run(ctx) @@ -52,13 +52,12 @@ func TestAttachmentStreamsRealZellijPane(t *testing.T) { eventually(t, 3*time.Second, func() bool { return a.write([]byte("echo AO_MARKER_42\n")) == nil }) eventually(t, 5*time.Second, func() bool { return strings.Contains(got.string(), "AO_MARKER_42") }) - // A fresh attach must carry zellij's init handshake: alt screen + SGR mouse - // tracking. This is the whole point of per-client attach — late clients see - // the mode negotiation, so wheel events are forwarded as mouse reports - // instead of going dead. + // A fresh attach must carry zellij's alt-screen init handshake. Mouse + // reporting is deliberately disabled for AO's embedded client, so this test + // should not require SGR mouse mode. eventually(t, 5*time.Second, func() bool { out := got.string() - return strings.Contains(out, "\x1b[?1049h") && strings.Contains(out, "\x1b[?1006h") + return strings.Contains(out, "\x1b[?1049h") }) // Kill the session: the attachment must observe it as gone and not re-attach. @@ -89,28 +88,32 @@ func TestAttachmentReattachAdoptsNewSize(t *testing.T) { handle, err := rt.Create(context.Background(), ports.RuntimeConfig{ SessionID: domain.SessionID(name), WorkspacePath: t.TempDir(), - Argv: []string{"printf", "AO_READY\\n"}, + Argv: []string{"sh", "-lc", "printf AO_READY\\n; exec sh -i"}, }) if err != nil { t.Fatalf("Create: %v", err) } t.Cleanup(func() { _ = rt.Destroy(context.Background(), handle) }) - attachAt := func(rows, cols uint16) (*attachment, *safeBytes, context.CancelFunc) { + attachAt := func(rows, cols uint16) (*attachment, *safeBytes, <-chan struct{}, context.CancelFunc) { var got safeBytes - a := newAttachment(name, handle, rt, defaultSpawn, got.add, nil, testLogger()) + opened := make(chan struct{}) + a := newAttachment(name, handle, rt, defaultSpawn, func() { close(opened) }, got.add, nil, testLogger()) if err := a.resize(rows, cols); err != nil { t.Fatalf("record size: %v", err) } ctx, cancel := context.WithCancel(context.Background()) go a.run(ctx) - return a, &got, cancel + return a, &got, opened, cancel } // Client A at 115x37: wait for the pane shell, then detach. - a, gotA, cancelA := attachAt(37, 115) - eventually(t, 5*time.Second, func() bool { return a.write([]byte("\n")) == nil }) - eventually(t, 5*time.Second, func() bool { return strings.Contains(gotA.string(), "AO_READY") }) + a, _, openedA, cancelA := attachAt(37, 115) + select { + case <-openedA: + case <-time.After(5 * time.Second): + t.Fatal("client A did not attach") + } a.close() cancelA() @@ -118,9 +121,14 @@ func TestAttachmentReattachAdoptsNewSize(t *testing.T) { // frontend reconnecting. The inner pane must see B's grid (zellij chrome // shaves a couple rows/cols, so assert the reported cols land near 148 and // far from 115). - b, gotB, cancelB := attachAt(40, 148) + b, gotB, openedB, cancelB := attachAt(40, 148) defer cancelB() defer b.close() + select { + case <-openedB: + case <-time.After(5 * time.Second): + t.Fatal("client B did not attach") + } eventually(t, 5*time.Second, func() bool { return b.write([]byte("echo SIZE:$(stty size)\n")) == nil }) eventually(t, 10*time.Second, func() bool { diff --git a/backend/internal/terminal/attachment_test.go b/backend/internal/terminal/attachment_test.go index 476d03ac..a0851948 100644 --- a/backend/internal/terminal/attachment_test.go +++ b/backend/internal/terminal/attachment_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "log/slog" + "sync" "testing" "time" @@ -13,7 +14,17 @@ import ( func testLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } func newTestAttachment(src PTYSource, spawn spawnFunc, onData func([]byte), onExit func()) *attachment { - return newAttachment("t1", ports.RuntimeHandle{ID: "t1"}, src, spawn, onData, onExit, testLogger()) + return newTestAttachmentWithOpen(src, spawn, nil, onData, onExit) +} + +func newTestAttachmentWithOpen(src PTYSource, spawn spawnFunc, onOpen func(), onData func([]byte), onExit func()) *attachment { + return newAttachment("t1", ports.RuntimeHandle{ID: "t1"}, src, spawn, onOpen, onData, onExit, testLogger()) +} + +func currentPTY(a *attachment) ptyProcess { + a.mu.Lock() + defer a.mu.Unlock() + return a.pty } func TestAttachmentStreamsOutputToSink(t *testing.T) { @@ -54,6 +65,60 @@ func TestAttachmentWriteAndResizeReachPTY(t *testing.T) { }) } +func TestAttachmentSignalsOpenOnlyAfterPTYIsPublished(t *testing.T) { + src := &fakeSource{alive: true} + pty := newFakePTY() + sp := &fakeSpawner{ptys: []*fakePTY{pty}} + + opened := make(chan bool, 1) + var a *attachment + a = newTestAttachmentWithOpen(src, sp.spawn, func() { + opened <- currentPTY(a) == pty + }, nil, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go a.run(ctx) + + select { + case sawPTY := <-opened: + if !sawPTY { + t.Fatal("open callback fired before the PTY was published") + } + case <-time.After(time.Second): + t.Fatal("open callback did not fire") + } +} + +func TestAttachmentBuffersInputUntilPTYReady(t *testing.T) { + src := &fakeSource{alive: true} + pty := newFakePTY() + spawnStarted := make(chan struct{}) + releaseSpawn := make(chan struct{}) + spawn := func(context.Context, []string, []string, uint16, uint16) (ptyProcess, error) { + close(spawnStarted) + <-releaseSpawn + return pty, nil + } + a := newTestAttachment(src, spawn, nil, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go a.run(ctx) + + select { + case <-spawnStarted: + case <-time.After(time.Second): + t.Fatal("spawn was not reached") + } + if err := a.write([]byte("hello\n")); err != nil { + t.Fatalf("write before PTY ready: %v", err) + } + close(releaseSpawn) + + eventually(t, time.Second, func() bool { return string(pty.writtenBytes()) == "hello\n" }) +} + // A size requested before the PTY exists (the open frame's cols/rows, or a // resize racing the attach) must not be lost: the attach applies it the moment // the PTY is up, instead of leaving the pane at the kernel default grid. @@ -284,3 +349,66 @@ func TestAttachmentCloseDoesNotFireExit(t *testing.T) { t.Fatalf("close must stop re-attaching, got %d spawns", got) } } + +type closeOrderPTY struct { + *fakePTY + ctx context.Context + before chan struct{} + after chan struct{} + once sync.Once +} + +func (p *closeOrderPTY) Close() error { + p.once.Do(func() { + select { + case <-p.ctx.Done(): + close(p.after) + default: + close(p.before) + } + }) + return p.fakePTY.Close() +} + +func TestAttachmentCloseClosesPTYBeforeCancel(t *testing.T) { + src := &fakeSource{alive: true} + beforeCancel := make(chan struct{}) + afterCancel := make(chan struct{}) + var spawnCtx context.Context + spawn := func(ctx context.Context, _ []string, _ []string, _, _ uint16) (ptyProcess, error) { + spawnCtx = ctx + return &closeOrderPTY{ + fakePTY: newFakePTY(), + ctx: ctx, + before: beforeCancel, + after: afterCancel, + }, nil + } + a := newTestAttachment(src, spawn, nil, nil) + + done := make(chan struct{}) + go func() { + a.run(context.Background()) + close(done) + }() + eventually(t, time.Second, func() bool { return currentPTY(a) != nil }) + + a.close() + select { + case <-beforeCancel: + case <-afterCancel: + t.Fatal("attachment cancelled the run context before closing the PTY") + case <-time.After(time.Second): + t.Fatal("PTY was not closed") + } + select { + case <-spawnCtx.Done(): + case <-time.After(time.Second): + t.Fatal("close must still cancel the attach context") + } + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("run must return after close") + } +} diff --git a/backend/internal/terminal/logger_test.go b/backend/internal/terminal/logger_test.go index 0384a65a..ba08a2db 100644 --- a/backend/internal/terminal/logger_test.go +++ b/backend/internal/terminal/logger_test.go @@ -12,7 +12,7 @@ func TestNilLoggerFallsBackToDefault(t *testing.T) { if mgr.log == nil { t.Fatal("manager logger is nil") } - a := newAttachment("t1", ports.RuntimeHandle{ID: "t1"}, &fakeSource{}, (&fakeSpawner{}).spawn, nil, nil, nil) + a := newAttachment("t1", ports.RuntimeHandle{ID: "t1"}, &fakeSource{}, (&fakeSpawner{}).spawn, nil, nil, nil, nil) if a.log == nil { t.Fatal("attachment logger is nil") } diff --git a/backend/internal/terminal/manager.go b/backend/internal/terminal/manager.go index 7b22608b..88ff36ed 100644 --- a/backend/internal/terminal/manager.go +++ b/backend/internal/terminal/manager.go @@ -224,6 +224,9 @@ func (c *connState) openTerminal(id string, rows, cols uint16) { // the only thing that fires onExit — starts after the registration below. var a *attachment a = newAttachment(id, ports.RuntimeHandle{ID: id}, c.mgr.src, c.mgr.spawn, + func() { + c.enqueue(serverMsg{Ch: chTerminal, ID: id, Type: msgOpened}) + }, func(data []byte) { c.enqueue(serverMsg{ Ch: chTerminal, @@ -257,11 +260,6 @@ func (c *connState) openTerminal(id string, rows, cols uint16) { c.terms[id] = a c.mu.Unlock() - // Ack before starting the attach loop so opened always precedes any - // data/exited frames (the single out channel preserves this order). A - // dead pane is reported as opened followed by exited. - c.enqueue(serverMsg{Ch: chTerminal, ID: id, Type: msgOpened}) - go func() { a.run(c.mgr.ctx) c.mgr.forget(a) diff --git a/backend/internal/terminal/manager_test.go b/backend/internal/terminal/manager_test.go index dc0730b8..8c3e3ee1 100644 --- a/backend/internal/terminal/manager_test.go +++ b/backend/internal/terminal/manager_test.go @@ -99,6 +99,37 @@ func TestServeOpenStreamsAndWritesTerminal(t *testing.T) { }) } +func TestServeBuffersInputUntilAttachReady(t *testing.T) { + src := &fakeSource{alive: true} + pty := newFakePTY() + spawnStarted := make(chan struct{}) + releaseSpawn := make(chan struct{}) + spawn := func(context.Context, []string, []string, uint16, uint16) (ptyProcess, error) { + close(spawnStarted) + <-releaseSpawn + return pty, nil + } + mgr := NewManager(src, nil, testLogger(), WithSpawn(spawn), WithHeartbeat(0)) + defer mgr.Close() + + conn := newFakeConn() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go mgr.Serve(ctx, conn) + + conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} + select { + case <-spawnStarted: + case <-time.After(time.Second): + t.Fatal("spawn was not reached") + } + conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgData, Data: base64.StdEncoding.EncodeToString([]byte("status\n"))} + close(releaseSpawn) + + recv(t, conn, chTerminal, msgOpened, time.Second) + eventually(t, time.Second, func() bool { return string(pty.writtenBytes()) == "status\n" }) +} + // nextTerminal returns the next frame on conn.out (no skipping), so callers can // assert frame ordering rather than just presence. func nextTerminal(t *testing.T, c *fakeConn) serverMsg { @@ -112,13 +143,13 @@ func nextTerminal(t *testing.T, c *fakeConn) serverMsg { } } -// Opening a pane whose runtime is already dead must (1) send opened before -// exited (the dead pane is reported, not errored) and (2) clear the conn's -// entry, so a later open for the same id on this connection is still served -// instead of being silently dropped by the already-open guard. +// Opening a pane whose runtime is already dead must report exited without +// spawning an attach. The conn entry still has to clear so a later open for the +// same id on this connection is served instead of being silently dropped by the +// already-open guard. func TestServeOpenDeadRuntimeReportsExitedAndAllowsReopen(t *testing.T) { src := &fakeSource{alive: false} - sp := &fakeSpawner{} + sp := &fakeSpawner{ptys: []*fakePTY{newFakePTY()}} mgr := NewManager(src, nil, testLogger(), WithSpawn(sp.spawn), WithHeartbeat(0)) defer mgr.Close() @@ -128,16 +159,14 @@ func TestServeOpenDeadRuntimeReportsExitedAndAllowsReopen(t *testing.T) { go mgr.Serve(ctx, conn) conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} - if m := nextTerminal(t, conn); m.Type != msgOpened { - t.Fatalf("first frame = %q, want opened", m.Type) - } if m := nextTerminal(t, conn); m.Type != msgExited { - t.Fatalf("second frame = %q, want exited", m.Type) + t.Fatalf("first frame = %q, want exited", m.Type) } if got := sp.calls(); got != 0 { t.Fatalf("attach must never run against a dead runtime, got %d spawns", got) } + src.setAlive(true) conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} if m := nextTerminal(t, conn); m.Type != msgOpened { t.Fatalf("re-open frame = %q, want opened (open was dropped, entry stuck)", m.Type) @@ -166,6 +195,7 @@ func TestServeExitAfterOpenClearsEntryAllowingReopen(t *testing.T) { p.Close() // drop the pty; IsAlive false => session exits, no re-attach recv(t, conn, chTerminal, msgExited, time.Second) + src.setAlive(true) conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} recv(t, conn, chTerminal, msgOpened, 2*time.Second) } @@ -173,8 +203,9 @@ func TestServeExitAfterOpenClearsEntryAllowingReopen(t *testing.T) { // An attachment that exits the moment it is opened (dead runtime) fires onExit // from its run goroutine, racing the reopen that follows the exited frame. The // identity-guarded delete in onExit must never evict a successor attachment -// registered under the same id: every reopen must be served (opened), never -// silently dropped by the already-open guard. Stressed across many iterations +// registered under the same id: every reopen must be served (exited again for a +// still-dead runtime), never silently dropped by the already-open guard. +// Stressed across many iterations // to shake the exit/reopen interleavings out. func TestServeReopenAfterImmediateExitNeverStuck(t *testing.T) { for i := 0; i < 400; i++ { @@ -189,13 +220,12 @@ func TestServeReopenAfterImmediateExitNeverStuck(t *testing.T) { conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} - recv(t, conn, chTerminal, msgOpened, time.Second) recv(t, conn, chTerminal, msgExited, time.Second) // The reopen must be served even while the first open's exit teardown is // still in flight. conn.in <- clientMsg{Ch: chTerminal, ID: "t1", Type: msgOpen} - recv(t, conn, chTerminal, msgOpened, time.Second) + recv(t, conn, chTerminal, msgExited, time.Second) cancel() mgr.Close() diff --git a/backend/internal/terminal/pty_unix.go b/backend/internal/terminal/pty_unix.go index 390a79b9..c613bbd8 100644 --- a/backend/internal/terminal/pty_unix.go +++ b/backend/internal/terminal/pty_unix.go @@ -19,14 +19,17 @@ import ( // startup, and a post-spawn TIOCSWINSZ depends on SIGWINCH delivery that can // race the client installing its handler — StartWithSize makes the first read // correct by construction. env, when non-nil, replaces the inherited -// environment (mirrors exec.Cmd.Env semantics). ctx cancellation kills the -// process. Windows uses a stub (see pty_windows.go) until a ConPTY path is -// added. +// environment (mirrors exec.Cmd.Env semantics). ctx cancellation closes the PTY +// through the same graceful detach path as an explicit client close. Windows uses +// a stub (see pty_windows.go) until a ConPTY path is added. func defaultSpawn(ctx context.Context, argv, env []string, rows, cols uint16) (ptyProcess, error) { if len(argv) == 0 { return nil, errors.New("terminal: empty attach command") } - cmd := exec.CommandContext(ctx, argv[0], argv[1:]...) + if err := ctx.Err(); err != nil { + return nil, err + } + cmd := exec.Command(argv[0], argv[1:]...) if env != nil { cmd.Env = env } @@ -40,7 +43,12 @@ func defaultSpawn(ctx context.Context, argv, env []string, rows, cols uint16) (p if err != nil { return nil, err } - return &creackPTY{f: f, cmd: cmd}, nil + proc := &creackPTY{f: f, cmd: cmd} + go func() { + <-ctx.Done() + _ = proc.Close() + }() + return proc, nil } type creackPTY struct { diff --git a/frontend/src/renderer/components/SessionInspector.tsx b/frontend/src/renderer/components/SessionInspector.tsx index 87edce1d..edd99b9e 100644 --- a/frontend/src/renderer/components/SessionInspector.tsx +++ b/frontend/src/renderer/components/SessionInspector.tsx @@ -247,6 +247,8 @@ function activityDetail(status: SessionStatus): string | null { return "Session idle"; case "needs_input": return "Waiting for your input"; + case "no_signal": + return "No recent agent signal"; case "working": return null; default: @@ -261,6 +263,7 @@ const STATUS_PILL: Record< working: { label: "Working", tone: "var(--orange)", breathe: true }, needs_you: { label: "Input needed", tone: "var(--amber)", breathe: false }, ci_failed: { label: "CI failed", tone: "var(--red)", breathe: false }, + no_signal: { label: "No signal", tone: "var(--fg-muted)", breathe: false }, mergeable: { label: "Ready", tone: "var(--green)", breathe: false }, done: { label: "Done", tone: "var(--fg-muted)", breathe: false }, idle: { label: "Idle", tone: "var(--fg-muted)", breathe: false }, diff --git a/frontend/src/renderer/components/SessionView.tsx b/frontend/src/renderer/components/SessionView.tsx index 9acf8b0d..86c81634 100644 --- a/frontend/src/renderer/components/SessionView.tsx +++ b/frontend/src/renderer/components/SessionView.tsx @@ -24,13 +24,10 @@ type SessionViewProps = { sessionId: string; }; -// The session detail screen: the persistent terminal + git rail, under the -// shell-owned ShellTopbar. Rendered by both the project-scoped and -// cross-project session routes. The terminal lives here (not in the shell) — -// switching sessions only changes route params, so TanStack Router keeps this -// component mounted and the terminal re-points its mux without remounting -// (useTerminalSession). Leaving for the board unmounts it; a fresh server-side -// zellij attach repaints the pane on return. +// The session detail screen: terminal + git rail, under the shell-owned +// ShellTopbar. Rendered by both the project-scoped and cross-project session +// routes. TerminalPane owns the terminal lifetime and remounts by terminal +// handle so each session gets a clean xterm/mux binding. // // The split is shadcn's resizable (react-resizable-panels v4) with a fully // collapsible inspector: the panel is `collapsible` and driven to 0% via the diff --git a/frontend/src/renderer/components/SessionsBoard.tsx b/frontend/src/renderer/components/SessionsBoard.tsx index 673380d4..6014b02b 100644 --- a/frontend/src/renderer/components/SessionsBoard.tsx +++ b/frontend/src/renderer/components/SessionsBoard.tsx @@ -328,6 +328,8 @@ function sessionBadge(session: WorkspaceSession): { label: string; className: st switch (session.status) { case "needs_input": return { label: "Input needed", className: "text-warning" }; + case "no_signal": + return { label: "No signal", className: "text-passive" }; case "ci_failed": return { label: "CI failed", className: "text-error" }; case "changes_requested": diff --git a/frontend/src/renderer/components/ShellTopbar.tsx b/frontend/src/renderer/components/ShellTopbar.tsx index ff8285de..8ab3eec9 100644 --- a/frontend/src/renderer/components/ShellTopbar.tsx +++ b/frontend/src/renderer/components/ShellTopbar.tsx @@ -30,6 +30,7 @@ const STATUS_PILL: Record; + return ( + + ); } function bannerText(state: TerminalSessionState, error?: string): string | undefined { @@ -46,10 +57,9 @@ function AttachedTerminal({ session, theme, daemonReady, terminalTarget }: Termi session && terminalTarget?.kind === "reviewer" ? { ...session, terminalHandleId: terminalTarget.handleId } : session; - // One terminal instance per pane lifetime (yyork's core rule): switching - // sessions never remounts XtermTerminal — the attachment effect re-points - // the mux and clears the screen instead. A keyed remount would tear down the - // renderer mid-switch and lose the warm GPU surface. + // One terminal instance per handle-scoped pane lifetime. TerminalPane keys this + // component by terminal handle, so session switches get a fresh xterm + mux + // hook state instead of reusing a potentially stale screen/input binding. const [terminal, setTerminal] = useState(null); const [initFailed, setInitFailed] = useState(false); const [isRestoring, setIsRestoring] = useState(false); @@ -60,7 +70,9 @@ function AttachedTerminal({ session, theme, daemonReady, terminalTarget }: Termi const hadAttachmentRef = useRef(false); const canRestoreSession = terminalTarget?.kind !== "reviewer" && session?.status === "terminated"; - const handleReady = useCallback((handle: AttachableTerminal) => setTerminal(handle), []); + const handleReady = useCallback((handle: AttachableTerminal) => { + setTerminal(handle); + }, []); const handleInitError = useCallback((err: unknown) => { console.error("xterm failed to initialize", err); setInitFailed(true); @@ -97,7 +109,7 @@ function AttachedTerminal({ session, theme, daemonReady, terminalTarget }: Termi } hadAttachmentRef.current = true; return attach(terminal); - }, [terminal, handleId, attach]); + }, [terminal, handleId, attach, attachSession?.id]); if (initFailed) { return ( diff --git a/frontend/src/renderer/components/XtermTerminal.tsx b/frontend/src/renderer/components/XtermTerminal.tsx index 9b177b4c..3851bc5f 100644 --- a/frontend/src/renderer/components/XtermTerminal.tsx +++ b/frontend/src/renderer/components/XtermTerminal.tsx @@ -2,9 +2,10 @@ // // Design rules (the reason this component exists): // - The mount effect is dependency-free: the terminal instance is created once -// per mount and NEVER torn down because a callback identity or session -// changed. Session switching is the owner's job (re-point the mux, clear the -// screen) — see TerminalPane. +// per mount and NEVER torn down because a callback identity changed. +// TerminalPane chooses the mount lifetime; it keys mounts by terminal handle +// so session switches get a clean surface, while same-handle reconnects reuse +// the mounted renderer. // - Nothing writes into the buffer at mount. Status/empty-state belongs to DOM // chrome around the terminal, not inside it. Writing before layout settles // is what crashed xterm's Viewport (`dimensions` of a zero-sized renderer). @@ -26,7 +27,7 @@ import { SearchAddon } from "@xterm/addon-search"; import { Unicode11Addon } from "@xterm/addon-unicode11"; import { WebLinksAddon } from "@xterm/addon-web-links"; import { WebglAddon } from "@xterm/addon-webgl"; -import type { AttachableTerminal } from "../hooks/useTerminalSession"; +import type { AttachableTerminal, TerminalUserInputSource } from "../hooks/useTerminalSession"; import { buildTerminalThemes } from "../lib/terminal-themes"; import type { Theme } from "../stores/ui-store"; @@ -68,12 +69,19 @@ const terminalThemes = buildTerminalThemes(); // Erase scrollback (3J) + display (2J) and home the cursor — yyork's // terminalResetSequence. Deliberately NOT term.reset(): every pane PTY is a -// fresh per-client `zellij attach` whose handshake re-asserts the DEC private -// modes (SGR mouse tracking, alt screen) anyway, but a full RIS would drop -// them for the window until that handshake arrives — a flash where wheel -// events stop reaching zellij. The clear only wipes pixels; modes stay up. +// fresh per-client `zellij attach` whose handshake re-asserts terminal modes +// anyway, but a full RIS would drop them for the window until that handshake +// arrives. The clear only wipes pixels; modes stay up. const CLEAR_SEQUENCE = "\x1b[3J\x1b[2J\x1b[H"; +function preparePastedText(text: string): string { + return text.replace(/\r?\n/g, "\r"); +} + +function bracketPastedText(text: string, bracketedPasteMode: boolean): string { + return bracketedPasteMode ? `\x1b[200~${text}\x1b[201~` : text; +} + export function XtermTerminal(props: XtermTerminalProps) { const hostRef = useRef(null); const termRef = useRef(null); @@ -227,6 +235,26 @@ export function XtermTerminal(props: XtermTerminalProps) { // misses them. Listen on window directly as a session-long recovery path. window.addEventListener("resize", fitTerminal); + const userInputListeners = new Set<(data: string, source: TerminalUserInputSource) => void>(); + const emitUserInput = (data: string, source: TerminalUserInputSource) => { + if (data.length === 0) return; + userInputListeners.forEach((listener) => listener(data, source)); + }; + const keyInput = term.onKey(({ key }) => emitUserInput(key, "keyboard")); + const pasteInput = (event: ClipboardEvent) => { + event.preventDefault(); + event.stopPropagation(); + const text = event.clipboardData?.getData("text/plain") ?? ""; + const prepared = preparePastedText(text); + const bracketed = term.modes.bracketedPasteMode && term.options.ignoreBracketedPasteMode !== true; + emitUserInput(bracketPastedText(prepared, bracketed), "paste"); + }; + const compositionInput = (event: CompositionEvent) => { + emitUserInput(event.data, "composition"); + }; + host.addEventListener("paste", pasteInput, true); + host.addEventListener("compositionend", compositionInput, true); + // Live cols/rows getters: the owner reads the current grid at attach time, // not a snapshot taken at ready time (the first fit may not have run yet). const handle: AttachableTerminal = { @@ -239,7 +267,10 @@ export function XtermTerminal(props: XtermTerminalProps) { write: (data) => term.write(data), writeln: (line) => term.writeln(line), clear: () => term.write(CLEAR_SEQUENCE), - onData: (listener) => term.onData(listener), + onUserInput: (listener) => { + userInputListeners.add(listener); + return { dispose: () => userInputListeners.delete(listener) }; + }, onResize: (listener) => term.onResize(listener), }; callbacksRef.current.onReady?.(handle); @@ -251,6 +282,10 @@ export function XtermTerminal(props: XtermTerminalProps) { observer.disconnect(); stabilizer.dispose(); window.removeEventListener("resize", fitTerminal); + host.removeEventListener("paste", pasteInput, true); + host.removeEventListener("compositionend", compositionInput, true); + keyInput.dispose(); + userInputListeners.clear(); try { term.dispose(); } catch { diff --git a/frontend/src/renderer/hooks/useTerminalSession.test.tsx b/frontend/src/renderer/hooks/useTerminalSession.test.tsx index d8efca52..5b5034b6 100644 --- a/frontend/src/renderer/hooks/useTerminalSession.test.tsx +++ b/frontend/src/renderer/hooks/useTerminalSession.test.tsx @@ -25,6 +25,8 @@ type FakeMux = { opens: Array<[string, number, number]>; resizes: Array<[string, number, number]>; inputs: Array<[string, string]>; + closes: string[]; + events: string[]; disposed: boolean; emitData(id: string, text: string): void; emitOpened(id: string): void; @@ -51,12 +53,17 @@ function createFakeMux(): FakeMux { opens: [], resizes: [], inputs: [], + closes: [], + events: [], disposed: false, mux: { open: (id, cols, rows) => fake.opens.push([id, cols, rows]), sendInput: (id, input) => fake.inputs.push([id, input]), resize: (id, cols, rows) => fake.resizes.push([id, cols, rows]), - close: () => undefined, + close: (id) => { + fake.closes.push(id); + fake.events.push(`close:${id}`); + }, onData: (id, listener) => subscribe(data, id, listener), onExit: (id, listener) => subscribe(exit, id, listener), onOpened: (id, listener) => subscribe(opened, id, listener), @@ -67,6 +74,7 @@ function createFakeMux(): FakeMux { }, dispose: () => { fake.disposed = true; + fake.events.push("dispose"); }, }, emitData: (id, text) => data.get(id)?.forEach((listener) => listener(new TextEncoder().encode(text))), @@ -82,11 +90,12 @@ type FakeTerminal = AttachableTerminal & { lines: string[]; clears: number; typeKeys(data: string): void; + paste(data: string): void; emitResize(cols: number, rows: number): void; }; function createFakeTerminal(): FakeTerminal { - const dataListeners = new Set<(data: string) => void>(); + const inputListeners = new Set[0]>(); const resizeListeners = new Set<(size: { cols: number; rows: number }) => void>(); const terminal: FakeTerminal = { cols: 80, @@ -98,15 +107,16 @@ function createFakeTerminal(): FakeTerminal { clear: () => { terminal.clears += 1; }, - onData: (listener) => { - dataListeners.add(listener); - return { dispose: () => dataListeners.delete(listener) }; + onUserInput: (listener) => { + inputListeners.add(listener); + return { dispose: () => inputListeners.delete(listener) }; }, onResize: (listener) => { resizeListeners.add(listener); return { dispose: () => resizeListeners.delete(listener) }; }, - typeKeys: (data) => dataListeners.forEach((listener) => listener(data)), + typeKeys: (data) => inputListeners.forEach((listener) => listener(data, "keyboard")), + paste: (data) => inputListeners.forEach((listener) => listener(data, "paste")), emitResize: (cols, rows) => resizeListeners.forEach((listener) => listener({ cols, rows })), }; return terminal; @@ -163,10 +173,16 @@ describe("useTerminalSession", () => { it("forwards PTY output, keystrokes, and resizes across the attachment", () => { const { terminal, muxes } = setup(); + act(() => muxes[0].emitOpened("handle-1")); act(() => muxes[0].emitData("handle-1", "hello")); expect(terminal.lines).toContain("hello"); terminal.typeKeys("ls\r"); expect(muxes[0].inputs).toEqual([["handle-1", "ls\r"]]); + terminal.paste("echo pasted\r"); + expect(muxes[0].inputs).toEqual([ + ["handle-1", "ls\r"], + ["handle-1", "echo pasted\r"], + ]); terminal.emitResize(120, 40); act(() => void vi.advanceTimersByTime(100)); expect(muxes[0].resizes).toContainEqual(["handle-1", 120, 40]); @@ -191,6 +207,15 @@ describe("useTerminalSession", () => { ]); }); + it("does not forward input until the server opens the current attachment", () => { + const { terminal, muxes } = setup(); + terminal.typeKeys("too early"); + expect(muxes[0].inputs).toEqual([]); + act(() => muxes[0].emitOpened("handle-1")); + terminal.typeKeys("ready\r"); + expect(muxes[0].inputs).toEqual([["handle-1", "ready\r"]]); + }); + it("a new resize burst supersedes a pending re-assert", () => { const { terminal, muxes } = setup(); const initialResizes = muxes[0].resizes.length; @@ -238,6 +263,33 @@ describe("useTerminalSession", () => { expect(view.result.current.state).toBe("attached"); }); + it("ignores stale frames after a reconnect starts", () => { + const { view, terminal, muxes } = setup(); + act(() => muxes[0].emitConnection("closed")); + act(() => void vi.advanceTimersByTime(500)); + expect(muxes).toHaveLength(2); + act(() => muxes[0].emitOpened("handle-1")); + act(() => muxes[0].emitData("handle-1", "stale")); + expect(view.result.current.state).toBe("reattaching"); + expect(terminal.lines).not.toContain("stale"); + act(() => muxes[1].emitOpened("handle-1")); + expect(view.result.current.state).toBe("attached"); + }); + + it("hard-reconnects when the server never acknowledges open", () => { + const { view, muxes } = setup(); + act(() => void vi.advanceTimersByTime(2_999)); + expect(muxes).toHaveLength(1); + act(() => void vi.advanceTimersByTime(1)); + expect(view.result.current.state).toBe("reattaching"); + expect(muxes[0].disposed).toBe(true); + act(() => void vi.advanceTimersByTime(500)); + expect(muxes).toHaveLength(2); + expect(muxes[1].opens).toEqual([["handle-1", 80, 24]]); + act(() => muxes[1].emitOpened("handle-1")); + expect(view.result.current.state).toBe("attached"); + }); + it("backs off between failed reconnect attempts", () => { const { muxes } = setup(); act(() => muxes[0].emitConnection("closed")); @@ -267,6 +319,8 @@ describe("useTerminalSession", () => { act(() => detach()); expect(view.result.current.state).toBe("idle"); expect(muxes[0].disposed).toBe(true); + expect(muxes[0].closes).toEqual(["handle-1"]); + expect(muxes[0].events).toEqual(["close:handle-1", "dispose"]); act(() => muxes[0].emitConnection("closed")); act(() => void vi.advanceTimersByTime(60_000)); expect(muxes).toHaveLength(1); diff --git a/frontend/src/renderer/hooks/useTerminalSession.ts b/frontend/src/renderer/hooks/useTerminalSession.ts index 8829c70b..a574e855 100644 --- a/frontend/src/renderer/hooks/useTerminalSession.ts +++ b/frontend/src/renderer/hooks/useTerminalSession.ts @@ -18,6 +18,8 @@ import { workspaceQueryKey } from "./useWorkspaceQuery"; * The slice of xterm's Terminal the attachment needs. Structural, so tests can * drive the hook with a tiny fake instead of a real xterm + DOM. */ +export type TerminalUserInputSource = "keyboard" | "paste" | "composition"; + export type AttachableTerminal = { cols: number; rows: number; @@ -30,7 +32,7 @@ export type AttachableTerminal = { * with wheel scroll dead (see XtermTerminal's CLEAR_SEQUENCE). */ clear: () => void; - onData: (listener: (data: string) => void) => { dispose: () => void }; + onUserInput: (listener: (data: string, source: TerminalUserInputSource) => void) => { dispose: () => void }; onResize: (listener: (size: { cols: number; rows: number }) => void) => { dispose: () => void }; }; @@ -51,6 +53,7 @@ export type UseTerminalSessionOptions = { const RETRY_BASE_MS = 500; const RETRY_MAX_MS = 8_000; +const OPEN_TIMEOUT_MS = 3_000; // Trailing debounce on grid changes: a pane drag emits a burst of intermediate // sizes; the attached program should get one SIGWINCH when the drag settles, // not dozens (yyork's terminal-panel does the same at its socket layer). @@ -88,9 +91,12 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option handle: null as string | null, disposers: [] as Array<() => void>, retryTimer: null as ReturnType | null, + openTimer: null as ReturnType | null, resizeTimer: null as ReturnType | null, attempts: 0, firstAttach: true, + generation: 0, + inputReady: false, detached: true, }); @@ -109,25 +115,53 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option clearTimeout(r.retryTimer); r.retryTimer = null; } + if (r.openTimer) { + clearTimeout(r.openTimer); + r.openTimer = null; + } if (r.resizeTimer) { clearTimeout(r.resizeTimer); r.resizeTimer = null; } + r.inputReady = false; + if (r.mux && r.handle) { + r.mux.close(r.handle); + } r.disposers.forEach((dispose) => dispose()); r.disposers = []; r.mux?.dispose(); r.mux = null; }, []); + const isCurrentAttachment = useCallback((generation: number, handle: string, mux: TerminalMux) => { + const r = runtime.current; + return !r.detached && r.generation === generation && r.handle === handle && r.mux === mux; + }, []); + + const clearOpenTimer = useCallback((generation: number) => { + const r = runtime.current; + if (r.generation !== generation || !r.openTimer) return; + clearTimeout(r.openTimer); + r.openTimer = null; + }, []); + const scheduleReattach = useCallback(() => { const r = runtime.current; - if (r.detached || !r.terminal || !r.handle) return; + if (r.detached || !r.terminal || !r.handle) { + return; + } // A socket dropping after the PTY ended (or errored) changes nothing. - if (stateRef.current === "exited" || stateRef.current === "error") return; + if (stateRef.current === "exited" || stateRef.current === "error") { + return; + } transition("reattaching"); // Not ready → no timer; the daemonReady effect reconnects when it flips. - if (!optionsRef.current.daemonReady) return; - if (r.retryTimer) return; + if (!optionsRef.current.daemonReady) { + return; + } + if (r.retryTimer) { + return; + } const delay = Math.min(RETRY_BASE_MS * 2 ** r.attempts, RETRY_MAX_MS); r.attempts += 1; r.retryTimer = setTimeout(() => { @@ -139,45 +173,75 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option const connect = useCallback(() => { const r = runtime.current; const { terminal, handle } = r; - if (!terminal || !handle || r.detached) return; + if (!terminal || !handle || r.detached) { + return; + } + const generation = r.generation + 1; + r.generation = generation; + r.inputReady = false; teardownMux(); const mux = (optionsRef.current.createMux ?? defaultCreateMux)(); r.mux = mux; r.disposers.push( - mux.onData(handle, (bytes) => terminal.write(bytes)), + mux.onData(handle, (bytes) => { + if (!isCurrentAttachment(generation, handle, mux)) return; + terminal.write(bytes); + }), mux.onOpened(handle, () => { + if (!isCurrentAttachment(generation, handle, mux)) return; + clearOpenTimer(generation); + r.inputReady = true; r.attempts = 0; setError(undefined); transition("attached"); }), mux.onExit(handle, () => { + if (!isCurrentAttachment(generation, handle, mux)) return; + clearOpenTimer(generation); + r.inputReady = false; terminal.writeln("\r\n\x1b[2m[process exited]\x1b[0m"); transition("exited"); invalidateWorkspaces(); }), mux.onError(handle, (message) => { + if (!isCurrentAttachment(generation, handle, mux)) return; + clearOpenTimer(generation); + r.inputReady = false; terminal.writeln(`\r\n\x1b[2m[terminal error] ${message}\x1b[0m`); setError(message); transition("error"); invalidateWorkspaces(); }), mux.onConnectionChange((connectionState) => { - if (connectionState === "closed") scheduleReattach(); + if (!isCurrentAttachment(generation, handle, mux)) return; + if (connectionState === "closed") { + clearOpenTimer(generation); + r.inputReady = false; + scheduleReattach(); + } }), ); - const input = terminal.onData((data) => mux.sendInput(handle, data)); + const input = terminal.onUserInput((data) => { + if (!isCurrentAttachment(generation, handle, mux) || !r.inputReady) { + return; + } + mux.sendInput(handle, data); + }); // xterm only fires onResize when the grid actually changed; the debounce // additionally collapses a drag's burst of changes into one PTY resize. // Each settled resize is re-asserted once (see RESIZE_REASSERT_MS); both // stages share resizeTimer so a new burst or teardown cancels either. const resize = terminal.onResize(({ cols, rows }) => { + if (!isCurrentAttachment(generation, handle, mux)) return; if (r.resizeTimer) clearTimeout(r.resizeTimer); r.resizeTimer = setTimeout(() => { + if (!isCurrentAttachment(generation, handle, mux)) return; mux.resize(handle, cols, rows); r.resizeTimer = setTimeout(() => { r.resizeTimer = null; + if (!isCurrentAttachment(generation, handle, mux)) return; mux.resize(handle, cols, rows); }, RESIZE_REASSERT_MS); }, RESIZE_DEBOUNCE_MS); @@ -200,7 +264,14 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option mux.open(handle, terminal.cols, terminal.rows); mux.resize(handle, terminal.cols, terminal.rows); - }, [invalidateWorkspaces, scheduleReattach, teardownMux, transition]); + r.openTimer = setTimeout(() => { + if (!isCurrentAttachment(generation, handle, mux)) return; + r.openTimer = null; + transition("reattaching"); + teardownMux(); + scheduleReattach(); + }, OPEN_TIMEOUT_MS); + }, [clearOpenTimer, invalidateWorkspaces, isCurrentAttachment, scheduleReattach, teardownMux, transition]); connectRef.current = connect; /** @@ -224,10 +295,12 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option transition("idle"); } return () => { + r.generation += 1; r.detached = true; teardownMux(); r.terminal = null; r.handle = null; + r.inputReady = false; setError(undefined); transition("idle"); }; @@ -250,7 +323,10 @@ export function useTerminalSession(session: WorkspaceSession | undefined, option // forgot to call detach. useEffect( () => () => { - runtime.current.detached = true; + const r = runtime.current; + r.generation += 1; + r.detached = true; + r.inputReady = false; teardownMux(); }, [teardownMux], diff --git a/frontend/src/renderer/lib/terminal-mux.ts b/frontend/src/renderer/lib/terminal-mux.ts index 76cd8c20..8e0264ac 100644 --- a/frontend/src/renderer/lib/terminal-mux.ts +++ b/frontend/src/renderer/lib/terminal-mux.ts @@ -87,7 +87,7 @@ type ConnectionListener = (state: MuxConnectionState) => void; export type TerminalMux = { /** Open a PTY pane for the given runtime/session id at an initial size. */ open: (id: string, cols: number, rows: number) => void; - /** Forward a keystroke string (xterm `onData`) to the pane. */ + /** Forward user-originated keyboard/paste data to the pane. */ sendInput: (id: string, input: string) => void; resize: (id: string, cols: number, rows: number) => void; close: (id: string) => void; @@ -166,8 +166,12 @@ export function createTerminalMux(url: string, WebSocketImpl: typeof WebSocket = setConnectionState("open"); }); - socket.addEventListener("close", () => setConnectionState("closed")); - socket.addEventListener("error", () => setConnectionState("closed")); + socket.addEventListener("close", () => { + setConnectionState("closed"); + }); + socket.addEventListener("error", () => { + setConnectionState("closed"); + }); socket.addEventListener("message", (event: MessageEvent) => { if (typeof event.data !== "string") return; @@ -214,10 +218,19 @@ export function createTerminalMux(url: string, WebSocketImpl: typeof WebSocket = }; return { - open: (id, cols, rows) => send(openFrame(id, cols, rows)), - sendInput: (id, input) => send(dataFrame(id, encoder.encode(input))), - resize: (id, cols, rows) => send(resizeFrame(id, cols, rows)), - close: (id) => send(closeFrame(id)), + open: (id, cols, rows) => { + send(openFrame(id, cols, rows)); + }, + sendInput: (id, input) => { + const bytes = encoder.encode(input); + send(dataFrame(id, bytes)); + }, + resize: (id, cols, rows) => { + send(resizeFrame(id, cols, rows)); + }, + close: (id) => { + send(closeFrame(id)); + }, onData: (id, listener) => subscribeById(dataListeners, id, listener), onExit: (id, listener) => subscribeById(exitListeners, id, listener), onOpened: (id, listener) => subscribeById(openedListeners, id, listener), diff --git a/frontend/src/renderer/types/workspace.test.ts b/frontend/src/renderer/types/workspace.test.ts index 729c88c9..cc1e54e5 100644 --- a/frontend/src/renderer/types/workspace.test.ts +++ b/frontend/src/renderer/types/workspace.test.ts @@ -48,6 +48,7 @@ const pr = (overrides: Partial & { number: number; state: PRSt describe("toSessionStatus", () => { it("passes through a known status", () => { expect(toSessionStatus("mergeable")).toBe("mergeable"); + expect(toSessionStatus("no_signal")).toBe("no_signal"); }); it("overrides to terminated when the session is terminated", () => { @@ -73,6 +74,7 @@ describe("workerDisplayStatus", () => { ["changes_requested", "needs_you"], ["review_pending", "needs_you"], ["ci_failed", "ci_failed"], + ["no_signal", "no_signal"], ["approved", "mergeable"], ["mergeable", "mergeable"], ["merged", "done"], @@ -132,6 +134,10 @@ describe("sessionNeedsAttention", () => { expect(sessionNeedsAttention(sessionWith({ status }))).toBe(true); }); + it("treats no_signal as needing attention", () => { + expect(sessionNeedsAttention(sessionWith({ status: "no_signal" }))).toBe(true); + }); + it("is false for statuses that don't need the user", () => { expect(sessionNeedsAttention(sessionWith({ status: "working" }))).toBe(false); expect(sessionNeedsAttention(sessionWith({ status: "mergeable" }))).toBe(false); @@ -143,6 +149,7 @@ describe("workerStatusPulses", () => { expect(workerStatusPulses("working")).toBe(true); expect(workerStatusPulses("needs_you")).toBe(true); expect(workerStatusPulses("mergeable")).toBe(false); + expect(workerStatusPulses("no_signal")).toBe(false); expect(workerStatusPulses("done")).toBe(false); }); }); @@ -199,6 +206,7 @@ describe("attentionZone", () => { ["approved", "merge"], ["needs_input", "action"], ["ci_failed", "action"], + ["no_signal", "action"], ["changes_requested", "action"], ["review_pending", "pending"], ["pr_open", "pending"], diff --git a/frontend/src/renderer/types/workspace.ts b/frontend/src/renderer/types/workspace.ts index a5f1b466..47918246 100644 --- a/frontend/src/renderer/types/workspace.ts +++ b/frontend/src/renderer/types/workspace.ts @@ -9,6 +9,7 @@ export type SessionStatus = | "mergeable" | "merged" | "needs_input" + | "no_signal" | "idle" | "terminated"; @@ -23,6 +24,7 @@ const sessionStatuses = new Set([ "mergeable", "merged", "needs_input", + "no_signal", "idle", "terminated", ]); @@ -119,7 +121,7 @@ export type WorkspaceSession = { }; /** Glanceable worker status. Maps 1:1 to the accent colors in DESIGN.md. */ -export type WorkerDisplayStatus = "working" | "needs_you" | "mergeable" | "ci_failed" | "done"; +export type WorkerDisplayStatus = "working" | "needs_you" | "mergeable" | "ci_failed" | "no_signal" | "done"; export function workerDisplayStatus(session: WorkspaceSession): WorkerDisplayStatus { if (session.displayStatus) return session.displayStatus; @@ -130,6 +132,8 @@ export function workerDisplayStatus(session: WorkspaceSession): WorkerDisplaySta return "needs_you"; case "ci_failed": return "ci_failed"; + case "no_signal": + return "no_signal"; case "approved": case "mergeable": return "mergeable"; @@ -194,6 +198,7 @@ export function sessionIsActive(session: WorkspaceSession): boolean { export function sessionNeedsAttention(session: WorkspaceSession): boolean { return ( session.status === "needs_input" || + session.status === "no_signal" || session.status === "changes_requested" || session.status === "review_pending" || session.status === "ci_failed" @@ -205,6 +210,7 @@ export const workerStatusLabel: Record = { needs_you: "needs you", mergeable: "mergeable", ci_failed: "ci failed", + no_signal: "no signal", done: "done", }; @@ -246,6 +252,7 @@ export function attentionZone(session: WorkspaceSession): AttentionZone { // Agent waiting on a human (respond) or a problem to investigate (review); // agent-orchestrator collapses these into one "action" zone by default. case "needs_input": + case "no_signal": case "ci_failed": case "changes_requested": return "action";