Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ jobs:
- name: 📥 Download deps
run: pnpm install --frozen-lockfile --filter trigger.dev...

# Prisma clients generate concurrently and share one engine binary in the
# pnpm store; the generate script retries the transient Windows EPERM race.
- name: 📀 Generate Prisma Client
run: pnpm run generate

Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1727,10 +1727,10 @@ const EnvironmentSchema = z
RUN_REPLICATION_LEGACY_ORIGIN_GENERATION: z.coerce.number().int().default(0),
RUN_REPLICATION_NEW_ORIGIN_GENERATION: z.coerce.number().int().default(1),

// Run-ops KSUID mint cutover — per-env, canary-first, OFF by default.
// Even when on, an env mints KSUID only if its per-org runOpsMintKsuid flag is
// "ksuid" AND isSplitEnabled() is true. Cache mirrors REALTIME_BACKEND_FLAG_CACHE_*.
RUN_OPS_MINT_KSUID_ENABLED: BoolEnv.default(false),
// Run-ops id mint cutover — per-env, canary-first, OFF by default.
// Even when on, an env mints run-ops ids only if its per-org runOpsMintKind flag is
// "runOpsId" AND isSplitEnabled() is true. Cache mirrors REALTIME_BACKEND_FLAG_CACHE_*.
RUN_OPS_MINT_ENABLED: BoolEnv.default(false),
Comment thread
d-cs marked this conversation as resolved.
RUN_OPS_MINT_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
RUN_OPS_MINT_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(10_000),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class ApiBatchResultsPresenter extends BasePresenter {
}

