diff --git a/.github/workflows/pullfrog.yml b/.github/workflows/pullfrog.yml index ae594ba9..9ec7ffc5 100644 --- a/.github/workflows/pullfrog.yml +++ b/.github/workflows/pullfrog.yml @@ -33,7 +33,7 @@ jobs: with: fetch-depth: 1 - name: Run agent - uses: pullfrog/pullfrog@27557aef1ccff9b9ea531324824daa1f86c066c4 # v0.1.29 + uses: pullfrog/pullfrog@3beaa0fb16f67a689055eac8df88e71024e423b2 # v0.1.31 with: prompt: ${{ inputs.prompt }} env: diff --git a/apps/alerting/src/worker.ts b/apps/alerting/src/worker.ts index db6e1f96..4049d439 100644 --- a/apps/alerting/src/worker.ts +++ b/apps/alerting/src/worker.ts @@ -1,4 +1,5 @@ import { + ANTICIPATED_ERROR_TAGS, AlertsService, AnomalyDetectionService, BucketCacheService, @@ -30,6 +31,7 @@ const telemetry = MapleCloudflareSDK.make({ serviceName: "alerting", serviceNamespace: "backend", repositoryUrl: "https://github.com/Makisuo/maple", + anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS], }) const buildLayer = (_env: Record) => { diff --git a/apps/api/src/alerting.ts b/apps/api/src/alerting.ts index c6f30020..4bcca666 100644 --- a/apps/api/src/alerting.ts +++ b/apps/api/src/alerting.ts @@ -1,3 +1,4 @@ +export { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors" export { AlertRuntime, AlertsService } from "./services/AlertsService" export { AnomalyDetectionService } from "./services/AnomalyDetectionService" export { BucketCacheService } from "@maple/query-engine/caching" diff --git a/apps/api/src/vcs-sync-runtime.ts b/apps/api/src/vcs-sync-runtime.ts index e45631f6..a2108fc8 100644 --- a/apps/api/src/vcs-sync-runtime.ts +++ b/apps/api/src/vcs-sync-runtime.ts @@ -1,5 +1,6 @@ import type { MessageBatch } from "@cloudflare/workers-types" import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare" +import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors" import { WorkerConfigProviderLayer, WorkerEnvironment } from "@maple/effect-cloudflare" import { Cause, Effect, Layer, Option } from "effect" import { DatabasePgLive } from "./lib/DatabasePgLive" @@ -23,6 +24,7 @@ const telemetry = MapleCloudflareSDK.make({ serviceName: "maple-api", serviceNamespace: "backend", repositoryUrl: "https://github.com/Makisuo/maple", + anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS], }) export const buildVcsSyncLayer = (_env: Record) => { diff --git a/apps/api/src/worker.ts b/apps/api/src/worker.ts index deb6ae10..46035dde 100644 --- a/apps/api/src/worker.ts +++ b/apps/api/src/worker.ts @@ -1,5 +1,6 @@ import type { MessageBatch, ScheduledController } from "@cloudflare/workers-types" import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare" +import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors" import { runScheduledEffect, WorkerConfigProviderLayer, WorkerEnvironment } from "@maple/effect-cloudflare" import { Context, FileSystem, Layer, Path } from "effect" import { HttpMiddleware, HttpRouter } from "effect/unstable/http" @@ -44,6 +45,9 @@ const telemetry = MapleCloudflareSDK.make({ serviceNamespace: "backend", repositoryUrl: "https://github.com/Makisuo/maple", dropSpanNames: ["McpServer/Notifications."], + // Expected 4xx outcomes (validation, not-found, unauthorized, …) record as + // Ok spans instead of errors — see @maple/domain/anticipated-errors. + anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS], }) // `HttpMiddleware.tracer` ends the root server span on a deferred macrotask diff --git a/apps/api/src/workflows/AiTriageWorkflow.run.ts b/apps/api/src/workflows/AiTriageWorkflow.run.ts index 564e04cd..754d1992 100644 --- a/apps/api/src/workflows/AiTriageWorkflow.run.ts +++ b/apps/api/src/workflows/AiTriageWorkflow.run.ts @@ -22,6 +22,7 @@ import { createFlueClient } from "@flue/sdk" import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare" import { aiTriageRuns, anomalyIncidents, errorIssueEvents } from "@maple/db" import { createMaplePgClient, type MaplePgClient } from "@maple/db/client" +import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors" import { AiTriageResult } from "@maple/domain/http" import { AiTriageRunId, @@ -176,6 +177,7 @@ const triageTelemetry = MapleCloudflareSDK.make({ serviceName: "maple-api", serviceNamespace: "backend", repositoryUrl: "https://github.com/Makisuo/maple", + anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS], }) const GATE_STEP = { retries: { limit: 3, delay: "2 seconds", backoff: "exponential" } } diff --git a/apps/scraper/src/ApiClient.ts b/apps/scraper/src/ApiClient.ts index 87806300..466635f7 100644 --- a/apps/scraper/src/ApiClient.ts +++ b/apps/scraper/src/ApiClient.ts @@ -1,4 +1,4 @@ -import { Context, Effect, Layer, Redacted, Schema } from "effect" +import { Context, Duration, Effect, Layer, Redacted, Schema } from "effect" import { HttpClient, HttpClientRequest } from "effect/unstable/http" import { InternalScrapeTargetList, @@ -51,6 +51,11 @@ export class ApiClient extends Context.Service()("@ma authorization: `Bearer ${Redacted.value(env.SD_INTERNAL_TOKEN)}`, } + // Bound every API round-trip. `FetchHttpClient` sets no timeout, so a + // stalled Worker (e.g. an oversized scrape-results POST) would otherwise + // hang for minutes; cap it so a flush fails fast and re-buffers instead. + const REQUEST_TIMEOUT = Duration.seconds(30) + const transportError = (error: { readonly message: string }) => new ApiRequestError({ message: `Maple API unreachable: ${error.message}`, status: null }) @@ -58,7 +63,9 @@ export class ApiClient extends Context.Service()("@ma const request = HttpClientRequest.get(`${env.MAPLE_API_URL}/api/internal/scrape-targets`, { headers: authHeaders, }) - const response = yield* client.execute(request).pipe(Effect.mapError(transportError)) + const response = yield* client + .execute(request) + .pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError)) const text = yield* response.text.pipe(Effect.mapError(transportError)) if (response.status < 200 || response.status >= 300) { return yield* Effect.fail( @@ -96,7 +103,9 @@ export class ApiClient extends Context.Service()("@ma `${env.MAPLE_API_URL}/api/internal/prometheus-scrape?targetId=${encodeURIComponent(targetId)}${sub}`, { headers: authHeaders }, ) - const response = yield* client.execute(request).pipe(Effect.mapError(transportError)) + const response = yield* client + .execute(request) + .pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError)) const body = yield* response.text.pipe(Effect.mapError(transportError)) const retryAfterRaw = response.headers["retry-after"] const retryAfterSeconds = @@ -113,7 +122,9 @@ export class ApiClient extends Context.Service()("@ma const request = HttpClientRequest.post(`${env.MAPLE_API_URL}/api/internal/scrape-results`, { headers: authHeaders, }).pipe(HttpClientRequest.bodyText(JSON.stringify(results), "application/json")) - const response = yield* client.execute(request).pipe(Effect.mapError(transportError)) + const response = yield* client + .execute(request) + .pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError)) if (response.status < 200 || response.status >= 300) { return yield* Effect.fail( new ApiRequestError({ diff --git a/apps/scraper/src/ScrapeScheduler.test.ts b/apps/scraper/src/ScrapeScheduler.test.ts index f3afb0a0..632f9770 100644 --- a/apps/scraper/src/ScrapeScheduler.test.ts +++ b/apps/scraper/src/ScrapeScheduler.test.ts @@ -1,13 +1,14 @@ import { assert, describe, it } from "@effect/vitest" import { Duration, Effect, Layer, Redacted, Schema } from "effect" import { TestClock } from "effect/testing" -import { InternalScrapeTarget, type ScrapeResultReport } from "@maple/domain/http" +import { InternalScrapeTarget, ScrapeResultReport, ScrapeTargetId } from "@maple/domain/http" import { ApiClient, ApiRequestError, type ApiClientShape, type ScrapeProxyResponse } from "./ApiClient" import { OtlpIngest, OtlpIngestError, type OtlpIngestShape } from "./OtlpIngest" import { initialJitterMs, nextScrapeDelayMs, ScrapeScheduler, + sendResultsInChunks, type ScrapeOutcome, } from "./ScrapeScheduler" import { ScraperEnv, type ScraperEnvShape } from "./Env" @@ -531,6 +532,61 @@ describe("ScrapeScheduler", () => { ) }) +describe("sendResultsInChunks", () => { + const decodeId = Schema.decodeUnknownSync(ScrapeTargetId) + const mkReports = (count: number): ReadonlyArray => + Array.from( + { length: count }, + (_, i) => new ScrapeResultReport({ targetId: decodeId(TARGET_A), scrapedAt: i, error: null }), + ) + + it.effect("splits a batch into chunks no larger than chunkSize", () => + Effect.gen(function* () { + const batches: Array = [] + const result = yield* sendResultsInChunks(mkReports(2500), 1000, (chunk) => + Effect.sync(() => { + batches.push(chunk.length) + }), + ) + assert.deepStrictEqual(batches, [1000, 1000, 500]) + assert.lengthOf(result.unsent, 0) + assert.isNull(result.error) + }), + ) + + it.effect("stops at the first failed chunk and returns the unsent remainder", () => + Effect.gen(function* () { + const batches: Array = [] + const result = yield* sendResultsInChunks(mkReports(2500), 1000, (chunk) => + Effect.suspend(() => { + // First chunk delivers; the second fails → 1500 left unsent. + if (batches.length >= 1) { + return Effect.fail(new ApiRequestError({ message: "HTTP 503", status: 503 })) + } + batches.push(chunk.length) + return Effect.void + }), + ) + assert.deepStrictEqual(batches, [1000]) + assert.lengthOf(result.unsent, 1500) + assert.strictEqual(result.error?.message, "HTTP 503") + }), + ) + + it.effect("no-ops on an empty batch", () => + Effect.gen(function* () { + let calls = 0 + const result = yield* sendResultsInChunks([], 1000, () => + Effect.sync(() => { + calls++ + }), + ) + assert.strictEqual(calls, 0) + assert.lengthOf(result.unsent, 0) + }), + ) +}) + describe("nextScrapeDelayMs", () => { const ok: ScrapeOutcome = { error: null, rateLimited: false, retryAfterMs: null } const limited = (retryAfterMs: number | null = null): ScrapeOutcome => ({ diff --git a/apps/scraper/src/ScrapeScheduler.ts b/apps/scraper/src/ScrapeScheduler.ts index 2a12a800..38897eab 100644 --- a/apps/scraper/src/ScrapeScheduler.ts +++ b/apps/scraper/src/ScrapeScheduler.ts @@ -1,4 +1,4 @@ -import { Cause, Clock, Context, Duration, Effect, Fiber, Layer, Ref, Schedule, Semaphore } from "effect" +import { Cause, Clock, Context, Duration, Effect, Fiber, Layer, Ref, Result, Schedule, Semaphore } from "effect" import { ScrapeResultReport, type InternalScrapeTarget } from "@maple/domain/http" import { ApiClient, ApiRequestError } from "./ApiClient" import { convertFamiliesToOtlp } from "./prometheus/otlp" @@ -25,6 +25,13 @@ export interface ScrapeSchedulerShape { const RESULTS_FLUSH_INTERVAL = Duration.seconds(10) /** Cap the result buffer so an unreachable API cannot grow memory unboundedly. */ const MAX_BUFFERED_RESULTS = 10_000 +/** + * Max results per `scrape-results` POST. The buffer can hold up to + * `MAX_BUFFERED_RESULTS`; sending that as one body overwhelmed the API Worker + * (CPU/time → edge 503), so a flush sends in chunks and re-buffers the unsent + * remainder on the first failure. + */ +const RESULTS_FLUSH_CHUNK_SIZE = 1_000 /** Upper bound on rate-limit backoff so a target keeps probing for recovery. */ const MAX_BACKOFF_MS = Duration.toMillis(Duration.minutes(5)) @@ -112,6 +119,27 @@ interface TargetEntry { readonly fiber: Fiber.Fiber } +/** + * Send `results` to `send` in chunks of `chunkSize`, stopping at the first + * failed chunk. Returns the results that were NOT delivered (the failed chunk + * plus everything after it) so the caller can re-buffer just those; `unsent` is + * empty when the whole batch went through. Chunking keeps any single POST small + * enough that the API Worker doesn't choke on it. + */ +export const sendResultsInChunks = ( + results: ReadonlyArray, + chunkSize: number, + send: (chunk: ReadonlyArray) => Effect.Effect, +): Effect.Effect<{ readonly unsent: ReadonlyArray; readonly error: E | null }> => + Effect.gen(function* () { + for (let index = 0; index < results.length; index += chunkSize) { + const chunk = results.slice(index, index + chunkSize) + const outcome = yield* Effect.result(send(chunk)) + if (Result.isFailure(outcome)) return { unsent: results.slice(index), error: outcome.failure } + } + return { unsent: [], error: null } + }) + export class ScrapeScheduler extends Context.Service()( "@maple/scraper/ScrapeScheduler", { @@ -357,22 +385,24 @@ export class ScrapeScheduler extends Context.Service - Effect.gen(function* () { - // Put the batch back (in front) and retry on the next flush. - yield* Ref.update(resultsRef, (buffered) => - [...results, ...buffered].slice(-MAX_BUFFERED_RESULTS), - ) - yield* Effect.logWarning("Failed to report scrape results").pipe( - Effect.annotateLogs({ - error: error.message, - bufferedResults: results.length, - }), - ) - }), - ), + // Send in chunks so one POST never overwhelms the API Worker; re-buffer + // only what didn't make it (in front) and retry on the next flush. + const { unsent, error } = yield* sendResultsInChunks( + results, + RESULTS_FLUSH_CHUNK_SIZE, + api.reportResults, ) + if (unsent.length > 0) { + yield* Ref.update(resultsRef, (buffered) => + [...unsent, ...buffered].slice(-MAX_BUFFERED_RESULTS), + ) + yield* Effect.logWarning("Failed to report scrape results").pipe( + Effect.annotateLogs({ + error: error?.message ?? "unknown", + bufferedResults: unsent.length, + }), + ) + } }).pipe(Effect.withSpan("scraper.flush_results")) const run = Effect.gen(function* () { diff --git a/apps/web/src/lib/services/common/otel-layer.ts b/apps/web/src/lib/services/common/otel-layer.ts index 06934d2b..4605498b 100644 --- a/apps/web/src/lib/services/common/otel-layer.ts +++ b/apps/web/src/lib/services/common/otel-layer.ts @@ -1,4 +1,5 @@ import { MapleFlush } from "@maple-dev/effect-sdk/client" +import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors" import { ingestUrl } from "./ingest-url" // Buffer-backed client telemetry with flush-on-unload. `Maple.layer` @@ -24,6 +25,9 @@ const telemetry = MapleFlush.make({ ? { "vcs.ref.head.revision": import.meta.env.VITE_COMMIT_SHA } : {}), }, + // Expected 4xx API responses (the maple-web → maple-api edge surfaces these + // as client-span failures) record as Ok instead of errors. + anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS], }) export const mapleOtelLayer = telemetry.layer diff --git a/lib/effect-sdk/src/client/flushable.ts b/lib/effect-sdk/src/client/flushable.ts index 2422ef67..843d0f50 100644 --- a/lib/effect-sdk/src/client/flushable.ts +++ b/lib/effect-sdk/src/client/flushable.ts @@ -59,6 +59,12 @@ export interface MapleClientFlushableConfig { readonly excludeLogSpans?: boolean | undefined /** Span name prefixes to drop before OTLP export. */ readonly dropSpanNames?: ReadonlyArray | undefined + /** + * `_tag`s of *anticipated* failures (expected 4xx business errors). Spans + * failing entirely with these export as status `Ok` (no `exception` event), + * so they stay visible but never count as errors. + */ + readonly anticipatedErrorTags?: ReadonlyArray | undefined /** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */ readonly tracesPath?: string | undefined /** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */ @@ -134,7 +140,11 @@ export const make = (config: MapleClientFlushableConfig): FlushableTelemetry => dropPrefixes !== undefined && dropPrefixes.length > 0 ? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix)) : undefined - const spans: SpanBuffer = makeSpanBuffer({ dropSpan }) + const anticipatedErrorTags = + config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0 + ? new Set(config.anticipatedErrorTags) + : undefined + const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags }) const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans }) // `withSessionLink` overrides only the Tracer reference, keeping the logger. const layer = withSessionLink(Layer.mergeAll(spans.tracerLayer, logs.loggerLayer)) diff --git a/lib/effect-sdk/src/cloudflare/index.ts b/lib/effect-sdk/src/cloudflare/index.ts index 210b9b36..090806cc 100644 --- a/lib/effect-sdk/src/cloudflare/index.ts +++ b/lib/effect-sdk/src/cloudflare/index.ts @@ -83,6 +83,13 @@ export interface Config { * (e.g. `"McpServer/Notifications."` for MCP notification spam). */ readonly dropSpanNames?: ReadonlyArray | undefined + /** + * `_tag`s of *anticipated* failures (expected 4xx business errors). A span + * whose failure is caused entirely by these is exported with status `Ok` and + * no `exception` event, so it stays visible as a trace but never counts as an + * error. See `SpanBufferOptions.anticipatedErrorTags`. + */ + readonly anticipatedErrorTags?: ReadonlyArray | undefined /** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */ readonly tracesPath?: string | undefined /** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */ @@ -126,7 +133,11 @@ export const make = (config: Config = {}): Telemetry => { dropPrefixes !== undefined && dropPrefixes.length > 0 ? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix)) : undefined - const spans: SpanBuffer = makeSpanBuffer({ dropSpan }) + const anticipatedErrorTags = + config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0 + ? new Set(config.anticipatedErrorTags) + : undefined + const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags }) const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans }) let resolved: Resolved | undefined = undefined diff --git a/lib/effect-sdk/src/server/flushable.ts b/lib/effect-sdk/src/server/flushable.ts index cc95407c..8d6933f0 100644 --- a/lib/effect-sdk/src/server/flushable.ts +++ b/lib/effect-sdk/src/server/flushable.ts @@ -60,6 +60,12 @@ export interface MapleFlushableConfig { readonly excludeLogSpans?: boolean | undefined /** Span name prefixes to drop before OTLP export. */ readonly dropSpanNames?: ReadonlyArray | undefined + /** + * `_tag`s of *anticipated* failures (expected 4xx business errors). Spans + * failing entirely with these export as status `Ok` (no `exception` event), + * so they stay visible but never count as errors. + */ + readonly anticipatedErrorTags?: ReadonlyArray | undefined /** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */ readonly tracesPath?: string | undefined /** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */ @@ -92,7 +98,11 @@ export const make = (config: MapleFlushableConfig = {}): FlushableTelemetry => { dropPrefixes !== undefined && dropPrefixes.length > 0 ? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix)) : undefined - const spans: SpanBuffer = makeSpanBuffer({ dropSpan }) + const anticipatedErrorTags = + config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0 + ? new Set(config.anticipatedErrorTags) + : undefined + const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags }) const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans }) const layer = Layer.mergeAll(spans.tracerLayer, logs.loggerLayer) diff --git a/lib/effect-sdk/src/shared/flushable-tracer.test.ts b/lib/effect-sdk/src/shared/flushable-tracer.test.ts index 1ac71acc..be2e4aa4 100644 --- a/lib/effect-sdk/src/shared/flushable-tracer.test.ts +++ b/lib/effect-sdk/src/shared/flushable-tracer.test.ts @@ -12,6 +12,9 @@ class BenignError extends Data.TaggedError("BenignError")<{}> { // A real reportable failure (no ignore flag) — e.g. a 400/500. class ReportableError extends Data.TaggedError("ReportableError")<{}> {} +// An anticipated 4xx business error (e.g. unauthorized / not-found). +class UnauthorizedError extends Data.TaggedError("UnauthorizedError")<{}> {} + const runSpan = (buffer: ReturnType, effect: Effect.Effect) => effect.pipe(Effect.withSpan("http.server GET"), Effect.provide(buffer.tracerLayer), Effect.exit) @@ -54,3 +57,78 @@ describe("makeSpanBuffer ignored-failure drop", () => { }), ) }) + +describe("makeSpanBuffer anticipated-error classification", () => { + const tags = new Set(["UnauthorizedError"]) + + it.effect("keeps an anticipated failure as an Ok span with no exception event", () => + Effect.gen(function* () { + const buffer = makeSpanBuffer({ anticipatedErrorTags: tags }) + yield* runSpan(buffer, Effect.fail(new UnauthorizedError())) + const [span] = buffer.drain() + assert.isDefined(span) + assert.strictEqual(span!.status.code, 1 /* Ok */) + assert.strictEqual( + span!.events.some((event) => event.name === "exception"), + false, + ) + }), + ) + + it.effect("still marks an unclassified failure as an Error span with an exception event", () => + Effect.gen(function* () { + const buffer = makeSpanBuffer({ anticipatedErrorTags: tags }) + yield* runSpan(buffer, Effect.fail(new ReportableError())) + const [span] = buffer.drain() + assert.isDefined(span) + assert.strictEqual(span!.status.code, 2 /* Error */) + assert.strictEqual( + span!.events.some((event) => event.name === "exception"), + true, + ) + }), + ) + + it.effect("marks Error when an anticipated error is mixed with a defect", () => + Effect.gen(function* () { + const buffer = makeSpanBuffer({ anticipatedErrorTags: tags }) + yield* runSpan( + buffer, + Effect.fail(new UnauthorizedError()).pipe(Effect.ensuring(Effect.die("boom"))), + ) + const [span] = buffer.drain() + assert.isDefined(span) + assert.strictEqual(span!.status.code, 2 /* Error */) + }), + ) + + it.effect("marks Error for an anticipated tag when no tags are configured", () => + Effect.gen(function* () { + const buffer = makeSpanBuffer() + yield* runSpan(buffer, Effect.fail(new UnauthorizedError())) + const [span] = buffer.drain() + assert.isDefined(span) + assert.strictEqual(span!.status.code, 2 /* Error */) + }), + ) + + // An interrupt co-occurring with an anticipated failure is NOT an error: + // interrupts are normal fiber control flow, so the span stays Ok (unlike a + // defect, which forces Error). Pins the Die-vs-Interrupt asymmetry. + it.effect("keeps Ok when an anticipated error is mixed with an interrupt", () => + Effect.gen(function* () { + const buffer = makeSpanBuffer({ anticipatedErrorTags: tags }) + yield* runSpan( + buffer, + Effect.fail(new UnauthorizedError()).pipe(Effect.ensuring(Effect.interrupt)), + ) + const [span] = buffer.drain() + assert.isDefined(span) + assert.strictEqual(span!.status.code, 1 /* Ok */) + assert.strictEqual( + span!.events.some((event) => event.name === "exception"), + false, + ) + }), + ) +}) diff --git a/lib/effect-sdk/src/shared/flushable-tracer.ts b/lib/effect-sdk/src/shared/flushable-tracer.ts index 3c632907..21ebb751 100644 --- a/lib/effect-sdk/src/shared/flushable-tracer.ts +++ b/lib/effect-sdk/src/shared/flushable-tracer.ts @@ -27,6 +27,19 @@ export interface SpanBufferOptions { * Use to suppress known-noisy span names (e.g. MCP protocol notifications). */ readonly dropSpan?: ((name: string) => boolean) | undefined + /** + * `_tag`s of failures that represent *anticipated* outcomes (expected 4xx + * business errors: validation, not-found, unauthorized, …). When a span's + * failure is caused *entirely* by errors with one of these tags, the span is + * still exported (so latency / `http.response.status_code` stay visible) but + * with OTLP status `Ok` and **no** `exception` event — so it never lands in + * error tracking (`error_events_mv` keys off `StatusCode='Error'`). Mirrors + * the ingest gateway's `otel_status_for_rejection` (4xx → Ok) rule. + * + * Distinct from Effect's `ErrorReporter.ignore` flag, which *drops* the span + * entirely (used for benign routing 404s). + */ + readonly anticipatedErrorTags?: ReadonlySet | undefined } // Errors carrying Effect's `[ErrorReporter.ignore]` flag are benign by design — @@ -51,6 +64,7 @@ export const makeSpanBuffer = (options: SpanBufferOptions = {}): SpanBuffer => { let buffer: Array = [] let disabled = false const dropSpan = options.dropSpan + const anticipatedErrorTags = options.anticipatedErrorTags const exportFn = (span: SpanImpl) => { if (disabled) return @@ -58,7 +72,7 @@ export const makeSpanBuffer = (options: SpanBufferOptions = {}): SpanBuffer => { if (dropSpan !== undefined && dropSpan(span.name)) return if (isIgnoredSpan(span)) return if (buffer.length >= MAX_BUFFER) return - buffer.push(makeOtlpSpan(span)) + buffer.push(makeOtlpSpan(span, anticipatedErrorTags)) } const tracer = Tracer.make({ @@ -146,7 +160,23 @@ const generateId = (len: number): string => { return result } -const makeOtlpSpan = (self: SpanImpl): OtlpSpan => { +// A failure is "anticipated" when its `_tag` is in the configured set. A span +// whose failure is caused *entirely* by anticipated errors (no defects/Die) +// records OTLP status `Ok` and emits no `exception` event. +const isAnticipatedFailure = (error: unknown, tags: ReadonlySet): boolean => + Predicate.hasProperty(error, "_tag") && typeof error._tag === "string" && tags.has(error._tag) + +const isFullyAnticipated = ( + cause: Cause.Cause, + tags: ReadonlySet | undefined, +): boolean => { + if (tags === undefined || tags.size === 0) return false + if (cause.reasons.some(Cause.isDieReason)) return false + const failErrors = cause.reasons.filter(Cause.isFailReason).map((reason) => reason.error) + return failErrors.length > 0 && failErrors.every((error) => isAnticipatedFailure(error, tags)) +} + +const makeOtlpSpan = (self: SpanImpl, anticipatedErrorTags?: ReadonlySet): OtlpSpan => { const status = self.status as ExtractTag const attributes = OtlpResource.entriesToAttributes(self.attributes.entries()) const events = self.events.map(([name, startTime, attrs]) => ({ @@ -165,6 +195,10 @@ const makeOtlpSpan = (self: SpanImpl): OtlpSpan => { { key: "span.label", value: { stringValue: "⚠︎ Interrupted" } }, { key: "status.interrupted", value: { boolValue: true } }, ) + } else if (isFullyAnticipated(status.exit.cause, anticipatedErrorTags)) { + // Expected business outcome (4xx). Keep the span (latency / status code + // stay visible) but don't flag it as an error or fingerprint it. + otelStatus = constOtelStatusSuccess } else { const errors = Cause.prettyErrors(status.exit.cause) otelStatus = { code: StatusCode.Error } diff --git a/packages/domain/package.json b/packages/domain/package.json index eca43688..a08c8318 100644 --- a/packages/domain/package.json +++ b/packages/domain/package.json @@ -4,6 +4,7 @@ "type": "module", "exports": { ".": "./src/index.ts", + "./anticipated-errors": "./src/anticipated-errors.ts", "./billing": "./src/billing.ts", "./http": "./src/http/index.ts", "./primitives": "./src/primitives.ts", diff --git a/packages/domain/src/anticipated-errors.test.ts b/packages/domain/src/anticipated-errors.test.ts new file mode 100644 index 00000000..7b10c7cb --- /dev/null +++ b/packages/domain/src/anticipated-errors.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "vitest" +import { ANTICIPATED_ERROR_TAGS, isAnticipatedErrorTag } from "./anticipated-errors" + +describe("ANTICIPATED_ERROR_TAGS", () => { + it("includes the observed 4xx business-error tags", () => { + for (const tag of [ + "@maple/http/errors/UnauthorizedError", + "@maple/http/errors/RawSqlValidationError", + "@maple/http/ai-triage/AiTriageNotFoundError", + "@maple/http/errors/IntegrationsNotConnectedError", + ]) { + expect(isAnticipatedErrorTag(tag), tag).toBe(true) + } + }) + + it("excludes 5xx persistence / upstream failures", () => { + for (const tag of [ + "@maple/http/errors/WarehouseQueryError", + "@maple/http/errors/QueryEngineTimeoutError", + ]) { + expect(isAnticipatedErrorTag(tag), tag).toBe(false) + } + }) + + it("derives a non-trivial set (reflection still works)", () => { + expect(ANTICIPATED_ERROR_TAGS.size).toBeGreaterThan(20) + }) +}) diff --git a/packages/domain/src/anticipated-errors.ts b/packages/domain/src/anticipated-errors.ts new file mode 100644 index 00000000..5995533c --- /dev/null +++ b/packages/domain/src/anticipated-errors.ts @@ -0,0 +1,60 @@ +// --------------------------------------------------------------------------- +// Anticipated error tags +// +// The set of domain HTTP error `_tag`s that represent *expected* client-facing +// outcomes (4xx): validation, not-found, unauthorized, forbidden, conflict, … +// +// These are not bugs — they're normal business results. The telemetry SDK uses +// this set to record spans that fail *entirely* with one of these errors as +// OTLP status `Ok` (no `exception` event), so they stay visible as traces but +// never count toward error tracking (`error_events_mv` keys off +// `StatusCode='Error'`). Mirrors the ingest gateway's `otel_status_for_rejection` +// rule (4xx → Ok, 5xx → Error). +// +// Derived (not hand-maintained) from the error classes themselves: every +// `Schema.TaggedErrorClass` carries its `_tag` literal and an `httpApiStatus` +// annotation on its AST, so a new 4xx error is picked up automatically. A 5xx +// error (persistence/upstream failures) is intentionally excluded and keeps +// tracing. +// --------------------------------------------------------------------------- +import * as Http from "./http/index" + +/** Read `obj[key]` when `obj` is an object/function that has it; `undefined` otherwise. */ +const prop = (obj: unknown, key: string): unknown => + (typeof obj === "object" || typeof obj === "function") && obj !== null && key in obj + ? (obj as Record)[key] + : undefined + +/** The `_tag` literal of a `Schema.TaggedErrorClass` (`ast.fields._tag.schema.literal`). */ +const readTag = (value: unknown): string | undefined => { + const literal = prop(prop(prop(prop(value, "fields"), "_tag"), "schema"), "literal") + return typeof literal === "string" ? literal : undefined +} + +/** The `httpApiStatus` annotation on a schema's AST, when present. */ +const readHttpStatus = (value: unknown): number | undefined => { + const status = prop(prop(prop(value, "ast"), "annotations"), "httpApiStatus") + return typeof status === "number" ? status : undefined +} + +const deriveAnticipatedTags = (): ReadonlySet => { + const tags = new Set() + for (const value of Object.values(Http)) { + if (typeof value !== "function") continue + const tag = readTag(value) + if (tag === undefined) continue + const status = readHttpStatus(value) + if (status === undefined) continue + if (status >= 400 && status < 500) tags.add(tag) + } + return tags +} + +/** + * `_tag`s of all domain HTTP errors annotated with a 4xx `httpApiStatus`. + * Pass `[...ANTICIPATED_ERROR_TAGS]` to the telemetry SDK's `anticipatedErrorTags`. + */ +export const ANTICIPATED_ERROR_TAGS: ReadonlySet = deriveAnticipatedTags() + +/** True when `tag` is a known anticipated (4xx) domain error tag. */ +export const isAnticipatedErrorTag = (tag: string): boolean => ANTICIPATED_ERROR_TAGS.has(tag)