Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [4.34.6]

- Add `keepAlive()` method to `StreamingTranscriber` — sends a `KeepAlive` message to reset the server's inactivity timer when `inactivityTimeout` is configured

## [4.33.0]

- Add streaming parameters to match the Python SDK:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "assemblyai",
"version": "4.34.6",
"version": "4.35.0",
"description": "The AssemblyAI JavaScript SDK provides an easy-to-use interface for interacting with the AssemblyAI API, which supports async and real-time transcription, as well as the latest LeMUR models.",
"engines": {
"node": ">=18"
Expand Down
178 changes: 167 additions & 11 deletions src/services/streaming/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import {
SpeakerRevisionEvent,
StreamingUpdateConfiguration,
StreamingForceEndpoint,
StreamingKeepAlive,
WarningEvent,
} from "../..";
import type { VadDetector, VadFrame } from "../../types/streaming/dual-channel";
import { EnergyVad } from "./energy-vad";
import { attributeTurn, rollUpTurnChannel, VadTimeline } from "./label-mapper";
import { StreamingError, StreamingErrorMessages } from "../../utils/errors";
import {
StreamingError,
StreamingErrorMessages,
StreamingErrorType,
} from "../../utils/errors";
import { StreamingErrorTypeCodes } from "../../utils/errors/streaming";

/**
Expand Down Expand Up @@ -58,6 +63,30 @@ function toInt16View(audio: AudioData): Int16Array {
const defaultStreamingUrl = "wss://streaming.assemblyai.com/v3/ws";
const terminateSessionMessage = `{"type":"Terminate"}`;

const DEFAULT_CONNECT_TIMEOUT_MS = 1000;
const DEFAULT_MAX_CONNECTION_RETRIES = 2;
const DEFAULT_CONNECTION_RETRY_DELAY_MS = 500;

/**
* Close/error codes that signal a permanent client-side problem (auth,
* billing, malformed config). A retry would hit the same failure, so the
* connection is never retried on these.
*/
const NON_RETRYABLE_CLOSE_CODES = new Set<number>([
StreamingErrorType.BadSampleRate,
StreamingErrorType.AuthFailed,
StreamingErrorType.InsufficientFunds,
StreamingErrorType.FreeTierUser,
StreamingErrorType.BadSchema,
]);

/** Error from a single connection attempt, tagged for retry handling. */
type ConnectionAttemptError = Error & { code?: number; retryable: boolean };

function isRetryableCloseCode(code: number): boolean {
return code !== 1000 && !NON_RETRYABLE_CLOSE_CODES.has(code);
}

/**
* Per-send chunk cap in milliseconds for the dual-channel mixer. The streaming
* server rejects audio messages longer than 1000 ms (`Input Duration Error`).
Expand Down Expand Up @@ -432,13 +461,90 @@ export class StreamingTranscriber {
this.listeners[event] = listener;
}

connect() {
return new Promise<BeginEvent>((resolve) => {
if (this.socket) {
throw new Error("Already connected");
/**
* Open the streaming session.
*
* Resolves with the server's `Begin` event once the handshake completes. A
* single attempt is bounded by `connectTimeout` (default 1000ms); transient
* failures (timeout, network drop, unexpected close) are retried up to
* `maxConnectionRetries` times (default 2), waiting `connectionRetryDelay`
* (default 500ms) between attempts. Permanent failures (auth, insufficient
* funds, malformed config) are not retried.
*
* Unlike previously, a failed connection now rejects this promise rather
* than only invoking the `error` listener — necessary for the caller (and
* the retry loop) to observe the failure.
*/
async connect(): Promise<BeginEvent> {
if (this.socket) {
throw new Error("Already connected");
}

const maxRetries =
this.params.maxConnectionRetries ?? DEFAULT_MAX_CONNECTION_RETRIES;
const retryDelay =
this.params.connectionRetryDelay ?? DEFAULT_CONNECTION_RETRY_DELAY_MS;

let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await this.connectOnce();
} catch (err) {
lastError = err as Error;
const retryable = (err as ConnectionAttemptError).retryable === true;
if (!retryable || attempt === maxRetries) {
throw err;
}
console.warn(
`Streaming connect attempt ${attempt + 1}/${maxRetries + 1} failed (${(err as Error).message}); retrying`,
);
if (retryDelay > 0) {
await new Promise((resolve) => setTimeout(resolve, retryDelay));
}
}
}
// The loop above always returns or throws; this only satisfies the type
// checker that a value is produced on every path.
throw lastError ?? new Error("Failed to connect to streaming server");
}

