Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pullfrog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
with:
fetch-depth: 1
- name: Run agent
uses: pullfrog/pullfrog@27557aef1ccff9b9ea531324824daa1f86c066c4 # v0.1.29
uses: pullfrog/pullfrog@3beaa0fb16f67a689055eac8df88e71024e423b2 # v0.1.31
with:
prompt: ${{ inputs.prompt }}
env:
Expand Down
2 changes: 2 additions & 0 deletions apps/alerting/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ANTICIPATED_ERROR_TAGS,
AlertsService,
AnomalyDetectionService,
BucketCacheService,
Expand Down Expand Up @@ -30,6 +31,7 @@ const telemetry = MapleCloudflareSDK.make({
serviceName: "alerting",
serviceNamespace: "backend",
repositoryUrl: "https://github.com/Makisuo/maple",
anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS],
})

const buildLayer = (_env: Record<string, unknown>) => {
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/alerting.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors"
export { AlertRuntime, AlertsService } from "./services/AlertsService"
export { AnomalyDetectionService } from "./services/AnomalyDetectionService"
export { BucketCacheService } from "@maple/query-engine/caching"
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/vcs-sync-runtime.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { MessageBatch } from "@cloudflare/workers-types"
import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare"
import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors"
import { WorkerConfigProviderLayer, WorkerEnvironment } from "@maple/effect-cloudflare"
import { Cause, Effect, Layer, Option } from "effect"
import { DatabasePgLive } from "./lib/DatabasePgLive"
Expand All @@ -23,6 +24,7 @@ const telemetry = MapleCloudflareSDK.make({
serviceName: "maple-api",
serviceNamespace: "backend",
repositoryUrl: "https://github.com/Makisuo/maple",
anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS],
})

export const buildVcsSyncLayer = (_env: Record<string, unknown>) => {
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { MessageBatch, ScheduledController } from "@cloudflare/workers-types"
import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare"
import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors"
import { runScheduledEffect, WorkerConfigProviderLayer, WorkerEnvironment } from "@maple/effect-cloudflare"
import { Context, FileSystem, Layer, Path } from "effect"
import { HttpMiddleware, HttpRouter } from "effect/unstable/http"
Expand Down Expand Up @@ -44,6 +45,9 @@ const telemetry = MapleCloudflareSDK.make({
serviceNamespace: "backend",
repositoryUrl: "https://github.com/Makisuo/maple",
dropSpanNames: ["McpServer/Notifications."],
// Expected 4xx outcomes (validation, not-found, unauthorized, …) record as
// Ok spans instead of errors — see @maple/domain/anticipated-errors.
anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS],
})

// `HttpMiddleware.tracer` ends the root server span on a deferred macrotask
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/workflows/AiTriageWorkflow.run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { createFlueClient } from "@flue/sdk"
import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare"
import { aiTriageRuns, anomalyIncidents, errorIssueEvents } from "@maple/db"
import { createMaplePgClient, type MaplePgClient } from "@maple/db/client"
import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors"
import { AiTriageResult } from "@maple/domain/http"
import {
AiTriageRunId,
Expand Down Expand Up @@ -176,6 +177,7 @@ const triageTelemetry = MapleCloudflareSDK.make({
serviceName: "maple-api",
serviceNamespace: "backend",
repositoryUrl: "https://github.com/Makisuo/maple",
anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS],
})

