Skip to content

Commit 386b4f6

Browse files
authored
feat(webapp): per-org S2 basin migration (#3516)
## Summary Move from a single shared S2 basin to **per-org basins** with retention tied to the org's billing plan. Stops S2 from deleting streams out from under live chat sessions when basin retention fires before the chat ends, and unlocks per-org cost attribution. OSS / s2-lite installs are unaffected: provisioning is gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED` (default `false`), and the read precedence falls back to the global basin env var when an entity has no stamped basin. ``` basin = run.streamBasinName ?? session.streamBasinName ?? env.REALTIME_STREAMS_S2_BASIN ``` ## Design Three nullable `streamBasinName` columns (`Organization`, `TaskRun`, `Session`) plus a provisioner that idempotently creates the basin and reconfigures retention on plan changes. The trigger and session-create paths stamp the org's basin onto new rows; the realtime read path picks the basin from the entity context. Admin routes back-fill existing orgs and force-reconfigure a single org. ## Test plan - [x] `pnpm run typecheck --filter webapp --filter @internal/run-engine` - [x] Backfill admin route end-to-end (provision + DB stamp + S2 basin config). - [x] Reconfigure on plan change (all retention tiers). - [x] chat.agent multi-turn drives streams into the per-org basin. - [x] Legacy fallback when entity has no stamped basin. - [x] Provisioner is a no-op when the flag is off.
1 parent 3d418a9 commit 386b4f6

23 files changed

Lines changed: 547 additions & 58 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Per-org S2 stream basins with retention tied to the org's billing plan, gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution.

apps/webapp/app/env.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@ import { MachinePresetName } from "@trigger.dev/core/v3";
33
import { BoolEnv } from "./utils/boolEnv";
44
import { isValidDatabaseUrl } from "./utils/db";
55
import { isValidRegex } from "./utils/regex";
6+
import { isValidDuration } from "./services/realtime/duration.server";
7+
8+
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
9+
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
10+
function durationString() {
11+
return z
12+
.string()
13+
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
14+
}
615

716
// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
817
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
@@ -1506,6 +1515,16 @@ const EnvironmentSchema = z
15061515
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
15071516
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
15081517
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1518+
// When "true", provision a dedicated S2 basin per org and stamp
1519+
// `streamBasinName` on new rows. Off keeps everything on the single
1520+
// basin defined by `REALTIME_STREAMS_S2_BASIN`.
1521+
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"),
1522+
// Per-org basin name = `{prefix}-{env}-org-{orgId}`.
1523+
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
1524+
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
1525+
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
1526+
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
1527+
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
15091528
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
15101529
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
15111530

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
4+
import { isValidDuration } from "~/services/realtime/duration.server";
5+
import {
6+
deprovisionBasinForOrg,
7+
ensureBasinForOrg,
8+
} from "~/services/realtime/streamBasinProvisioner.server";
9+
10+
const ParamsSchema = z.object({ organizationId: z.string() });
11+
12+
const BodySchema = z.discriminatedUnion("action", [
13+
z.object({
14+
action: z.literal("ensure"),
15+
retention: z
16+
.string()
17+
.refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y"),
18+
}),
19+
z.object({ action: z.literal("deprovision") }),
20+
]);
21+
22+
export async function action({ request, params }: ActionFunctionArgs) {
23+
await requireAdminApiRequest(request);
24+
25+
const { organizationId } = ParamsSchema.parse(params);
26+
27+
let parsed: z.infer<typeof BodySchema>;
28+
try {
29+
const text = await request.text();
30+
const raw = text.length > 0 ? JSON.parse(text) : {};
31+
const result = BodySchema.safeParse(raw);
32+
if (!result.success) {
33+
return json({ ok: false, error: result.error.flatten() }, { status: 400 });
34+
}
35+
parsed = result.data;
36+
} catch {
37+
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
38+
}
39+
40+
if (parsed.action === "ensure") {
41+
const result = await ensureBasinForOrg(organizationId, parsed.retention);
42+
return json({ ok: true, ...result });
43+
}
44+
45+
const result = await deprovisionBasinForOrg(organizationId);
46+
return json({ ok: true, ...result });
47+
}

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const { action, loader } = createActionApiRoute(
4040
id: true,
4141
friendlyId: true,
4242
realtimeStreamsVersion: true,
43+
streamBasinName: true,
4344
},
4445
});
4546

