Skip to content

Commit 054d1af

Browse files
committed
fix(webapp): address review on per-org basin migration
- Use `org.id` (cuid, fixed-length, unique-by-construction) as the basin-name suffix instead of a truncated `org.slug`. The slug approach could silently collide two orgs whose slugs share a prefix past the truncation point, since the create call treats S2's 409 as success — a real cross-tenant isolation risk. - `resolveRetentionForOrg` now distinguishes "billing not configured" from "billing call failed". OSS / self-hosted installs (no billing client) get `defaultRetention()` and the worker job converges; cloud installs that experience a transient billing failure throw and get retried by redis-worker. Previously every install without billing hit a permafail loop. - `reconfigureBasinForOrg` throws when no S2 access token is configured instead of silently returning, so a misconfigured cloud install surfaces as a worker failure rather than stale retention. - Duration env vars (`*_RETENTION*`, `*_DELETE_ON_EMPTY_MIN_AGE`) validated at boot via a `durationString()` Zod schema, so a misconfigured value fails fast at startup instead of at first basin operation. - Admin reconfigure route's `retention` body field validated against the same duration shape — bad input is now a clean 400 rather than a 500 from `parseDuration`. - Extract duration parsing into a shared `duration.server.ts` so the env validator and the provisioner share one source of truth. Verified end-to-end with chat.agent locally — fresh chat lands in the per-org basin, no leakage to the global fallback.
1 parent d6d3586 commit 054d1af

8 files changed

Lines changed: 131 additions & 79 deletions

apps/webapp/app/env.server.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,19 @@ import { MachinePresetName } from "@trigger.dev/core/v3";
33
import { BoolEnv } from "./utils/boolEnv";
44
import { isValidDatabaseUrl } from "./utils/db";
55
import { isValidRegex } from "./utils/regex";
6+
import { isValidDuration } from "./services/realtime/duration.server";
7+
8+
/**
9+
* `z.string()` constrained to a duration string parseable by
10+
* `parseDuration` (e.g. `7d`, `30d`, `365d`, `1h`). Validated at boot
11+
* so a typo'd retention env var fails fast at startup rather than
12+
* lurking until the first basin operation.
13+
*/
14+
function durationString() {
15+
return z
16+
.string()
17+
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
18+
}
619

720
// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
821
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
@@ -1521,19 +1534,19 @@ const EnvironmentSchema = z
15211534
/// Used at org-create and as the fallback when no plan-specific
15221535
/// retention is resolved. Operators that don't run a billing API
15231536
/// only need this one.
1524-
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: z.string().default("30d"),
1537+
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
15251538
/// Plan-specific retention overrides — only consulted by the
15261539
/// optional `streamBasinRetentionByPlan` shim. Operators that
15271540
/// don't map plans to retention (OSS, self-hosted) can ignore
15281541
/// these and rely on the default above.
1529-
REALTIME_STREAMS_BASIN_RETENTION_FREE: z.string().default("7d"),
1530-
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: z.string().default("30d"),
1531-
REALTIME_STREAMS_BASIN_RETENTION_PRO: z.string().default("365d"),
1542+
REALTIME_STREAMS_BASIN_RETENTION_FREE: durationString().default("7d"),
1543+
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: durationString().default("30d"),
1544+
REALTIME_STREAMS_BASIN_RETENTION_PRO: durationString().default("365d"),
15321545
/// Storage class applied to per-org basins at create time.
15331546
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
15341547
/// `delete_on_empty_min_age` applied to per-org basins. Streams
15351548
/// that go empty for this long are reaped automatically.
1536-
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: z.string().default("1h"),
1549+
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
15371550
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
15381551
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
15391552

