feat: renew in-flight job timestamps via a worker heartbeat#17
Conversation
Long-running jobs could be executed twice from a single enqueue. When a handler runs longer than `stalledThreshold`, the stalled-recovery path re-delivers it to a free slot because nothing ever refreshes `acquiredAt` between claim and completion — so the threshold acts as a hard cap on job runtime rather than a crash-detection window. Add a heartbeat that periodically renews the acquired timestamp of jobs currently in the pool: - `renewJobs(queue, jobIds)` on the Adapter contract, implemented for the Redis (Lua `HSET` over the active hash), Knex (UPDATE ... WHERE status = 'active'), Fake and Sync adapters. Only entries still active are renewed, so a job that was already recovered or finalized is never resurrected by a late heartbeat. - A dedicated worker `setInterval` (~`stalledThreshold / 2`) that renews the in-flight job ids. It must be a separate timer: at full concurrency the process loop blocks on `waitForNextCompletion()` with no idle tick, so the loop is not cycling exactly when long jobs are in flight. - The heartbeat is cleared in `stop()` (after draining, so jobs that are still finishing keep being renewed until they complete) as well as in the process() generator's `finally`, guaranteeing deterministic cleanup whether the worker was driven via start() or processCycle(). `#startHeartbeat` is idempotent so the timer can never leak if the loop is re-entered. "Stalled" now means the worker actually died again, so `stalledThreshold` can stay small without re-delivering healthy long-running jobs. Tests cover renewJobs across all adapters (renew keeps an active job from recovery, never resurrects an already-recovered job, is queue-scoped) and two worker-level tests: a long-running job at full capacity is renewed by the heartbeat and executes exactly once, and the heartbeat stops firing once the worker is stopped.
|
Hi! Thanks for the PR. The approach looks good to me as a first solution. One caveat is that CPU-bound jobs can still cause issues, since they may block the event loop and prevent the heartbeat from running. That is probably a separate problem, though. There is one subtle issue I would like to see addressed before merging. A worker can currently renew a job it no longer owns. Could you please pass the |
renewJobs previously checked only that a job was still active (HEXISTS), so a slow-but-alive worker whose job had been recovered and re-acquired by another worker would keep renewing the new owner's lease — preventing recovery from re-delivering it if that owner later died. Enforce ownership using the worker id the adapter already holds from setWorkerId (as pop does), without changing the renewJobs signature: - Redis: RENEW_JOBS_SCRIPT skips entries whose workerId doesn't match. - Knex: renew UPDATE gains a WHERE worker_id clause. - Fake/memory adapters: record the worker id on pop and filter on it. Add a cross-worker driver test asserting a worker cannot renew a lease owned by another worker, while the legitimate owner still can.
|
Thanks for the review!! great catch on the ownership issue. I've fixed it so each adapter only renews leases it still owns. Rather than add a
Added a cross-worker test (runs against Redis and Postgres/SQLite): worker A acquires a job, worker B's On the CPU-bound point — fully agree that it's separate. The heartbeat is a timer on the same event loop as the handlers, so a synchronous CPU-bound job blocks it from firing and there's no way around that within a single thread (BullMQ's lock renewal has the same limitation). The real fix is sandboxed/worker-thread processors so the renewal loop stays responsive while the handler crunches. One related thing I noticed while in here: the same ownership check is missing on |
|
Thanks, this looks good to me! For |
Closes #16.
Companion PR for the long-running-job double-execution issue. As described there, when a handler runs longer than
stalledThreshold, nothing ever refreshesacquiredAtbetween claim and completion, so the stalled-recovery path re-delivers the job to a free slot and it runs a second time, concurrently.stalledThresholdeffectively becomes a hard cap on how long a job may run rather than a crash-detection window.What this does
Adds a heartbeat that periodically renews the acquired timestamp of the jobs a worker is actively processing.
renewJobs(queue, jobIds)on theAdaptercontract, implemented for every backend:RENEW_JOBS_SCRIPTLua thatHSETsacquiredAton the active hash, preservingworkerId.UPDATE ... WHERE status = 'active' AND id IN (...).setInterval(~stalledThreshold / 2) that renews the in-flight job ids. It has to be a separate timer rather than piggybacking on the process loop: at full concurrency the loop blocks onwaitForNextCompletion()with no idle tick, so the loop is not cycling exactly when long jobs are in flight.stop()(after draining, so jobs still finishing keep being renewed until they complete) as well as in theprocess()generator'sfinally, giving deterministic cleanup whether the worker is driven viastart()orprocessCycle().#startHeartbeatis idempotent so the timer can't leak if the loop is re-entered.Net effect: "stalled" once again means the worker actually died, so
stalledThresholdcan stay small without re-delivering healthy long-running jobs.Tests
renewJobsbehavior across all adapters via the shared driver suite: renewal keeps an active job from being recovered, never resurrects an already-recovered job, is queue-scoped, and is a no-op for an empty id list.All 647 tests pass locally across Memory, Redis, Knex (SQLite) and Knex (PostgreSQL);
tsc, oxlint and oxfmt are clean.