@@ -98,7 +99,8 @@ const { action, loader } = createActionApiRoute(
9899
try {
99100
const realtimeStream = getRealtimeStreamInstance(
100101
authentication.environment,
101-
run.realtimeStreamsVersion
102+
run.realtimeStreamsVersion,
103+
{ run }
102104
);
103105

104106
const records = await realtimeStream.readRecords(

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,14 @@ const { action, loader } = createActionApiRoute(
123123
// and remove the pending registration.
124124
if (!result.isCached) {
125125
try {
126-
// Session streams are always v2 (S2) — the writer in
127-
// `appendPartToSessionStream` and the SSE subscribe both
128-
// hardcode "v2", so the race-check reader has to match.
129-
// Don't fall through to the run's own `realtimeStreamsVersion`,
130-
// which only describes the run's run-scoped streams.
131-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
126+
// Match the writer's basin resolution exactly: session if the
127+
// row exists, otherwise the org so we look at the same basin a
128+
// fresh row would be stamped with. Mirrors the PUT/GET sister
129+
// routes in `realtime.v1.sessions.$session.$io.ts`.
130+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
131+
session: maybeSession,
132+
organization: maybeSession ? null : authentication.environment.organization,
133+
});
132134

133135
if (realtimeStream instanceof S2RealtimeStreams) {
134136
const records = await realtimeStream.readSessionStreamRecords(

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ const { action } = createActionApiRoute(
167167
runtimeEnvironmentId: authentication.environment.id,
168168
environmentType: authentication.environment.type,
169169
organizationId: authentication.environment.organizationId,
170+
streamBasinName: authentication.environment.organization.streamBasinName,
170171
},
171172
update: { triggerConfig: triggerConfigJson },
172173
});
@@ -186,6 +187,7 @@ const { action } = createActionApiRoute(
186187
runtimeEnvironmentId: authentication.environment.id,
187188
environmentType: authentication.environment.type,
188189
organizationId: authentication.environment.organizationId,
190+
streamBasinName: authentication.environment.organization.streamBasinName,
189191
},
190192
});
191193
}

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ const { action, loader } = createActionApiRoute(
8181
);
8282
}
8383

84-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
84+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
85+
session,
86+
});
8587

8688
if (!(realtimeStream instanceof S2RealtimeStreams)) {
8789
return json(

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ const { action } = createActionApiRoute(
5959
});
6060
}
6161

62-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
62+
// No-row form: resolve via the org so the stream initialised here
63+
// matches what later appends/subscribes will land on once the row
64+
// is created.
65+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
66+
session: maybeSession,
67+
organization: maybeSession ? null : authentication.environment.organization,
68+
});
6369

6470
if (!(realtimeStream instanceof S2RealtimeStreams)) {
6571
return new Response("Session channels require the S2 realtime backend", {
@@ -122,7 +128,11 @@ const loader = createLoaderApiRoute(
122128
},
123129
},
124130
async ({ params, request, authentication, resource }) => {
125-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
131+
// Same no-row fallback as PUT above.
132+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
133+
session: resource.row,
134+
organization: resource.row ? null : authentication.environment.organization,
135+
});
126136

127137
if (!(realtimeStream instanceof S2RealtimeStreams)) {
128138
return new Response("Session channels require the S2 realtime backend", {

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
2929
select: {
3030
id: true,
3131
friendlyId: true,
32+
streamBasinName: true,
3233
runtimeEnvironment: {
3334
include: {
3435
project: true,
@@ -64,7 +65,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
6465
}
6566

6667
// The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment
67-
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion);
68+
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion, {
69+
run,
70+
});
6871

6972
return realtimeStream.ingestData(
7073
request.body,
@@ -127,7 +130,8 @@ export const loader = createLoaderApiRoute(
127130

128131
const realtimeStream = getRealtimeStreamInstance(
129132
authentication.environment,
130-
run.realtimeStreamsVersion
133+
run.realtimeStreamsVersion,
134+
{ run }
131135
);
132136

133137
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), {

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const { action } = createActionApiRoute(
7272
realtimeStreamsVersion: true,
7373
completedAt: true,
7474
id: true,
75+
streamBasinName: true,
7576
},
7677
});
7778

@@ -102,7 +103,8 @@ const { action } = createActionApiRoute(
102103

103104
const realtimeStream = getRealtimeStreamInstance(
104105
authentication.environment,
105-
targetRun.realtimeStreamsVersion
106+
targetRun.realtimeStreamsVersion,
107+
{ run: targetRun }
106108
);
107109

108110
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);

0 commit comments

Comments
 (0)