Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions backend/internal/cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ type daemonStatus struct {
}

type probeResult struct {
Status string `json:"status"`
Service string `json:"service"`
PID int `json:"pid"`
Status string `json:"status"`
Service string `json:"service"`
PID int `json:"pid"`
ExecutablePath string `json:"executablePath,omitempty"`
WorkingDirectory string `json:"workingDirectory,omitempty"`
}

func newStatusCommand(ctx *commandContext) *cobra.Command {
Expand Down
23 changes: 15 additions & 8 deletions backend/internal/httpd/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,26 @@ func localControlRequest(r *http.Request) bool {
// handleHealthz is the liveness probe: it answers 200 as long as the process is
// up and serving. It does no dependency checks by design.
func handleHealthz(w http.ResponseWriter, _ *http.Request) {
envelope.WriteJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"service": daemonmeta.ServiceName,
"pid": os.Getpid(),
})
envelope.WriteJSON(w, http.StatusOK, daemonProbePayload("ok"))
}

// handleReadyz is the readiness probe. Dependency initialization happens before
// the server is constructed, so a listening daemon is ready to answer requests.
func handleReadyz(w http.ResponseWriter, _ *http.Request) {
envelope.WriteJSON(w, http.StatusOK, map[string]any{
"status": "ready",
envelope.WriteJSON(w, http.StatusOK, daemonProbePayload("ready"))
}

func daemonProbePayload(status string) map[string]any {
payload := map[string]any{
"status": status,
"service": daemonmeta.ServiceName,
"pid": os.Getpid(),
})
}
if exe, err := os.Executable(); err == nil && exe != "" {
payload["executablePath"] = exe
}
if cwd, err := os.Getwd(); err == nil && cwd != "" {
payload["workingDirectory"] = cwd
}
return payload
}
39 changes: 39 additions & 0 deletions backend/internal/httpd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package httpd

import (
"context"
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -39,6 +41,43 @@ func TestHealthProbes(t *testing.T) {
}
}

func TestHealthProbesIncludeDaemonIdentity(t *testing.T) {
router := newTestRouter(config.Config{}, discardLogger(), nil)
srv := httptest.NewServer(router)
defer srv.Close()

wantExe, err := os.Executable()
if err != nil {
t.Fatal(err)
}
wantCWD, err := os.Getwd()
if err != nil {
t.Fatal(err)
}

client := &http.Client{Timeout: 2 * time.Second}
for _, path := range []string{"/healthz", "/readyz"} {
resp, err := client.Get(srv.URL + path)
if err != nil {
t.Fatalf("GET %s: %v", path, err)
}
defer resp.Body.Close()
var body struct {
ExecutablePath string `json:"executablePath"`
WorkingDirectory string `json:"workingDirectory"`
}
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
t.Fatalf("decode %s: %v", path, err)
}
if body.ExecutablePath != wantExe {
t.Errorf("GET %s executablePath = %q, want %q", path, body.ExecutablePath, wantExe)
}
if body.WorkingDirectory != wantCWD {
t.Errorf("GET %s workingDirectory = %q, want %q", path, body.WorkingDirectory, wantCWD)
}
}
}

