From 74d76ce41ad097425c6943bf8e11bd21774be7e7 Mon Sep 17 00:00:00 2001 From: Tom Riglar Date: Mon, 22 Jun 2026 13:43:35 +0100 Subject: [PATCH] feat: realtime test status with polling backstop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Logged-in (bearer) users now get near-instant run status via a Supabase Realtime subscription on the `results` table, layered over the existing poll loop as a latency optimisation rather than a rewrite: a realtime change "pokes" the loop to fetch immediately, reusing all existing render/terminal/artifact logic. - AuthContext exposes accessToken + env for bearer sessions so realtime can authenticate the socket under RLS (no DB migration needed — we only read the `new` row). - RealtimeResultsGateway: subscribes to postgres_changes on `results` filtered by org_id; any failure degrades silently to pure polling. - Backstop cadence: 60s for bearer (realtime carries the fast path), 20s for api-key users (no realtime). Inter-poll wait is interruptible and latches pokes that land mid-fetch so events are never dropped. - api-key users get a dim `dcd login` tip, suppressed in CI / json / quiet. - New isCI() helper + unit tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/commands/cloud.ts | 11 ++ src/gateways/realtime-gateway.ts | 130 ++++++++++++++++ src/services/results-polling.service.ts | 192 +++++++++++++++++------- src/types/domain/auth.types.ts | 9 ++ src/utils/auth.ts | 2 + src/utils/ci.ts | 42 ++++++ test/unit/ci.test.ts | 72 +++++++++ 7 files changed, 400 insertions(+), 58 deletions(-) create mode 100644 src/gateways/realtime-gateway.ts create mode 100644 src/utils/ci.ts create mode 100644 test/unit/ci.test.ts diff --git a/src/commands/cloud.ts b/src/commands/cloud.ts index 262b0bc..c3f452c 100644 --- a/src/commands/cloud.ts +++ b/src/commands/cloud.ts @@ -27,6 +27,7 @@ import { EiOSVersions, } from '../types/domain/device.types'; import { resolveAuth } from '../utils/auth'; +import { isCI } from '../utils/ci'; import { CliError, coerceArray, @@ -324,6 +325,16 @@ export const cloudCommand = defineCommand({ const auth = await resolveAuth({ apiKeyFlag }); + // Nudge interactive api-key users toward `dcd login`, which unlocks live + // (realtime) status updates. Suppressed in CI and non-interactive output. + if (auth.mode === 'apiKey' && !json && !quiet && !isCI()) { + out( + colors.dim( + 'Tip: run `dcd login` for live test updates and a smoother experience than passing --api-key.', + ), + ); + } + let compatibilityData: CompatibilityData; try { compatibilityData = await fetchCompatibilityData(apiUrl, auth); diff --git a/src/gateways/realtime-gateway.ts b/src/gateways/realtime-gateway.ts new file mode 100644 index 0000000..ddc82df --- /dev/null +++ b/src/gateways/realtime-gateway.ts @@ -0,0 +1,130 @@ +/** + * Supabase Realtime subscription to test-result status changes. + * + * This is a *latency optimisation* layered on top of HTTP polling, not a + * replacement for it. A logged-in (bearer) user's JWT authorises a + * `postgres_changes` subscription on the `results` table (RLS scopes rows to + * the user's org via `app_metadata.org_ids`). Each relevant change fires + * `onChange`, which the polling loop uses to fetch immediately instead of + * waiting out the full backstop interval. + * + * We deliberately only read the `new` row's `test_upload_id` — Postgres always + * ships the full new tuple on INSERT/UPDATE regardless of REPLICA IDENTITY, so + * no DB change is required. Mirrors the frontend pattern in + * dcd/frontend/app/stores/Results.store.ts. + */ +import { + createClient, + REALTIME_SUBSCRIBE_STATES, + type SupabaseClient, +} from '@supabase/supabase-js'; + +import { ENVIRONMENTS, type DcdEnvName } from '../config/environments'; + +export interface RealtimeResultsSubscription { + /** Tear down the channel and close the socket. Best-effort, never throws. */ + unsubscribe(): Promise; +} + +export interface RealtimeSubscribeOptions { + /** Supabase JWT (AuthContext.accessToken) — authorises the socket for RLS. */ + accessToken: string; + debug?: boolean; + env: DcdEnvName; + /** Stderr-safe logger; stdout is reserved by some callers (MCP). */ + log?: (message: string) => void; + /** Fired when a result row for this upload changes. */ + onChange: () => void; + orgId: string; + uploadId: string; +} + +/** + * The `new` record we care about from a `results` row change. Loosely typed — + * we only ever read `test_upload_id`. + */ +interface ResultChangePayload { + new?: { test_upload_id?: string } | null; +} + +export class RealtimeResultsGateway { + /** + * Open a realtime subscription to `results` changes for the given upload. + * Construction never throws — on any failure the caller simply keeps polling. + */ + static subscribe( + options: RealtimeSubscribeOptions, + ): RealtimeResultsSubscription { + const { accessToken, debug, env, log, onChange, orgId, uploadId } = options; + const dbg = (message: string) => { + if (debug && log) log(`[DEBUG] [realtime] ${message}`); + }; + + let client: SupabaseClient | undefined; + try { + const { url, anonKey } = ENVIRONMENTS[env].supabase; + client = createClient(url, anonKey, { + // Match SupabaseClientGateway / the frontend; no session persistence — + // we set the token explicitly below. + auth: { autoRefreshToken: false, persistSession: false }, + realtime: { params: { eventsPerSecond: 10 } }, + }); + + // Attach the user's JWT to the socket so RLS is enforced on the channel. + client.realtime.setAuth(accessToken); + + const channel = client + .channel(`results-cli-${uploadId}`) + .on( + // The typings don't cover the postgres_changes overload cleanly. + 'postgres_changes' as never, + { + event: '*', + schema: 'public', + table: 'results', + filter: `org_id=eq.${orgId}`, + } as never, + (payload: ResultChangePayload) => { + if (payload.new?.test_upload_id === uploadId) { + dbg(`change for upload ${uploadId}`); + onChange(); + } + }, + ) + .subscribe((status) => { + if (status === REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) { + dbg('subscribed'); + } else if ( + status === REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR || + status === REALTIME_SUBSCRIBE_STATES.TIMED_OUT || + status === REALTIME_SUBSCRIBE_STATES.CLOSED + ) { + // Don't try to recover — the backstop poll covers us. Surface in + // debug so a network-blocked websocket is diagnosable. + dbg(`channel ${status}; relying on backstop poll`); + } + }); + + const activeClient = client; + return { + async unsubscribe() { + try { + await activeClient.removeChannel(channel); + await activeClient.realtime.disconnect(); + } catch { + /* best effort — process is exiting anyway */ + } + }, + }; + } catch (error) { + dbg(`failed to subscribe: ${error instanceof Error ? error.message : String(error)}`); + // Best-effort cleanup of a half-built client, then degrade to polling. + try { + client?.realtime.disconnect(); + } catch { + /* ignore */ + } + return { async unsubscribe() {} }; + } + } +} diff --git a/src/services/results-polling.service.ts b/src/services/results-polling.service.ts index d1fe2af..e54d3d1 100644 --- a/src/services/results-polling.service.ts +++ b/src/services/results-polling.service.ts @@ -1,6 +1,10 @@ import * as path from 'node:path'; import { ApiGateway } from '../gateways/api-gateway'; +import { + RealtimeResultsGateway, + type RealtimeResultsSubscription, +} from '../gateways/realtime-gateway'; import { formatDurationSeconds } from '../methods'; import type { AuthContext } from '../types/domain/auth.types'; import { paths } from '../types/generated/schema.types'; @@ -65,12 +69,17 @@ export interface PollingResult { */ export class ResultsPollingService { // The run keeps executing in the cloud regardless of whether the CLI can - // poll, so tolerate a long stretch of transient API/network blips (~5 min at - // the 10s base interval) before giving up. Losing a run to a brief hiccup is - // far more costly than waiting a bit longer. + // poll, so tolerate a long stretch of transient API/network blips before + // giving up. Losing a run to a brief hiccup is far more costly than waiting a + // bit longer. private readonly MAX_SEQUENTIAL_FAILURES = 30; - private readonly POLL_INTERVAL_MS = 10_000; - // Cap for the backoff applied between failed polls. + // Backstop poll cadence. Logged-in (bearer) users also get realtime pushes + // (see RealtimeResultsGateway), so they only need an occasional reconciling + // poll; api-key users have no realtime and rely on the faster interval. + private readonly BEARER_POLL_INTERVAL_MS = 60_000; + private readonly APIKEY_POLL_INTERVAL_MS = 20_000; + // Base unit for the backoff applied between *failed* polls, and its cap. + private readonly ERROR_BACKOFF_BASE_MS = 10_000; private readonly MAX_ERROR_BACKOFF_MS = 30_000; /** @@ -107,70 +116,137 @@ export class ResultsPollingService { let sequentialPollFailures = 0; let previousSummary = ''; + const pollIntervalMs = + auth.mode === 'bearer' + ? this.BEARER_POLL_INTERVAL_MS + : this.APIKEY_POLL_INTERVAL_MS; + + // "Poke" mechanism: a realtime change resolves the current inter-poll wait + // early. If a poke lands while we're mid-fetch (not waiting) it's latched + // and consumed by the next wait, so events are never silently dropped. + let resolveWake: (() => void) | null = null; + let pendingPoke = false; + const poke = () => { + if (resolveWake) { + const r = resolveWake; + resolveWake = null; + r(); + } else { + pendingPoke = true; + } + }; + const waitForNextPoll = (ms: number): Promise => { + if (pendingPoke) { + pendingPoke = false; + return Promise.resolve(); + } + return new Promise((resolve) => { + const timer = setTimeout(() => { + resolveWake = null; + resolve(); + }, ms); + resolveWake = () => { + clearTimeout(timer); + resolve(); + }; + }); + }; + + // Realtime is a latency optimisation over the backstop poll; only logged-in + // (bearer) users can authenticate the socket under RLS. Any failure inside + // the gateway degrades silently to pure polling. + let subscription: RealtimeResultsSubscription | undefined; + if (auth.mode === 'bearer' && auth.accessToken && auth.orgId && auth.env) { + subscription = RealtimeResultsGateway.subscribe({ + accessToken: auth.accessToken, + debug, + env: auth.env, + log: logger, + onChange: poke, + orgId: auth.orgId, + uploadId, + }); + if (debug && logger) { + logger( + `[DEBUG] Realtime enabled; backstop poll every ${pollIntervalMs / 1000}s`, + ); + } + } + if (debug && logger) { logger(`[DEBUG] Starting polling loop for results`); } - // Poll in a loop until all tests complete - // eslint-disable-next-line no-constant-condition - while (true) { - try { - const updatedResults = await this.fetchAndLogResults(apiUrl, auth, uploadId, debug, logger); - - const { summary } = this.calculateStatusSummary(updatedResults); - previousSummary = this.updateDisplayStatus( - updatedResults, - quiet, - json, - summary, - previousSummary, - ); + try { + // Poll in a loop until all tests complete + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const updatedResults = await this.fetchAndLogResults(apiUrl, auth, uploadId, debug, logger); + + const { summary } = this.calculateStatusSummary(updatedResults); + previousSummary = this.updateDisplayStatus( + updatedResults, + quiet, + json, + summary, + previousSummary, + ); + + const allComplete = updatedResults.every( + (result) => !['PENDING', 'QUEUED', 'RUNNING'].includes(result.status), + ); + + if (allComplete) { + return await this.handleCompletedTests(updatedResults, { + consoleUrl, + debug, + json, + logger, + testMetadata, + uploadId, + }); + } - const allComplete = updatedResults.every( - (result) => !['PENDING', 'QUEUED', 'RUNNING'].includes(result.status), - ); + // Reset failure counter on successful poll + sequentialPollFailures = 0; + + // Wait for the next backstop poll, or a realtime poke, whichever comes + // first. + await waitForNextPoll(pollIntervalMs); + } catch (error) { + // Re-throw RunFailedError immediately (test failures, not polling errors) + if (error instanceof RunFailedError) { + throw error; + } + + sequentialPollFailures++; - if (allComplete) { - return await this.handleCompletedTests(updatedResults, { - consoleUrl, + // Handle polling errors (network issues, etc.) + await this.handlePollingError( + error, + sequentialPollFailures, debug, - json, logger, - testMetadata, uploadId, - }); + ); + + // Back off (capped) before retrying so a flaky API gets some breathing + // room instead of being hammered on every failure. + await this.sleep( + Math.min( + this.ERROR_BACKOFF_BASE_MS * sequentialPollFailures, + this.MAX_ERROR_BACKOFF_MS, + ), + ); } - - // Reset failure counter on successful poll - sequentialPollFailures = 0; - - // Wait before next poll - await this.sleep(this.POLL_INTERVAL_MS); - } catch (error) { - // Re-throw RunFailedError immediately (test failures, not polling errors) - if (error instanceof RunFailedError) { - throw error; + } + } finally { + if (subscription) { + if (debug && logger) { + logger('[DEBUG] Closing realtime subscription'); } - - sequentialPollFailures++; - - // Handle polling errors (network issues, etc.) - await this.handlePollingError( - error, - sequentialPollFailures, - debug, - logger, - uploadId, - ); - - // Back off (capped) before retrying so a flaky API gets some breathing - // room instead of being hammered every 10s. - await this.sleep( - Math.min( - this.POLL_INTERVAL_MS * sequentialPollFailures, - this.MAX_ERROR_BACKOFF_MS, - ), - ); + await subscription.unsubscribe(); } } } diff --git a/src/types/domain/auth.types.ts b/src/types/domain/auth.types.ts index 1630fb0..8a5e6ea 100644 --- a/src/types/domain/auth.types.ts +++ b/src/types/domain/auth.types.ts @@ -1,3 +1,5 @@ +import type { DcdEnvName } from '../../config/environments'; + /** * Auth context threaded through gateways and services. Callers build this once * (via resolveAuth) and the gateway spreads .headers into every fetch. @@ -5,6 +7,13 @@ export interface AuthContext { headers: Record; mode: 'apiKey' | 'bearer'; + /** + * Supabase JWT — present when mode === 'bearer'. Lets realtime subscriptions + * authenticate the socket (RLS) without re-reading the session from disk. + */ + accessToken?: string; + /** Environment the session belongs to — present when mode === 'bearer'. */ + env?: DcdEnvName; /** Present when mode === 'bearer'. */ orgId?: string; /** Present when mode === 'bearer'. */ diff --git a/src/utils/auth.ts b/src/utils/auth.ts index addbd67..1e2184d 100644 --- a/src/utils/auth.ts +++ b/src/utils/auth.ts @@ -91,6 +91,8 @@ export async function resolveAuth( const auth: AuthContext = { mode: 'bearer', + accessToken: session.access_token, + env: config.env, orgId: config.current_org_id, userEmail: session.user_email, headers: { diff --git a/src/utils/ci.ts b/src/utils/ci.ts new file mode 100644 index 0000000..196b2dc --- /dev/null +++ b/src/utils/ci.ts @@ -0,0 +1,42 @@ +/** + * Best-effort detection of non-interactive / CI environments. Used to suppress + * interactive niceties (e.g. the `dcd login` nudge) that only make sense for a + * human at a terminal. Dependency-free on purpose — these are the env vars the + * major providers set, plus a TTY check for piped/redirected output. + */ + +// Generic + per-provider markers. `CI` is set by virtually every provider; +// the rest cover platforms that historically didn't set it. +const CI_ENV_VARS = [ + 'CI', + 'CONTINUOUS_INTEGRATION', + 'BUILD_NUMBER', + 'GITHUB_ACTIONS', + 'GITLAB_CI', + 'CIRCLECI', + 'TRAVIS', + 'BUILDKITE', + 'DRONE', + 'TEAMCITY_VERSION', + 'TF_BUILD', // Azure Pipelines + 'JENKINS_URL', + 'BITBUCKET_BUILD_NUMBER', + 'APPVEYOR', + 'CODEBUILD_BUILD_ID', +] as const; + +/** + * Returns true when running under CI or otherwise non-interactively (no TTY on + * stdout, e.g. output piped to a file). A truthy value for any known CI env var + * counts — providers set `CI=true`, but a bare presence check is the safe net. + */ +export function isCI(): boolean { + for (const name of CI_ENV_VARS) { + const value = process.env[name]; + if (value !== undefined && value !== '' && value !== 'false' && value !== '0') { + return true; + } + } + + return !process.stdout.isTTY; +} diff --git a/test/unit/ci.test.ts b/test/unit/ci.test.ts new file mode 100644 index 0000000..049a76c --- /dev/null +++ b/test/unit/ci.test.ts @@ -0,0 +1,72 @@ +import { expect } from 'chai'; + +import { isCI } from '../../src/utils/ci'; + +// Every env var isCI() inspects — cleared before each case so the environment +// the suite happens to run in (often a real CI) can't leak into assertions. +const CI_ENV_VARS = [ + 'CI', + 'CONTINUOUS_INTEGRATION', + 'BUILD_NUMBER', + 'GITHUB_ACTIONS', + 'GITLAB_CI', + 'CIRCLECI', + 'TRAVIS', + 'BUILDKITE', + 'DRONE', + 'TEAMCITY_VERSION', + 'TF_BUILD', + 'JENKINS_URL', + 'BITBUCKET_BUILD_NUMBER', + 'APPVEYOR', + 'CODEBUILD_BUILD_ID', +]; + +describe('isCI', () => { + const ORIGINAL_ENV = { ...process.env }; + const originalIsTTY = process.stdout.isTTY; + + const setTTY = (value: boolean | undefined) => { + // isTTY is a plain (writable) property on the stream in Node. + (process.stdout as { isTTY?: boolean }).isTTY = value; + }; + + beforeEach(() => { + for (const name of CI_ENV_VARS) delete process.env[name]; + }); + + afterEach(() => { + process.env = { ...ORIGINAL_ENV }; + setTTY(originalIsTTY); + }); + + it('returns true when CI is set', () => { + setTTY(true); + process.env.CI = 'true'; + expect(isCI()).to.equal(true); + }); + + it('returns true for a provider var even when CI is unset', () => { + setTTY(true); + process.env.GITHUB_ACTIONS = 'true'; + expect(isCI()).to.equal(true); + }); + + it('treats CI=false / CI=0 as not CI (falls through to the TTY check)', () => { + setTTY(true); + process.env.CI = 'false'; + expect(isCI()).to.equal(false); + process.env.CI = '0'; + expect(isCI()).to.equal(false); + }); + + it('returns false in an interactive (TTY), non-CI environment', () => { + setTTY(true); + expect(isCI()).to.equal(false); + }); + + it('returns true when stdout is not a TTY (piped/redirected)', () => { + setTTY(false); + expect(isCI()).to.equal(true); + }); +});