const GATE_STEP = { retries: { limit: 3, delay: "2 seconds", backoff: "exponential" } }
Expand Down
19 changes: 15 additions & 4 deletions apps/scraper/src/ApiClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Context, Effect, Layer, Redacted, Schema } from "effect"
import { Context, Duration, Effect, Layer, Redacted, Schema } from "effect"
import { HttpClient, HttpClientRequest } from "effect/unstable/http"
import {
InternalScrapeTargetList,
Expand Down Expand Up @@ -51,14 +51,21 @@ export class ApiClient extends Context.Service<ApiClient, ApiClientShape>()("@ma
authorization: `Bearer ${Redacted.value(env.SD_INTERNAL_TOKEN)}`,
}

// Bound every API round-trip. `FetchHttpClient` sets no timeout, so a
// stalled Worker (e.g. an oversized scrape-results POST) would otherwise
// hang for minutes; cap it so a flush fails fast and re-buffers instead.
const REQUEST_TIMEOUT = Duration.seconds(30)

const transportError = (error: { readonly message: string }) =>
new ApiRequestError({ message: `Maple API unreachable: ${error.message}`, status: null })

const listTargets = Effect.fn("ApiClient.listTargets")(function* () {
const request = HttpClientRequest.get(`${env.MAPLE_API_URL}/api/internal/scrape-targets`, {
headers: authHeaders,
})
const response = yield* client.execute(request).pipe(Effect.mapError(transportError))
const response = yield* client
.execute(request)
.pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError))
const text = yield* response.text.pipe(Effect.mapError(transportError))
if (response.status < 200 || response.status >= 300) {
return yield* Effect.fail(
Expand Down Expand Up @@ -96,7 +103,9 @@ export class ApiClient extends Context.Service<ApiClient, ApiClientShape>()("@ma
`${env.MAPLE_API_URL}/api/internal/prometheus-scrape?targetId=${encodeURIComponent(targetId)}${sub}`,
{ headers: authHeaders },
)
const response = yield* client.execute(request).pipe(Effect.mapError(transportError))
const response = yield* client
.execute(request)
.pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError))
const body = yield* response.text.pipe(Effect.mapError(transportError))
const retryAfterRaw = response.headers["retry-after"]
const retryAfterSeconds =
Expand All @@ -113,7 +122,9 @@ export class ApiClient extends Context.Service<ApiClient, ApiClientShape>()("@ma
const request = HttpClientRequest.post(`${env.MAPLE_API_URL}/api/internal/scrape-results`, {
headers: authHeaders,
}).pipe(HttpClientRequest.bodyText(JSON.stringify(results), "application/json"))
const response = yield* client.execute(request).pipe(Effect.mapError(transportError))
const response = yield* client
.execute(request)
.pipe(Effect.timeout(REQUEST_TIMEOUT), Effect.mapError(transportError))
if (response.status < 200 || response.status >= 300) {
return yield* Effect.fail(
new ApiRequestError({
Expand Down
58 changes: 57 additions & 1 deletion apps/scraper/src/ScrapeScheduler.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { assert, describe, it } from "@effect/vitest"
import { Duration, Effect, Layer, Redacted, Schema } from "effect"
import { TestClock } from "effect/testing"
import { InternalScrapeTarget, type ScrapeResultReport } from "@maple/domain/http"
import { InternalScrapeTarget, ScrapeResultReport, ScrapeTargetId } from "@maple/domain/http"
import { ApiClient, ApiRequestError, type ApiClientShape, type ScrapeProxyResponse } from "./ApiClient"
import { OtlpIngest, OtlpIngestError, type OtlpIngestShape } from "./OtlpIngest"
import {
initialJitterMs,
nextScrapeDelayMs,
ScrapeScheduler,
sendResultsInChunks,
type ScrapeOutcome,
} from "./ScrapeScheduler"
import { ScraperEnv, type ScraperEnvShape } from "./Env"
Expand Down Expand Up @@ -531,6 +532,61 @@ describe("ScrapeScheduler", () => {
)
})

describe("sendResultsInChunks", () => {
const decodeId = Schema.decodeUnknownSync(ScrapeTargetId)
const mkReports = (count: number): ReadonlyArray<ScrapeResultReport> =>
Array.from(
{ length: count },
(_, i) => new ScrapeResultReport({ targetId: decodeId(TARGET_A), scrapedAt: i, error: null }),
)

it.effect("splits a batch into chunks no larger than chunkSize", () =>
Effect.gen(function* () {
const batches: Array<number> = []
const result = yield* sendResultsInChunks(mkReports(2500), 1000, (chunk) =>
Effect.sync(() => {
batches.push(chunk.length)
}),
)
assert.deepStrictEqual(batches, [1000, 1000, 500])
assert.lengthOf(result.unsent, 0)
assert.isNull(result.error)
}),
)

it.effect("stops at the first failed chunk and returns the unsent remainder", () =>
Effect.gen(function* () {
const batches: Array<number> = []
const result = yield* sendResultsInChunks(mkReports(2500), 1000, (chunk) =>
Effect.suspend(() => {
// First chunk delivers; the second fails → 1500 left unsent.
if (batches.length >= 1) {
return Effect.fail(new ApiRequestError({ message: "HTTP 503", status: 503 }))
}
batches.push(chunk.length)
return Effect.void
}),
)
assert.deepStrictEqual(batches, [1000])
assert.lengthOf(result.unsent, 1500)
assert.strictEqual(result.error?.message, "HTTP 503")
}),
)

it.effect("no-ops on an empty batch", () =>
Effect.gen(function* () {
let calls = 0
const result = yield* sendResultsInChunks([], 1000, () =>
Effect.sync(() => {
calls++
}),
)
assert.strictEqual(calls, 0)
assert.lengthOf(result.unsent, 0)
}),
)
})

describe("nextScrapeDelayMs", () => {
const ok: ScrapeOutcome = { error: null, rateLimited: false, retryAfterMs: null }
const limited = (retryAfterMs: number | null = null): ScrapeOutcome => ({
Expand Down
62 changes: 46 additions & 16 deletions apps/scraper/src/ScrapeScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cause, Clock, Context, Duration, Effect, Fiber, Layer, Ref, Schedule, Semaphore } from "effect"
import { Cause, Clock, Context, Duration, Effect, Fiber, Layer, Ref, Result, Schedule, Semaphore } from "effect"
import { ScrapeResultReport, type InternalScrapeTarget } from "@maple/domain/http"
import { ApiClient, ApiRequestError } from "./ApiClient"
import { convertFamiliesToOtlp } from "./prometheus/otlp"
Expand All @@ -25,6 +25,13 @@ export interface ScrapeSchedulerShape {
const RESULTS_FLUSH_INTERVAL = Duration.seconds(10)
/** Cap the result buffer so an unreachable API cannot grow memory unboundedly. */
const MAX_BUFFERED_RESULTS = 10_000
/**
* Max results per `scrape-results` POST. The buffer can hold up to
* `MAX_BUFFERED_RESULTS`; sending that as one body overwhelmed the API Worker
* (CPU/time → edge 503), so a flush sends in chunks and re-buffers the unsent
* remainder on the first failure.
*/
const RESULTS_FLUSH_CHUNK_SIZE = 1_000
/** Upper bound on rate-limit backoff so a target keeps probing for recovery. */
const MAX_BACKOFF_MS = Duration.toMillis(Duration.minutes(5))

Expand Down Expand Up @@ -112,6 +119,27 @@ interface TargetEntry {
readonly fiber: Fiber.Fiber<unknown, unknown>
}

/**
* Send `results` to `send` in chunks of `chunkSize`, stopping at the first
* failed chunk. Returns the results that were NOT delivered (the failed chunk
* plus everything after it) so the caller can re-buffer just those; `unsent` is
* empty when the whole batch went through. Chunking keeps any single POST small
* enough that the API Worker doesn't choke on it.
*/
export const sendResultsInChunks = <E>(
results: ReadonlyArray<ScrapeResultReport>,
chunkSize: number,
send: (chunk: ReadonlyArray<ScrapeResultReport>) => Effect.Effect<void, E>,
): Effect.Effect<{ readonly unsent: ReadonlyArray<ScrapeResultReport>; readonly error: E | null }> =>
Effect.gen(function* () {
for (let index = 0; index < results.length; index += chunkSize) {
const chunk = results.slice(index, index + chunkSize)
const outcome = yield* Effect.result(send(chunk))
if (Result.isFailure(outcome)) return { unsent: results.slice(index), error: outcome.failure }
}
return { unsent: [], error: null }
})

export class ScrapeScheduler extends Context.Service<ScrapeScheduler, ScrapeSchedulerShape>()(
"@maple/scraper/ScrapeScheduler",
{
Expand Down Expand Up @@ -357,22 +385,24 @@ export class ScrapeScheduler extends Context.Service<ScrapeScheduler, ScrapeSche
const flushResults = Effect.gen(function* () {
const results = yield* Ref.getAndSet(resultsRef, [])
if (results.length === 0) return
yield* api.reportResults(results).pipe(
Effect.catch((error) =>
Effect.gen(function* () {
// Put the batch back (in front) and retry on the next flush.
yield* Ref.update(resultsRef, (buffered) =>
[...results, ...buffered].slice(-MAX_BUFFERED_RESULTS),
)
yield* Effect.logWarning("Failed to report scrape results").pipe(
Effect.annotateLogs({
error: error.message,
bufferedResults: results.length,
}),
)
}),
),
// Send in chunks so one POST never overwhelms the API Worker; re-buffer
// only what didn't make it (in front) and retry on the next flush.
const { unsent, error } = yield* sendResultsInChunks(
results,
RESULTS_FLUSH_CHUNK_SIZE,
api.reportResults,
)
if (unsent.length > 0) {
yield* Ref.update(resultsRef, (buffered) =>
[...unsent, ...buffered].slice(-MAX_BUFFERED_RESULTS),
)
yield* Effect.logWarning("Failed to report scrape results").pipe(
Effect.annotateLogs({
error: error?.message ?? "unknown",
bufferedResults: unsent.length,
}),
)
}
}).pipe(Effect.withSpan("scraper.flush_results"))

const run = Effect.gen(function* () {
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/lib/services/common/otel-layer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { MapleFlush } from "@maple-dev/effect-sdk/client"
import { ANTICIPATED_ERROR_TAGS } from "@maple/domain/anticipated-errors"
import { ingestUrl } from "./ingest-url"

// Buffer-backed client telemetry with flush-on-unload. `Maple.layer`
Expand All @@ -24,6 +25,9 @@ const telemetry = MapleFlush.make({
? { "vcs.ref.head.revision": import.meta.env.VITE_COMMIT_SHA }
: {}),
},
// Expected 4xx API responses (the maple-web → maple-api edge surfaces these
// as client-span failures) record as Ok instead of errors.
anticipatedErrorTags: [...ANTICIPATED_ERROR_TAGS],
})

export const mapleOtelLayer = telemetry.layer
12 changes: 11 additions & 1 deletion lib/effect-sdk/src/client/flushable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ export interface MapleClientFlushableConfig {
readonly excludeLogSpans?: boolean | undefined
/** Span name prefixes to drop before OTLP export. */
readonly dropSpanNames?: ReadonlyArray<string> | undefined
/**
* `_tag`s of *anticipated* failures (expected 4xx business errors). Spans
* failing entirely with these export as status `Ok` (no `exception` event),
* so they stay visible but never count as errors.
*/
readonly anticipatedErrorTags?: ReadonlyArray<string> | undefined
/** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */
readonly tracesPath?: string | undefined
/** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */
Expand Down Expand Up @@ -134,7 +140,11 @@ export const make = (config: MapleClientFlushableConfig): FlushableTelemetry =>
dropPrefixes !== undefined && dropPrefixes.length > 0
? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix))
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan })
const anticipatedErrorTags =
config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0
? new Set(config.anticipatedErrorTags)
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags })
const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans })
// `withSessionLink` overrides only the Tracer reference, keeping the logger.
const layer = withSessionLink(Layer.mergeAll(spans.tracerLayer, logs.loggerLayer))
Expand Down
13 changes: 12 additions & 1 deletion lib/effect-sdk/src/cloudflare/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ export interface Config {
* (e.g. `"McpServer/Notifications."` for MCP notification spam).
*/
readonly dropSpanNames?: ReadonlyArray<string> | undefined
/**
* `_tag`s of *anticipated* failures (expected 4xx business errors). A span
* whose failure is caused entirely by these is exported with status `Ok` and
* no `exception` event, so it stays visible as a trace but never counts as an
* error. See `SpanBufferOptions.anticipatedErrorTags`.
*/
readonly anticipatedErrorTags?: ReadonlyArray<string> | undefined
/** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */
readonly tracesPath?: string | undefined
/** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */
Expand Down Expand Up @@ -126,7 +133,11 @@ export const make = (config: Config = {}): Telemetry => {
dropPrefixes !== undefined && dropPrefixes.length > 0
? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix))
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan })
const anticipatedErrorTags =
config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0
? new Set(config.anticipatedErrorTags)
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags })
const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans })

