From dda1d2069a9be792e2c5b0de86bd2d0ebbd16b03 Mon Sep 17 00:00:00 2001 From: bcode Date: Sat, 16 May 2026 21:44:02 -0700 Subject: [PATCH 1/2] fix(plugin): synchronous shutdown hook for OTel span drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert PR #74's bus-await mechanism (v0.1.7) and replace with a synchronous shutdown hook invoked from src/index.ts's top-level finally before forceFlush. PR #74 was a no-op in headless 'bcode run' mode because the plugin's bus subscriber fiber gets interrupted by Effect scope teardown before it can process session.idle / server.instance.disposed events — confirmed by A/B trace shape comparison between v0.1.6 and v0.1.7 (identical, turn parent span missing in both). The new path is a direct function call from inside the running finally, so the event loop is alive, no scope race, no bus dependency. --- packages/bcode-laminar/src/plugin.ts | 12 ++++++++ packages/opencode/src/index.ts | 22 ++++++++++---- packages/opencode/src/plugin/index.ts | 42 ++++++++++++--------------- packages/plugin/src/index.ts | 9 ++++++ 4 files changed, 56 insertions(+), 29 deletions(-) diff --git a/packages/bcode-laminar/src/plugin.ts b/packages/bcode-laminar/src/plugin.ts index fa502401b..20a750145 100644 --- a/packages/bcode-laminar/src/plugin.ts +++ b/packages/bcode-laminar/src/plugin.ts @@ -75,6 +75,18 @@ export const LaminarPlugin: Plugin = ({ client }) => { config.experimental = { ...(config.experimental ?? {}), openTelemetry: true } } }, + // Synchronous end-of-turn drain. The bus-based session.idle / + // server.instance.disposed events race with Effect scope teardown in + // headless `bcode run` mode and don't reliably deliver, so the turn span + // was historically being left un-ended and never exported. The host calls + // this hook from its top-level finally before forceFlush, so span.end() + // here gets its export drained by the host's existing forceFlush race. + shutdown: () => { + for (const [sessionId, span] of Object.entries(sessionCurrentTurnSpan)) { + span.end() + delete sessionCurrentTurnSpan[sessionId] + } + }, event: async ({ event }) => { switch (event.type) { case "session.idle": { diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 6261c5a14..12c29fc93 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -250,13 +250,23 @@ try { } process.exitCode = 1 } finally { + // Give plugins a synchronous chance to end any open OTel spans before the + // exporter drain below. The bus-based session.idle / server.instance.disposed + // events race with Effect scope teardown and don't reliably reach plugin + // subscribers in headless `bcode run` mode, so we expose a direct sync hook + // (see packages/opencode/src/plugin/index.ts pluginShutdownHooks). + const { pluginShutdownHooks } = await import("./plugin") + for (const hook of pluginShutdownHooks) { + try { + hook() + } catch (err) { + Log.Default.error("plugin shutdown hook failed", { error: err }) + } + } // Drain any registered OTel span processors (e.g. bcode-laminar) before - // exiting. The plugin's `session.idle` event handler is invoked - // fire-and-forget (`packages/opencode/src/plugin/index.ts:249`), so its - // `processor.forceFlush()` Promise was never awaited — without this drain, - // `process.exit()` kills any in-flight gRPC export and the final agent - // span is lost. Bounded with a 3 s race so a wedged exporter cannot hang - // bcode on exit. Generic to any OTel-based plugin, not laminar-specific. + // exiting so the just-ended turn spans actually hit the wire. Bounded with + // a 3 s race so a wedged exporter cannot hang bcode on exit. Generic to any + // OTel-based plugin, not laminar-specific. const provider = trace.getTracerProvider() as { forceFlush?: () => Promise } if (provider.forceFlush) { await Promise.race([ diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index d15b9433c..ab548948a 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -32,6 +32,12 @@ import { RuntimeFlags } from "@/effect/runtime-flags" const log = Log.create({ service: "plugin" }) +// Synchronous shutdown hooks invoked from src/index.ts's top-level finally +// before forceFlush. Plugins register here when loaded; runs once per +// process before process.exit(). Module-level intentionally — needs to be +// reachable outside the Effect runtime. +export const pluginShutdownHooks = new Set<() => void>() + type State = { hooks: Hooks[] } @@ -244,38 +250,28 @@ export const layer = Layer.effect( }).pipe(Effect.ignore) } - // Subscribe to bus events, fiber interrupted when scope closes. - // session.idle and server.instance.disposed are plugins' only chance to - // drain async work (e.g. OTel span exporters) before src/index.ts's - // top-level finally runs forceFlush and calls process.exit() — await - // those handlers; keep the rest fire-and-forget for throughput. + // Subscribe to bus events, fiber interrupted when scope closes yield* bus.subscribeAll().pipe( Stream.runForEach((input) => - Effect.promise(async () => { - const awaitHook = input.type === "server.instance.disposed" || input.type === "session.idle" + Effect.sync(() => { for (const hook of hooks) { - try { - const ret = hook["event"]?.({ event: input as any }) - if (awaitHook && ret) { - await ret - } else if (ret) { - // Fire-and-forget path: surface async failures to logs instead of letting them - // become unhandledRejections that hide which plugin/event broke. - void Promise.resolve(ret).catch((err) => - log.error("plugin event hook failed", { error: err }), - ) - } - } catch (err) { - // Catches sync throws + awaited async rejections so one bad plugin can't kill - // the subscription fiber and silently disable every other plugin. - log.error("plugin event hook failed", { error: err }) - } + void hook["event"]?.({ event: input as any }) } }), ), Effect.forkScoped, ) + // Register synchronous shutdown hooks for the top-level finally in + // src/index.ts. Runs before forceFlush so plugins can end any open + // OTel spans (e.g. bcode-laminar's turn span) — the bus-based + // session.idle / server.instance.disposed paths race with scope + // teardown and don't reliably deliver, so plugins need a direct sync + // entry point. + for (const hook of hooks) { + if (hook.shutdown) pluginShutdownHooks.add(hook.shutdown) + } + return { hooks } }), ) diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index 6156477be..576427ac9 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -222,6 +222,15 @@ export type AuthOuathResult = AuthOAuthResult export interface Hooks { event?: (input: { event: Event }) => Promise config?: (input: Config) => Promise + /** + * Synchronous shutdown hook invoked once per process before + * `process.exit()`, after the event loop has finished its last task and + * before the host's OTel span exporter drain. Use this to end any + * still-open OTel spans your plugin created — async work is not honored + * here, but ending a span (`span.end()`) is synchronous and the host's + * `forceFlush` runs right after this hook. + */ + shutdown?: () => void tool?: { [key: string]: ToolDefinition } From 495114440be3b14b71b1752fa7741c423d2045a9 Mon Sep 17 00:00:00 2001 From: bcode Date: Sat, 16 May 2026 21:52:11 -0700 Subject: [PATCH 2/2] fix(plugin): address cubic review on PR #75 Three fixes: 1. (src/index.ts) Wrap the dynamic 'await import(./plugin)' in the top-level finally with try/catch so a module-load failure cannot strand the process before forceFlush + process.exit(). 2. (plugin/index.ts) Re-add per-hook error isolation on the bus event dispatch loop. Reverting PR #74's await also accidentally removed this. Catches sync throws and observes async rejections via .catch(log.error) so one bad plugin can't terminate the subscription fiber for the rest of the process. 3. (plugin/index.ts) Deregister this layer's shutdown hooks from the module-level Set via Effect.addFinalizer so multi-instance TUI mode doesn't accumulate stale closures across project reopens. --- packages/opencode/src/index.ts | 18 +++++++++++------ packages/opencode/src/plugin/index.ts | 29 +++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 12c29fc93..c487778ef 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -255,13 +255,19 @@ try { // events race with Effect scope teardown and don't reliably reach plugin // subscribers in headless `bcode run` mode, so we expose a direct sync hook // (see packages/opencode/src/plugin/index.ts pluginShutdownHooks). - const { pluginShutdownHooks } = await import("./plugin") - for (const hook of pluginShutdownHooks) { - try { - hook() - } catch (err) { - Log.Default.error("plugin shutdown hook failed", { error: err }) + // The import is wrapped so a module-load failure can't strand the process + // before forceFlush + process.exit() below. + try { + const { pluginShutdownHooks } = await import("./plugin") + for (const hook of pluginShutdownHooks) { + try { + hook() + } catch (err) { + Log.Default.error("plugin shutdown hook failed", { error: err }) + } } + } catch (err) { + Log.Default.error("plugin shutdown import failed", { error: err }) } // Drain any registered OTel span processors (e.g. bcode-laminar) before // exiting so the just-ended turn spans actually hit the wire. Bounded with diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index ab548948a..004627806 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -250,12 +250,24 @@ export const layer = Layer.effect( }).pipe(Effect.ignore) } - // Subscribe to bus events, fiber interrupted when scope closes + // Subscribe to bus events, fiber interrupted when scope closes. + // Isolate per-hook failures (sync throw or async rejection) so one bad + // plugin can't kill the subscription fiber and silently disable every + // other plugin's event handler for the rest of the process. yield* bus.subscribeAll().pipe( Stream.runForEach((input) => Effect.sync(() => { for (const hook of hooks) { - void hook["event"]?.({ event: input as any }) + try { + const ret = hook["event"]?.({ event: input as any }) + if (ret) { + void Promise.resolve(ret).catch((err) => + log.error("plugin event hook failed", { error: err }), + ) + } + } catch (err) { + log.error("plugin event hook failed", { error: err }) + } } }), ), @@ -267,10 +279,19 @@ export const layer = Layer.effect( // OTel spans (e.g. bcode-laminar's turn span) — the bus-based // session.idle / server.instance.disposed paths race with scope // teardown and don't reliably deliver, so plugins need a direct sync - // entry point. + // entry point. Deregister on instance disposal so multi-instance TUI + // mode doesn't accumulate stale closures across reopens. + const registered: Array<() => void> = [] for (const hook of hooks) { - if (hook.shutdown) pluginShutdownHooks.add(hook.shutdown) + if (!hook.shutdown) continue + pluginShutdownHooks.add(hook.shutdown) + registered.push(hook.shutdown) } + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const fn of registered) pluginShutdownHooks.delete(fn) + }), + ) return { hooks } }),