// TestServerLifecycle exercises the full Run loop: bind an ephemeral port,
// publish running.json, serve a request, then cancel the context and confirm a
// clean shutdown that removes the handshake file.
Expand Down
202 changes: 195 additions & 7 deletions frontend/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { readFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { pathToFileURL } from "node:url";
import { resolveDaemonLaunch } from "./shared/daemon-launch";
import { type DaemonLaunchSpec, resolveDaemonLaunch } from "./shared/daemon-launch";
import { createListenPortScanner, defaultRunFilePath, parseRunFile } from "./shared/daemon-discovery";
import type { DaemonStatus } from "./shared/daemon-status";
import { DEFAULT_POSTHOG_HOST, DEFAULT_POSTHOG_PROJECT_KEY } from "./shared/posthog-config";
Expand All @@ -21,6 +21,9 @@ app.setName("Agent Orchestrator");

let mainWindow: BrowserWindow | null = null;
let daemonProcess: ChildProcessWithoutNullStreams | null = null;
let daemonStoppingProcess: ChildProcessWithoutNullStreams | null = null;
let daemonStartPromise: Promise<DaemonStatus> | null = null;
let daemonStartEpoch = 0;
let daemonStatus: DaemonStatus = { state: "stopped" };

const isDev = !app.isPackaged;
Expand Down Expand Up @@ -153,6 +156,16 @@ const RUN_FILE_POLL_MS = 300;
// Accept run-files stamped slightly before our spawn timestamp: the daemon's
// clock reading and ours race within normal scheduling jitter.
const RUN_FILE_FRESHNESS_SKEW_MS = 2_000;
const DAEMON_PROBE_TIMEOUT_MS = 2_000;
const DAEMON_SERVICE_NAME = "agent-orchestrator-daemon";

type DaemonProbe = {
status: string;
service: string;
pid: number;
executablePath?: string;
workingDirectory?: string;
};

function runFilePath(): string | null {
if (process.env.AO_RUN_FILE) return process.env.AO_RUN_FILE;
Expand All @@ -169,7 +182,169 @@ function daemonEnv(): NodeJS.ProcessEnv {
};
}

function startDaemon(): DaemonStatus {
function pathKey(value: string): string {
const resolved = path.resolve(value);
return process.platform === "win32" ? resolved.toLowerCase() : resolved;
}

function samePath(a: string, b: string): boolean {
return pathKey(a) === pathKey(b);
}

function pathInside(child: string, parent: string): boolean {
const childKey = pathKey(child);
const parentKey = pathKey(parent);
return childKey === parentKey || childKey.startsWith(parentKey + path.sep);
}

function processAlive(pid: number): boolean {
if (!pid) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}

async function readDaemonProbe(port: number, endpoint: "healthz" | "readyz"): Promise<DaemonProbe | null> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), DAEMON_PROBE_TIMEOUT_MS);
try {
const response = await net.fetch(`http://127.0.0.1:${port}/${endpoint}`, { signal: controller.signal });
if (!response.ok) return null;
const body = (await response.json()) as Partial<DaemonProbe>;
if (body.status !== (endpoint === "healthz" ? "ok" : "ready")) return null;
if (body.service !== DAEMON_SERVICE_NAME) return null;
if (typeof body.pid !== "number" || !Number.isInteger(body.pid)) return null;
return {
status: body.status,
service: body.service,
pid: body.pid,
executablePath: typeof body.executablePath === "string" ? body.executablePath : undefined,
workingDirectory: typeof body.workingDirectory === "string" ? body.workingDirectory : undefined,
};
} catch {
return null;
} finally {
clearTimeout(timer);
}
}

function daemonIdentityError(launch: DaemonLaunchSpec, probe: DaemonProbe): string | null {
if (launch.source === "dev") {
const cwdMatches = probe.workingDirectory ? samePath(probe.workingDirectory, launch.cwd) : false;
const executableMatches = probe.executablePath ? pathInside(probe.executablePath, launch.cwd) : false;
if (!probe.workingDirectory && !probe.executablePath) {
return "An older AO daemon is already running, but it does not report its checkout identity. Stop it and restart this app.";
}
if (!cwdMatches && !executableMatches) {
const actual = probe.workingDirectory ?? probe.executablePath ?? "an unknown location";
return `Another AO daemon is already running from ${actual}; expected this checkout at ${launch.cwd}. Stop the other daemon before using this checkout.`;
}
return null;
}

if (launch.source === "bundled") {
if (!probe.executablePath) {
return "An older AO daemon is already running, but it does not report its binary path. Stop it and restart this app.";
}
if (!samePath(probe.executablePath, launch.command)) {
return `Another AO daemon is already running from ${probe.executablePath}; expected ${launch.command}. Stop the other daemon before using this app.`;
}
}
return null;
}

async function inspectExistingDaemon(launch: DaemonLaunchSpec): Promise<DaemonStatus | null> {
const handshakePath = runFilePath();
if (!handshakePath) return null;
let contents: string;
try {
contents = await readFile(handshakePath, "utf8");
} catch {
return null;
}
const info = parseRunFile(contents);
if (!info || !processAlive(info.pid)) return null;

const health = await readDaemonProbe(info.port, "healthz");
if (!health || health.pid !== info.pid) return null;
const ready = await readDaemonProbe(info.port, "readyz");
if (!ready || ready.pid !== info.pid) {
return {
state: "error",
port: info.port,
pid: info.pid,
executablePath: health.executablePath,
workingDirectory: health.workingDirectory,
message: "An AO daemon is already running, but it is not ready yet.",
};
}

const identityError = daemonIdentityError(launch, ready);
if (identityError) {
return {
state: "error",
port: info.port,
pid: info.pid,
executablePath: ready.executablePath,
workingDirectory: ready.workingDirectory,
message: identityError,
};
}

return {
state: "ready",
port: info.port,
pid: info.pid,
executablePath: ready.executablePath,
workingDirectory: ready.workingDirectory,
};
}

