-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(run-ops): read presenters — de-join control-plane relations + read-through hydration #4122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
c532b8b
feat(run-ops): read presenters — de-join control-plane relations + re…
d-cs f2d8d02
fix(run-ops split): green the run-ops read presenter tests
d-cs 7181b2f
refactor(run-ops): drop known-migrated from read presenters; id-shape…
d-cs ff73dac
chore(run-ops split): strip review-scaffolding comments/labels from p…
d-cs 2b41afb
style(run-ops): apply oxfmt
d-cs 26e428e
fix(run-ops split): drop dead connectedRuns relation from ApiWaitpoin…
d-cs 77a3d69
test(run-ops): reconcile ApiBatchResultsPresenter split test with rea…
d-cs b9815cc
fix(run-ops split): align WaitpointPresenter single-DB fallback and r…
d-cs cad09df
test(run-ops): stop RunPresenter read-seam test freezing on the first…
d-cs 3a0918d
test(run-ops): stub org-data-stores registry singleton + deterministi…
d-cs e2a35ff
chore: add server-changes for pr08
d-cs 1ae68a0
chore(run-ops): fix lint/format for main lint rules
d-cs 77236dd
test(run-ops split): route-level regression for batch-results 404 on …
d-cs 1ef21cf
fix(run-ops presenters): batch-list empty-state probe mirrors the sca…
d-cs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
6 changes: 6 additions & 0 deletions
6
.server-changes/route-run-presenter-reads-through-run-store.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: improvement | ||
| --- | ||
|
|
||
| Route dashboard and API run/batch/waitpoint presenter reads through the run store so they can be served from a separate backing store without changing call sites. |
242 changes: 190 additions & 52 deletions
242
apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,81 +1,219 @@ | ||
| import type { BatchTaskRunExecutionResult } from "@trigger.dev/core/v3"; | ||
| import { | ||
| $replica, | ||
| type PrismaClientOrTransaction, | ||
| type PrismaReplicaClient, | ||
| prisma, | ||
| } from "~/db.server"; | ||
| import type { TaskRunWithAttempts } from "~/models/taskRun.server"; | ||
| import { executionResultForTaskRun } from "~/models/taskRun.server"; | ||
| import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; | ||
| import { runStore } from "~/v3/runStore.server"; | ||
| import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server"; | ||
| import { runStore as defaultRunStore } from "~/v3/runStore.server"; | ||
| import { BasePresenter } from "./basePresenter.server"; | ||
|
|
||
| /** | ||
| * Run-ops read-through wiring. All optional; absent (or `splitEnabled` falsy) collapses `call` to | ||
| * passthrough. `legacyReplica` is a READ REPLICA handle only — there is NO legacy-primary field. | ||
| */ | ||
| type ApiBatchResultsReadThroughDeps = { | ||
| splitEnabled?: boolean; | ||
| newClient?: PrismaReplicaClient; | ||
| legacyReplica?: PrismaReplicaClient; | ||
| isPastRetention?: (runId: string) => boolean; | ||
| }; | ||
|
|
||
| // The TaskRun shape `executionResultForTaskRun` consumes. Shared by both read sites. | ||
| const memberRunSelect = { | ||
| id: true, | ||
| friendlyId: true, | ||
| status: true, | ||
| taskIdentifier: true, | ||
| attempts: { | ||
| select: { | ||
| status: true, | ||
| output: true, | ||
| outputType: true, | ||
| error: true, | ||
| }, | ||
| orderBy: { | ||
| createdAt: "desc", | ||
| }, | ||
| }, | ||
| } as const; | ||
|
|
||
| /** | ||
| * Split on: the batch row + its item rows resolve new-run-ops first, then the LEGACY RUN-OPS | ||
| * READ REPLICA ONLY (never the legacy primary — there is no such handle); each member run is | ||
| * hydrated independently via readThroughRun keyed on the member runId, so a batch whose members | ||
| * span migrated + abandoned runs returns the complete reachable set (the batch-spanning-the-line | ||
| * read; the dangling-reference termination gate is a separate, adjacent unit). | ||
| * | ||
| * Split off (single-DB / self-host): one passthrough read for the batch row + a single store | ||
| * id-set hydrate for the members — no legacy read, no known-migrated probe, no second connection. | ||
| */ | ||
| export class ApiBatchResultsPresenter extends BasePresenter { | ||
| constructor( | ||
| prismaClient: PrismaClientOrTransaction = prisma, | ||
| replicaClient: PrismaClientOrTransaction = $replica, | ||
| private readonly readThrough?: ApiBatchResultsReadThroughDeps, | ||
| private readonly runStore = defaultRunStore | ||
| ) { | ||
| super(prismaClient, replicaClient); | ||
| } | ||
|
|
||
| public async call( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| return this.traceWithEnv("call", env, async (span) => { | ||
| // Route through the store so a NEW-resident batch resolves under the run-ops split (the | ||
| // router probes NEW→LEGACY and drops this client hint) instead of 404ing on a control-plane read. | ||
| const batchRun = await runStore.findBatchTaskRunByFriendlyId( | ||
| const splitEnabled = this.readThrough?.splitEnabled ?? false; | ||
|
|
||
| if (!splitEnabled) { | ||
| return this.#callPassthrough(friendlyId, env); | ||
| } | ||
|
|
||
| return this.#callSplit(friendlyId, env); | ||
| }); | ||
| } | ||
|
|
||
| // Passthrough: batch row off the replica, members via the single run store. No legacy read. | ||
| async #callPassthrough( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| const batchRun = await this._replica.batchTaskRun.findFirst({ | ||
| where: { | ||
| friendlyId, | ||
| env.id, | ||
| { | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| runtimeEnvironmentId: env.id, | ||
| }, | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| this._prisma | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
|
|
||
| const taskRunIds = batchRun.items.map((item) => item.taskRunId); | ||
| const taskRunIds = batchRun.items.map((item) => item.taskRunId); | ||
|
|
||
| if (taskRunIds.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: [], | ||
| }; | ||
| } | ||
| if (taskRunIds.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: [], | ||
| }; | ||
| } | ||
|
|
||
| const taskRuns = await runStore.findRuns( | ||
| { | ||
| where: { id: { in: taskRunIds } }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| status: true, | ||
| taskIdentifier: true, | ||
| attempts: { | ||
| select: { | ||
| status: true, | ||
| output: true, | ||
| outputType: true, | ||
| error: true, | ||
| }, | ||
| orderBy: { | ||
| createdAt: "desc", | ||
| }, | ||
| const taskRuns = await this.runStore.findRuns( | ||
| { | ||
| where: { id: { in: taskRunIds } }, | ||
| select: memberRunSelect, | ||
| }, | ||
| this._prisma | ||
| ); | ||
|
|
||
| const runMap = new Map(taskRuns.map((run) => [run.id, run])); | ||
|
|
||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: batchRun.items | ||
| .map((item) => { | ||
| const run = runMap.get(item.taskRunId); | ||
| return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined; | ||
| }) | ||
| .filter(Boolean), | ||
| }; | ||
| } | ||
|
|
||
| // Split: resolve the batch row new-first then off the legacy READ REPLICA only (a batch id may | ||
| // be cuid or ksuid, and a cuid-shaped id can still have been backfilled onto NEW, so id-shape | ||
| // residency is not authoritative for the row — the new-first-then-legacy probe is), then | ||
| // hydrate every member run independently via the per-run read-through primitive. | ||
| async #callSplit( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| // Resolve both handles ONCE so the batch row and its members never read from different DBs. | ||
| const newClient = (this.readThrough?.newClient ?? this._replica) as PrismaReplicaClient; | ||
| const legacyReplica = (this.readThrough?.legacyReplica ?? this._replica) as PrismaReplicaClient; | ||
|
|
||
| const readBatch = (client: PrismaClientOrTransaction) => | ||
| client.batchTaskRun.findFirst({ | ||
| where: { | ||
| friendlyId, | ||
| runtimeEnvironmentId: env.id, | ||
| }, | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| }, | ||
| this._prisma | ||
| ); | ||
| }); | ||
|
|
||
| let batchRun = await readBatch(newClient); | ||
|
|
||
| // Legacy READ REPLICA probe, only on a new-probe miss; skipped when past retention. | ||
| if (!batchRun && !this.readThrough?.isPastRetention?.(friendlyId)) { | ||
| batchRun = await readBatch(legacyReplica); | ||
| } | ||
|
|
||
| const runMap = new Map(taskRuns.map((run) => [run.id, run])); | ||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
|
|
||
| if (batchRun.items.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: batchRun.items | ||
| .map((item) => { | ||
| const run = runMap.get(item.taskRunId); | ||
| return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined; | ||
| }) | ||
| .filter(Boolean), | ||
| items: [], | ||
| }; | ||
| }); | ||
| } | ||
|
|
||
| const readMemberRun = (client: PrismaClientOrTransaction, taskRunId: string) => | ||
| client.taskRun.findFirst({ | ||
| where: { id: taskRunId }, | ||
| select: memberRunSelect, | ||
| }) as Promise<TaskRunWithAttempts | null>; | ||
|
|
||
| // Per-member fan-out: each member may live on a different DB, so a single nested include cannot | ||
| // cross the seam. Promise.all preserves batchRun.items order, unchanged from today. | ||
| const memberResults = await Promise.all( | ||
| batchRun.items.map(async (item) => { | ||
| const result = await readThroughRun<TaskRunWithAttempts>({ | ||
| runId: item.taskRunId, | ||
| environmentId: env.id, | ||
| readNew: (client) => readMemberRun(client, item.taskRunId), | ||
| readLegacy: (replica) => readMemberRun(replica, item.taskRunId), | ||
| deps: { | ||
| splitEnabled: true, | ||
| // Pass the SAME resolved handles the batch row used, so the batch row and its members | ||
| // never resolve against different DBs. (Letting these fall through to readThroughRun's | ||
| // own module-level defaults would diverge from the batch read's `?? this._replica`.) | ||
| newClient, | ||
| legacyReplica, | ||
| isPastRetention: this.readThrough?.isPastRetention, | ||
| }, | ||
| }); | ||
|
|
||
| // not-found / past-retention members are omitted (matches today's drop-undefined behavior); | ||
| // the dangling-reference termination gate (separate unit) governs whether that's permitted. | ||
| if (result.source === "not-found" || result.source === "past-retention") { | ||
| return undefined; | ||
| } | ||
|
|
||
| return executionResultForTaskRun(result.value); | ||
| }) | ||
| ); | ||
|
|
||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: memberResults.filter(Boolean), | ||
| }; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.