Skip to content

Commit 9a3599c

Browse files
committed
feat: US-026 - Fix streaming stdin delivery from PTY to sandbox process
1 parent 4ccab1b commit 9a3599c

8 files changed

Lines changed: 364 additions & 25 deletions

File tree

native/v8-runtime/src/stream.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,22 @@
33
/// Dispatch a stream event into V8 by calling the registered callback function.
44
///
55
/// Stream events are sent by the host when async operations (child processes,
6-
/// HTTP servers) produce data. The event_type determines which V8 dispatch
7-
/// function is called:
8-
/// - "child_stdout", "child_stderr", "child_exit" → _childProcessDispatch
9-
/// - "http_request" → _httpServerDispatch
6+
/// HTTP servers, stdin) produce data. The event_type determines which V8
7+
/// dispatch function is called:
8+
/// - "child_stdout", "child_stderr", "child_exit", "childProcess" → _childProcessDispatch
9+
/// - "http_request", "httpServerRequest", "httpServerUpgrade", etc. → _httpServerDispatch
10+
/// - "stdin" → _stdinDispatch
11+
/// - "netSocket" → _netSocketDispatch
1012
pub fn dispatch_stream_event(scope: &mut v8::HandleScope, event_type: &str, payload: &[u8]) {
1113
// Look up the dispatch function on the global object
1214
let context = scope.get_current_context();
1315
let global = context.global(scope);
1416

1517
let dispatch_name = match event_type {
16-
"child_stdout" | "child_stderr" | "child_exit" => "_childProcessDispatch",
17-
"http_request" => "_httpServerDispatch",
18+
"child_stdout" | "child_stderr" | "child_exit" | "childProcess" => "_childProcessDispatch",
19+
"http_request" | "httpServerRequest" | "httpServerUpgrade" | "upgradeSocketData" | "upgradeSocketEnd" => "_httpServerDispatch",
20+
"stdin" => "_stdinDispatch",
21+
"netSocket" => "_netSocketDispatch",
1822
_ => return, // Unknown event type — ignore
1923
};
2024

packages/core/src/kernel/kernel.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,28 @@ class KernelImpl implements Kernel {
316316
}
317317
})();
318318