async function refreshDaemonStatus(): Promise<DaemonStatus> {
if (daemonProcess) {
return daemonStatus;
}
const launch = resolveDaemonLaunch(
process.env,
app.isPackaged,
process.resourcesPath,
app.getAppPath(),
process.platform,
);
if (!launch) return daemonStatus;
const existing = await inspectExistingDaemon(launch);
if (existing) {
setDaemonStatus(existing);
} else if (
daemonStatus.state === "ready" ||
(daemonStatus.state === "error" && (daemonStatus.pid || daemonStatus.port))
) {
setDaemonStatus({
state: "stopped",
message: "AO daemon is no longer reachable.",
});
}
return daemonStatus;
}

async function startDaemon(): Promise<DaemonStatus> {
if (daemonStartPromise) {
return daemonStartPromise;
}
const startEpoch = daemonStartEpoch;
const promise = startDaemonInner(startEpoch).finally(() => {
if (daemonStartPromise === promise) {
daemonStartPromise = null;
}
});
daemonStartPromise = promise;
return daemonStartPromise;
}

async function startDaemonInner(startEpoch: number): Promise<DaemonStatus> {
if (daemonProcess) {
return daemonStatus;
}
Expand All @@ -189,6 +364,15 @@ function startDaemon(): DaemonStatus {
return daemonStatus;
}

const existing = await inspectExistingDaemon(launch);
if (startEpoch !== daemonStartEpoch) {
return daemonStatus;
}
if (existing) {
setDaemonStatus(existing);
return daemonStatus;
}

if (launch.source === "bundled" && !existsSync(launch.command)) {
setDaemonStatus({
state: "error",
Expand Down Expand Up @@ -232,7 +416,7 @@ function startDaemon(): DaemonStatus {
};

const reportBoundPort = (port: number) => {
if (portConfirmed || daemonProcess !== child) return;
if (portConfirmed || daemonProcess !== child || daemonStoppingProcess === child) return;
portConfirmed = true;
stopDiscovery();
setDaemonStatus({ state: "ready", port });
Expand Down Expand Up @@ -273,7 +457,7 @@ function startDaemon(): DaemonStatus {
// Last resort: neither source confirmed (e.g. an older daemon build). Report
// the configured port so the renderer is not stuck on "starting" forever.
fallbackTimer = setTimeout(() => {
if (portConfirmed || daemonProcess !== child) return;
if (portConfirmed || daemonProcess !== child || daemonStoppingProcess === child) return;
stopDiscovery();
setDaemonStatus({
state: "ready",
Expand All @@ -286,13 +470,15 @@ function startDaemon(): DaemonStatus {
stopDiscovery();
if (daemonProcess !== child) return;
daemonProcess = null;
if (daemonStoppingProcess === child) daemonStoppingProcess = null;
setDaemonStatus({ state: "error", message: error.message });
});

child.once("exit", (code, signal) => {
stopDiscovery();
if (daemonProcess !== child) return;
daemonProcess = null;
if (daemonStoppingProcess === child) daemonStoppingProcess = null;
setDaemonStatus({
state: "stopped",
message: signal ? `Daemon exited with ${signal}` : `Daemon exited with code ${code ?? "unknown"}`,
Expand All @@ -316,18 +502,20 @@ function killDaemon(child: ChildProcessWithoutNullStreams): void {
}

function stopDaemon(): DaemonStatus {
daemonStartEpoch += 1;
daemonStartPromise = null;
if (!daemonProcess) {
setDaemonStatus({ state: "stopped" });
return daemonStatus;
}

daemonStoppingProcess = daemonProcess;
killDaemon(daemonProcess);
daemonProcess = null;
setDaemonStatus({ state: "stopped" });
return daemonStatus;
}

ipcMain.handle("daemon:getStatus", () => daemonStatus);
ipcMain.handle("daemon:getStatus", () => refreshDaemonStatus());
ipcMain.handle("daemon:start", () => startDaemon());
ipcMain.handle("daemon:stop", () => stopDaemon());
ipcMain.handle("app:getVersion", () => app.getVersion());
Expand Down Expand Up @@ -357,7 +545,7 @@ function initAutoUpdates(): void {
app.whenReady().then(() => {
registerRendererProtocol();
createWindow();
startDaemon();
void startDaemon();
initAutoUpdates();

app.on("activate", () => {
Expand Down
Loading
Loading