apps/webapp/app/models/organization.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ export async function createOrganization(
9494
try {
9595
await provisionBasinForOrg({
9696
id: organization.id,
97-
slug: organization.slug,
9897
streamBasinName: organization.streamBasinName,
9998
// No `retention` — provisioner uses `defaultRetention()`.
10099
});

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
4+
import { isValidDuration } from "~/services/realtime/duration.server";
45
import {
56
isPerOrgBasinsEnabled,
67
reconfigureBasinForOrg,
@@ -23,7 +24,10 @@ import { commonWorker } from "~/v3/commonWorker.server";
2324
const BodySchema = z
2425
.object({
2526
orgId: z.string(),
26-
retention: z.string().optional(),
27+
retention: z
28+
.string()
29+
.refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y")
30+
.optional(),
2731
})
2832
.strict();
2933

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ function initializeClient() {
4444
}
4545

4646
const client = singleton("billingClient", initializeClient);
47+
48+
/**
49+
* `true` when the billing client was instantiated — i.e. we're running
50+
* in a cloud-style install with `BILLING_API_URL` + `BILLING_API_KEY`
51+
* configured. OSS / self-hosted installs return `false` here, which
52+
* lets callers distinguish "no billing wired up, fall back to
53+
* defaults" from "billing wired up but the call failed, retry."
54+
*/
55+
export function isBillingConfigured(): boolean {
56+
return client !== undefined;
57+
}
4758
// Failures from @trigger.dev/platform billing client calls are tracked via
4859
// this metric (with low-cardinality {function, kind} labels) rather than
4960
// logged. Every task invocation hits these paths, so per-call logs were too
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Duration string parsing for stream-basin retention / delete-on-empty
3+
* configuration. Used by `streamBasinProvisioner` (to convert to S2's
4+
* integer-seconds wire format) and by `env.server.ts` (to validate
5+
* duration-shaped env vars at boot rather than at first use).
6+
*
7+
* Accepts the short forms (`7d`, `30d`, `365d`, `1h`, `90m`, `45s`,
8+
* `2w`, `1y`) and the human forms (`7days`, `1week`, `1year`).
9+
*/
10+
11+
const PATTERN =
12+
/^(\d+)\s*(s|sec|secs|seconds?|m|min|mins|minutes?|h|hour|hours?|d|day|days?|w|week|weeks?|y|year|years?)$/;
13+
14+
export function isValidDuration(input: string): boolean {
15+
return PATTERN.test(input.trim().toLowerCase());
16+
}
17+
18+
/**
19+
* Parse a duration string into seconds. Throws on garbage so a
20+
* misconfigured env var fails loudly. Use {@link isValidDuration}
21+
* for non-throwing validation (e.g. inside a Zod `.refine()`).
22+
*/
23+
export function parseDuration(input: string): number {
24+
const trimmed = input.trim().toLowerCase();
25+
const match = trimmed.match(PATTERN);
26+
if (!match) {
27+
throw new Error(`Invalid duration string: ${input}`);
28+
}
29+
const value = parseInt(match[1]!, 10);
30+
const unit = match[2]!;
31+
const multiplier =
32+
/^s/.test(unit)
33+
? 1
34+
: /^m(?:in|ins|inute|inutes)?$/.test(unit)
35+
? 60
36+
: /^h/.test(unit)
37+
? 3600
38+
: /^d/.test(unit)
39+
? 86400
40+
: /^w/.test(unit)
41+
? 604800
42+
: /^y/.test(unit)
43+
? 31_536_000
44+
: NaN;
45+
if (!Number.isFinite(multiplier)) {
46+
throw new Error(`Invalid duration unit: ${unit}`);
47+
}
48+
return value * multiplier;
49+
}

apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import type { PrismaClientOrTransaction } from "~/db.server";
3434
import { prisma } from "~/db.server";
3535
import { env } from "~/env.server";
3636
import { logger } from "~/services/logger.server";
37+
import { parseDuration } from "./duration.server";
3738

3839
export function isPerOrgBasinsEnabled(): boolean {
3940
return env.REALTIME_STREAMS_PER_ORG_BASINS_ENABLED === "true";
@@ -48,36 +49,28 @@ export function defaultRetention(): string {
4849
}
4950

5051
/**
51-
* Build the basin name for an org. Format: `{prefix}-{env}-org-{slug}`.
52-
* The org slug is already lowercase-and-hyphenated by
53-
* `createOrganization`, so it satisfies S2 basin-name rules without
54-
* further normalization. We truncate defensively to keep total length
55-
* under 63 chars (a common bucket convention; verify against S2 docs
56-
* before raising).
52+
* Build the basin name for an org. Format: `{prefix}-{env}-org-{id}`.
5753
*
58-
* Throws if `REALTIME_STREAMS_BASIN_NAME_PREFIX` +
59-
* `REALTIME_STREAMS_BASIN_NAME_ENV` are configured so long that no
60-
* room remains for the slug — without this guard, `slice(0, 0)` would
61-
* return an empty string and every org would share the same name,
62-
* silently colliding via S2's 409-on-create.
54+
* We use the org's `id` (cuid, fixed-length, unique-by-construction)
55+
* rather than the slug. Slugs are user-influenced, can change, and —
56+
* critically — could collide across orgs once truncated to fit the
57+
* S2 basin-name length cap. cuid is short (25 chars) and never
58+
* collides, so the basin name is stable and tenant-isolated by
59+
* construction.
60+
*
61+
* Format check: `triggerdotdev-prod-org-{25 chars}` is 47 chars total,
62+
* comfortably under the conventional 63-char cap. If you change the
63+
* prefix / env-name to something extreme, this still fails fast at
64+
* S2's validator.
6365
*/
64-
export function basinNameForOrg(org: { slug: string }): string {
66+
export function basinNameForOrg(org: { id: string }): string {
6567
const prefix = env.REALTIME_STREAMS_BASIN_NAME_PREFIX;
6668
const envName = env.REALTIME_STREAMS_BASIN_NAME_ENV;
67-
const head = `${prefix}-${envName}-org-`;
68-
const budget = 63 - head.length;
69-
if (budget <= 0) {
70-
throw new Error(
71-
`[streamBasinProvisioner] REALTIME_STREAMS_BASIN_NAME_PREFIX + REALTIME_STREAMS_BASIN_NAME_ENV too long: head="${head}" leaves no room for the org slug (budget=${budget}). Shorten the prefix or env-name values.`
72-
);
73-
}
74-
const slug = org.slug.slice(0, budget);
75-
return `${head}${slug}`;
69+
return `${prefix}-${envName}-org-${org.id}`;
7670
}
7771

7872
type ProvisionInput = {
7973
id: string;
80-
slug: string;
8174
/// Duration string passed straight to S2. Defaults to
8275
/// `defaultRetention()` when omitted. Caller decides; the provisioner
8376
/// has no opinion about what retention is appropriate.
@@ -156,7 +149,15 @@ export async function reconfigureBasinForOrg(
156149
if (!isPerOrgBasinsEnabled()) return;
157150

158151
const accessToken = env.REALTIME_STREAMS_S2_ACCESS_TOKEN;
159-
if (!accessToken) return;
152+
if (!accessToken) {
153+
// Per-org basins are enabled but no token is configured — that's a
154+
// misconfiguration, not a no-op condition. Throw so the worker job
155+
// surfaces in the queue's failure log instead of silently leaving
156+
// retention stale on the basin.
157+
throw new Error(
158+
"REALTIME_STREAMS_S2_ACCESS_TOKEN must be set when REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true"
159+
);
160+
}
160161

161162
const org = await prisma.organization.findFirst({
162163
where: { id: orgId },
@@ -197,8 +198,8 @@ async function s2CreateBasin(name: string, opts: CreateBasinOptions): Promise<vo
197198
create_stream_on_read: true,
198199
default_stream_config: {
199200
storage_class: opts.storageClass,
200-
retention_policy: { age: durationToSeconds(opts.retentionPolicy) },
201-
delete_on_empty: { min_age_secs: durationToSeconds(opts.deleteOnEmptyMinAge) },
201+
retention_policy: { age: parseDuration(opts.retentionPolicy) },
202+
delete_on_empty: { min_age_secs: parseDuration(opts.deleteOnEmptyMinAge) },
202203
},
203204
},
204205
};
@@ -235,7 +236,7 @@ async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions):
235236
const url = `https://aws.s2.dev/v1/basins/${encodeURIComponent(name)}`;
236237
const body = {
237238
default_stream_config: {
238-
retention_policy: { age: durationToSeconds(opts.retentionPolicy) },
239+
retention_policy: { age: parseDuration(opts.retentionPolicy) },
239240
},
240241
};
241242

@@ -258,29 +259,3 @@ async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions):
258259
throw new Error(`S2 reconfigureBasin failed: ${res.status} ${res.statusText} ${text}`);
259260
}
260261

261-
/**
262-
* Parse a short duration string (e.g. `7d`, `30d`, `365d`, `1h`, `90m`,
263-
* `45s`, `2w`) into seconds. Tolerant of `7days` and `1week` forms too.
264-
* Throws on garbage so a misconfigured env var fails loudly at first use.
265-
*/
266-
function durationToSeconds(input: string): number {
267-
const trimmed = input.trim().toLowerCase();
268-
const match = trimmed.match(/^(\d+)\s*(s|sec|secs|seconds?|m|min|mins|minutes?|h|hour|hours?|d|day|days?|w|week|weeks?|y|year|years?)$/);
269-
if (!match) {
270-
throw new Error(`Invalid duration string: ${input}`);
271-
}
272-
const value = parseInt(match[1]!, 10);
273-
const unit = match[2]!;
274-
const multiplier =
275-
/^s/.test(unit) ? 1
276-
: /^m(?:in|ins|inute|inutes)?$/.test(unit) ? 60
277-
: /^h/.test(unit) ? 3600
278-
: /^d/.test(unit) ? 86400
279-
: /^w/.test(unit) ? 604800
280-
: /^y/.test(unit) ? 31_536_000
281-
: NaN;
282-
if (!Number.isFinite(multiplier)) {
283-
throw new Error(`Invalid duration unit: ${unit}`);
284-
}
285-
return value * multiplier;
286-
}

apps/webapp/app/services/realtime/streamBasinRetentionByPlan.server.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,36 @@
1212
* path falls back to `defaultRetention()`.
1313
*/
1414
import { env } from "~/env.server";
15-
import { getCurrentPlan } from "~/services/platform.v3.server";
15+
import { getCurrentPlan, isBillingConfigured } from "~/services/platform.v3.server";
1616
import { defaultRetention } from "./streamBasinProvisioner.server";
1717

1818
/**
1919
* Resolve the retention duration for an org based on its current plan.
2020
*
21-
* - Returns the configured retention for the plan when the billing
22-
* API has data.
23-
* - Returns `defaultRetention()` when no billing client is configured
24-
* (OSS / non-cloud installs that flipped per-org basins on without
25-
* wiring billing).
26-
* - **Throws** when billing is configured but the call failed, so
27-
* the redis-worker retry kicks in and we don't silently downgrade
28-
* a paid org's retention.
21+
* - When billing is **not configured** (OSS / self-hosted installs),
22+
* returns `defaultRetention()` — the worker job converges, the
23+
* backfill completes, and operators get a sane default without
24+
* having to wire up a billing API.
25+
* - When billing **is configured** and the call succeeds, maps the
26+
* plan code to a retention duration.
27+
* - When billing **is configured** but the call failed (transient
28+
* outage / 5xx), **throws** so the redis-worker retry kicks in
29+
* and we don't silently downgrade a paid org's retention.
2930
*/
3031
export async function resolveRetentionForOrg(orgId: string): Promise<string> {
31-
const plan = await getCurrentPlan(orgId);
32+
if (!isBillingConfigured()) {
33+
// No billing wired up — operator either runs OSS or hasn't set
34+
// BILLING_API_URL / BILLING_API_KEY. Fall back to the default;
35+
// the org-create path uses the same default, so this is just the
36+
// backfill's catch-up path arriving at the same answer.
37+
return defaultRetention();
38+
}
3239

40+
const plan = await getCurrentPlan(orgId);
3341
if (plan === undefined) {
34-
// We can't tell from `getCurrentPlan` alone whether the billing
35-
// client isn't configured (OSS) or whether the call failed
36-
// (transient cloud outage). Today we conservatively throw so
37-
// cloud installs retry. OSS installs that hit this path either:
38-
// (a) flipped the per-org-basins flag on without wiring billing
39-
// and should configure `BILLING_API_URL` / `BILLING_API_KEY`,
40-
// or
41-
// (b) shouldn't be calling this at all and should pass an
42-
// explicit retention to the provisioner.
42+
// Billing client exists but the call failed. Throw so redis-worker
43+
// retries — silently defaulting to free would clip a paid org's
44+
// retention if a backfill landed during a transient billing outage.
4345
throw new Error(
4446
`[streamBasinRetentionByPlan] billing plan unavailable for org ${orgId}; will retry`
4547
);

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ function initializeWorker() {
317317
where: { id: payload.orgId },
318318
select: {
319319
id: true,
320-
slug: true,
321320
streamBasinName: true,
322321
},
323322
});

0 commit comments

Comments
 (0)