Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions apps/api/alchemy.run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ export const createMapleApi = async ({ stage, domains }: CreateMapleApiOptions)
url: true,
adopt: true,
routes: domains.api ? [{ pattern: `${domains.api}/*`, adopt: true }] : undefined,
// Periodic VCS sync backstop (every 12h) — enqueues a refresh per installation; see worker.ts `scheduled`.
crons: ["0 */12 * * *"],
// Cron schedules — dispatched by `event.cron` in worker.ts `scheduled`:
// - "0 *\/12 * * *": VCS sync backstop (enqueues a refresh per installation).
// - "0 6 * * *": daily billing-suspension reconcile (overdue ≥3d + never-paid → stop ingest).
crons: ["0 */12 * * *", "0 6 * * *"],
eventSources: [
{
queue: vcsSyncQueue,
Expand Down Expand Up @@ -195,6 +197,7 @@ export const createMapleApi = async ({ stage, domains }: CreateMapleApiOptions)
...optionalPlain("CLERK_PUBLISHABLE_KEY"),
...optionalSecret("CLERK_JWT_KEY"),
...optionalSecret("AUTUMN_SECRET_KEY"),
...optionalSecret("AUTUMN_WEBHOOK_SECRET"),
...optionalSecret("SD_INTERNAL_TOKEN"),
...optionalSecret("INTERNAL_SERVICE_TOKEN"),
...optionalPlain("HAZEL_API_BASE_URL"),
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { HttpMiddleware, HttpRouter, HttpServerResponse } from "effect/unstable/
import { HttpApiBuilder, HttpApiScalar } from "effect/unstable/httpapi"
import { McpLive } from "./mcp/app"
import { HttpBillingLive, HttpBillingPublicLive } from "./routes/billing.http"
import { BillingWebhookRouter } from "./routes/billing-webhook.http"
import { HttpAiTriageLive } from "./routes/ai-triage.http"
import { HttpAlertsLive } from "./routes/alerts.http"
import { HttpAnomaliesLive } from "./routes/anomalies.http"
Expand Down Expand Up @@ -36,6 +37,7 @@ import { HttpWarehouseLive } from "./routes/warehouse.http"
import { AiTriageService } from "./services/AiTriageService"
import { AlertRuntime, AlertsService } from "./services/AlertsService"
import { AnomalyDetectionService } from "./services/AnomalyDetectionService"
import { BillingSuspensionService } from "./services/BillingSuspensionService"
import { BucketCacheService, EdgeCacheService } from "@maple/query-engine/caching"
import { CacheBackendLive } from "./lib/CacheBackendLive"
import { ErrorsService } from "./services/ErrorsService"
Expand Down Expand Up @@ -100,6 +102,7 @@ const CoreServicesLive = Layer.mergeAll(
OrgIngestKeysService.layer,
OrgClickHouseSettingsService.layer,
OrganizationService.layer,
BillingSuspensionService.layer,
// Shared with ScrapeTargetsService via layer memoization so the proxy and
// the internal target list resolve sub-targets from one discovery cache.
PlanetScaleDiscoveryService.layer,
Expand Down Expand Up @@ -237,6 +240,7 @@ export const AllRoutes = Layer.mergeAll(
PrometheusScrapeProxyRouter,
ScraperInternalRouter,
VcsWebhookRouter,
BillingWebhookRouter,
McpLive,
HealthRouter,
McpGetFallback,
Expand Down
62 changes: 62 additions & 0 deletions apps/api/src/billing-suspension-runtime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare"
import { WorkerConfigProviderLayer, WorkerEnvironment } from "@maple/effect-cloudflare"
import { Cause, Effect, Layer } from "effect"
import { DatabasePgLive } from "./lib/DatabasePgLive"
import { Env } from "./lib/Env"
import { BillingSuspensionService } from "./services/BillingSuspensionService"

// ---------------------------------------------------------------------------
// Per-invocation runtime for the daily billing-suspension reconcile cron.
// Mirrors `vcs-sync-runtime.ts`: its own light layer graph (NOT the fetch
// path's MainLive) so the cron invocation stays within the startup CPU budget.
// ---------------------------------------------------------------------------

const telemetry = MapleCloudflareSDK.make({
serviceName: "maple-api",
serviceNamespace: "backend",
repositoryUrl: "https://github.com/Makisuo/maple",
})

export const buildBillingSuspensionLayer = (_env: Record<string, unknown>) => {
const ConfigLive = WorkerConfigProviderLayer
const EnvLive = Env.layer.pipe(Layer.provide(ConfigLive))
const DatabaseLive = DatabasePgLive.pipe(Layer.provide(WorkerEnvironment.layer))
const Base = Layer.mergeAll(EnvLive, DatabaseLive, WorkerEnvironment.layer)

const ServiceLive = BillingSuspensionService.layer.pipe(Layer.provide(Base))

return ServiceLive.pipe(Layer.provideMerge(telemetry.layer), Layer.provideMerge(ConfigLive))
}

export const flushBillingTelemetry = (env: Record<string, unknown>) => telemetry.flush(env)

// The cron program: scan the overdue set, promote/clear per the policy.
export const runBillingSuspensionReconcile = Effect.gen(function* () {
const service = yield* BillingSuspensionService
const result = yield* service.runReconcile()
yield* Effect.annotateCurrentSpan({
"billing.reconcile.scanned": result.scanned,
"billing.reconcile.suspended": result.suspended,
"billing.reconcile.cleared": result.cleared,
"billing.reconcile.outcome": "completed",
})
yield* Effect.logInfo("[billing] suspension reconcile tick complete").pipe(
Effect.annotateLogs({
scanned: result.scanned,
suspended: result.suspended,
cleared: result.cleared,
}),
)
}).pipe(
// tapCause lets the cause propagate so `withSpan` marks the tick as Error.
Effect.tapCause((cause) =>
Effect.annotateCurrentSpan({ "billing.reconcile.outcome": "failed" }).pipe(
Effect.flatMap(() =>
Effect.logError("[billing] suspension reconcile tick failed").pipe(
Effect.annotateLogs({ error: Cause.pretty(cause) }),
),
),
),
),
Effect.withSpan("BillingSuspensionReconcile.tick"),
)
63 changes: 63 additions & 0 deletions apps/api/src/lib/AutumnClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { autumnHandler, type CustomerData } from "autumn-js/backend"
import { Effect, Schema } from "effect"
import { BillingUpstreamError } from "@maple/domain/http"

// Shared, dependency-free primitives for speaking the internal `autumn-js/backend`
// contract. Extracted from billing.http.ts so non-HTTP callers (the billing
// reconcile cron, the Autumn webhook receiver) reuse the exact same call path
// instead of re-implementing it. The HTTP billing group still owns the per-org
// edge cache (`readCustomerCached`) — that stays in billing.http.ts.

export type AutumnResult = Awaited<ReturnType<typeof autumnHandler>>

// `autumnHandler` matches its route by `method` + `path`, always POST against
// `${DEFAULT_PATH_PREFIX}/${route}` (= /api/autumn/<route>) regardless of which
// Maple endpoint fronts it, so every call here speaks that internal contract.
export const AUTUMN_PATH_PREFIX = "/api/autumn"

export const makeCallAutumn =
(secretKey: string | undefined) =>
(
route: string,
body: unknown,
customerId: string | undefined,
customerData?: CustomerData,
): Effect.Effect<AutumnResult, BillingUpstreamError> =>
secretKey === undefined
? Effect.fail(new BillingUpstreamError({ message: "Billing is not configured" }))
: Effect.tryPromise({
try: () =>
autumnHandler({
request: { url: `${AUTUMN_PATH_PREFIX}/${route}`, method: "POST", body },
customerId,
customerData,
clientOptions: { secretKey },
}),
catch: (error) =>
new BillingUpstreamError({
message: error instanceof Error ? error.message : String(error),
}),
})

// Surface a readable message for a non-2xx Autumn response (it carries a
// `{ message }` / `{ error }` body) so the client error isn't an opaque 502.
const upstreamMessage = (result: AutumnResult): string => {
const body = result.response as { message?: unknown; error?: unknown } | null
const message = body?.message ?? body?.error
return typeof message === "string" ? message : `Billing request failed (${result.statusCode})`
}

export const ensureOk = (result: AutumnResult): Effect.Effect<unknown, BillingUpstreamError> =>
result.statusCode >= 200 && result.statusCode < 300
? Effect.succeed(result.response)
: Effect.fail(new BillingUpstreamError({ message: upstreamMessage(result) }))

export const decodeUpstream = <S extends Schema.Top>(
schema: S,
value: unknown,
): Effect.Effect<S["Type"], BillingUpstreamError, S["DecodingServices"]> =>
Schema.decodeUnknownEffect(schema)(value).pipe(
Effect.mapError(
(error) => new BillingUpstreamError({ message: `Unexpected billing response: ${error}` }),
),
)
2 changes: 2 additions & 0 deletions apps/api/src/lib/Env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface EnvShape {
readonly CLERK_JWT_KEY: Option.Option<Redacted.Redacted<string>>
readonly MAPLE_ORG_ID_OVERRIDE: Option.Option<string>
readonly AUTUMN_SECRET_KEY: Option.Option<Redacted.Redacted<string>>
readonly AUTUMN_WEBHOOK_SECRET: Option.Option<Redacted.Redacted<string>>
readonly SD_INTERNAL_TOKEN: Option.Option<Redacted.Redacted<string>>
readonly INTERNAL_SERVICE_TOKEN: Option.Option<Redacted.Redacted<string>>
readonly EMAIL_FROM: string
Expand Down Expand Up @@ -84,6 +85,7 @@ const envConfig = Config.all({
CLERK_JWT_KEY: optionalRedacted("CLERK_JWT_KEY"),
MAPLE_ORG_ID_OVERRIDE: optionalString("MAPLE_ORG_ID_OVERRIDE"),
AUTUMN_SECRET_KEY: optionalRedacted("AUTUMN_SECRET_KEY"),
AUTUMN_WEBHOOK_SECRET: optionalRedacted("AUTUMN_WEBHOOK_SECRET"),
SD_INTERNAL_TOKEN: optionalRedacted("SD_INTERNAL_TOKEN"),
INTERNAL_SERVICE_TOKEN: optionalRedacted("INTERNAL_SERVICE_TOKEN"),
EMAIL_FROM: stringWithDefault("EMAIL_FROM", "Maple <notifications@noreply.maple.dev>"),
Expand Down
Loading
Loading