Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .oxfmtrc.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"printWidth": 110,
"useTabs": true,
"tabWidth": 4,
"ignorePatterns": [".context", "deploy"],
"ignorePatterns": [".context", "deploy", "lib/effect-cf"],
}
2 changes: 1 addition & 1 deletion .oxlintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
}
]
},
"ignorePatterns": [".context"]
"ignorePatterns": [".context", "lib/effect-cf"]
}
2 changes: 1 addition & 1 deletion apps/alerting/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"@maple/api": "workspace:*",
"@maple/db": "workspace:*",
"@maple/domain": "workspace:*",
"@maple/effect-cloudflare": "workspace:*",
"@maple/effect-cf": "workspace:*",
"effect": "catalog:effect"
},
"devDependencies": {
Expand Down
42 changes: 42 additions & 0 deletions apps/alerting/src/lib/runtime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { Effect } from "effect"
import { ManagedRuntime } from "effect"
import type { Layer } from "effect"

/**
* Minimal shape of CF `ExecutionContext.waitUntil`.
*/
export interface ExecutionContextLike {
waitUntil(promise: Promise<unknown>): void
}

/**
* Yield one macrotask so Effect's scheduler can drain tasks queued via
* `scheduleTask(fn, 0)`. `HttpMiddleware.tracer` ends the root Server span this
* way; `scheduleTask(fn, 0)` dispatches via `setImmediate` → `setTimeout(fn, 0)`
* on Workers (a macrotask). Disposing the per-invocation runtime the moment the
* program promise resolves would race that scheduled `span.end` and leave spans
* parentless. Awaiting one `setTimeout(0)` drains the dispatcher first.
*/
const drainScheduler = () => new Promise<void>((resolve) => setTimeout(resolve, 0))

/**
* Run a single Effect program to completion under a fresh per-invocation
* runtime. Intended for CF Worker `scheduled` handlers. Disposes the runtime
* after the program settles (draining the scheduler first) and registers the
* whole thing with `ctx.waitUntil`. Rethrows so the CF runtime reports failure.
*/
export const runScheduledEffect = <A, E, R>(
layer: Layer.Layer<R, unknown, never>,
program: Effect.Effect<A, E, R>,
ctx: ExecutionContextLike,
): Promise<A> => {
const runtime = ManagedRuntime.make(layer)
const done = runtime.runPromise(program).finally(async () => {
await drainScheduler()
await runtime.dispose().catch((err) => {
console.error("[alerting] scheduled runtime dispose failed:", err)
})
})
ctx.waitUntil(done.catch(() => undefined))
return done
}
19 changes: 10 additions & 9 deletions apps/alerting/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ import {
QueryEngineService,
ServiceMapRollupService,
WarehouseQueryService,
WorkerEnvironmentLive,
} from "@maple/api/alerting"
import * as MapleCloudflareSDK from "@maple-dev/effect-sdk/cloudflare"
import {
runScheduledEffect,
WorkerConfigProviderLayer,
WorkerEnvironment,
} from "@maple/effect-cloudflare"
import { WorkerConfig } from "@maple/effect-cf"
import { Cause, Effect, Layer } from "effect"
import { runScheduledEffect } from "./lib/runtime"

// Module-scope construction; `flush(env)` resolves env on first call. The
// in-isolate buffers coalesce concurrent scheduled ticks into one POST per
// signal.
const telemetry = MapleCloudflareSDK.make({ serviceName: "alerting" })

const buildLayer = (_env: Record<string, unknown>) => {
const ConfigLive = WorkerConfigProviderLayer
const buildLayer = () => {
// `WorkerConfig.providerLayer` reads the env-backed Effect ConfigProvider via
// `WorkerEnvironment`; `DatabaseD1Live` reads the `MAPLE_DB` binding the same
// way. Both get `WorkerEnvironment` from `WorkerEnvironmentLive`.
const ConfigLive = WorkerConfig.providerLayer.pipe(Layer.provide(WorkerEnvironmentLive))
const EnvLive = Env.layer.pipe(Layer.provide(ConfigLive))

const DatabaseLive = DatabaseD1Live.pipe(Layer.provide(WorkerEnvironment.layer))
const DatabaseLive = DatabaseD1Live.pipe(Layer.provide(WorkerEnvironmentLive))

const BaseLive = Layer.mergeAll(EnvLive, DatabaseLive)

Expand Down Expand Up @@ -231,7 +232,7 @@ export default {
? onboardingTick
: Effect.all([alertTick, errorTick], { concurrency: 2, discard: true })
try {
await runScheduledEffect(buildLayer(env), program, ctx)
await runScheduledEffect(buildLayer(), program, ctx)
} finally {
ctx.waitUntil(telemetry.flush(env))
}
Expand Down
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"@maple-dev/effect-sdk": "workspace:*",
"@maple/db": "workspace:*",
"@maple/domain": "workspace:*",
"@maple/effect-cloudflare": "workspace:*",
"@maple/effect-cf": "workspace:*",
"@maple/email": "workspace:*",
"@maple/infra": "workspace:*",
"@maple/query-engine": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/alerting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ export { OrgClickHouseSettingsService } from "./services/OrgClickHouseSettingsSe
export { QueryEngineService } from "./services/QueryEngineService"
export { ServiceMapRollupService } from "./services/ServiceMapRollupService"
export { WarehouseQueryService } from "./lib/WarehouseQueryService"
export { WorkerEnvironment } from "./lib/WorkerEnvironment"
export { WorkerEnvironment, WorkerEnvironmentLive } from "./lib/WorkerEnvironment"
26 changes: 13 additions & 13 deletions apps/api/src/lib/DatabaseD1Live.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import { createMapleD1Client, type CloudflareD1Database } from "@maple/db/client"
import { createMapleD1Client } from "@maple/db/client"
import { migrateAlertQuerySignalTypes, reshapeDashboardWidgets } from "@maple/db/migrate"
import { D1Database as D1DatabaseToken } from "@maple/effect-cloudflare/d1-connection"
import { Effect, Layer } from "effect"
import { Database, type DatabaseClient, type DatabaseShape, toDatabaseError } from "./DatabaseLive"

const MAPLE_DB = D1DatabaseToken("MAPLE_DB")
import { MapleDb } from "./MapleD1"

const makeD1Database = Effect.gen(function* () {
const conn = yield* D1DatabaseToken.bind(MAPLE_DB)
const binding = yield* conn.raw
if (!binding) {
return yield* Effect.die(new Error("Missing worker D1 binding: MAPLE_DB"))
}
// `MapleDb` resolves the validated raw `MAPLE_DB` D1 binding. A missing or
// malformed binding fails `MapleDb.layer` with `BindingNotFound/Validation`,
// converted to a defect via `Layer.orDie` below — preserving the original
// fail-fast (surfaced as a boot error in `wrangler tail`).
const binding = yield* MapleDb

const client = createMapleD1Client(
binding as unknown as CloudflareD1Database,
) as unknown as DatabaseClient
const client = createMapleD1Client(binding) as unknown as DatabaseClient

// The D1 worker never calls runMigrations; the data migration is guarded by
// the _maple_data_migrations table, so every later boot is a single SELECT.
Expand Down Expand Up @@ -45,4 +41,8 @@ const makeD1Database = Effect.gen(function* () {
} satisfies DatabaseShape)
})

export const DatabaseD1Live = Layer.effect(Database, makeD1Database)
// Self-provides the D1 binding layer (orDie'd) so the only remaining
// requirement is `WorkerEnvironment`, which `Worker.make` supplies from `env`.
export const DatabaseD1Live = Layer.effect(Database, makeD1Database).pipe(
Layer.provide(Layer.orDie(MapleDb.layer)),
)
27 changes: 27 additions & 0 deletions apps/api/src/lib/MapleD1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { CloudflareD1Database } from "@maple/db/client"
import { Binding } from "@maple/effect-cf"

// Structural guard for a Cloudflare D1 binding. effect-cf's `D1.Service` wraps
// `@effect/sql-d1` (for `sqlLayer()`); maple feeds the raw binding to drizzle
// instead, so we use the lower-level `Binding.Service` directly and keep
// `@effect/sql-d1` out of the worker bundle.
const isD1Database = (value: unknown): value is CloudflareD1Database => {
if (typeof value !== "object" || value === null) return false
const resource = value as Record<string, unknown>
return (
typeof resource.prepare === "function" &&
typeof resource.batch === "function" &&
typeof resource.exec === "function"
)
}

/**
* Validated `MAPLE_DB` D1 binding. `yield* MapleDb` resolves the raw
* `CloudflareD1Database`; provide via `MapleDb.layer` (requires
* `WorkerEnvironment`, supplied by `Worker.make`).
*/
export class MapleDb extends Binding.Service<MapleDb>()(
"@maple/api/MapleDb",
"MAPLE_DB",
isD1Database,
) {}
24 changes: 19 additions & 5 deletions apps/api/src/lib/WorkerEnvironment.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
// Thin re-export of the runtime-shared `WorkerEnvironment` from
// `@maple/effect-cloudflare`. The shared service uses the same tag
// (`"Cloudflare.Workers.WorkerEnvironment"`) so provision is compatible with
// any prior in-tree usage.
export { WorkerEnvironment } from "@maple/effect-cloudflare/worker-environment"
import { WorkerEnvironment } from "@maple/effect-cf"
import { Effect, Layer } from "effect"

// effect-cf's `WorkerEnvironment` is a bare Context tag. We provide it from the
// `cloudflare:workers` runtime `env` via a dynamic import + fallback, so
// non-worker contexts (tsc, vitest) don't choke on the bare specifier. Mirrors
// the old `@maple/effect-cloudflare` `WorkerEnvironment.layer`.
const workerEnv = Effect.promise(() =>
import("cloudflare:workers")
.then((m) => m.env as unknown as Record<string, unknown>)
.catch(() => ({}) as Record<string, unknown>),
)

export { WorkerEnvironment }

export const WorkerEnvironmentLive: Layer.Layer<WorkerEnvironment> = Layer.effect(
WorkerEnvironment,
workerEnv as Effect.Effect<never>,
)
Loading
Loading