diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e21ef3..c679e77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [4.34.6] + +- Add `keepAlive()` method to `StreamingTranscriber` — sends a `KeepAlive` message to reset the server's inactivity timer when `inactivityTimeout` is configured + ## [4.33.0] - Add streaming parameters to match the Python SDK: diff --git a/package.json b/package.json index a706d87..1965353 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "assemblyai", - "version": "4.34.6", + "version": "4.35.0", "description": "The AssemblyAI JavaScript SDK provides an easy-to-use interface for interacting with the AssemblyAI API, which supports async and real-time transcription, as well as the latest LeMUR models.", "engines": { "node": ">=18" diff --git a/src/services/streaming/service.ts b/src/services/streaming/service.ts index 1bb5537..2fa74c3 100644 --- a/src/services/streaming/service.ts +++ b/src/services/streaming/service.ts @@ -18,12 +18,17 @@ import { SpeakerRevisionEvent, StreamingUpdateConfiguration, StreamingForceEndpoint, + StreamingKeepAlive, WarningEvent, } from "../.."; import type { VadDetector, VadFrame } from "../../types/streaming/dual-channel"; import { EnergyVad } from "./energy-vad"; import { attributeTurn, rollUpTurnChannel, VadTimeline } from "./label-mapper"; -import { StreamingError, StreamingErrorMessages } from "../../utils/errors"; +import { + StreamingError, + StreamingErrorMessages, + StreamingErrorType, +} from "../../utils/errors"; import { StreamingErrorTypeCodes } from "../../utils/errors/streaming"; /** @@ -58,6 +63,30 @@ function toInt16View(audio: AudioData): Int16Array { const defaultStreamingUrl = "wss://streaming.assemblyai.com/v3/ws"; const terminateSessionMessage = `{"type":"Terminate"}`; +const DEFAULT_CONNECT_TIMEOUT_MS = 1000; +const DEFAULT_MAX_CONNECTION_RETRIES = 2; +const DEFAULT_CONNECTION_RETRY_DELAY_MS = 500; + +/** + * Close/error codes that signal a permanent client-side problem (auth, + * billing, malformed config). A retry would hit the same failure, so the + * connection is never retried on these. + */ +const NON_RETRYABLE_CLOSE_CODES = new Set([ + StreamingErrorType.BadSampleRate, + StreamingErrorType.AuthFailed, + StreamingErrorType.InsufficientFunds, + StreamingErrorType.FreeTierUser, + StreamingErrorType.BadSchema, +]); + +/** Error from a single connection attempt, tagged for retry handling. */ +type ConnectionAttemptError = Error & { code?: number; retryable: boolean }; + +function isRetryableCloseCode(code: number): boolean { + return code !== 1000 && !NON_RETRYABLE_CLOSE_CODES.has(code); +} + /** * Per-send chunk cap in milliseconds for the dual-channel mixer. The streaming * server rejects audio messages longer than 1000 ms (`Input Duration Error`). @@ -432,13 +461,90 @@ export class StreamingTranscriber { this.listeners[event] = listener; } - connect() { - return new Promise((resolve) => { - if (this.socket) { - throw new Error("Already connected"); + /** + * Open the streaming session. + * + * Resolves with the server's `Begin` event once the handshake completes. A + * single attempt is bounded by `connectTimeout` (default 1000ms); transient + * failures (timeout, network drop, unexpected close) are retried up to + * `maxConnectionRetries` times (default 2), waiting `connectionRetryDelay` + * (default 500ms) between attempts. Permanent failures (auth, insufficient + * funds, malformed config) are not retried. + * + * Unlike previously, a failed connection now rejects this promise rather + * than only invoking the `error` listener — necessary for the caller (and + * the retry loop) to observe the failure. + */ + async connect(): Promise { + if (this.socket) { + throw new Error("Already connected"); + } + + const maxRetries = + this.params.maxConnectionRetries ?? DEFAULT_MAX_CONNECTION_RETRIES; + const retryDelay = + this.params.connectionRetryDelay ?? DEFAULT_CONNECTION_RETRY_DELAY_MS; + + let lastError: Error | undefined; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await this.connectOnce(); + } catch (err) { + lastError = err as Error; + const retryable = (err as ConnectionAttemptError).retryable === true; + if (!retryable || attempt === maxRetries) { + throw err; + } + console.warn( + `Streaming connect attempt ${attempt + 1}/${maxRetries + 1} failed (${(err as Error).message}); retrying`, + ); + if (retryDelay > 0) { + await new Promise((resolve) => setTimeout(resolve, retryDelay)); + } } + } + // The loop above always returns or throws; this only satisfies the type + // checker that a value is produced on every path. + throw lastError ?? new Error("Failed to connect to streaming server"); + } + private connectOnce(): Promise { + return new Promise((resolve, reject) => { const url = this.connectionUrl(); + const timeoutMs = + this.params.connectTimeout ?? DEFAULT_CONNECT_TIMEOUT_MS; + + // `settled` flips once this attempt has resolved (`Begin`) or rejected + // (timeout / pre-`Begin` close / error). Before it flips the socket + // handlers drive this promise; after it flips they revert to normal + // runtime dispatch (close / error / message listeners). + let settled = false; + let timer: ReturnType | undefined; + + const failAttempt = (error: ConnectionAttemptError) => { + if (settled) return; + settled = true; + if (timer) clearTimeout(timer); + this.discardPendingSocket(); + reject(error); + }; + + const succeed = (begin: BeginEvent) => { + if (settled) return; + settled = true; + if (timer) clearTimeout(timer); + resolve(begin); + }; + + if (timeoutMs > 0) { + timer = setTimeout(() => { + const err = new StreamingError( + `Streaming connection timed out after ${timeoutMs}ms`, + ) as ConnectionAttemptError; + err.retryable = true; + failAttempt(err); + }, timeoutMs); + } if (this.token) { this.socket = polyfillWebSocketFactory(url.toString()); @@ -465,6 +571,17 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c reason = StreamingErrorMessages[code as StreamingErrorTypeCodes]; } } + // A close before `Begin` is a failed connection attempt — reject so + // connect() can retry (or surface a permanent failure). + if (!settled) { + const err = new StreamingError( + reason || `Streaming connection closed (code=${code})`, + ) as ConnectionAttemptError; + err.code = code; + err.retryable = isRetryableCloseCode(code); + failAttempt(err); + return; + } // Stop the flush timer when the socket is gone (server-initiated close, // network drop, etc.) — otherwise subsequent ticks call send() on a // closed socket and spam the error listener. @@ -476,18 +593,34 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c }; this.socket.onerror = (event: ErrorEvent) => { - if (event.error) this.listeners.error?.(event.error as Error); - else this.listeners.error?.(new Error(event.message)); + const error = (event.error as Error) ?? new Error(event.message); + // A socket error before `Begin` is a failed attempt → reject/retry. + if (!settled) { + (error as ConnectionAttemptError).retryable = true; + failAttempt(error as ConnectionAttemptError); + return; + } + this.listeners.error?.(error); }; this.socket.onmessage = ({ data }: MessageEvent) => { const message = JSON.parse(data.toString()) as StreamingEventMessage; if ("error" in message) { - const err = new StreamingError(message.error); + const err = new StreamingError(message.error) as StreamingError & { + code?: number; + }; if ("error_code" in message) { - (err as StreamingError & { code?: number }).code = - message.error_code; + err.code = message.error_code; + } + // A server error frame before `Begin` fails the attempt; the code + // decides whether a retry is worthwhile. + if (!settled) { + const attemptErr = err as ConnectionAttemptError; + attemptErr.retryable = + err.code === undefined ? true : isRetryableCloseCode(err.code); + failAttempt(attemptErr); + return; } this.listeners.error?.(err); return; @@ -495,7 +628,7 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c switch (message.type) { case "Begin": { - resolve(message); + succeed(message); this.listeners.open?.(message); break; } @@ -548,6 +681,18 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c }); } + /** Tear down a half-open socket from a failed connection attempt. */ + private discardPendingSocket(): void { + if (!this.socket) return; + try { + if (this.socket.removeAllListeners) this.socket.removeAllListeners(); + this.socket.close(); + } catch { + // Best-effort cleanup; a half-open socket may throw on close. + } + this.socket = undefined; + } + /** * Returns a WritableStream that pumps PCM chunks into `sendAudio`. Single-channel * only — in dual-channel mode use `sendAudio(pcm, { channel })` directly, since @@ -829,6 +974,17 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c this.send(JSON.stringify(message)); } + /** + * Reset the server's inactivity timer. Only needed when the session was + * created with `inactivityTimeout` and no audio is being sent. + */ + keepAlive() { + const message: StreamingKeepAlive = { + type: "KeepAlive", + }; + this.send(JSON.stringify(message)); + } + private send(data: BufferLike) { if (!this.socket || this.socket.readyState !== this.socket.OPEN) { throw new Error("Socket is not open for communication"); diff --git a/src/types/streaming/index.ts b/src/types/streaming/index.ts index fc2a5aa..5354e7b 100644 --- a/src/types/streaming/index.ts +++ b/src/types/streaming/index.ts @@ -76,6 +76,22 @@ export type StreamingTranscriberParams = { websocketBaseUrl?: string; apiKey?: string; token?: string; + /** + * Milliseconds to wait for the streaming handshake (socket open + server + * `Begin`) before treating the attempt as failed. Defaults to 1000. + */ + connectTimeout?: number; + /** + * Number of additional connection attempts after the first one fails on a + * transient error (timeout, network drop, unexpected close). 0 disables + * retries. Permanent failures (auth, insufficient funds, malformed config) + * are never retried. Defaults to 2. + */ + maxConnectionRetries?: number; + /** + * Milliseconds to wait between connection attempts. Defaults to 500. + */ + connectionRetryDelay?: number; sampleRate: number; encoding?: AudioEncoding; endOfTurnConfidenceThreshold?: number; @@ -349,6 +365,10 @@ export type StreamingForceEndpoint = { type: "ForceEndpoint"; }; +export type StreamingKeepAlive = { + type: "KeepAlive"; +}; + export type ErrorEvent = { type: "Error"; error_code?: number; @@ -405,4 +425,5 @@ export type StreamingEventMessage = export type StreamingOperationMessage = | StreamingUpdateConfiguration | StreamingForceEndpoint + | StreamingKeepAlive | StreamingTerminateSession; diff --git a/tests/unit/streaming.test.ts b/tests/unit/streaming.test.ts index 1ad0bf4..18cff66 100644 --- a/tests/unit/streaming.test.ts +++ b/tests/unit/streaming.test.ts @@ -59,6 +59,23 @@ describe("streaming", () => { WS.clean(); } + // Leave the shared `rt`/`server` in a connected state so the trailing + // afterEach `cleanup()` (which expects a live session) succeeds after a + // test that deliberately drove `rt` into a failed/closed connection. + async function reestablish() { + WS.clean(); + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + apiKey: "123", + sampleRate: 16_000, + speechModel: "universal-streaming-english", + }); + onOpen = jest.fn(); + rt.on("open", onOpen); + await connect(rt, server); + } + it("noop", async () => {}); it("should include speaker_labels in connection URL", async () => { @@ -214,6 +231,15 @@ describe("streaming", () => { ); }); + it("should send KeepAlive message on keepAlive()", async () => { + rt.keepAlive(); + await expect(server).toReceiveMessage( + JSON.stringify({ + type: "KeepAlive", + }), + ); + }); + it("should include agent_context in connection URL", async () => { await cleanup(); WS.clean(); @@ -486,4 +512,108 @@ describe("streaming", () => { expect(event.revisions[1].speaker_label).toBe("A"); expect(event.revisions[1].words).toEqual([]); }); + + it("rejects when the handshake times out (retries disabled)", async () => { + await cleanup(); + WS.clean(); + server = new WS(websocketBaseUrl); + + // Connect but never send `Begin`; the attempt should time out and reject. + const failing = new StreamingTranscriber({ + websocketBaseUrl, + token: "123", + sampleRate: 16_000, + speechModel: "universal-streaming-english", + connectTimeout: 30, + maxConnectionRetries: 0, + }); + await expect(failing.connect()).rejects.toThrow(/timed out/i); + + await reestablish(); + }); + + it("does not retry a permanent auth failure", async () => { + await cleanup(); + WS.clean(); + server = new WS(websocketBaseUrl); + + const failing = new StreamingTranscriber({ + websocketBaseUrl, + token: "123", + sampleRate: 16_000, + speechModel: "universal-streaming-english", + connectTimeout: 1000, + maxConnectionRetries: 5, + connectionRetryDelay: 0, + }); + const connectPromise = failing.connect(); + await server.connected; + // Server rejects the handshake with an auth close before `Begin`. + server.close({ + code: 4001, + reason: null as unknown as string, + wasClean: false, + }); + + // Rejects with the auth error (code 4001) rather than retrying until a + // timeout — proving the permanent-failure short-circuit. + await expect(connectPromise).rejects.toMatchObject({ code: 4001 }); + + await reestablish(); + }); + + it("retries a transient handshake failure and then succeeds", async () => { + await cleanup(); + WS.clean(); + server = new WS(websocketBaseUrl); + + // Drive the underlying mock-socket server per-connection: close the first + // connection with a transient code (no `Begin`), then accept the retry by + // sending `Begin`. Deterministic — no reliance on retry/connection timing. + let connectionCount = 0; + const mockServer = ( + server as unknown as { + server: { + on: ( + event: "connection", + cb: (socket: { + close: (options: { + code: number; + reason: string; + wasClean: boolean; + }) => void; + send: (data: string) => void; + }) => void, + ) => void; + }; + } + ).server; + mockServer.on("connection", (socket) => { + connectionCount++; + if (connectionCount === 1) { + socket.close({ code: 1011, reason: "transient", wasClean: false }); + } else { + socket.send(JSON.stringify(sessionBeginsMessage)); + } + }); + + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "123", + sampleRate: 16_000, + speechModel: "universal-streaming-english", + connectTimeout: 1000, + maxConnectionRetries: 2, + connectionRetryDelay: 0, + }); + onOpen = jest.fn(); + rt.on("open", onOpen); + + const begin = await rt.connect(); + + expect(begin.type).toBe("Begin"); + expect(connectionCount).toBeGreaterThanOrEqual(2); + expect(onOpen).toHaveBeenCalled(); + // Leaves rt/server connected for the shared afterEach cleanup. + }); });