From 57db55ff2e6290e00f660d4010644fd4ce873ea7 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 1 Jul 2026 16:20:01 +0100 Subject: [PATCH 1/7] feat(run-ops): ClickHouse multi-source replication fan-in + admin ops Co-Authored-By: Claude Opus 4.8 (1M context) --- .../admin.api.v1.runs-replication.backfill.ts | 6 +- .../admin.api.v1.runs-replication.status.ts | 60 ++ .../services/runsReplicationGlobal.server.ts | 12 + .../runsReplicationInstance.server.ts | 174 +++++- .../services/runsReplicationService.server.ts | 396 +++++++++---- .../app/v3/services/adminWorker.server.ts | 6 +- .../test/runsReplicationInstance.test.ts | 420 ++++++++++++++ .../test/runsReplicationService.part8.test.ts | 531 ++++++++++++++++++ .../test/runsReplicationService.part9.test.ts | 263 +++++++++ 9 files changed, 1732 insertions(+), 136 deletions(-) create mode 100644 apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts create mode 100644 apps/webapp/test/runsReplicationInstance.test.ts create mode 100644 apps/webapp/test/runsReplicationService.part8.test.ts create mode 100644 apps/webapp/test/runsReplicationService.part9.test.ts diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts index af041353ada..002da73c625 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts @@ -5,6 +5,7 @@ import { prisma } from "~/db.server"; import { runStore } from "~/v3/runStore.server"; import { logger } from "~/services/logger.server"; import { requireAdminApiRequest } from "~/services/personalAccessToken.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; import { FINAL_RUN_STATUSES } from "~/v3/taskStatus"; @@ -40,11 +41,12 @@ export async function action({ request }: ActionFunctionArgs) { runs.push(...batchRuns); } - if (!runsReplicationInstance) { + const service = getRunsReplicationGlobal() ?? runsReplicationInstance; + if (!service) { throw new Error("Runs replication instance not found"); } - await runsReplicationInstance.backfill( + await service.backfill( runs.map((run) => ({ ...run, masterQueue: run.workerQueue, diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts new file mode 100644 index 00000000000..6d570418cc7 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts @@ -0,0 +1,60 @@ +import { type LoaderFunctionArgs, json } from "@remix-run/server-runtime"; +import Redis from "ioredis"; +import { env } from "~/env.server"; +import { requireAdminApiRequest } from "~/services/personalAccessToken.server"; +import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationGlobal.server"; + +/** + * Probes per-source replication leadership via the redlock leader-lock key, which + * is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's + * keyPrefix and once from redlock's resource string. So we prefix this connection + * with `runs-replication:logical-replication-client:` and EXISTS on the resource + * `logical-replication-client:runs-replication:`, resolving to: + * runs-replication:logical-replication-client:logical-replication-client:runs-replication: + */ +async function probeLeadership(sourceIds: string[]): Promise> { + const leaders = new Map(); + + const redis = new Redis({ + keyPrefix: "runs-replication:logical-replication-client:", + port: env.RUN_REPLICATION_REDIS_PORT ?? undefined, + host: env.RUN_REPLICATION_REDIS_HOST ?? undefined, + username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined, + password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined, + enableAutoPipelining: true, + ...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); + + try { + for (const id of sourceIds) { + const exists = await redis.exists(`logical-replication-client:runs-replication:${id}`); + leaders.set(id, exists === 1); + } + } finally { + await redis.quit(); + } + + return leaders; +} + +export async function loader({ request }: LoaderFunctionArgs) { + await requireAdminApiRequest(request); + + const sources = getRunsReplicationConfiguredSources(); + + if (!sources || sources.length === 0) { + return json({ enabled: false, sources: [] }); + } + + const leaders = await probeLeadership(sources.map((s) => s.id)); + + return json({ + enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0, + sources: sources.map((s) => ({ + id: s.id, + slotName: s.slotName, + originGeneration: s.originGeneration, + leader: leaders.get(s.id) ?? false, + })), + }); +} diff --git a/apps/webapp/app/services/runsReplicationGlobal.server.ts b/apps/webapp/app/services/runsReplicationGlobal.server.ts index fef22f02261..48e783ef56a 100644 --- a/apps/webapp/app/services/runsReplicationGlobal.server.ts +++ b/apps/webapp/app/services/runsReplicationGlobal.server.ts @@ -2,10 +2,14 @@ import type { RunsReplicationService } from "./runsReplicationService.server"; const GLOBAL_RUNS_REPLICATION_KEY = Symbol.for("dev.trigger.ts.runs-replication"); const GLOBAL_TCP_MONITOR_KEY = Symbol.for("dev.trigger.ts.tcp-monitor"); +const GLOBAL_RUNS_REPLICATION_SOURCES_KEY = Symbol.for("dev.trigger.ts.runs-replication-sources"); + +export type ConfiguredSource = { id: string; slotName: string; originGeneration: number }; type RunsReplicationGlobal = { [GLOBAL_RUNS_REPLICATION_KEY]?: RunsReplicationService; [GLOBAL_TCP_MONITOR_KEY]?: NodeJS.Timeout; + [GLOBAL_RUNS_REPLICATION_SOURCES_KEY]?: ConfiguredSource[]; }; const _globalThis = typeof globalThis === "object" ? globalThis : global; @@ -23,6 +27,14 @@ export function unregisterRunsReplicationGlobal() { delete _global[GLOBAL_RUNS_REPLICATION_KEY]; } +export function getRunsReplicationConfiguredSources(): ConfiguredSource[] | undefined { + return _global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY]; +} + +export function setRunsReplicationConfiguredSources(sources: ConfiguredSource[]) { + _global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY] = sources; +} + export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined { return _global[GLOBAL_TCP_MONITOR_KEY]; } diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index d5071e2d2b8..fef74ef4e13 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -2,8 +2,16 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { singleton } from "~/utils/singleton"; +import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server"; import { meter, provider } from "~/v3/tracer.server"; -import { RunsReplicationService } from "./runsReplicationService.server"; +import { + setRunsReplicationConfiguredSources, + setRunsReplicationGlobal, +} from "./runsReplicationGlobal.server"; +import { + RunsReplicationService, + type RunsReplicationSource, +} from "./runsReplicationService.server"; import { signalsEmitter } from "./signals.server"; export const runsReplicationInstance = singleton( @@ -11,6 +19,73 @@ export const runsReplicationInstance = singleton( initializeRunsReplicationInstance ); +export function buildReplicationSources(args: { + splitEnabled: boolean; + legacyUrl: string; + newUrl?: string; + /** `false` forces the new source off under split; undefined follows split. */ + newSourceOverride?: boolean; + legacySlotName: string; + legacyPublicationName: string; + legacyOriginGeneration: number; + newSlotName: string; + newPublicationName: string; + newOriginGeneration: number; +}): RunsReplicationSource[] { + const legacy: RunsReplicationSource = { + id: "legacy", + pgConnectionUrl: args.legacyUrl, + slotName: args.legacySlotName, + publicationName: args.legacyPublicationName, + originGeneration: args.legacyOriginGeneration, + }; + + const newSourceOn = args.splitEnabled && !!args.newUrl && args.newSourceOverride !== false; + + if (!newSourceOn || !args.newUrl) { + return [legacy]; + } + + const next: RunsReplicationSource = { + id: "new", + pgConnectionUrl: args.newUrl, + slotName: args.newSlotName, + publicationName: args.newPublicationName, + originGeneration: args.newOriginGeneration, + }; + + return [legacy, next]; +} + +/** + * The residency-split gate and the `#new`->ClickHouse replication gate are + * independent env vars. If split is on (ksuid runs are minted on the new DB) but the + * constructed sources[] has no `"new"` source, every ksuid run is silently missing from + * ClickHouse — under-counting all CH-fronted usage/cost/metrics aggregates with no + * Postgres fallback. Couple the gates at boot: this misconfiguration must fail loudly + * rather than ship a fleet-wide under-count. + */ +export class SplitReplicationMisconfiguredError extends Error { + constructor() { + super( + "RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no \"new\" source: " + + "ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " + + "ClickHouse-fronted aggregate. Enable the new replication source " + + "(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off." + ); + this.name = "SplitReplicationMisconfiguredError"; + } +} + +export function assertReplicationCoversSplit(args: { + splitEnabled: boolean; + sources: RunsReplicationSource[]; +}): void { + if (args.splitEnabled && !args.sources.some((s) => s.id === "new")) { + throw new SplitReplicationMisconfiguredError(); + } +} + function initializeRunsReplicationInstance() { const { DATABASE_URL } = process.env; invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set"); @@ -22,12 +97,11 @@ function initializeRunsReplicationInstance() { console.log("🗃️ Runs replication service enabled"); - const service = new RunsReplicationService({ + // Shared options for both the legacy-only and the multi-source constructions. + // Excludes per-source identity (pgConnectionUrl/slotName/publicationName/sources). + const baseReplicationOptions = { clickhouseFactory, - pgConnectionUrl: DATABASE_URL, serviceName: "runs-replication", - slotName: env.RUN_REPLICATION_SLOT_NAME, - publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, redisOptions: { keyPrefix: "runs-replication:", port: env.RUN_REPLICATION_REDIS_PORT ?? undefined, @@ -55,24 +129,94 @@ function initializeRunsReplicationInstance() { insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1", + }; + + // Construct the SINGLE legacy source synchronously (the split gate has not resolved + // yet at module-init time, and singleton(...) memoizes this synchronous return value). + let service = new RunsReplicationService({ + ...baseReplicationOptions, + pgConnectionUrl: DATABASE_URL, + slotName: env.RUN_REPLICATION_SLOT_NAME, + publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, }); + // Register the live handle so the status route + lifecycle routes can find it. + setRunsReplicationGlobal(service); + setRunsReplicationConfiguredSources([ + { + id: "legacy", + slotName: env.RUN_REPLICATION_SLOT_NAME, + originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION, + }, + ]); + if (env.RUN_REPLICATION_ENABLED === "1") { - clickhouseFactory - .isReady() - .then(() => service.start()) - .then(() => { - console.log("🗃️ Runs replication service started"); + // Shape B (construct-after-gate): resolve the async split gate ONCE at boot, and + // when both sources are enabled rebuild `service` with sources[] before starting. + // The legacy-only instance above is never started in the dual path (no slot/lock + // taken). runsReplicationService.server.ts is untouched. The create route also calls + // setRunsReplicationGlobal — last-writer-wins is the existing contract. + isSplitEnabled() + .then((splitEnabled) => { + const sources = buildReplicationSources({ + splitEnabled, + legacyUrl: DATABASE_URL, + newUrl: env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL, + newSourceOverride: + env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined, + legacySlotName: env.RUN_REPLICATION_SLOT_NAME, + legacyPublicationName: env.RUN_REPLICATION_PUBLICATION_NAME, + legacyOriginGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION, + newSlotName: env.RUN_REPLICATION_NEW_SLOT_NAME, + newPublicationName: env.RUN_REPLICATION_NEW_PUBLICATION_NAME, + newOriginGeneration: env.RUN_REPLICATION_NEW_ORIGIN_GENERATION, + }); + + // Refuse to start replication if split is on but `#new` is not a source. + assertReplicationCoversSplit({ splitEnabled, sources }); + + if (sources.length > 1) { + // The scalar pgConnectionUrl/slotName/publicationName remain required on the + // options type, but are ignored when sources[] is non-empty — the + // service normalizes off sources. Pass the legacy scalars to satisfy the type. + service = new RunsReplicationService({ + ...baseReplicationOptions, + pgConnectionUrl: DATABASE_URL, + slotName: env.RUN_REPLICATION_SLOT_NAME, + publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, + sources, + }); + setRunsReplicationGlobal(service); + setRunsReplicationConfiguredSources( + sources.map((s) => ({ + id: s.id, + slotName: s.slotName, + originGeneration: s.originGeneration, + })) + ); + } + + return clickhouseFactory.isReady().then(() => service.start()); }) + .then(() => console.log("🗃️ Runs replication service started")) .catch((error) => { - console.error("🗃️ Runs replication service failed to start", { - error, - }); + if (error instanceof SplitReplicationMisconfiguredError) { + // A silent ClickHouse under-count is worse than a crash — make it fatal. + console.error("🚨 FATAL: run-ops split / ClickHouse replication misconfiguration", { + error, + }); + process.exit(1); + } + console.error("🗃️ Runs replication service failed to start", { error }); }); - signalsEmitter.on("SIGTERM", service.shutdown.bind(service)); - signalsEmitter.on("SIGINT", service.shutdown.bind(service)); + // Closures over the `let` so SIGTERM/SIGINT hit whichever instance is live (NOT a + // stale .bind() to the discarded legacy-only instance). + signalsEmitter.on("SIGTERM", () => service.shutdown()); + signalsEmitter.on("SIGINT", () => service.shutdown()); } + // Returns the legacy-only instance synchronously (singleton memoizes this). Lifecycle + // routes read getRunsReplicationGlobal() first, so they get the live multi-source one. return service; } diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index f76a3c7b83d..0448fa78c90 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -3,6 +3,7 @@ import { type ClickHouse, type PayloadInsertArray, type TaskRunInsertArray, + composeTaskRunVersion, getPayloadField, getTaskRunField, } from "@internal/clickhouse"; @@ -60,12 +61,35 @@ interface Transaction { replicationLagMs: number; } +export type RunsReplicationSource = { + /** + * Stable per-source id. MUST be unique across sources. It is the key off + * which every per-source identity is derived: the LogicalReplicationClient + * `name` (and therefore the redlock leader-lock resource key), metrics tags, + * logs. e.g. "legacy" | "new". + */ + id: string; + pgConnectionUrl: string; + slotName: string; + publicationName: string; + /** 0 = legacy/control-plane DB, 1 = dedicated run-ops DB. Packed into _version via composeTaskRunVersion. */ + originGeneration: number; +}; + export type RunsReplicationServiceOptions = { clickhouseFactory: ClickhouseFactory; pgConnectionUrl: string; serviceName: string; slotName: string; publicationName: string; + /** + * Optional list of replication sources. When provided (and non-empty), the + * service fans in from each named source into the single shared flush + * scheduler. When omitted, the scalar `pgConnectionUrl`/`slotName`/ + * `publicationName` are used as a single implicit `"default"` source, + * preserving the legacy single-source behavior exactly. + */ + sources?: RunsReplicationSource[]; redisOptions: RedisOptions; maxFlushConcurrency?: number; flushIntervalMs?: number; @@ -92,6 +116,24 @@ export type RunsReplicationServiceOptions = { type PostgresTaskRun = TaskRun & { masterQueue: string }; +type CurrentTransaction = + | (Omit, "commitEndLsn" | "replicationLagMs"> & { + commitEndLsn?: string | null; + replicationLagMs?: number; + }) + | null; + +type SourceRuntime = { + source: RunsReplicationSource; + client: LogicalReplicationClient; + latestCommitEndLsn: string | null; + lastAcknowledgedLsn: string | null; + lastAcknowledgedAt: number | null; + acknowledgeInterval: NodeJS.Timeout | null; + currentTransaction: CurrentTransaction; + currentParseDurationMs: number | null; +}; + type TaskRunInsert = { _version: bigint; run: PostgresTaskRun; @@ -107,26 +149,22 @@ export type RunsReplicationServiceEvents = { export class RunsReplicationService { private _isSubscribed = false; - private _currentTransaction: - | (Omit, "commitEndLsn" | "replicationLagMs"> & { - commitEndLsn?: string | null; - replicationLagMs?: number; - }) - | null = null; - - private _replicationClient: LogicalReplicationClient; + + /** + * Per-source runtime state. Each source has its own replication client, leader + * lock, slot, and in-flight transaction state. All fan in to the single shared + * _concurrentFlushScheduler. Transaction/LSN state MUST be per-source because + * logical-replication transactions interleave per stream. + */ + private _sources: Map; + private _concurrentFlushScheduler: ConcurrentFlushScheduler; private logger: Logger; private _isShuttingDown = false; private _isShutDownComplete = false; private _tracer: Tracer; private _meter: Meter; - private _currentParseDurationMs: number | null = null; - private _lastAcknowledgedAt: number | null = null; private _acknowledgeTimeoutMs: number; - private _latestCommitEndLsn: string | null = null; - private _lastAcknowledgedLsn: string | null = null; - private _acknowledgeInterval: NodeJS.Timeout | null = null; // Retry configuration private _insertMaxRetries: number; private _insertBaseDelayMs: number; @@ -219,25 +257,60 @@ export class RunsReplicationService { this._disablePayloadInsert = options.disablePayloadInsert ?? false; this._disableErrorFingerprinting = options.disableErrorFingerprinting ?? false; - this._replicationClient = new LogicalReplicationClient({ - pgConfig: { - connectionString: options.pgConnectionUrl, - }, - name: options.serviceName, - slotName: options.slotName, - publicationName: options.publicationName, - table: "TaskRun", - redisOptions: options.redisOptions, - autoAcknowledge: false, - publicationActions: ["insert", "update", "delete"], - logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), - leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, - leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000, - ackIntervalSeconds: options.ackIntervalSeconds ?? 10, - leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000, - leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, - tracer: options.tracer, - }); + const sources: RunsReplicationSource[] = + options.sources && options.sources.length > 0 + ? options.sources + : [ + { + id: "default", + pgConnectionUrl: options.pgConnectionUrl, + slotName: options.slotName, + publicationName: options.publicationName, + originGeneration: 0, + }, + ]; + + RunsReplicationService.#validateSources(sources); + + this._sources = new Map(); + + for (const source of sources) { + const client = new LogicalReplicationClient({ + pgConfig: { + connectionString: source.pgConnectionUrl, + }, + name: `${options.serviceName}:${source.id}`, + slotName: source.slotName, + publicationName: source.publicationName, + table: "TaskRun", + redisOptions: options.redisOptions, + autoAcknowledge: false, + publicationActions: ["insert", "update", "delete"], + logger: + options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), + leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, + leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000, + ackIntervalSeconds: options.ackIntervalSeconds ?? 10, + leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000, + leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, + tracer: options.tracer, + }); + + const runtime: SourceRuntime = { + source, + client, + latestCommitEndLsn: null, + lastAcknowledgedLsn: null, + lastAcknowledgedAt: null, + acknowledgeInterval: null, + currentTransaction: null, + currentParseDurationMs: null, + }; + + this.#wireClientEvents(runtime); + + this._sources.set(source.id, runtime); + } this._concurrentFlushScheduler = new ConcurrentFlushScheduler({ batchSize: options.flushBatchSize ?? 50, @@ -260,42 +333,80 @@ export class RunsReplicationService { tracer: options.tracer, }); - this._replicationClient.events.on("data", async ({ lsn, log, parseDuration }) => { - this.#handleData(lsn, log, parseDuration); + // Initialize retry configuration + this._insertMaxRetries = options.insertMaxRetries ?? 3; + this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100; + this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; + } + + static #validateSources(sources: RunsReplicationSource[]) { + const ids = new Set(); + const slotNames = new Set(); + const originGenerations = new Set(); + + for (const source of sources) { + // Distinct id: a duplicate id derives a duplicate client name -> duplicate + // redlock leader-lock key -> only one source ever streams. + if (ids.has(source.id)) { + throw new Error( + `RunsReplicationService: duplicate source id "${source.id}" — source ids must be unique` + ); + } + ids.add(source.id); + + // Distinct slotName: two consumers on one WAL stream is a data race. + if (slotNames.has(source.slotName)) { + throw new Error( + `RunsReplicationService: duplicate slotName "${source.slotName}" — slot names must be unique across sources` + ); + } + slotNames.add(source.slotName); + + // Distinct originGeneration: a shared generation defeats the dedup tiebreak. + if (originGenerations.has(source.originGeneration)) { + throw new Error( + `RunsReplicationService: duplicate originGeneration "${source.originGeneration}" — originGeneration must be unique across sources` + ); + } + originGenerations.add(source.originGeneration); + } + } + + #wireClientEvents(runtime: SourceRuntime) { + const { client, source } = runtime; + + client.events.on("data", async ({ lsn, log, parseDuration }) => { + this.#handleData(runtime, lsn, log, parseDuration); }); - this._replicationClient.events.on("heartbeat", async ({ lsn, shouldRespond }) => { + client.events.on("heartbeat", async ({ lsn, shouldRespond }) => { if (this._isShuttingDown) return; if (this._isShutDownComplete) return; if (shouldRespond) { - this._lastAcknowledgedLsn = lsn; - await this._replicationClient.acknowledge(lsn); + runtime.lastAcknowledgedLsn = lsn; + await client.acknowledge(lsn); } }); - this._replicationClient.events.on("error", (error) => { + client.events.on("error", (error) => { this.logger.error("Replication client error", { + sourceId: source.id, error, }); }); - this._replicationClient.events.on("start", () => { - this.logger.info("Replication client started"); + client.events.on("start", () => { + this.logger.info("Replication client started", { sourceId: source.id }); }); - this._replicationClient.events.on("acknowledge", ({ lsn }) => { - this.logger.debug("Acknowledged", { lsn }); + client.events.on("acknowledge", ({ lsn }) => { + this.logger.debug("Acknowledged", { sourceId: source.id, lsn }); }); - this._replicationClient.events.on("leaderElection", (isLeader) => { - this.logger.info("Leader election", { isLeader }); + client.events.on("leaderElection", (isLeader) => { + this.logger.info("Leader election", { sourceId: source.id, isLeader }); }); - - // Initialize retry configuration - this._insertMaxRetries = options.insertMaxRetries ?? 3; - this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100; - this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */ @@ -310,9 +421,13 @@ export class RunsReplicationService { this.logger.info("Initiating shutdown of runs replication service"); - if (!this._currentTransaction) { + const hasCurrentTransaction = Array.from(this._sources.values()).some( + (runtime) => runtime.currentTransaction !== null + ); + + if (!hasCurrentTransaction) { this.logger.info("No transaction to commit, shutting down immediately"); - await this._replicationClient.stop(); + await Promise.all(Array.from(this._sources.values()).map((runtime) => runtime.client.stop())); this._isShutDownComplete = true; return; } @@ -321,43 +436,70 @@ export class RunsReplicationService { } async start() { - this.logger.info("Starting replication client", { - lastLsn: this._latestCommitEndLsn, - }); + for (const runtime of this._sources.values()) { + this.logger.info("Starting replication client", { + sourceId: runtime.source.id, + lastLsn: runtime.latestCommitEndLsn, + }); - await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + await runtime.client.subscribe(runtime.latestCommitEndLsn ?? undefined); + + runtime.acknowledgeInterval = setInterval( + () => this.#acknowledgeLatestTransaction(runtime), + 1000 + ); + } - this._acknowledgeInterval = setInterval(this.#acknowledgeLatestTransaction.bind(this), 1000); this._concurrentFlushScheduler.start(); } async stop() { - this.logger.info("Stopping replication client"); + for (const runtime of this._sources.values()) { + this.logger.info("Stopping replication client", { sourceId: runtime.source.id }); - await this._replicationClient.stop(); + await runtime.client.stop(); - if (this._acknowledgeInterval) { - clearInterval(this._acknowledgeInterval); + if (runtime.acknowledgeInterval) { + clearInterval(runtime.acknowledgeInterval); + } } } async teardown() { - this.logger.info("Teardown replication client"); + for (const runtime of this._sources.values()) { + this.logger.info("Teardown replication client", { sourceId: runtime.source.id }); - await this._replicationClient.teardown(); + await runtime.client.teardown(); - if (this._acknowledgeInterval) { - clearInterval(this._acknowledgeInterval); + if (runtime.acknowledgeInterval) { + clearInterval(runtime.acknowledgeInterval); + } } } - async backfill(runs: PostgresTaskRun[]) { - // divide into batches of 50 to get data from Postgres + async backfill(runs: PostgresTaskRun[], sourceId?: string) { const flushId = nanoid(); // Use current timestamp as LSN (high enough to be above existing data) const now = Date.now(); const syntheticLsn = `${now.toString(16).padStart(8, "0").toUpperCase()}/00000000`; - const baseVersion = lsnToUInt64(syntheticLsn); + + // Backfill and live replication of the SAME source share an origin generation + // and rely on raw-LSN ordering within that generation. Default to the single + // source self-host uses (gen 0 => passthrough). + const runtime = sourceId ? this._sources.get(sourceId) : this._sources.values().next().value; + + if (!runtime) { + throw new Error( + sourceId + ? `RunsReplicationService.backfill: no source found with id "${sourceId}"` + : "RunsReplicationService.backfill: no sources configured" + ); + } + + const baseVersion = composeTaskRunVersion({ + originGeneration: runtime.source.originGeneration, + lsnVersion: lsnToUInt64(syntheticLsn), + }); await this.#flushBatch( flushId, @@ -369,8 +511,14 @@ export class RunsReplicationService { ); } - #handleData(lsn: string, message: PgoutputMessage, parseDuration: bigint) { + #handleData( + runtime: SourceRuntime, + lsn: string, + message: PgoutputMessage, + parseDuration: bigint + ) { this.logger.debug("Handling data", { + sourceId: runtime.source.id, lsn, tag: message.tag, parseDuration, @@ -384,28 +532,28 @@ export class RunsReplicationService { return; } - this._currentTransaction = { + runtime.currentTransaction = { beginStartTimestamp: Date.now(), commitLsn: message.commitLsn, xid: message.xid, events: [], }; - this._currentParseDurationMs = Number(parseDuration) / 1_000_000; + runtime.currentParseDurationMs = Number(parseDuration) / 1_000_000; break; } case "insert": { - if (!this._currentTransaction) { + if (!runtime.currentTransaction) { return; } - if (this._currentParseDurationMs) { - this._currentParseDurationMs = - this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + if (runtime.currentParseDurationMs) { + runtime.currentParseDurationMs = + runtime.currentParseDurationMs + Number(parseDuration) / 1_000_000; } - this._currentTransaction.events.push({ + runtime.currentTransaction.events.push({ tag: message.tag, data: message.new as TaskRun, raw: message, @@ -413,16 +561,16 @@ export class RunsReplicationService { break; } case "update": { - if (!this._currentTransaction) { + if (!runtime.currentTransaction) { return; } - if (this._currentParseDurationMs) { - this._currentParseDurationMs = - this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + if (runtime.currentParseDurationMs) { + runtime.currentParseDurationMs = + runtime.currentParseDurationMs + Number(parseDuration) / 1_000_000; } - this._currentTransaction.events.push({ + runtime.currentTransaction.events.push({ tag: message.tag, data: message.new as TaskRun, raw: message, @@ -430,16 +578,16 @@ export class RunsReplicationService { break; } case "delete": { - if (!this._currentTransaction) { + if (!runtime.currentTransaction) { return; } - if (this._currentParseDurationMs) { - this._currentParseDurationMs = - this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + if (runtime.currentParseDurationMs) { + runtime.currentParseDurationMs = + runtime.currentParseDurationMs + Number(parseDuration) / 1_000_000; } - this._currentTransaction.events.push({ + runtime.currentTransaction.events.push({ tag: message.tag, data: message.old as TaskRun, raw: message, @@ -448,26 +596,26 @@ export class RunsReplicationService { break; } case "commit": { - if (!this._currentTransaction) { + if (!runtime.currentTransaction) { return; } - if (this._currentParseDurationMs) { - this._currentParseDurationMs = - this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + if (runtime.currentParseDurationMs) { + runtime.currentParseDurationMs = + runtime.currentParseDurationMs + Number(parseDuration) / 1_000_000; } const replicationLagMs = Date.now() - Number(message.commitTime / 1000n); - this._currentTransaction.commitEndLsn = message.commitEndLsn; - this._currentTransaction.replicationLagMs = replicationLagMs; - const transaction = this._currentTransaction as Transaction; - this._currentTransaction = null; + runtime.currentTransaction.commitEndLsn = message.commitEndLsn; + runtime.currentTransaction.replicationLagMs = replicationLagMs; + const transaction = runtime.currentTransaction as Transaction; + runtime.currentTransaction = null; if (transaction.commitEndLsn) { - this._latestCommitEndLsn = transaction.commitEndLsn; + runtime.latestCommitEndLsn = transaction.commitEndLsn; } - this.#handleTransaction(transaction); + this.#handleTransaction(runtime, transaction); break; } default: { @@ -478,11 +626,14 @@ export class RunsReplicationService { } } - #handleTransaction(transaction: Transaction) { + #handleTransaction(runtime: SourceRuntime, transaction: Transaction) { if (this._isShutDownComplete) return; if (this._isShuttingDown) { - this._replicationClient.stop().finally(() => { + // A global shutdown stops every source's client; mark complete once all + // have stopped. For a single source this is identical to the prior + // "stop the one client, then mark complete" behavior. + Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop())).finally(() => { this._isShutDownComplete = true; }); } @@ -494,6 +645,7 @@ export class RunsReplicationService { if (!transaction.commitEndLsn) { this.logger.error("Transaction has no commit end lsn", { + sourceId: runtime.source.id, transaction, }); @@ -502,8 +654,13 @@ export class RunsReplicationService { const lsnToUInt64Start = process.hrtime.bigint(); - // If there are events, we need to handle them - const _version = lsnToUInt64(transaction.commitEndLsn); + // Compose the source's origin generation above the LSN so a higher-generation + // source wins the ClickHouse dedup tiebreak regardless of raw LSN. Gen 0 (the + // single-source default) is a passthrough. + const _version = composeTaskRunVersion({ + originGeneration: runtime.source.originGeneration, + lsnVersion: lsnToUInt64(transaction.commitEndLsn), + }); const lsnToUInt64DurationMs = Number(process.hrtime.bigint() - lsnToUInt64Start) / 1_000_000; @@ -516,7 +673,10 @@ export class RunsReplicationService { ); // Record metrics - this._replicationLagHistogram.record(transaction.replicationLagMs); + this._replicationLagHistogram.record(transaction.replicationLagMs, { + source: runtime.source.id, + generation: runtime.source.originGeneration, + }); // Count events by type for (const event of transaction.events) { @@ -524,55 +684,58 @@ export class RunsReplicationService { } this.logger.debug("handle_transaction", { + sourceId: runtime.source.id, transaction: { xid: transaction.xid, commitLsn: transaction.commitLsn, commitEndLsn: transaction.commitEndLsn, events: transaction.events.length, - parseDurationMs: this._currentParseDurationMs, + parseDurationMs: runtime.currentParseDurationMs, lsnToUInt64DurationMs, version: _version.toString(), }, }); } - async #acknowledgeLatestTransaction() { - if (!this._latestCommitEndLsn) { + async #acknowledgeLatestTransaction(runtime: SourceRuntime) { + if (!runtime.latestCommitEndLsn) { return; } - if (this._lastAcknowledgedLsn === this._latestCommitEndLsn) { + if (runtime.lastAcknowledgedLsn === runtime.latestCommitEndLsn) { return; } const now = Date.now(); - if (this._lastAcknowledgedAt) { - const timeSinceLastAcknowledged = now - this._lastAcknowledgedAt; + if (runtime.lastAcknowledgedAt) { + const timeSinceLastAcknowledged = now - runtime.lastAcknowledgedAt; // If we've already acknowledged within the last second, don't acknowledge again if (timeSinceLastAcknowledged < this._acknowledgeTimeoutMs) { return; } } - this._lastAcknowledgedAt = now; - this._lastAcknowledgedLsn = this._latestCommitEndLsn; + runtime.lastAcknowledgedAt = now; + runtime.lastAcknowledgedLsn = runtime.latestCommitEndLsn; this.logger.debug("acknowledge_latest_transaction", { - commitEndLsn: this._latestCommitEndLsn, - lastAcknowledgedAt: this._lastAcknowledgedAt, + sourceId: runtime.source.id, + commitEndLsn: runtime.latestCommitEndLsn, + lastAcknowledgedAt: runtime.lastAcknowledgedAt, }); - const [ackError] = await tryCatch( - this._replicationClient.acknowledge(this._latestCommitEndLsn) - ); + const [ackError] = await tryCatch(runtime.client.acknowledge(runtime.latestCommitEndLsn)); if (ackError) { - this.logger.error("Error acknowledging transaction", { ackError }); + this.logger.error("Error acknowledging transaction", { + sourceId: runtime.source.id, + ackError, + }); } - if (this._isShutDownComplete && this._acknowledgeInterval) { - clearInterval(this._acknowledgeInterval); + if (this._isShutDownComplete && runtime.acknowledgeInterval) { + clearInterval(runtime.acknowledgeInterval); } } @@ -755,7 +918,6 @@ export class RunsReplicationService { }); } - // New method to handle inserts with retry logic for connection errors async #insertWithRetry( insertFn: (attempt: number) => Promise, operationName: string, diff --git a/apps/webapp/app/v3/services/adminWorker.server.ts b/apps/webapp/app/v3/services/adminWorker.server.ts index 6b416b45b7a..562e4f451dc 100644 --- a/apps/webapp/app/v3/services/adminWorker.server.ts +++ b/apps/webapp/app/v3/services/adminWorker.server.ts @@ -3,6 +3,7 @@ import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; import { z } from "zod"; import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; // Reference-hold the sessions-replication singleton so module evaluation runs // its initializer (creates the ClickHouse client, subscribes to the logical @@ -62,14 +63,15 @@ function initializeWorker() { logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL), jobs: { "admin.backfillRunsToReplication": async ({ payload, id }) => { - if (!runsReplicationInstance) { + const replicationService = getRunsReplicationGlobal() ?? runsReplicationInstance; + if (!replicationService) { logger.error("Runs replication instance not found"); return; } const service = new RunsBackfillerService({ prisma: $replica, - runsReplicationInstance: runsReplicationInstance, + runsReplicationInstance: replicationService, tracer: tracer, }); diff --git a/apps/webapp/test/runsReplicationInstance.test.ts b/apps/webapp/test/runsReplicationInstance.test.ts new file mode 100644 index 00000000000..a7108d2bc75 --- /dev/null +++ b/apps/webapp/test/runsReplicationInstance.test.ts @@ -0,0 +1,420 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { createPostgresContainer, replicationContainerTest } from "@internal/testcontainers"; +import { PrismaClient } from "@trigger.dev/database"; +import Redis from "ioredis"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { + assertReplicationCoversSplit, + buildReplicationSources, + SplitReplicationMisconfiguredError, +} from "~/services/runsReplicationInstance.server"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing } from "./utils/tracing"; +import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; + +vi.setConfig({ testTimeout: 90_000 }); + +describe("buildReplicationSources (pure)", () => { + const baseArgs = { + legacyUrl: "postgres://legacy", + legacySlotName: "task_runs_to_clickhouse_v1", + legacyPublicationName: "task_runs_to_clickhouse_v1_publication", + legacyOriginGeneration: 0, + newSlotName: "task_runs_to_clickhouse_v2", + newPublicationName: "task_runs_to_clickhouse_v2_publication", + newOriginGeneration: 1, + }; + + it("returns [legacy] when split is disabled", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: false, + newUrl: "postgres://new", + newSourceOverride: true, + }); + + expect(sources).toHaveLength(1); + expect(sources[0]).toEqual({ + id: "legacy", + pgConnectionUrl: "postgres://legacy", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + originGeneration: 0, + }); + }); + + it("returns [legacy] when split is enabled but no new URL is set", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: undefined, + }); + + expect(sources).toHaveLength(1); + expect(sources[0].id).toBe("legacy"); + }); + + it("returns [legacy] when split + new URL but the new source is explicitly disabled (escape hatch)", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + newSourceOverride: false, + }); + + expect(sources).toHaveLength(1); + expect(sources[0].id).toBe("legacy"); + }); + + it("returns [legacy(gen0), new(gen1)] with distinct slot/publication/generation when all gates pass", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + }); + + expect(sources).toHaveLength(2); + expect(sources[0]).toEqual({ + id: "legacy", + pgConnectionUrl: "postgres://legacy", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + originGeneration: 0, + }); + expect(sources[1]).toEqual({ + id: "new", + pgConnectionUrl: "postgres://new", + slotName: "task_runs_to_clickhouse_v2", + publicationName: "task_runs_to_clickhouse_v2_publication", + originGeneration: 1, + }); + + // Distinctness invariants the service validates. + expect(sources[0].slotName).not.toBe(sources[1].slotName); + expect(sources[0].publicationName).not.toBe(sources[1].publicationName); + expect(sources[0].originGeneration).not.toBe(sources[1].originGeneration); + }); + + it("returns [legacy, new] when split is enabled + new URL is set WITHOUT any RUN_REPLICATION_NEW_ENABLED override", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + }); + + expect(sources).toHaveLength(2); + expect(sources[0].id).toBe("legacy"); + expect(sources[1]).toEqual({ + id: "new", + pgConnectionUrl: "postgres://new", + slotName: "task_runs_to_clickhouse_v2", + publicationName: "task_runs_to_clickhouse_v2_publication", + originGeneration: 1, + }); + }); + + it("treats newSourceOverride:false as an explicit escape hatch (force the new source off even under split)", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + newSourceOverride: false, + }); + + expect(sources).toHaveLength(1); + expect(sources[0].id).toBe("legacy"); + }); + + it("RUN_OPS_DATABASE_URL takes precedence: new source pgConnectionUrl === RUN_OPS_DATABASE_URL when both are supplied", () => { + const runOpsUrl = "postgres://run-ops-dedicated"; + const taskRunUrl = "postgres://task-run-legacy-alias"; + + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + // Simulates env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL with RUN_OPS set + newUrl: runOpsUrl ?? taskRunUrl, + }); + + expect(sources).toHaveLength(2); + expect(sources[1]!.id).toBe("new"); + expect(sources[1]!.pgConnectionUrl).toBe(runOpsUrl); + }); + + it("falls back to TASK_RUN_DATABASE_URL when RUN_OPS_DATABASE_URL is absent", () => { + const taskRunUrl = "postgres://task-run-legacy-alias"; + + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + // Simulates env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL with RUN_OPS unset + newUrl: undefined ?? taskRunUrl, + }); + + expect(sources).toHaveLength(2); + expect(sources[1]!.pgConnectionUrl).toBe(taskRunUrl); + }); +}); + +describe("assertReplicationCoversSplit (boot gate-coupling)", () => { + const baseArgs = { + legacyUrl: "postgres://legacy", + legacySlotName: "task_runs_to_clickhouse_v1", + legacyPublicationName: "task_runs_to_clickhouse_v1_publication", + legacyOriginGeneration: 0, + newSlotName: "task_runs_to_clickhouse_v2", + newPublicationName: "task_runs_to_clickhouse_v2_publication", + newOriginGeneration: 1, + }; + + it("throws when split is on but sources[] has no \"new\" source (the silent under-count)", () => { + // Split on, but the new replication source is forced off — ksuid runs would not + // reach ClickHouse. This is the exact misconfiguration the boot gate must refuse to boot with. + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + newSourceOverride: false, + }); + expect(sources.some((s) => s.id === "new")).toBe(false); + + expect(() => assertReplicationCoversSplit({ splitEnabled: true, sources })).toThrow( + SplitReplicationMisconfiguredError + ); + }); + + it("throws when split is on but split has a new URL missing entirely", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: undefined, + }); + + expect(() => assertReplicationCoversSplit({ splitEnabled: true, sources })).toThrow( + SplitReplicationMisconfiguredError + ); + }); + + it("does NOT throw when split is on and the new source is present", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: true, + newUrl: "postgres://new", + }); + expect(sources.some((s) => s.id === "new")).toBe(true); + + expect(() => assertReplicationCoversSplit({ splitEnabled: true, sources })).not.toThrow(); + }); + + it("does NOT throw when split is off (legacy-only is the correct config)", () => { + const sources = buildReplicationSources({ + ...baseArgs, + splitEnabled: false, + newUrl: "postgres://new", + }); + + expect(() => assertReplicationCoversSplit({ splitEnabled: false, sources })).not.toThrow(); + }); +}); + +describe("RunsReplication new-source backfill origin generation (integration)", () => { + replicationContainerTest( + "backfill via the new source tags the ClickHouse row with the new origin generation (gen=1), not gen=0", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma, network }) => { + const legacyUrl = postgresContainer.getConnectionUri(); + + const { url: newUrl, container: pg17 } = await createPostgresContainer(network, { + imageTag: "docker.io/postgres:17", + }); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-backfill-gen", + logLevel: "warn", + }); + + const NEW_ORIGIN_GENERATION = 1; + + const sources = buildReplicationSources({ + splitEnabled: true, + legacyUrl, + newUrl, + newSourceOverride: true, + legacySlotName: "tr_bf_legacy", + legacyPublicationName: "tr_bf_legacy_pub", + legacyOriginGeneration: 0, + newSlotName: "tr_bf_new", + newPublicationName: "tr_bf_new_pub", + newOriginGeneration: NEW_ORIGIN_GENERATION, + }); + + const service = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication-backfill-gen", + pgConnectionUrl: legacyUrl, + slotName: "tr_bf_legacy", + publicationName: "tr_bf_legacy_pub", + redisOptions: { ...redisOptions, keyPrefix: "runs-replication-backfill-gen:" }, + sources, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 10, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + }); + + // Create org/project/env/run on the legacy DB (the FK schema lives there). + // This simulates a pre-existing run that was migrated to the dedicated DB. + const organization = await prisma.organization.create({ data: { title: "bf-gen", slug: "bf-gen" } }); + const project = await prisma.project.create({ + data: { name: "bf-gen", slug: "bf-gen", organizationId: organization.id, externalRef: "bf-gen" }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "bf-gen", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "bf-gen", + pkApiKey: "bf-gen", + shortcode: "bf-gen", + }, + }); + + const run = await prisma.taskRun.create({ + data: { + friendlyId: `run_newdb_${Date.now()}`, + taskIdentifier: "new-db-task", + payload: JSON.stringify({ source: "dedicated-db" }), + traceId: "bf-gen-trace", + spanId: "bf-gen-span", + queue: "bf-gen", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "COMPLETED_SUCCESSFULLY", + }, + }); + + try { + // Backfill the run via the "new" source — must encode gen=1 in _version. + await service.backfill( + [{ ...run, masterQueue: run.workerQueue ?? "main" }], + "new" + ); + + await setTimeout(500); + + const queryRuns = clickhouse.reader.query({ + name: "runs-replication-backfill-gen", + query: "SELECT run_id, _version FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}", + schema: z.object({ run_id: z.string(), _version: z.number() }), + params: z.object({ run_id: z.string() }), + }); + + const [queryError, result] = await queryRuns({ run_id: run.id }); + + expect(queryError).toBeNull(); + expect(result).toHaveLength(1); + + // Decode origin generation from _version: top 8 bits = gen (>> 56). + const versionBigInt = BigInt(result![0]!._version); + const originGen = Number(versionBigInt >> 56n); + expect(originGen).toBe(NEW_ORIGIN_GENERATION); + } finally { + await pg17.stop({ timeout: 0 }); + } + } + ); +}); + +describe("RunsReplication multi-source wiring (integration)", () => { + replicationContainerTest( + "both sources acquire their leader locks (two-leaders proof against the double-prefixed redlock key)", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma, network }) => { + const legacyUrl = postgresContainer.getConnectionUri(); + + const { url: newUrl, container: pg17 } = await createPostgresContainer(network, { + imageTag: "docker.io/postgres:17", + }); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + const sources = buildReplicationSources({ + splitEnabled: true, + legacyUrl, + newUrl, + newSourceOverride: true, + legacySlotName: "tr_legacy_wiring", + legacyPublicationName: "tr_legacy_wiring_pub", + legacyOriginGeneration: 0, + newSlotName: "tr_new_wiring", + newPublicationName: "tr_new_wiring_pub", + newOriginGeneration: 1, + }); + + let service: RunsReplicationService | undefined; + let probe: Redis | undefined; + + try { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const newPrismaForAlter = new PrismaClient({ datasources: { db: { url: newUrl } } }); + try { + await newPrismaForAlter.$executeRawUnsafe( + `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` + ); + } finally { + await newPrismaForAlter.$disconnect(); + } + + service = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication", + pgConnectionUrl: legacyUrl, + slotName: "tr_legacy_wiring", + publicationName: "tr_legacy_wiring_pub", + redisOptions: { ...redisOptions, keyPrefix: "runs-replication:" }, + sources, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await service.start(); + + await setTimeout(3000); + + probe = new Redis(redisOptions); + + const legacyKey = + "runs-replication:logical-replication-client:logical-replication-client:runs-replication:legacy"; + const newKey = + "runs-replication:logical-replication-client:logical-replication-client:runs-replication:new"; + + expect(await probe.exists(legacyKey)).toBe(1); + expect(await probe.exists(newKey)).toBe(1); + } finally { + await service?.stop(); + await probe?.quit(); + await pg17.stop({ timeout: 0 }); + } + } + ); +}); diff --git a/apps/webapp/test/runsReplicationService.part8.test.ts b/apps/webapp/test/runsReplicationService.part8.test.ts new file mode 100644 index 00000000000..e0861e741d3 --- /dev/null +++ b/apps/webapp/test/runsReplicationService.part8.test.ts @@ -0,0 +1,531 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { createPostgresContainer, replicationContainerTest } from "@internal/testcontainers"; +import { PrismaClient } from "@trigger.dev/database"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { TaskRunStatus } from "~/database-types"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing } from "./utils/tracing"; +import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { + replicationContainerTest( + "collapses the same run from two slots into one ClickHouse row, gen-1 wins across the PG14<->PG17 boundary", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma, network }) => { + // LEGACY / gen-0 source = the fixture's PG14 container. + const legacyUrl = postgresContainer.getConnectionUri(); + + // NEW / gen-1 source = a dedicated PG17 container on the SAME network. + // createPostgresContainer applies wal_level=logical and pushes the TaskRun schema. + const { url: newUrl, container: pg17 } = await createPostgresContainer(network, { + imageTag: "docker.io/postgres:17", + }); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + let runsReplicationService: RunsReplicationService | undefined; + + try { + // REPLICA IDENTITY FULL on BOTH source DBs before start(). + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const newPrismaForAlter = new PrismaClient({ + datasources: { db: { url: newUrl } }, + }); + try { + await newPrismaForAlter.$executeRawUnsafe( + `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` + ); + } finally { + await newPrismaForAlter.$disconnect(); + } + + runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication", + redisOptions, + sources: [ + { + id: "legacy", + pgConnectionUrl: legacyUrl, + slotName: "tr_legacy_v1", + publicationName: "tr_legacy_v1_pub", + originGeneration: 0, + }, + { + id: "new", + pgConnectionUrl: newUrl, + slotName: "tr_new_v1", + publicationName: "tr_new_v1_pub", + originGeneration: 1, + }, + ], + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + // The ClickHouse ReplacingMergeTree dedup key is the full ORDER BY tuple + // (organization_id, project_id, environment_id, created_at, run_id) - NOT run_id alone. + // So the two source rows must share ALL of those columns to collapse into one. We give + // both DBs identical org/project/env/run ids and an identical createdAt. + const suffix = `${Date.now()}`; + const sharedOrgId = `org_dual_${suffix}`; + const sharedProjectId = `proj_dual_${suffix}`; + const sharedEnvId = `env_dual_${suffix}`; + const sharedRunId = `run_dual_${suffix}`; + const sharedCreatedAt = new Date(); + + const newPrisma = new PrismaClient({ datasources: { db: { url: newUrl } } }); + + const seedFkRows = async ( + client: PrismaClient, + tag: string, + status: TaskRunStatus, + friendlyId: string + ) => { + await client.organization.create({ + data: { id: sharedOrgId, title: `org-${tag}`, slug: `org-${tag}` }, + }); + await client.project.create({ + data: { + id: sharedProjectId, + name: `proj-${tag}`, + slug: `proj-${tag}`, + organizationId: sharedOrgId, + externalRef: `proj-${tag}`, + }, + }); + await client.runtimeEnvironment.create({ + data: { + id: sharedEnvId, + slug: `env-${tag}`, + type: "DEVELOPMENT", + projectId: sharedProjectId, + organizationId: sharedOrgId, + apiKey: `apikey-${tag}`, + pkApiKey: `pkapikey-${tag}`, + shortcode: `shortcode-${tag}`, + }, + }); + await client.taskRun.create({ + data: { + id: sharedRunId, + friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace-${tag}`, + spanId: `span-${tag}`, + queue: "test", + status, + createdAt: sharedCreatedAt, + runtimeEnvironmentId: sharedEnvId, + projectId: sharedProjectId, + organizationId: sharedOrgId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + }; + + // The two source servers have INDEPENDENT WAL counters, so commit order alone does not + // control raw LSN magnitude. To make legacy's raw LSN deterministically larger we burn WAL + // on the legacy server with pg_switch_wal() (each call bumps the WAL segment = the high 32 + // bits of the LSN) before inserting the legacy run. With raw LSN as _version the larger-LSN + // STALE legacy PENDING snapshot would win the dedup - which is the RED this guards against. + try { + await seedFkRows(newPrisma, "new", TaskRunStatus.COMPLETED_SUCCESSFULLY, "run_dual_new"); + + // Settle so the new DB's WAL entry is produced (and committed) before the legacy one. + await setTimeout(500); + + for (let i = 0; i < 16; i++) { + await prisma.$executeRawUnsafe(`SELECT pg_switch_wal();`); + } + + await seedFkRows(prisma, "legacy", TaskRunStatus.PENDING, "run_dual_legacy"); + + // Wait for BOTH streams to flush into ClickHouse. + await setTimeout(3000); + } finally { + await newPrisma.$disconnect(); + } + + const queryRuns = clickhouse.reader.query({ + name: "dual-source", + query: + "SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL", + schema: z.object({ + run_id: z.string(), + status: z.string(), + total: z.number().int(), + }), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result).toHaveLength(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: sharedRunId, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + } finally { + await runsReplicationService?.stop(); + await pg17.stop({ timeout: 0 }); + } + } + ); + + // Case A - reverse-order independence (Step 4.1). + // Same run in both sources, but we flush the NEW (gen-1) snapshot FIRST, then the LEGACY + // (gen-0) snapshot. The gen-1 winner must survive regardless of arrival order - the collapse + // is FINAL-time and ordered by _version (composed origin generation), not by arrival time. + replicationContainerTest( + "gen-1 winner survives when the new snapshot flushes before the legacy snapshot (reverse-order independence)", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma, network }) => { + const legacyUrl = postgresContainer.getConnectionUri(); + + const { url: newUrl, container: pg17 } = await createPostgresContainer(network, { + imageTag: "docker.io/postgres:17", + }); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + let runsReplicationService: RunsReplicationService | undefined; + + try { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const newPrismaForAlter = new PrismaClient({ + datasources: { db: { url: newUrl } }, + }); + try { + await newPrismaForAlter.$executeRawUnsafe( + `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` + ); + } finally { + await newPrismaForAlter.$disconnect(); + } + + runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication", + redisOptions, + sources: [ + { + id: "legacy", + pgConnectionUrl: legacyUrl, + slotName: "tr_legacy_a", + publicationName: "tr_legacy_a_pub", + originGeneration: 0, + }, + { + id: "new", + pgConnectionUrl: newUrl, + slotName: "tr_new_a", + publicationName: "tr_new_a_pub", + originGeneration: 1, + }, + ], + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + const suffix = `a_${Date.now()}`; + const sharedOrgId = `org_dual_${suffix}`; + const sharedProjectId = `proj_dual_${suffix}`; + const sharedEnvId = `env_dual_${suffix}`; + const sharedRunId = `run_dual_${suffix}`; + const sharedCreatedAt = new Date(); + + const newPrisma = new PrismaClient({ datasources: { db: { url: newUrl } } }); + + const seedFkRows = async ( + client: PrismaClient, + tag: string, + status: TaskRunStatus, + friendlyId: string + ) => { + await client.organization.create({ + data: { id: sharedOrgId, title: `org-${tag}`, slug: `org-${tag}` }, + }); + await client.project.create({ + data: { + id: sharedProjectId, + name: `proj-${tag}`, + slug: `proj-${tag}`, + organizationId: sharedOrgId, + externalRef: `proj-${tag}`, + }, + }); + await client.runtimeEnvironment.create({ + data: { + id: sharedEnvId, + slug: `env-${tag}`, + type: "DEVELOPMENT", + projectId: sharedProjectId, + organizationId: sharedOrgId, + apiKey: `apikey-${tag}`, + pkApiKey: `pkapikey-${tag}`, + shortcode: `shortcode-${tag}`, + }, + }); + await client.taskRun.create({ + data: { + id: sharedRunId, + friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace-${tag}`, + spanId: `span-${tag}`, + queue: "test", + status, + createdAt: sharedCreatedAt, + runtimeEnvironmentId: sharedEnvId, + projectId: sharedProjectId, + organizationId: sharedOrgId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + }; + + try { + // Flush the NEW (gen-1, COMPLETED) snapshot FIRST and let it land in ClickHouse. + await seedFkRows(newPrisma, "new", TaskRunStatus.COMPLETED_SUCCESSFULLY, "run_dual_new"); + await setTimeout(2000); + + // THEN flush the LEGACY (gen-0, PENDING) snapshot. + await seedFkRows(prisma, "legacy", TaskRunStatus.PENDING, "run_dual_legacy"); + await setTimeout(3000); + } finally { + await newPrisma.$disconnect(); + } + + const queryRuns = clickhouse.reader.query({ + name: "dual-source-reverse", + query: + "SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL", + schema: z.object({ + run_id: z.string(), + status: z.string(), + total: z.number().int(), + }), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result).toHaveLength(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: sharedRunId, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + } finally { + await runsReplicationService?.stop(); + await pg17.stop({ timeout: 0 }); + } + } + ); + + // Case B - per-source independence / no cross-contamination (Step 4.2). + // Two DIFFERENT runs: run X lives ONLY in the legacy/gen-0 DB, run Y lives ONLY in the + // new/gen-1 DB. BOTH must appear in ClickHouse exactly once with their own status. This proves + // (a) BOTH sources became leader and streamed (a single-leader regression would drop one run), + // and (b) the two streams don't corrupt each other's per-source transaction/LSN state. + replicationContainerTest( + "streams two distinct runs from two sources independently without cross-contamination", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma, network }) => { + const legacyUrl = postgresContainer.getConnectionUri(); + + const { url: newUrl, container: pg17 } = await createPostgresContainer(network, { + imageTag: "docker.io/postgres:17", + }); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + let runsReplicationService: RunsReplicationService | undefined; + + try { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const newPrismaForAlter = new PrismaClient({ + datasources: { db: { url: newUrl } }, + }); + try { + await newPrismaForAlter.$executeRawUnsafe( + `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` + ); + } finally { + await newPrismaForAlter.$disconnect(); + } + + runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication", + redisOptions, + sources: [ + { + id: "legacy", + pgConnectionUrl: legacyUrl, + slotName: "tr_legacy_b", + publicationName: "tr_legacy_b_pub", + originGeneration: 0, + }, + { + id: "new", + pgConnectionUrl: newUrl, + slotName: "tr_new_b", + publicationName: "tr_new_b_pub", + originGeneration: 1, + }, + ], + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + // Run X (legacy-only) and run Y (new-only) get DISTINCT ids. Each lives in its own DB + // with its own org/project/env, so there is nothing to collapse - both must survive. + const suffix = `b_${Date.now()}`; + const legacyRunId = `run_legacy_only_${suffix}`; + const newRunId = `run_new_only_${suffix}`; + + const newPrisma = new PrismaClient({ datasources: { db: { url: newUrl } } }); + + const seedRun = async ( + client: PrismaClient, + tag: string, + runId: string, + status: TaskRunStatus + ) => { + const orgId = `org_${tag}_${suffix}`; + const projectId = `proj_${tag}_${suffix}`; + const envId = `env_${tag}_${suffix}`; + + await client.organization.create({ + data: { id: orgId, title: `org-${tag}`, slug: `org-${tag}-${suffix}` }, + }); + await client.project.create({ + data: { + id: projectId, + name: `proj-${tag}`, + slug: `proj-${tag}-${suffix}`, + organizationId: orgId, + externalRef: `proj-${tag}-${suffix}`, + }, + }); + await client.runtimeEnvironment.create({ + data: { + id: envId, + slug: `env-${tag}`, + type: "DEVELOPMENT", + projectId: projectId, + organizationId: orgId, + apiKey: `apikey-${tag}-${suffix}`, + pkApiKey: `pkapikey-${tag}-${suffix}`, + shortcode: `shortcode-${tag}-${suffix}`, + }, + }); + await client.taskRun.create({ + data: { + id: runId, + friendlyId: `friendly-${runId}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace-${tag}-${suffix}`, + spanId: `span-${tag}-${suffix}`, + queue: "test", + status, + createdAt: new Date(), + runtimeEnvironmentId: envId, + projectId: projectId, + organizationId: orgId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + }; + + try { + // Seed run X ONLY in legacy and run Y ONLY in new. + await seedRun(prisma, "legacy", legacyRunId, TaskRunStatus.PENDING); + await seedRun(newPrisma, "new", newRunId, TaskRunStatus.COMPLETED_SUCCESSFULLY); + + // Wait for BOTH streams to flush into ClickHouse. + await setTimeout(3000); + } finally { + await newPrisma.$disconnect(); + } + + const queryRuns = clickhouse.reader.query({ + name: "dual-source-independent", + query: "SELECT run_id, status FROM trigger_dev.task_runs_v2 FINAL ORDER BY run_id", + schema: z.object({ + run_id: z.string(), + status: z.string(), + }), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result).toHaveLength(2); + + const byRunId = new Map(result?.map((row) => [row.run_id, row.status])); + expect(byRunId.get(legacyRunId)).toBe("PENDING"); + expect(byRunId.get(newRunId)).toBe("COMPLETED_SUCCESSFULLY"); + } finally { + await runsReplicationService?.stop(); + await pg17.stop({ timeout: 0 }); + } + } + ); +}); diff --git a/apps/webapp/test/runsReplicationService.part9.test.ts b/apps/webapp/test/runsReplicationService.part9.test.ts new file mode 100644 index 00000000000..495b602acd4 --- /dev/null +++ b/apps/webapp/test/runsReplicationService.part9.test.ts @@ -0,0 +1,263 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { replicationContainerTest } from "@internal/testcontainers"; +import { PrismaClient } from "@trigger.dev/database"; +import { setTimeout } from "node:timers/promises"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryMetrics } from "./utils/tracing"; +import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; + +vi.setConfig({ testTimeout: 90_000 }); + +// Copied from runsReplicationService.part4.test.ts (the only replication part-test that +// injects a meter). These read metric data points out of the in-memory reader. +function makeMetricReaders( + metrics: Awaited["getMetrics"]>> +) { + function getMetricData(name: string) { + for (const resourceMetrics of metrics) { + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + if (metric.descriptor.name === name) { + return metric; + } + } + } + } + return null; + } + + function histogramHasData(metric: any): boolean { + if (!metric?.dataPoints || metric.dataPoints.length === 0) return false; + return metric.dataPoints.some((dp: any) => { + return ( + (typeof dp.count === "number" && dp.count > 0) || + (typeof dp.value?.count === "number" && dp.value.count > 0) || + (Array.isArray(dp.buckets?.counts) && dp.buckets.counts.some((c: number) => c > 0)) || + (typeof dp.sum === "number" && dp.sum > 0) || + typeof dp.min === "number" || + typeof dp.max === "number" + ); + }); + } + + function getCounterAttributeValues(metric: any, attributeName: string): unknown[] { + if (!metric?.dataPoints) return []; + return metric.dataPoints + .filter((dp: any) => dp.attributes?.[attributeName] !== undefined) + .map((dp: any) => dp.attributes[attributeName]); + } + + return { getMetricData, histogramHasData, getCounterAttributeValues }; +} + +// Poll the in-memory reader until the lag histogram has data (replication is async, and +// container/CPU contention makes a fixed sleep flaky). Returns the latest collected metrics. +async function waitForLagHistogram( + metricsHelper: ReturnType, + timeoutMs = 20_000 +) { + const deadline = Date.now() + timeoutMs; + let metrics = await metricsHelper.getMetrics(); + while (Date.now() < deadline) { + const { getMetricData, histogramHasData } = makeMetricReaders(metrics); + if (histogramHasData(getMetricData("runs_replication.replication_lag_ms"))) { + return metrics; + } + await setTimeout(250); + metrics = await metricsHelper.getMetrics(); + } + return metrics; +} + +async function seedRun(client: PrismaClient, tag: string) { + const suffix = `${Date.now()}_${Math.floor(Math.random() * 1_000_000)}`; + const org = await client.organization.create({ + data: { title: `org-${tag}-${suffix}`, slug: `org-${tag}-${suffix}` }, + }); + const project = await client.project.create({ + data: { + name: `proj-${tag}-${suffix}`, + slug: `proj-${tag}-${suffix}`, + organizationId: org.id, + externalRef: `proj-${tag}-${suffix}`, + }, + }); + const env = await client.runtimeEnvironment.create({ + data: { + slug: `env-${tag}-${suffix}`, + type: "DEVELOPMENT", + projectId: project.id, + organizationId: org.id, + apiKey: `apikey-${tag}-${suffix}`, + pkApiKey: `pkapikey-${tag}-${suffix}`, + shortcode: `shortcode-${tag}-${suffix}`, + }, + }); + await client.taskRun.create({ + data: { + friendlyId: `run_${tag}_${suffix}`, + taskIdentifier: `my-task-${tag}`, + payload: JSON.stringify({ foo: "bar" }), + payloadType: "application/json", + traceId: `trace-${tag}-${suffix}`, + spanId: `span-${tag}-${suffix}`, + queue: `test-${tag}`, + runtimeEnvironmentId: env.id, + projectId: project.id, + organizationId: org.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + }, + }); +} + +describe("RunsReplicationService (part 9/9) - per-source replication-lag attribute", () => { + // Two named sources fanning into one flush scheduler (the production dual-fan-in shape). + // Both point at the warm fixture Postgres via independent slots/publications, so the test + // proves the per-source `.record(lag, { source, generation })` attribute deterministically + // for two distinct producer identities. The cross-version (PG14<->PG17) replication boundary + // itself is covered by part8's dual-source dedup test; here we assert the lag *attribution*, + // which is identical regardless of the producer's Postgres version. + replicationContainerTest( + "tags the replication-lag histogram with each source id for a dual-source service", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const pgUrl = postgresContainer.getConnectionUri(); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-lag-per-source", + logLevel: "warn", + }); + + const metricsHelper = createInMemoryMetrics(); + let runsReplicationService: RunsReplicationService | undefined; + + try { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication-lag-per-source", + redisOptions, + sources: [ + { + id: "legacy", + pgConnectionUrl: pgUrl, + slotName: "tr_lag_legacy_v1", + publicationName: "tr_lag_legacy_v1_pub", + originGeneration: 0, + }, + { + id: "new", + pgConnectionUrl: pgUrl, + slotName: "tr_lag_new_v1", + publicationName: "tr_lag_new_v1_pub", + originGeneration: 1, + }, + ], + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + meter: metricsHelper.meter, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + // Each insert is decoded by BOTH slots (both subscribe to the same table), so a single + // seed produces a lag point tagged "legacy" and one tagged "new". Poll until both land. + const deadline = Date.now() + 40_000; + let sources: unknown[] = []; + let metrics = await metricsHelper.getMetrics(); + while (Date.now() < deadline) { + const { getMetricData, getCounterAttributeValues } = makeMetricReaders(metrics); + sources = getCounterAttributeValues( + getMetricData("runs_replication.replication_lag_ms"), + "source" + ); + if (sources.includes("legacy") && sources.includes("new")) break; + await seedRun(prisma, "lag"); + await setTimeout(500); + metrics = await metricsHelper.getMetrics(); + } + + const { getMetricData, histogramHasData } = makeMetricReaders(metrics); + const replicationLag = getMetricData("runs_replication.replication_lag_ms"); + expect(replicationLag).not.toBeNull(); + expect(histogramHasData(replicationLag)).toBe(true); + + // Each source's id appears as a label value on at least one lag data point. + expect(sources).toContain("legacy"); + expect(sources).toContain("new"); + } finally { + await runsReplicationService?.stop(); + await metricsHelper.shutdown(); + } + } + ); + + // Step 1.4: single-source passthrough. When a single source is used, the lag + // histogram records exactly one `source` label value (the source's id). + replicationContainerTest( + "records a single source label in single-source mode", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-lag-single-source", + logLevel: "warn", + }); + + const metricsHelper = createInMemoryMetrics(); + + const runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + serviceName: "runs-replication-lag-single-source", + redisOptions, + sources: [ + { + id: "default", + pgConnectionUrl: postgresContainer.getConnectionUri(), + slotName: "tr_lag_single_v1", + publicationName: "tr_lag_single_v1_pub", + originGeneration: 0, + }, + ], + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + meter: metricsHelper.meter, + logLevel: "warn", + }); + + try { + await runsReplicationService.start(); + + await seedRun(prisma, "single"); + + const metrics = await waitForLagHistogram(metricsHelper); + const { getMetricData, histogramHasData, getCounterAttributeValues } = + makeMetricReaders(metrics); + + const replicationLag = getMetricData("runs_replication.replication_lag_ms"); + expect(replicationLag).not.toBeNull(); + expect(histogramHasData(replicationLag)).toBe(true); + + const sources = getCounterAttributeValues(replicationLag, "source"); + const uniqueSources = [...new Set(sources)]; + expect(uniqueSources).toEqual(["default"]); + } finally { + await runsReplicationService.stop(); + await metricsHelper.shutdown(); + } + } + ); +}); From 8844bd6f2af08dee0913183c9f74b4eb6a793e51 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 14:13:16 +0100 Subject: [PATCH 2/7] chore(run-ops split): strip plan-enumeration labels from replication comments/test names Remove the internal plan-enumeration labels from runs-replication comments and test names, keeping the behavioral descriptions intact. Comment/label hygiene only; no product logic or test behavior changed. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/webapp/app/services/runsReplicationInstance.server.ts | 2 +- apps/webapp/test/runsReplicationService.part8.test.ts | 4 ++-- apps/webapp/test/runsReplicationService.part9.test.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index fef74ef4e13..82cf379cb55 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -151,7 +151,7 @@ function initializeRunsReplicationInstance() { ]); if (env.RUN_REPLICATION_ENABLED === "1") { - // Shape B (construct-after-gate): resolve the async split gate ONCE at boot, and + // Construct-after-gate: resolve the async split gate ONCE at boot, and // when both sources are enabled rebuild `service` with sources[] before starting. // The legacy-only instance above is never started in the dual path (no slot/lock // taken). runsReplicationService.server.ts is untouched. The create route also calls diff --git a/apps/webapp/test/runsReplicationService.part8.test.ts b/apps/webapp/test/runsReplicationService.part8.test.ts index e0861e741d3..480a14e38d4 100644 --- a/apps/webapp/test/runsReplicationService.part8.test.ts +++ b/apps/webapp/test/runsReplicationService.part8.test.ts @@ -194,7 +194,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { } ); - // Case A - reverse-order independence (Step 4.1). + // Case A - reverse-order independence. // Same run in both sources, but we flush the NEW (gen-1) snapshot FIRST, then the LEGACY // (gen-0) snapshot. The gen-1 winner must survive regardless of arrival order - the collapse // is FINAL-time and ordered by _version (composed origin generation), not by arrival time. @@ -362,7 +362,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { } ); - // Case B - per-source independence / no cross-contamination (Step 4.2). + // Case B - per-source independence / no cross-contamination. // Two DIFFERENT runs: run X lives ONLY in the legacy/gen-0 DB, run Y lives ONLY in the // new/gen-1 DB. BOTH must appear in ClickHouse exactly once with their own status. This proves // (a) BOTH sources became leader and streamed (a single-leader regression would drop one run), diff --git a/apps/webapp/test/runsReplicationService.part9.test.ts b/apps/webapp/test/runsReplicationService.part9.test.ts index 495b602acd4..7371ab88540 100644 --- a/apps/webapp/test/runsReplicationService.part9.test.ts +++ b/apps/webapp/test/runsReplicationService.part9.test.ts @@ -200,7 +200,7 @@ describe("RunsReplicationService (part 9/9) - per-source replication-lag attribu } ); - // Step 1.4: single-source passthrough. When a single source is used, the lag + // Single-source passthrough. When a single source is used, the lag // histogram records exactly one `source` label value (the source's id). replicationContainerTest( "records a single source label in single-source mode", From e543aedcbef210e5a42da6ff7a1658735ca27640 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 15:32:16 +0100 Subject: [PATCH 3/7] style(run-ops): apply oxfmt Co-Authored-By: Claude Opus 4.8 (1M context) --- .../runsReplicationInstance.server.ts | 5 ++--- .../test/runsReplicationInstance.test.ts | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 82cf379cb55..1953b2957f8 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -68,7 +68,7 @@ export function buildReplicationSources(args: { export class SplitReplicationMisconfiguredError extends Error { constructor() { super( - "RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no \"new\" source: " + + 'RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no "new" source: ' + "ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " + "ClickHouse-fronted aggregate. Enable the new replication source " + "(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off." @@ -162,8 +162,7 @@ function initializeRunsReplicationInstance() { splitEnabled, legacyUrl: DATABASE_URL, newUrl: env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL, - newSourceOverride: - env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined, + newSourceOverride: env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined, legacySlotName: env.RUN_REPLICATION_SLOT_NAME, legacyPublicationName: env.RUN_REPLICATION_PUBLICATION_NAME, legacyOriginGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION, diff --git a/apps/webapp/test/runsReplicationInstance.test.ts b/apps/webapp/test/runsReplicationInstance.test.ts index a7108d2bc75..34a6f379934 100644 --- a/apps/webapp/test/runsReplicationInstance.test.ts +++ b/apps/webapp/test/runsReplicationInstance.test.ts @@ -168,7 +168,7 @@ describe("assertReplicationCoversSplit (boot gate-coupling)", () => { newOriginGeneration: 1, }; - it("throws when split is on but sources[] has no \"new\" source (the silent under-count)", () => { + it('throws when split is on but sources[] has no "new" source (the silent under-count)', () => { // Split on, but the new replication source is forced off — ksuid runs would not // reach ClickHouse. This is the exact misconfiguration the boot gate must refuse to boot with. const sources = buildReplicationSources({ @@ -268,9 +268,16 @@ describe("RunsReplication new-source backfill origin generation (integration)", // Create org/project/env/run on the legacy DB (the FK schema lives there). // This simulates a pre-existing run that was migrated to the dedicated DB. - const organization = await prisma.organization.create({ data: { title: "bf-gen", slug: "bf-gen" } }); + const organization = await prisma.organization.create({ + data: { title: "bf-gen", slug: "bf-gen" }, + }); const project = await prisma.project.create({ - data: { name: "bf-gen", slug: "bf-gen", organizationId: organization.id, externalRef: "bf-gen" }, + data: { + name: "bf-gen", + slug: "bf-gen", + organizationId: organization.id, + externalRef: "bf-gen", + }, }); const runtimeEnvironment = await prisma.runtimeEnvironment.create({ data: { @@ -303,16 +310,14 @@ describe("RunsReplication new-source backfill origin generation (integration)", try { // Backfill the run via the "new" source — must encode gen=1 in _version. - await service.backfill( - [{ ...run, masterQueue: run.workerQueue ?? "main" }], - "new" - ); + await service.backfill([{ ...run, masterQueue: run.workerQueue ?? "main" }], "new"); await setTimeout(500); const queryRuns = clickhouse.reader.query({ name: "runs-replication-backfill-gen", - query: "SELECT run_id, _version FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}", + query: + "SELECT run_id, _version FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}", schema: z.object({ run_id: z.string(), _version: z.number() }), params: z.object({ run_id: z.string() }), }); From ab014a7b2d45e777c9c98485694c65f54bf60302 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 20:10:03 +0100 Subject: [PATCH 4/7] fix(run-ops): align single-source leader-lock id, guard shutdown stop, fix test status type - Register the implicit single source with id "legacy" so its leader-lock key matches the id the admin status route probes; otherwise leadership always reads false in the non-split config. - Guard the shutdown-path client.stop() fan-out against re-firing per incoming transaction and add a catch so rejections don't surface as unhandled. - Use the TaskRunStatus type alias (not the const value) for status annotations in the dual-source dedup tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/runsReplicationInstance.server.ts | 11 +++++++++++ .../services/runsReplicationService.server.ts | 18 +++++++++++++----- .../test/runsReplicationService.part8.test.ts | 8 ++++---- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 1953b2957f8..ccae8e4f26c 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -138,6 +138,17 @@ function initializeRunsReplicationInstance() { pgConnectionUrl: DATABASE_URL, slotName: env.RUN_REPLICATION_SLOT_NAME, publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, + // Explicit legacy source so the leader-lock key matches the id the status + // route probes from the registry below. + sources: [ + { + id: "legacy", + pgConnectionUrl: DATABASE_URL, + slotName: env.RUN_REPLICATION_SLOT_NAME, + publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, + originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION, + }, + ], }); // Register the live handle so the status route + lifecycle routes can find it. diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 0448fa78c90..e37ff257671 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -162,6 +162,7 @@ export class RunsReplicationService { private logger: Logger; private _isShuttingDown = false; private _isShutDownComplete = false; + private _shutdownStopInFlight = false; private _tracer: Tracer; private _meter: Meter; private _acknowledgeTimeoutMs: number; @@ -631,11 +632,18 @@ export class RunsReplicationService { if (this._isShuttingDown) { // A global shutdown stops every source's client; mark complete once all - // have stopped. For a single source this is identical to the prior - // "stop the one client, then mark complete" behavior. - Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop())).finally(() => { - this._isShutDownComplete = true; - }); + // have stopped. Guard against re-firing per incoming transaction, and + // swallow client.stop() rejections so they don't surface as unhandled. + if (!this._shutdownStopInFlight) { + this._shutdownStopInFlight = true; + Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop())) + .catch((error) => { + this.logger.error("Error stopping replication clients during shutdown", { error }); + }) + .finally(() => { + this._isShutDownComplete = true; + }); + } } // If there are no events, do nothing diff --git a/apps/webapp/test/runsReplicationService.part8.test.ts b/apps/webapp/test/runsReplicationService.part8.test.ts index 480a14e38d4..0b4afab2d3d 100644 --- a/apps/webapp/test/runsReplicationService.part8.test.ts +++ b/apps/webapp/test/runsReplicationService.part8.test.ts @@ -1,6 +1,6 @@ import { ClickHouse } from "@internal/clickhouse"; import { createPostgresContainer, replicationContainerTest } from "@internal/testcontainers"; -import { PrismaClient } from "@trigger.dev/database"; +import { PrismaClient, type TaskRunStatus as TaskRunStatusType } from "@trigger.dev/database"; import { setTimeout } from "node:timers/promises"; import { z } from "zod"; import { TaskRunStatus } from "~/database-types"; @@ -96,7 +96,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { const seedFkRows = async ( client: PrismaClient, tag: string, - status: TaskRunStatus, + status: TaskRunStatusType, friendlyId: string ) => { await client.organization.create({ @@ -275,7 +275,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { const seedFkRows = async ( client: PrismaClient, tag: string, - status: TaskRunStatus, + status: TaskRunStatusType, friendlyId: string ) => { await client.organization.create({ @@ -444,7 +444,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => { client: PrismaClient, tag: string, runId: string, - status: TaskRunStatus + status: TaskRunStatusType ) => { const orgId = `org_${tag}_${suffix}`; const projectId = `proj_${tag}_${suffix}`; From 6fcf188232e1f0e0f300a8d6a38ce0b9f02531a1 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 08:50:05 +0100 Subject: [PATCH 5/7] chore(run-ops split): add server-changes entry for ClickHouse replication fan-in Co-Authored-By: Claude Opus 4.8 (1M context) --- .server-changes/run-ops-replication-fan-in.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/run-ops-replication-fan-in.md diff --git a/.server-changes/run-ops-replication-fan-in.md b/.server-changes/run-ops-replication-fan-in.md new file mode 100644 index 00000000000..7d6734dd4f9 --- /dev/null +++ b/.server-changes/run-ops-replication-fan-in.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Replicate task runs into ClickHouse from multiple source databases so the run-ops DB split can fan both databases into analytics, with an admin status endpoint reporting per-source replication leadership. From 766138c382b7ddda245ed5853d24b43bf7c5e1bb Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 11:25:06 +0100 Subject: [PATCH 6/7] chore(run-ops): fix lint/format for main lint rules Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/webapp/test/runsReplicationService.part9.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/test/runsReplicationService.part9.test.ts b/apps/webapp/test/runsReplicationService.part9.test.ts index 7371ab88540..b7882d94b3e 100644 --- a/apps/webapp/test/runsReplicationService.part9.test.ts +++ b/apps/webapp/test/runsReplicationService.part9.test.ts @@ -1,6 +1,6 @@ import { ClickHouse } from "@internal/clickhouse"; import { replicationContainerTest } from "@internal/testcontainers"; -import { PrismaClient } from "@trigger.dev/database"; +import type { PrismaClient } from "@trigger.dev/database"; import { setTimeout } from "node:timers/promises"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; import { createInMemoryMetrics } from "./utils/tracing"; From 29bd826caf41fb85882ecb924672c30e4c92b02d Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 19:06:37 +0100 Subject: [PATCH 7/7] fix(run-ops replication): shut down bootstrap replication instance before replacing it The legacy-only instance constructed at boot opens a replication client (Redis + Redlock) eagerly; when the split gate resolves to a multi-source service it was replaced without cleanup, leaking one Redis connection per split-enabled boot. shutdown() the bootstrap instance first (idempotent, safe on the never-started instance). Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/webapp/app/services/runsReplicationInstance.server.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index ccae8e4f26c..bfbf38478a2 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -168,7 +168,7 @@ function initializeRunsReplicationInstance() { // taken). runsReplicationService.server.ts is untouched. The create route also calls // setRunsReplicationGlobal — last-writer-wins is the existing contract. isSplitEnabled() - .then((splitEnabled) => { + .then(async (splitEnabled) => { const sources = buildReplicationSources({ splitEnabled, legacyUrl: DATABASE_URL, @@ -186,6 +186,9 @@ function initializeRunsReplicationInstance() { assertReplicationCoversSplit({ splitEnabled, sources }); if (sources.length > 1) { + // Release the bootstrap instance's eager replication client (Redis + Redlock) + // before replacing it, or it leaks for the process lifetime. shutdown() is idempotent. + await service.shutdown(); // The scalar pgConnectionUrl/slotName/publicationName remain required on the // options type, but are ignored when sources[] is non-empty — the // service normalizes off sources. Pass the legacy scalars to satisfy the type.