319+
// Start stdin pump: master write → PTY input buffer → slave read → writeStdin
320+
// Bridges shell.write() data to the runtime driver's streaming stdin path
321+
const stdinPumpPromise = (async () => {
322+
try {
323+
while (!pump.exited) {
324+
const data = await this.ptyManager.read(slaveDescId, 4096);
325+
if (!data || data.length === 0) break;
326+
proc.writeStdin(data);
327+
}
328+
} catch {
329+
// PTY closed — expected when shell exits
330+
}
331+
// Signal stdin EOF to the runtime driver
332+
try { proc.closeStdin(); } catch { /* already closed */ }
333+
})();
334+
319335
// wait() resolves after both shell exit AND pump drain
320336
const waitPromise = proc.wait().then(async (exitCode) => {
321337
pump.exited = true;
322-
// Wait for pump to finish delivering remaining data
338+
// Wait for pumps to finish delivering remaining data
323339
await pumpPromise;
340+
await stdinPumpPromise;
324341
// Clean up controller PID's FD table (incl. PTY master)
325342
this.cleanupProcessFDs(controllerPid);
326343
return exitCode;

packages/nodejs/src/bridge-contract.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export const HOST_BRIDGE_GLOBAL_KEYS = {
7878
resolveModuleSync: "_resolveModuleSync",
7979
loadFileSync: "_loadFileSync",
8080
ptySetRawMode: "_ptySetRawMode",
81+
stdinRead: "_stdinRead",
8182
processConfig: "_processConfig",
8283
osConfig: "_osConfig",
8384
log: "_log",

packages/nodejs/src/bridge-handlers.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,10 @@ export function resolveHttpServerResponse(serverId: number, responseJson: string
18021802
export interface PtyBridgeDeps {
18031803
onPtySetRawMode?: (mode: boolean) => void;
18041804
stdinIsTTY?: boolean;
1805+
/** Set by _stdinRead handler — call to deliver data to the pending read */
1806+
onStdinData?: (data: string) => void;
1807+
/** Set by _stdinRead handler — call to signal stdin EOF */
1808+
onStdinEnd?: () => void;
18051809
}
18061810

18071811
/** Build PTY bridge handlers. */
@@ -1815,6 +1819,41 @@ export function buildPtyBridgeHandlers(deps: PtyBridgeDeps): BridgeHandlers {
18151819
};
18161820
}
18171821

1822+
// Async bridge handler for streaming stdin reads
1823+
if (deps.stdinIsTTY) {
1824+
const stdinQueue: (string | null)[] = [];
1825+
let stdinReadResolve: ((data: string | null) => void) | null = null;
1826+
1827+
handlers[K.stdinRead] = (): Promise<string | null> => {
1828+
if (stdinQueue.length > 0) {
1829+
return Promise.resolve(stdinQueue.shift()!);
1830+
}
1831+
return new Promise<string | null>((resolve) => {
1832+
stdinReadResolve = resolve;
1833+
});
1834+
};
1835+
1836+
deps.onStdinData = (data: string) => {
1837+
if (stdinReadResolve) {
1838+
const resolve = stdinReadResolve;
1839+
stdinReadResolve = null;
1840+
resolve(data);
1841+
} else {
1842+
stdinQueue.push(data);
1843+
}
1844+
};
1845+
1846+
deps.onStdinEnd = () => {
1847+
if (stdinReadResolve) {
1848+
const resolve = stdinReadResolve;
1849+
stdinReadResolve = null;
1850+
resolve(null);
1851+
} else {
1852+
stdinQueue.push(null);
1853+
}
1854+
};
1855+
}
1856+
18181857
return handlers;
18191858
}
18201859

packages/nodejs/src/bridge/process.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { URL as WhatwgURL, URLSearchParams as WhatwgURLSearchParams } from "what
1212
// Use buffer package for spec-compliant Buffer implementation
1313
import { Buffer as BufferPolyfill } from "buffer";
1414
import type {
15+
BridgeApplyRef,
1516
CryptoRandomFillBridgeRef,
1617
CryptoRandomUuidBridgeRef,
1718
FsFacadeBridge,
@@ -56,6 +57,8 @@ declare const _log: ProcessLogBridgeRef;
5657
declare const _error: ProcessErrorBridgeRef;
5758
// Timer reference for actual delays using host's event loop
5859
declare const _scheduleTimer: ScheduleTimerBridgeRef | undefined;
60+
// Stdin streaming read — async bridge handler returning next chunk (null = EOF)
61+
declare const _stdinRead: BridgeApplyRef<[], string | null> | undefined;
5962
declare const _cryptoRandomFill: CryptoRandomFillBridgeRef | undefined;
6063
declare const _cryptoRandomUUID: CryptoRandomUuidBridgeRef | undefined;
6164
// Filesystem bridge for chdir validation
@@ -361,6 +364,9 @@ const _stderr: StdioWriteStream = {
361364
rows: 24,
362365
};
363366

367+
// Flag to prevent duplicate stdin read loops
368+
let _stdinKeepaliveActive = false;
369+
364370
// Stdin stream with data support
365371
// These are exposed as globals so they can be set after bridge initialization
366372
type StdinListener = (data?: unknown) => void;
@@ -418,6 +424,47 @@ function _emitStdinData(): void {
418424
}
419425
}
420426

427+
/**
428+
* Global dispatch handler for streaming stdin events from the host.
429+
* Called by the V8 sidecar when it receives a "stdin" stream event.
430+
* Pushes data into the stdin stream in real-time for PTY-backed processes.
431+
*/
432+
const stdinDispatch = (
433+
_eventType: string,
434+
payload: string | null,
435+
): void => {
436+
if (payload === null || payload === undefined) {
437+
// stdin end signal
438+
if (!getStdinEnded()) {
439+
setStdinEnded(true);
440+
const endListeners = [...(_stdinListeners["end"] || []), ...(_stdinOnceListeners["end"] || [])];
441+
_stdinOnceListeners["end"] = [];
442+
for (const listener of endListeners) {
443+
listener();
444+
}
445+
const closeListeners = [...(_stdinListeners["close"] || []), ...(_stdinOnceListeners["close"] || [])];
446+
_stdinOnceListeners["close"] = [];
447+
for (const listener of closeListeners) {
448+
listener();
449+
}
450+
}
451+
return;
452+
}
453+
454+
// Streaming data chunk — emit 'data' event if listeners are registered
455+
const dataListeners = [...(_stdinListeners["data"] || []), ...(_stdinOnceListeners["data"] || [])];
456+
_stdinOnceListeners["data"] = [];
457+
if (dataListeners.length > 0) {
458+
for (const listener of dataListeners) {
459+
listener(payload);
460+
}
461+
} else {
462+
// Buffer if no listeners yet — append to _stdinData for later read()
463+
setStdinDataValue(getStdinData() + payload);
464+
}
465+
};
466+
exposeCustomGlobal("_stdinDispatch", stdinDispatch);
467+
421468
// Stdin stream shape
422469
interface StdinStream {
423470
readable: boolean;
@@ -500,6 +547,26 @@ const _stdin: StdinStream = {
500547
this.paused = false;
501548
setStdinFlowMode(true);
502549
_emitStdinData();
550+
// Start streaming stdin read loop via _stdinRead bridge handler
551+
if (_getStdinIsTTY() && !_stdinKeepaliveActive && typeof _stdinRead !== "undefined") {
552+
_stdinKeepaliveActive = true;
553+
(async function readLoop() {
554+
try {
555+
while (true) {
556+
const chunk = await _stdinRead!.apply(undefined, [], { result: { promise: true } });
557+
if (chunk === null || chunk === undefined) {
558+
// EOF — dispatch end signal
559+
stdinDispatch("stdin", null);
560+
break;
561+
}
562+
stdinDispatch("stdin", chunk);
563+
}
564+
} catch {
565+
// Bridge error — session closing
566+
}
567+
_stdinKeepaliveActive = false;
568+
})();
569+
}
503570
return this;
504571
},
505572

packages/nodejs/src/execution-driver.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import {
5656
buildUpgradeSocketBridgeHandlers,
5757
buildModuleResolutionBridgeHandlers,
5858
buildPtyBridgeHandlers,
59+
type PtyBridgeDeps,
5960
createProcessConfigForExecution,
6061
resolveHttpServerResponse,
6162
} from "./bridge-handlers.js";
@@ -297,6 +298,10 @@ export class NodeExecutionDriver implements RuntimeDriver {
297298
private flattenedBindings: FlattenedBinding[] | null = null;
298299
// Unwrapped filesystem for path translation (toHostPath/toSandboxPath)
299300
private rawFilesystem: VirtualFileSystem | undefined;
301+
/** Callback invoked when V8 session is ready — used for streaming stdin */
302+
onStreamReady?: (sendStreamEvent: (eventType: string, payload: Uint8Array) => void) => void;
303+
/** Callback invoked when PTY stdin bridge handler is ready — delivers data to pending _stdinRead */
304+
onStdinReady?: (deliver: (data: string) => void, end: () => void) => void;
300305

301306
constructor(options: NodeExecutionDriverOptions) {
302307
this.memoryLimit = options.memoryLimit ?? 128;
@@ -446,6 +451,14 @@ export class NodeExecutionDriver implements RuntimeDriver {
446451
}
447452
};
448453

454+
// Notify kernel-runtime that streaming is ready (for PTY stdin)
455+
this.onStreamReady?.(sendStreamEvent);
456+
457+
// Build PTY bridge handlers ONCE — shared between dispatch and main handlers
458+
const ptyDeps: PtyBridgeDeps = { onPtySetRawMode: s.onPtySetRawMode, stdinIsTTY: s.processConfig.stdinIsTTY };
459+
const ptyHandlers = buildPtyBridgeHandlers(ptyDeps);
460+
if (ptyDeps.onStdinData) this.onStdinReady?.(ptyDeps.onStdinData, ptyDeps.onStdinEnd!);
461+
449462
const netSocketResult = buildNetworkSocketBridgeHandlers({
450463
dispatch: (socketId, event, data) => {
451464
const payload = JSON.stringify({ socketId, event, data });
@@ -488,10 +501,7 @@ export class NodeExecutionDriver implements RuntimeDriver {
488501
return typeof fs.toSandboxPath === "function" ? fs.toSandboxPath(p) : p;
489502
},
490503
}),
491-
...buildPtyBridgeHandlers({
492-
onPtySetRawMode: s.onPtySetRawMode,
493-
stdinIsTTY: s.processConfig.stdinIsTTY,
494-
}),
504+
...ptyHandlers,
495505
// Custom bindings dispatched through _loadPolyfill
496506
...(this.flattenedBindings ? Object.fromEntries(
497507
this.flattenedBindings.map(b => [b.key, b.handler])
@@ -543,10 +553,7 @@ export class NodeExecutionDriver implements RuntimeDriver {
543553
return typeof rfs?.toSandboxPath === "function" ? rfs.toSandboxPath(p) : p;
544554
},
545555
}),
546-
...buildPtyBridgeHandlers({
547-
onPtySetRawMode: s.onPtySetRawMode,
548-
stdinIsTTY: s.processConfig.stdinIsTTY,
549-
}),
556+
...ptyHandlers,
550557
};
551558

552559
// Merge custom bindings into bridge handlers

packages/nodejs/src/kernel-runtime.ts

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -353,28 +353,54 @@ class NodeRuntimeDriver implements RuntimeDriver {
353353
};
354354
});
355355

356-
// Stdin buffering — writeStdin collects data, closeStdin resolves the promise
356+
// Stdin plumbing — two modes:
357+
// 1. Batched (non-PTY): collect chunks, closeStdin concatenates and resolves promise
358+
// 2. Streaming (PTY): deliver each writeStdin chunk via _stdinRead bridge handler
359+
const isPty = ctx.stdinIsTTY ?? false;
357360
const stdinChunks: Uint8Array[] = [];
358361
let stdinResolve: ((data: string | undefined) => void) | null = null;
362+
// Callbacks set by _stdinRead bridge handler via onStdinReady
363+
let stdinDeliverFn: ((data: string) => void) | null = null;
364+
let stdinEndFn: (() => void) | null = null;
359365
const stdinPromise = new Promise<string | undefined>((resolve) => {
360366
stdinResolve = resolve;
361-
// Auto-resolve on next microtask if nobody calls writeStdin
362-
queueMicrotask(() => {
363-
if (stdinChunks.length === 0 && stdinResolve) {
364-
stdinResolve = null;
365-
resolve(undefined);
366-
}
367-
});
367+
if (isPty) {
368+
// PTY mode: resolve immediately with no initial stdin data
369+
stdinResolve = null;
370+
resolve(undefined);
371+
} else {
372+
// Non-PTY: auto-resolve on next microtask if nobody calls writeStdin
373+
queueMicrotask(() => {
374+
if (stdinChunks.length === 0 && stdinResolve) {
375+
stdinResolve = null;
376+
resolve(undefined);
377+
}
378+
});
379+
}
368380
});
369381

370382
const proc: DriverProcess = {
371383
onStdout: null,
372384
onStderr: null,
373385
onExit: null,
374386
writeStdin: (data: Uint8Array) => {
375-
stdinChunks.push(data);
387+
if (isPty && stdinDeliverFn) {
388+
// Streaming mode: deliver data to sandbox via _stdinRead bridge handler
389+
const text = new TextDecoder().decode(data);
390+
stdinDeliverFn(text);
391+
} else if (isPty) {
392+
// Bridge handler not ready yet — buffer for flush when handler connects
393+
stdinChunks.push(data);
394+
} else {
395+
// Non-PTY batched mode
396+
stdinChunks.push(data);
397+
}
376398
},
377399
closeStdin: () => {
400+
if (isPty && stdinEndFn) {
401+
stdinEndFn();
402+
return;
403+
}
378404
if (stdinResolve) {
379405
if (stdinChunks.length === 0) {
380406
// No data written — pass undefined (no stdin), not empty string
@@ -400,8 +426,20 @@ class NodeRuntimeDriver implements RuntimeDriver {
400426
wait: () => exitPromise,
401427
};
402428

429+
// Callback to wire up streaming stdin once _stdinRead bridge handler is ready
430+
const setStdinBridge = (deliver: (data: string) => void, end: () => void) => {
431+
stdinDeliverFn = deliver;
432+
stdinEndFn = end;
433+
// Flush any data that arrived before the bridge handler was ready
434+
for (const chunk of stdinChunks) {
435+
const text = new TextDecoder().decode(chunk);
436+
deliver(text);
437+
}
438+
stdinChunks.length = 0;
439+
};
440+
403441
// Launch async — spawn() returns synchronously per RuntimeDriver contract
404-
this._executeAsync(command, args, ctx, proc, resolveExit, stdinPromise);
442+
this._executeAsync(command, args, ctx, proc, resolveExit, stdinPromise, isPty ? setStdinBridge : undefined);
405443

406444
return proc;
407445
}
@@ -425,6 +463,7 @@ class NodeRuntimeDriver implements RuntimeDriver {
425463
proc: DriverProcess,
426464
resolveExit: (code: number) => void,
427465
stdinPromise: Promise<string | undefined>,
466+
setStdinBridge?: (deliver: (data: string) => void, end: () => void) => void,
428467
): Promise<void> {
429468
const kernel = this._kernel!;
430469

@@ -483,6 +522,12 @@ class NodeRuntimeDriver implements RuntimeDriver {
483522
bindings: this._bindings,
484523
onPtySetRawMode,
485524
});
525+
526+
// Wire streaming stdin for PTY processes via _stdinRead bridge handler
527+
if (setStdinBridge) {
528+
executionDriver.onStdinReady = setStdinBridge;
529+
}
530+
486531
this._activeDrivers.set(ctx.pid, executionDriver);
487532

488533
// Detect ESM files and use V8 native module system

0 commit comments

Comments
 (0)