Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/commands/cloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
EiOSVersions,
} from '../types/domain/device.types';
import { resolveAuth } from '../utils/auth';
import { isCI } from '../utils/ci';
import {
CliError,
coerceArray,
Expand Down Expand Up @@ -324,6 +325,16 @@ export const cloudCommand = defineCommand({

const auth = await resolveAuth({ apiKeyFlag });

// Nudge interactive api-key users toward `dcd login`, which unlocks live
// (realtime) status updates. Suppressed in CI and non-interactive output.
if (auth.mode === 'apiKey' && !json && !quiet && !isCI()) {
out(
colors.dim(
'Tip: run `dcd login` for live test updates and a smoother experience than passing --api-key.',
),
);
}

let compatibilityData: CompatibilityData;
try {
compatibilityData = await fetchCompatibilityData(apiUrl, auth);
Expand Down
130 changes: 130 additions & 0 deletions src/gateways/realtime-gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Supabase Realtime subscription to test-result status changes.
*
* This is a *latency optimisation* layered on top of HTTP polling, not a
* replacement for it. A logged-in (bearer) user's JWT authorises a
* `postgres_changes` subscription on the `results` table (RLS scopes rows to
* the user's org via `app_metadata.org_ids`). Each relevant change fires
* `onChange`, which the polling loop uses to fetch immediately instead of
* waiting out the full backstop interval.
*
* We deliberately only read the `new` row's `test_upload_id` — Postgres always
* ships the full new tuple on INSERT/UPDATE regardless of REPLICA IDENTITY, so
* no DB change is required. Mirrors the frontend pattern in
* dcd/frontend/app/stores/Results.store.ts.
*/
import {
createClient,
REALTIME_SUBSCRIBE_STATES,
type SupabaseClient,
} from '@supabase/supabase-js';

import { ENVIRONMENTS, type DcdEnvName } from '../config/environments';

export interface RealtimeResultsSubscription {
/** Tear down the channel and close the socket. Best-effort, never throws. */
unsubscribe(): Promise<void>;
}

export interface RealtimeSubscribeOptions {
/** Supabase JWT (AuthContext.accessToken) — authorises the socket for RLS. */
accessToken: string;
debug?: boolean;
env: DcdEnvName;
/** Stderr-safe logger; stdout is reserved by some callers (MCP). */
log?: (message: string) => void;
/** Fired when a result row for this upload changes. */
onChange: () => void;
orgId: string;
uploadId: string;
}

/**
* The `new` record we care about from a `results` row change. Loosely typed —
* we only ever read `test_upload_id`.
*/
interface ResultChangePayload {
new?: { test_upload_id?: string } | null;
}

export class RealtimeResultsGateway {
/**
* Open a realtime subscription to `results` changes for the given upload.
* Construction never throws — on any failure the caller simply keeps polling.
*/
static subscribe(
options: RealtimeSubscribeOptions,
): RealtimeResultsSubscription {
const { accessToken, debug, env, log, onChange, orgId, uploadId } = options;
const dbg = (message: string) => {
if (debug && log) log(`[DEBUG] [realtime] ${message}`);
};

let client: SupabaseClient | undefined;
try {
const { url, anonKey } = ENVIRONMENTS[env].supabase;
client = createClient(url, anonKey, {
// Match SupabaseClientGateway / the frontend; no session persistence —
// we set the token explicitly below.
auth: { autoRefreshToken: false, persistSession: false },
realtime: { params: { eventsPerSecond: 10 } },
});

// Attach the user's JWT to the socket so RLS is enforced on the channel.
client.realtime.setAuth(accessToken);

const channel = client
.channel(`results-cli-${uploadId}`)
.on(
// The typings don't cover the postgres_changes overload cleanly.
'postgres_changes' as never,
{
event: '*',
schema: 'public',
table: 'results',
filter: `org_id=eq.${orgId}`,
} as never,
(payload: ResultChangePayload) => {
if (payload.new?.test_upload_id === uploadId) {
dbg(`change for upload ${uploadId}`);
onChange();
}
},
)
.subscribe((status) => {
if (status === REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) {
dbg('subscribed');
} else if (
status === REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR ||
status === REALTIME_SUBSCRIBE_STATES.TIMED_OUT ||
status === REALTIME_SUBSCRIBE_STATES.CLOSED
) {
// Don't try to recover — the backstop poll covers us. Surface in
// debug so a network-blocked websocket is diagnosable.
dbg(`channel ${status}; relying on backstop poll`);
}
});

const activeClient = client;
return {
async unsubscribe() {
try {
await activeClient.removeChannel(channel);
await activeClient.realtime.disconnect();
} catch {
/* best effort — process is exiting anyway */
}
},
};
} catch (error) {
dbg(`failed to subscribe: ${error instanceof Error ? error.message : String(error)}`);
// Best-effort cleanup of a half-built client, then degrade to polling.
try {
client?.realtime.disconnect();
} catch {
/* ignore */
}
return { async unsubscribe() {} };
}
}
}
192 changes: 134 additions & 58 deletions src/services/results-polling.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import * as path from 'node:path';

import { ApiGateway } from '../gateways/api-gateway';
import {
RealtimeResultsGateway,
type RealtimeResultsSubscription,
} from '../gateways/realtime-gateway';
import { formatDurationSeconds } from '../methods';
import type { AuthContext } from '../types/domain/auth.types';
import { paths } from '../types/generated/schema.types';
Expand Down Expand Up @@ -65,12 +69,17 @@ export interface PollingResult {
*/
export class ResultsPollingService {
// The run keeps executing in the cloud regardless of whether the CLI can
// poll, so tolerate a long stretch of transient API/network blips (~5 min at
// the 10s base interval) before giving up. Losing a run to a brief hiccup is
// far more costly than waiting a bit longer.
// poll, so tolerate a long stretch of transient API/network blips before
// giving up. Losing a run to a brief hiccup is far more costly than waiting a
// bit longer.
private readonly MAX_SEQUENTIAL_FAILURES = 30;
private readonly POLL_INTERVAL_MS = 10_000;
// Cap for the backoff applied between failed polls.
// Backstop poll cadence. Logged-in (bearer) users also get realtime pushes
// (see RealtimeResultsGateway), so they only need an occasional reconciling
// poll; api-key users have no realtime and rely on the faster interval.
private readonly BEARER_POLL_INTERVAL_MS = 60_000;
private readonly APIKEY_POLL_INTERVAL_MS = 20_000;
// Base unit for the backoff applied between *failed* polls, and its cap.
private readonly ERROR_BACKOFF_BASE_MS = 10_000;
private readonly MAX_ERROR_BACKOFF_MS = 30_000;

/**
Expand Down Expand Up @@ -107,70 +116,137 @@ export class ResultsPollingService {
let sequentialPollFailures = 0;
let previousSummary = '';

const pollIntervalMs =
auth.mode === 'bearer'
? this.BEARER_POLL_INTERVAL_MS
: this.APIKEY_POLL_INTERVAL_MS;

// "Poke" mechanism: a realtime change resolves the current inter-poll wait
// early. If a poke lands while we're mid-fetch (not waiting) it's latched
// and consumed by the next wait, so events are never silently dropped.
let resolveWake: (() => void) | null = null;
let pendingPoke = false;
const poke = () => {
if (resolveWake) {
const r = resolveWake;
resolveWake = null;
r();
} else {
pendingPoke = true;
}
};
const waitForNextPoll = (ms: number): Promise<void> => {
if (pendingPoke) {
pendingPoke = false;
return Promise.resolve();
}
return new Promise<void>((resolve) => {
const timer = setTimeout(() => {
resolveWake = null;
resolve();
}, ms);
resolveWake = () => {
clearTimeout(timer);
resolve();
};
});
};

// Realtime is a latency optimisation over the backstop poll; only logged-in
// (bearer) users can authenticate the socket under RLS. Any failure inside
// the gateway degrades silently to pure polling.
let subscription: RealtimeResultsSubscription | undefined;
if (auth.mode === 'bearer' && auth.accessToken && auth.orgId && auth.env) {
subscription = RealtimeResultsGateway.subscribe({
accessToken: auth.accessToken,
debug,
env: auth.env,
log: logger,
onChange: poke,
orgId: auth.orgId,
uploadId,
});
if (debug && logger) {
logger(
`[DEBUG] Realtime enabled; backstop poll every ${pollIntervalMs / 1000}s`,
);
}
}

if (debug && logger) {
logger(`[DEBUG] Starting polling loop for results`);
}

// Poll in a loop until all tests complete
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const updatedResults = await this.fetchAndLogResults(apiUrl, auth, uploadId, debug, logger);

const { summary } = this.calculateStatusSummary(updatedResults);
previousSummary = this.updateDisplayStatus(
updatedResults,
quiet,
json,
summary,
previousSummary,
);
try {
// Poll in a loop until all tests complete
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const updatedResults = await this.fetchAndLogResults(apiUrl, auth, uploadId, debug, logger);

const { summary } = this.calculateStatusSummary(updatedResults);
previousSummary = this.updateDisplayStatus(
updatedResults,
quiet,
json,
summary,
previousSummary,
);

const allComplete = updatedResults.every(
(result) => !['PENDING', 'QUEUED', 'RUNNING'].includes(result.status),
);

if (allComplete) {
return await this.handleCompletedTests(updatedResults, {
consoleUrl,
debug,
json,
logger,
testMetadata,
uploadId,
});
}

const allComplete = updatedResults.every(
(result) => !['PENDING', 'QUEUED', 'RUNNING'].includes(result.status),
);
// Reset failure counter on successful poll
sequentialPollFailures = 0;

// Wait for the next backstop poll, or a realtime poke, whichever comes
// first.
await waitForNextPoll(pollIntervalMs);
} catch (error) {
// Re-throw RunFailedError immediately (test failures, not polling errors)
if (error instanceof RunFailedError) {
throw error;
}

sequentialPollFailures++;

if (allComplete) {
return await this.handleCompletedTests(updatedResults, {
consoleUrl,
// Handle polling errors (network issues, etc.)
await this.handlePollingError(
error,
sequentialPollFailures,
debug,
json,
logger,
testMetadata,
uploadId,
});
);

// Back off (capped) before retrying so a flaky API gets some breathing
// room instead of being hammered on every failure.
await this.sleep(
Math.min(
this.ERROR_BACKOFF_BASE_MS * sequentialPollFailures,
this.MAX_ERROR_BACKOFF_MS,
),
);
}

// Reset failure counter on successful poll
sequentialPollFailures = 0;

// Wait before next poll
await this.sleep(this.POLL_INTERVAL_MS);
} catch (error) {
// Re-throw RunFailedError immediately (test failures, not polling errors)
if (error instanceof RunFailedError) {
throw error;
}
} finally {
if (subscription) {
if (debug && logger) {
logger('[DEBUG] Closing realtime subscription');
}

sequentialPollFailures++;

// Handle polling errors (network issues, etc.)
await this.handlePollingError(
error,
sequentialPollFailures,
debug,
logger,
uploadId,
);

// Back off (capped) before retrying so a flaky API gets some breathing
// room instead of being hammered every 10s.
await this.sleep(
Math.min(
this.POLL_INTERVAL_MS * sequentialPollFailures,
this.MAX_ERROR_BACKOFF_MS,
),
);
await subscription.unsubscribe();
}
}
}
Expand Down
Loading
Loading