private connectOnce(): Promise<BeginEvent> {
return new Promise<BeginEvent>((resolve, reject) => {
const url = this.connectionUrl();
const timeoutMs =
this.params.connectTimeout ?? DEFAULT_CONNECT_TIMEOUT_MS;

// `settled` flips once this attempt has resolved (`Begin`) or rejected
// (timeout / pre-`Begin` close / error). Before it flips the socket
// handlers drive this promise; after it flips they revert to normal
// runtime dispatch (close / error / message listeners).
let settled = false;
let timer: ReturnType<typeof setTimeout> | undefined;

const failAttempt = (error: ConnectionAttemptError) => {
if (settled) return;
settled = true;
if (timer) clearTimeout(timer);
this.discardPendingSocket();
reject(error);
};

const succeed = (begin: BeginEvent) => {
if (settled) return;
settled = true;
if (timer) clearTimeout(timer);
resolve(begin);
};

if (timeoutMs > 0) {
timer = setTimeout(() => {
const err = new StreamingError(
`Streaming connection timed out after ${timeoutMs}ms`,
) as ConnectionAttemptError;
err.retryable = true;
failAttempt(err);
}, timeoutMs);
}

if (this.token) {
this.socket = polyfillWebSocketFactory(url.toString());
Expand All @@ -465,6 +571,17 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c
reason = StreamingErrorMessages[code as StreamingErrorTypeCodes];
}
}
// A close before `Begin` is a failed connection attempt — reject so
// connect() can retry (or surface a permanent failure).
if (!settled) {
const err = new StreamingError(
reason || `Streaming connection closed (code=${code})`,
) as ConnectionAttemptError;
err.code = code;
err.retryable = isRetryableCloseCode(code);
failAttempt(err);
return;
}
// Stop the flush timer when the socket is gone (server-initiated close,
// network drop, etc.) — otherwise subsequent ticks call send() on a
// closed socket and spam the error listener.
Expand All @@ -476,26 +593,42 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c
};

this.socket.onerror = (event: ErrorEvent) => {
if (event.error) this.listeners.error?.(event.error as Error);
else this.listeners.error?.(new Error(event.message));
const error = (event.error as Error) ?? new Error(event.message);
// A socket error before `Begin` is a failed attempt → reject/retry.
if (!settled) {
(error as ConnectionAttemptError).retryable = true;
failAttempt(error as ConnectionAttemptError);
return;
}
this.listeners.error?.(error);
};

this.socket.onmessage = ({ data }: MessageEvent) => {
const message = JSON.parse(data.toString()) as StreamingEventMessage;

if ("error" in message) {
const err = new StreamingError(message.error);
const err = new StreamingError(message.error) as StreamingError & {
code?: number;
};
if ("error_code" in message) {
(err as StreamingError & { code?: number }).code =
message.error_code;
err.code = message.error_code;
}
// A server error frame before `Begin` fails the attempt; the code
// decides whether a retry is worthwhile.
if (!settled) {
const attemptErr = err as ConnectionAttemptError;
attemptErr.retryable =
err.code === undefined ? true : isRetryableCloseCode(err.code);
failAttempt(attemptErr);
return;
}
this.listeners.error?.(err);
return;
}

switch (message.type) {
case "Begin": {
resolve(message);
succeed(message);
this.listeners.open?.(message);
break;
}
Expand Down Expand Up @@ -548,6 +681,18 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c
});
}

/** Tear down a half-open socket from a failed connection attempt. */
private discardPendingSocket(): void {
if (!this.socket) return;
try {
if (this.socket.removeAllListeners) this.socket.removeAllListeners();
this.socket.close();
} catch {
// Best-effort cleanup; a half-open socket may throw on close.
}
this.socket = undefined;
}

/**
* Returns a WritableStream that pumps PCM chunks into `sendAudio`. Single-channel
* only — in dual-channel mode use `sendAudio(pcm, { channel })` directly, since
Expand Down Expand Up @@ -829,6 +974,17 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c
this.send(JSON.stringify(message));
}

/**
* Reset the server's inactivity timer. Only needed when the session was
* created with `inactivityTimeout` and no audio is being sent.
*/
keepAlive() {
const message: StreamingKeepAlive = {
type: "KeepAlive",
};
this.send(JSON.stringify(message));
}

private send(data: BufferLike) {
if (!this.socket || this.socket.readyState !== this.socket.OPEN) {
throw new Error("Socket is not open for communication");
Expand Down
21 changes: 21 additions & 0 deletions src/types/streaming/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ export type StreamingTranscriberParams = {
websocketBaseUrl?: string;
apiKey?: string;
token?: string;
/**
* Milliseconds to wait for the streaming handshake (socket open + server
* `Begin`) before treating the attempt as failed. Defaults to 1000.
*/
connectTimeout?: number;
/**
* Number of additional connection attempts after the first one fails on a
* transient error (timeout, network drop, unexpected close). 0 disables
* retries. Permanent failures (auth, insufficient funds, malformed config)
* are never retried. Defaults to 2.
*/
maxConnectionRetries?: number;
/**
* Milliseconds to wait between connection attempts. Defaults to 500.
*/
connectionRetryDelay?: number;
sampleRate: number;
encoding?: AudioEncoding;
endOfTurnConfidenceThreshold?: number;
Expand Down Expand Up @@ -349,6 +365,10 @@ export type StreamingForceEndpoint = {
type: "ForceEndpoint";
};

export type StreamingKeepAlive = {
type: "KeepAlive";
};

export type ErrorEvent = {
type: "Error";
error_code?: number;
Expand Down Expand Up @@ -405,4 +425,5 @@ export type StreamingEventMessage =
export type StreamingOperationMessage =
| StreamingUpdateConfiguration
| StreamingForceEndpoint
| StreamingKeepAlive
| StreamingTerminateSession;
Loading
Loading