// Split: resolve the batch row new-first then off the legacy READ REPLICA only (a batch id may
// be cuid or ksuid, and a cuid-shaped id can still have been backfilled onto NEW, so id-shape
// be cuid or run-ops id, and a cuid-shaped id can still have been backfilled onto NEW, so id-shape
// residency is not authoritative for the row — the new-first-then-legacy probe is), then
// hydrate every member run independently via the per-run read-through primitive.
async #callSplit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class ApiWaitpointPresenter extends BasePresenter {
// Public waitpoint retrieve. Split on: new run-ops client first, then the LEGACY
// RUN-OPS READ REPLICA ONLY on a new-probe miss — never the legacy primary.
// Split off (single-DB / self-host): one plain waitpoint.findFirst against the replica
// (passthrough). The waitpointId is the residency-classifiable KSUID id (the route
// (passthrough). The waitpointId is the residency-classifiable run-ops id (the route
// pre-decodes the friendlyId via WaitpointId.toId).
const hydrate = (client: PrismaReplicaClient) =>
client.waitpoint.findFirst({
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class BatchListPresenter extends BasePresenter {
}
}

// codepoint comparator (NEVER localeCompare): BatchTaskRun.id is ASCII (cuid or ksuid).
// codepoint comparator (NEVER localeCompare): BatchTaskRun.id is ASCII (cuid or run-ops id).
const sign = direction === "forward" ? 1 : -1; // forward => DESC; backward => ASC
return Array.from(byId.values())
.sort((a, b) => (a.id < b.id ? sign : a.id > b.id ? -sign : 0))
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/BatchPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class BatchPresenter extends BasePresenter {
const batchResult = await readThrough<BatchRow>({
// The read-through key; here it is the batch friendlyId. A cuid-shaped batch friendlyId
// classifies as LEGACY and the read-through probes both stores (new first, then legacy
// replica); a ksuid-shaped one (cut-over orgs) classifies as NEW and reads only the new
// replica); a run-ops-shaped one (cut-over orgs) classifies as NEW and reads only the new
// store — either way the row is found on the DB that owns it.
runId: batchId,
environmentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const { action, loader } = createActionApiRoute(
}
}

// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a ksuid
// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a run-ops id
// run's input-stream waitpoint lands on the run's DB and its block edge resolves.
const result = await engine.createManualWaitpoint({
runId: run.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const { action, loader } = createActionApiRoute(
}
}

// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a ksuid
// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a run-ops id
// run's session-stream waitpoint lands on the run's DB and its block edge resolves.
const result = await engine.createManualWaitpoint({
runId: run.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const { action } = createActionApiRoute(
: undefined;

const { waitpoint } = await engine.createDateTimeWaitpoint({
// Co-locate the waitpoint with the run that blocks on it (run-ops split): a ksuid run lives
// Co-locate the waitpoint with the run that blocks on it (run-ops split): a run-ops run lives
// on the dedicated DB, but the minted waitpoint id is always a cuid, so without the run id
// the waitpoint would route to the control-plane DB and the block edge would never resolve.
runId: run.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ function makeDeps(over: Partial<ResolveIdempotencyClientDeps>): ResolveIdempoten
fallbackClient: FALLBACK,
newClient: NEW_CLIENT,
legacyClient: LEGACY_CLIENT,
resolveMintKind: async () => "ksuid",
resolveMintKind: async () => "runOpsId",
classify: (id) => {
if (id.length === 27) return "NEW";
if (id.length === 26 && id[25] === "1") return "NEW";
if (id.length === 25) return "LEGACY";
throw new Error(`unclassifiable: ${id.length}`);
},
Expand All @@ -38,10 +38,10 @@ describe("resolveIdempotencyDedupClient", () => {
expect(client).toBe(FALLBACK);
});

it("routes a root run to the NEW client when the env mints ksuid", async () => {
it("routes a root run to the NEW client when the env mints run-ops ids", async () => {
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: undefined },
makeDeps({ resolveMintKind: async () => "ksuid" })
makeDeps({ resolveMintKind: async () => "runOpsId" })
);
expect(client).toBe(NEW_CLIENT);
});
Expand All @@ -54,10 +54,10 @@ describe("resolveIdempotencyDedupClient", () => {
expect(client).toBe(LEGACY_CLIENT);
});

it("routes a child to the NEW client when the ksuid parent is NEW-resident", async () => {
const ksuidParent = RunId.toFriendlyId("a".repeat(27));
it("routes a child to the NEW client when the run-ops parent is NEW-resident", async () => {
const runOpsParent = RunId.toFriendlyId("a".repeat(24) + "01");
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: ksuidParent },
{ environmentForMint: env, parentRunFriendlyId: runOpsParent },
makeDeps({ resolveMintKind: async () => "cuid" }) // mint flag must NOT win for a child
);
expect(client).toBe(NEW_CLIENT);
Expand All @@ -67,7 +67,7 @@ describe("resolveIdempotencyDedupClient", () => {
const cuidParent = RunId.toFriendlyId("b".repeat(25));
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
makeDeps({ resolveMintKind: async () => "ksuid" }) // mint flag must NOT win for a child
makeDeps({ resolveMintKind: async () => "runOpsId" }) // mint flag must NOT win for a child
);
expect(client).toBe(LEGACY_CLIENT);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ownerEngine, RunId, type Residency } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";

type MintKind = "cuid" | "ksuid";
type MintKind = "cuid" | "runOpsId";

export type ResolveIdempotencyClientDeps = {
isSplitEnabled: () => Promise<boolean>;
Expand Down Expand Up @@ -52,5 +52,5 @@ export async function resolveIdempotencyDedupClient(
}

const kind = await deps.resolveMintKind(args.environmentForMint);
return clientFor(kind === "ksuid" ? "NEW" : "LEGACY");
return clientFor(kind === "runOpsId" ? "NEW" : "LEGACY");
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class StreamBatchItemsService extends WithRunEngine {
const batchId = this.parseBatchFriendlyId(batchFriendlyId);

// Validate batch exists and belongs to this environment. Routed by batch id so a
// ksuid (NEW-resident) batch is found on the owning DB; the env-ownership check that
// run-ops id (NEW-resident) batch is found on the owning DB; the env-ownership check that
// was in the where clause is enforced app-side below.
const batch = await this._engine.runStore.findBatchTaskRunById(batchId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { RunEngine } from "@internal/run-engine";
import { TaskRunErrorCodes, type TaskRunError } from "@trigger.dev/core/v3";
import { RunId, generateKsuidId } from "@trigger.dev/core/v3/isomorphic";
import { RunId, generateRunOpsId } from "@trigger.dev/core/v3/isomorphic";
import type {
PrismaClientOrTransaction,
RuntimeEnvironmentType,
Expand Down Expand Up @@ -85,7 +85,7 @@ export class TriggerFailedTaskService {
}

// Mint a failed run's friendlyId. The id-kind decides which store the run is
// born in (cuid → legacy store, ksuid → new store); the whole subgraph of a
// born in (cuid → legacy store, run-ops id → new store); the whole subgraph of a
// run must agree. Root failed runs mint by the environment's setting; child
// failed runs inherit the parent's current store so they never split.
private async mintFailedRunFriendlyId(args: {
Expand All @@ -102,8 +102,8 @@ export class TriggerFailedTaskService {
orgFeatureFlags: args.orgFeatureFlags,
});

return mintKind === "ksuid"
? RunId.toFriendlyId(generateKsuidId())
return mintKind === "runOpsId"
? RunId.toFriendlyId(generateRunOpsId())
: RunId.generate().friendlyId;
}

Expand Down
23 changes: 16 additions & 7 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TriggerTraceContext,
} from "@trigger.dev/core/v3";
import {
generateKsuidId,
generateRunOpsId,
parseTraceparent,
RunId,
serializeTraceparent,
Expand Down Expand Up @@ -129,16 +129,21 @@ export class RunEngineTriggerTaskService {
}

// Mint a new run's friendlyId. The id-kind decides which store the run is born
// in (cuid → legacy store, ksuid → new store), so the whole subgraph of a run
// in (cuid → legacy store, run-ops id → new store), so the whole subgraph of a run
// must agree. Two cases:
//
// - ROOT run (no parent): mint by the environment's cutover setting.
// - CHILD run (has a parent): inherit the parent's residency by id-shape, so a
// parent and child never split across stores (ksuid parent → ksuid child,
// parent and child never split across stores (run-ops parent → run-ops child,
// cuid parent → cuid child).
// `region` is the caller-requested region (body.options.region). The id is
// minted before the worker queue is resolved (the idempotency concern needs
// the friendlyId first), so the stamped region char reflects the requested
// region — or the default char when the run targets the default region.
private async mintRunFriendlyId(
environment: AuthenticatedEnvironment,
parentRunFriendlyId?: string
parentRunFriendlyId?: string,
region?: string
): Promise<string> {
const mintKind = parentRunFriendlyId
? resolveInheritedMintKind(parentRunFriendlyId)
Expand All @@ -148,8 +153,8 @@ export class RunEngineTriggerTaskService {
orgFeatureFlags: environment.organization.featureFlags,
});

return mintKind === "ksuid"
? RunId.toFriendlyId(generateKsuidId())
return mintKind === "runOpsId"
? RunId.toFriendlyId(generateRunOpsId(region))
: RunId.generate().friendlyId;
}

Expand Down Expand Up @@ -183,7 +188,11 @@ export class RunEngineTriggerTaskService {
// parent is present, else the environment's setting.
const runFriendlyId =
options?.runFriendlyId ??
(await this.mintRunFriendlyId(environment, body.options?.parentRunId));
(await this.mintRunFriendlyId(
environment,
body.options?.parentRunId,
body.options?.region
));
const triggerRequest = {
taskId,
friendlyId: runFriendlyId,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/realtime/sessions.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export function serializeSession(session: Session): SessionItem {
* Skips the lookup when `currentRunId` is null.
*
* Resolves `currentRunId` -> `friendlyId` through `runStore.findRun` so a
* ksuid (NEW-DB) session run resolves from its owning store rather than the
* run-ops id (NEW-DB) session run resolves from its owning store rather than the
* control-plane replica. Mirrors `sessionRunManager.server.ts`.
* Tenant-scoped because `Session.currentRunId` is a no-FK pointer.
*/
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ export function buildReplicationSources(args: {

/**
* The residency-split gate and the `#new`->ClickHouse replication gate are
* independent env vars. If split is on (ksuid runs are minted on the new DB) but the
* constructed sources[] has no `"new"` source, every ksuid run is silently missing from
* independent env vars. If split is on (run-ops runs are minted on the new DB) but the
* constructed sources[] has no `"new"` source, every run-ops run is silently missing from
* ClickHouse — under-counting all CH-fronted usage/cost/metrics aggregates with no
* Postgres fallback. Couple the gates at boot: this misconfiguration must fail loudly
* rather than ship a fleet-wide under-count.
Expand All @@ -69,7 +69,7 @@ export class SplitReplicationMisconfiguredError extends Error {
constructor() {
super(
'RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no "new" source: ' +
"ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " +
"run-ops runs on the new DB would not replicate to ClickHouse, under-counting every " +
"ClickHouse-fronted aggregate. Enable the new replication source " +
"(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off."
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {

// Preserve the ClickHouse keyset order (created_at desc, run_id desc) by re-ordering the
// hydrated rows to match the input `runIds`. Sorting by raw `id` was only ~chronological
// when every id was a time-prefixed cuid; a mixed cuid/ksuid page sorts the two id-spaces
// when every id was a time-prefixed cuid; a mixed cuid/run-ops id page sorts the two id-spaces
// into separate blocks, burying recent runs. Rows whose PG row is gone (e.g. past
// retention) drop out, exactly as before.
const byId = new Map(rows.map((r) => [r.id, r] as const));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ export async function convertRunListInputOptionsToFilterRunsOptions(
});
convertedOptions.period = time.period ? (parseDuration(time.period) ?? undefined) : undefined;

// Cross-DB resolution: BatchTaskRun is a RUN-OPS table. A ksuid batch resident on the
// Cross-DB resolution: BatchTaskRun is a RUN-OPS table. A run-ops batch resident on the
// dedicated run-ops DB must resolve via the store's NEW->LEGACY probe — a single control-plane
// client would miss it and leave the friendlyId in the ClickHouse `batch_id` filter, matching
// nothing. Split off / self-host: the store is a passthrough over the one client.
Expand Down
19 changes: 10 additions & 9 deletions apps/webapp/app/utils/friendlyId.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ import { describe, expect, it } from "vitest";
import {
BatchId,
generateFriendlyId,
generateKsuidId,
generateRunOpsId,
RunId,
} from "@trigger.dev/core/v3/isomorphic";
import { isValidFriendlyId, makeFriendlyIdValidator } from "./friendlyId";

describe("isValidFriendlyId", () => {
it("accepts every id generation the real generators produce", () => {
// nanoid (legacy V1), cuid (run-engine), ksuid (run-ops split)
// nanoid (legacy V1), cuid (run-engine), run-ops v1 (run-ops split)
expect(isValidFriendlyId(generateFriendlyId("run"), "run")).toBe(true);
expect(isValidFriendlyId(RunId.generate().friendlyId, "run")).toBe(true);
expect(isValidFriendlyId(RunId.toFriendlyId(generateKsuidId()), "run")).toBe(true);
expect(isValidFriendlyId(RunId.toFriendlyId(generateRunOpsId()), "run")).toBe(true);

expect(isValidFriendlyId(generateFriendlyId("batch"), "batch")).toBe(true);
expect(isValidFriendlyId(BatchId.generate().friendlyId, "batch")).toBe(true);
expect(isValidFriendlyId(BatchId.toFriendlyId(generateKsuidId()), "batch")).toBe(true);
expect(isValidFriendlyId(BatchId.toFriendlyId(generateRunOpsId()), "batch")).toBe(true);
});

it("accepts each valid body length (21 nanoid, 25 cuid, 27 ksuid)", () => {
it("accepts each valid body length (21 nanoid, 25 cuid, 26 run-ops v1, 27 legacy base62)", () => {
expect(isValidFriendlyId("run_" + "a".repeat(21), "run")).toBe(true);
expect(isValidFriendlyId("run_" + "a".repeat(25), "run")).toBe(true);
expect(isValidFriendlyId("run_" + "a".repeat(26), "run")).toBe(true);
expect(isValidFriendlyId("run_" + "a".repeat(27), "run")).toBe(true);
});

it("accepts mixed-case (uppercase) ksuid bodies", () => {
it("accepts mixed-case (uppercase) legacy base62 bodies", () => {
expect(isValidFriendlyId("run_2ABCdefGHI0123456789jklMN", "run")).toBe(true);
});

Expand All @@ -39,7 +40,7 @@ describe("isValidFriendlyId", () => {
});

it("rejects body lengths that match no generator", () => {
for (const len of [0, 20, 22, 24, 26, 28]) {
for (const len of [0, 20, 22, 24, 28]) {
expect(isValidFriendlyId("run_" + "a".repeat(len), "run")).toBe(false);
}
});
Expand All @@ -64,8 +65,8 @@ describe("makeFriendlyIdValidator", () => {
it("returns undefined for a valid id of any generation", () => {
expect(validateRunId(generateFriendlyId("run"))).toBeUndefined();
expect(validateRunId(RunId.generate().friendlyId)).toBeUndefined();
expect(validateRunId(RunId.toFriendlyId(generateKsuidId()))).toBeUndefined();
expect(validateBatchId(BatchId.toFriendlyId(generateKsuidId()))).toBeUndefined();
expect(validateRunId(RunId.toFriendlyId(generateRunOpsId()))).toBeUndefined();
expect(validateBatchId(BatchId.toFriendlyId(generateRunOpsId()))).toBeUndefined();
});

it("reports a wrong prefix distinctly from a wrong shape", () => {
Expand Down
18 changes: 11 additions & 7 deletions apps/webapp/app/utils/friendlyId.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import { CUID_LENGTH, KSUID_LENGTH } from "@trigger.dev/core/v3/isomorphic";
import { CUID_LENGTH, RUN_OPS_ID_LENGTH } from "@trigger.dev/core/v3/isomorphic";

// The body after `<prefix>_` is a base62 id; three generator lengths remain
// valid in existing data and must all be accepted: 21 (nanoid), 25 (cuid),
// 27 (ksuid). cuid/ksuid come from core so this tracks any future change.
// The body after `<prefix>_` is an alphanumeric id; four generator lengths
// remain valid in existing data and must all be accepted: 21 (nanoid),
// 25 (cuid), 26 (run-ops v1 base32hex), 27 (pre-cutover base62, kept so old
// ids still pass filter validation). cuid/run-ops come from core so this
// tracks any future change.
const NANOID_BODY_LENGTH = 21;
const LEGACY_BASE62_BODY_LENGTH = 27;
const VALID_BODY_LENGTHS: ReadonlySet<number> = new Set([
NANOID_BODY_LENGTH,
CUID_LENGTH,
KSUID_LENGTH,
RUN_OPS_ID_LENGTH,
LEGACY_BASE62_BODY_LENGTH,
]);

const BASE62 = /^[0-9A-Za-z]+$/;
const ALPHANUMERIC = /^[0-9A-Za-z]+$/;

export function isValidFriendlyId(value: string, prefix: string): boolean {
const marker = `${prefix}_`;
if (!value.startsWith(marker)) return false;
const body = value.slice(marker.length);
return VALID_BODY_LENGTHS.has(body.length) && BASE62.test(body);
return VALID_BODY_LENGTHS.has(body.length) && ALPHANUMERIC.test(body);
}

export function makeFriendlyIdValidator(prefix: string, label: string) {
Expand Down
Loading
Loading