let resolved: Resolved | undefined = undefined
Expand Down
12 changes: 11 additions & 1 deletion lib/effect-sdk/src/server/flushable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ export interface MapleFlushableConfig {
readonly excludeLogSpans?: boolean | undefined
/** Span name prefixes to drop before OTLP export. */
readonly dropSpanNames?: ReadonlyArray<string> | undefined
/**
* `_tag`s of *anticipated* failures (expected 4xx business errors). Spans
* failing entirely with these export as status `Ok` (no `exception` event),
* so they stay visible but never count as errors.
*/
readonly anticipatedErrorTags?: ReadonlyArray<string> | undefined
/** OTLP traces path appended to `endpoint`. Default `/v1/traces`. */
readonly tracesPath?: string | undefined
/** OTLP logs path appended to `endpoint`. Default `/v1/logs`. */
Expand Down Expand Up @@ -92,7 +98,11 @@ export const make = (config: MapleFlushableConfig = {}): FlushableTelemetry => {
dropPrefixes !== undefined && dropPrefixes.length > 0
? (name: string) => dropPrefixes.some((prefix) => name.startsWith(prefix))
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan })
const anticipatedErrorTags =
config.anticipatedErrorTags !== undefined && config.anticipatedErrorTags.length > 0
? new Set(config.anticipatedErrorTags)
: undefined
const spans: SpanBuffer = makeSpanBuffer({ dropSpan, anticipatedErrorTags })
const logs: LogBuffer = makeLogBuffer({ excludeLogSpans: config.excludeLogSpans })
const layer = Layer.mergeAll(spans.tracerLayer, logs.loggerLayer)

Expand Down
Loading
Loading