Releases: boringnode/queue
Job Deduplication and Worker Heartbeats
What's Changed
New Features
-
Added job deduplication through the new
.dedup()dispatcher method..dedup({ id })skips duplicate jobs while an existing job with the same key is still present..dedup({ id, ttl })skips duplicates within a time window..dedup({ id, ttl, extend: true })refreshes the deduplication window when a duplicate is dispatched..dedup({ id, ttl, replace: true })replaces the payload of an existing pending or delayed job..dedup({ id, ttl, extend: true, replace: true })enables debounce-style dispatching.- Dispatch results now expose a
dedupedoutcome:added,skipped,replaced, orextended.
-
Added worker heartbeats for long-running in-flight jobs.
- Workers periodically renew the acquired timestamp of active jobs they own.
- Healthy long-running jobs are no longer re-delivered just because their handler runs longer than
stalledThreshold. stalledThresholdnow behaves as a crash-detection window again, instead of an implicit maximum job runtime.
Bug Fixes
-
Fixed Redis job payload preservation for empty arrays.
- Redis lifecycle operations now keep stored job JSON opaque to Lua, preventing payload values like
[]from being serialized back as{}. - This applies across acquisition, inspection, retry, stalled recovery, and dedup replacement flows.
- Redis lifecycle operations now keep stored job JSON opaque to Lua, preventing payload values like
-
Fixed worker heartbeat ownership checks.
- A worker can only renew leases for jobs it still owns.
- Late heartbeats from a slow or stale worker cannot extend jobs that were already recovered and re-acquired by another worker.
Internal Improvements
- Moved Redis Lua scripts out of the Redis adapter into dedicated script modules.
Upgrade Notes
-
Knex users with an existing
queue_jobstable must add the new deduplication columns before using.dedup():import { QueueSchemaService } from '@boringnode/queue' const schema = new QueueSchemaService(knex) await schema.addDedupColumns()
If you use a custom jobs table name, pass it explicitly:
await schema.addDedupColumns('my_queue_jobs')
New tables created with
createJobsTable()already include these columns. -
Deduplication is supported by the Redis and Knex adapters. The Sync adapter still runs every dispatch inline and does not apply deduplication.
-
Deduplication only applies to single-job dispatch. Batch dispatch and scheduled jobs do not support deduplication.
-
The user-provided deduplication ID must be 400 characters or fewer, and the final
<jobName>::<id>key must be 510 characters or fewer. -
Custom adapter implementations must implement the new
renewJobs(queue, jobIds)adapter method. -
Existing Redis jobs remain compatible after upgrading. Jobs that do not yet have the new Redis metadata are read from their existing stored JSON.
-
Redis jobs whose payloads were already corrupted before upgrading cannot be automatically repaired, because an already-stored
{}cannot be distinguished from an original[].
Commits Since v0.5.2
- chore: update dependencies (a400cf5)
- chore: update yarn (48787f7)
- feat: renew in-flight job timestamps via a worker heartbeat (#17) (177ebd5)
- refactor(redis): move lua scripts out of adapter (4a576a3)
- docs: add changelog for job deduplication (500935a)
- fix(redis): preserve opaque job payloads (b14e488)
- chore: remove old changelog (cb40c90)
- feat: add .dedup() method for job deduplication (#12) (52c83be)
Explicit job loading and better observability
What's Changed
Added
- Added
messaging.job.queue_time_msto OpenTelemetry consumer spans, making it easier to measure how long a job waited in the queue before execution. - Added support for
using fake = QueueManager.fake()so tests can automatically restore real adapters when the fake goes out of scope. - Added
QueueManager.loadJobs()andautoLoadJobsto support framework integrations that need to defer job discovery until a command lifecycle is ready.
Fixed
- Improved Redis schedule claiming by moving due-schedule iteration into a bounded server-side Lua script, reducing round-trips when many schedules exist.
- Fixed the release configuration by removing the unsupported GitHub
weboption.
Changed
- OpenTelemetry queue time now uses
job.createdAtinstead of injecting internal timestamp keys into trace context. - Tests now run through Yarn in CI.
Documentation
- Documented public queue configuration types so users get better guidance directly from TypeScript autocomplete.
Commits
83db97dfeat(otel): add queue_time_ms span attribute5d69498feat:usingtest helper (#11)69172befix(redis): bound schedule claim round-tripsba545d8feat: expose explicit job loading12f8566docs: document public queue config types
Full Changelog: v0.5.1...v0.5.2
Experimental OpenTelemetry Instrumentation
Features
OpenTelemetry Instrumentation (experimental)
@boringnode/queue now ships with built-in OpenTelemetry instrumentation that creates PRODUCER spans for job dispatch and CONSUMER spans for job execution, following OTel messaging semantic conventions.
import { QueueInstrumentation } from '@boringnode/queue/otel'
import * as boringqueue from '@boringnode/queue'
const instrumentation = new QueueInstrumentation({
messagingSystem: 'boringqueue',
executionSpanLinkMode: 'link', // or 'parent'
})
instrumentation.enable()
instrumentation.manuallyRegister(boringqueue)Warning
This is experimental — the API may change in future releases.
Scheduler: custom payload with manual trigger
Schedule.trigger() now accepts an optional payload, allowing you to pass custom data when manually triggering a scheduled job.
What's Changed
- feat(scheduler): allow custom payload with manual trigger by @kerwanp in #7
- feat: add experimental otel instrumentation by @Julien-R44 in #8
New Contributors
- @kerwanp made their first contribution in #7
- @Julien-R44 made their first contribution in #8
Full Changelog: v0.5.0...v0.5.1
Queue Runtime and Lifecycle Fixes
Breaking Change
QueueManager.getMergedRetryConfig(), QueueManager.getMergedJobOptions(), and QueueManager.getWorkerTimeout() have been removed from the public API.
Effective runtime configuration is now resolved through QueueManager.getConfigResolver(). The merge precedence is unchanged.
Worker.stop() also no longer destroys the underlying adapter instance.
Stopping a worker now only:
- stops pulling new jobs
- waits for in-flight jobs to finish
- removes shutdown handlers
Adapter cleanup is now owned by QueueManager.destroy().
Bug Fixes
The sync adapter now uses the same retry and failure decision logic as the worker-backed runtime.
This fixes two issues:
- jobs executed through
dispatch().run()with thesyncadapter now honor retries and callfailed()after the final failure - delayed
syncjobs no longer leak execution errors as unhandled promise rejections
If a delayed sync job fails outside of a caller-controlled promise chain, the error is now logged through the configured queue logger instead of surfacing as an unhandled rejection.
Job.dispatch(), Job.dispatchMany(), and Job.schedule() now work with job subclasses that use typed constructor dependency injection.**
This fixes the TypeScript incompatibility that occurred with patterns such as AdonisJS @inject(), where the job constructor exposes concrete dependency types instead of unknown[].
QueueManager.init() now cleans up the current runtime state before applying a new configuration.
This fixes two reinitialization issues:
- cached adapter instances are destroyed before the manager swaps to the new config
- fake state is reset during reinitialization, so
fake()andrestore()cannot bring back an older configuration snapshot
These reinitialization fixes mainly affect long-lived processes and test environments that call QueueManager.init() multiple times in the same process.
Migration Guide
Before:
// Resolving Configuration
const retry = QueueManager.getMergedRetryConfig('emails', { maxRetries: 1 })
const options = QueueManager.getMergedJobOptions('emails', {
removeOnFail: false,
})
// Stopping the worker
await worker.stop()After:
// Resolving Configuration
const resolver = QueueManager.getConfigResolver()
const retry = resolver.resolveRetryConfig('emails', { maxRetries: 1 })
const options = resolver.resolveJobOptions('emails', {
removeOnFail: false,
})
const workerTimeout = resolver.getWorkerTimeout()
// Stopping the worker
await worker.stop()
await QueueManager.destroy()QueueManager.init() must still be called before accessing the resolver.
Hardening: schedule consistency, Redis atomicity/perf, and timeout correctness
Highlights
- Improved schedule upsert consistency across adapters.
- Fixed worker timeout edge cases and timeout listener cleanup.
- Optimized Redis schedule listing and made schedule deletion atomic.
- Expanded integration test coverage for Redis and Knex behavior.
- Reduced internal
anyusage and tightened TypeScript types.
What’s Changed
- Fixed Redis schedule upsert to clear stale scheduling fields.
- Renamed schedule creation flow to
upsertSchedule, keptcreateScheduleas deprecated wrapper. - Fixed schedule manual trigger queue routing and queue manager re-init behavior via follow-up fixes.
- Fixed worker timeout listener cleanup to prevent retained abort listeners.
- Fixed
timeout: 0handling so it behaves as an immediate timeout. - Preserved
runCounton schedule upsert for adapter parity. - Moved queue-isolation assertions from fake-adapter-only tests to shared adapter contract tests.
- Optimized Redis
listScheduleswith pipelining (removes N+1 request pattern). - Made Redis
deleteScheduleatomic usingMULTI/EXEC. - Added Redis integration test helper for write-stream spying.
- Added Knex query-spy helper and query-count tests for
listSchedulesanddeleteSchedule(SQLite + PostgreSQL). - Refactored internal typing to reduce any usage and improve dispatcher/worker type safety.
- Added repository GitHub link in
package.json. - Dependency update and general maintenance changes.
QueueSchemaService for Knex Adapter
Breaking Change
The Knex adapter no longer automatically creates database tables on first use. You must now create the tables explicitly using the new QueueSchemaService or your own migration system.
New Feature: QueueSchemaService
A new QueueSchemaService class is now exported from the main package, providing methods to create and drop the queue tables in a controlled manner.
Methods
createJobsTable(tableName?, extend?)- Creates the jobs table with the default schemacreateSchedulesTable(tableName?, extend?)- Creates the schedules table with the default schemadropJobsTable(tableName?)- Drops the jobs table if it existsdropSchedulesTable(tableName?)- Drops the schedules table if it exists
Usage
import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)
// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()
// Or with custom table names
await schemaService.createJobsTable('my_jobs')
await schemaService.createSchedulesTable('my_schedules')
// Extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
table.string('tenant_id', 255).nullable()
})AdonisJS Migration Example
import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'
export default class extends BaseSchema {
async up() {
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()
}
async down() {
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
await schemaService.dropSchedulesTable()
await schemaService.dropJobsTable()
}
}Migration Guide
If you were relying on automatic table creation, you need to:
- Create a migration that uses
QueueSchemaServiceto create the tables - Run the migration before starting your application
This change gives you full control over when and how the tables are created, and allows you to extend the schema with custom columns.
Fixed concurrent job processing
Bug Fixes
Previously, when jobs were dispatched with delays between them while the worker was already processing, newly enqueued jobs would not be picked up until a currently running job completed, even when concurrency slots were available.
For example, with concurrency: 5, if Job 0 was dispatched and started processing (taking 10 seconds), Jobs 1, 2, and 3 dispatched shortly after would wait for Job 0 to finish before being picked up, effectively running sequentially instead of concurrently.
The worker now periodically checks for new jobs when it has available capacity, ensuring that newly enqueued jobs are picked up promptly and run concurrently as expected.
Related #4
Error cause propagation
New Feature
When rethrowing an error, the original error is now forwarded using the standard cause option.
This ensures the full error chain is preserved, making debugging and logging significantly easier.
export default class SendEmailJob extends Job<SendEmailPayload> {
static options: JobOptions = {
queue: 'email',
}
async execute(): Promise<void> {
console.log(`Sending email to: ${this.payload.to}`)
}
failed(e) {
console.log(e.cause)
}
}Fake adapter support and bug fixes
New Features
QueueManager.fake / QueueManager.restore
You can replace all adapters with the fake adapter for test assertions, then restore the original configuration.
import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'
await QueueManager.init({
default: 'redis',
adapters: {
redis: redis({ host: 'localhost' }),
},
})
const fake = QueueManager.fake()
await SendEmailJob.dispatch({ to: 'user@example.com' })
fake.assertPushed(SendEmailJob, {
queue: 'default',
payload: { to: 'user@example.com' },
})
QueueManager.restore()Bug Fixes
Respect maxRetries: 0 in job retry configuration
Fixed a bug where maxRetries: 0 configuration was not properly respected, allowing jobs to retry when they should not. (Closes #3)
All types are imported from /types subpath
Breaking Change (patch due to 0.3.0 being released < 10 minutes ago)
Types are no longer exported from the main entry point. Import them from the /types subpath instead:
- import type { AdapterFactory } from '@boringnode/queue'
+ import type { AdapterFactory } from '@boringnode/queue/types'Also added missing type exports: AdapterFactory, DispatchManyResult, JobRecord, JobRetention, JobStatus.