Skip to content

Releases: boringnode/queue

Job Deduplication and Worker Heartbeats

28 Jun 18:52

Choose a tag to compare

What's Changed

New Features

  • Added job deduplication through the new .dedup() dispatcher method.

    • .dedup({ id }) skips duplicate jobs while an existing job with the same key is still present.
    • .dedup({ id, ttl }) skips duplicates within a time window.
    • .dedup({ id, ttl, extend: true }) refreshes the deduplication window when a duplicate is dispatched.
    • .dedup({ id, ttl, replace: true }) replaces the payload of an existing pending or delayed job.
    • .dedup({ id, ttl, extend: true, replace: true }) enables debounce-style dispatching.
    • Dispatch results now expose a deduped outcome: added, skipped, replaced, or extended.
  • Added worker heartbeats for long-running in-flight jobs.

    • Workers periodically renew the acquired timestamp of active jobs they own.
    • Healthy long-running jobs are no longer re-delivered just because their handler runs longer than stalledThreshold.
    • stalledThreshold now behaves as a crash-detection window again, instead of an implicit maximum job runtime.

Bug Fixes

  • Fixed Redis job payload preservation for empty arrays.

    • Redis lifecycle operations now keep stored job JSON opaque to Lua, preventing payload values like [] from being serialized back as {}.
    • This applies across acquisition, inspection, retry, stalled recovery, and dedup replacement flows.
  • Fixed worker heartbeat ownership checks.

    • A worker can only renew leases for jobs it still owns.
    • Late heartbeats from a slow or stale worker cannot extend jobs that were already recovered and re-acquired by another worker.

Internal Improvements

  • Moved Redis Lua scripts out of the Redis adapter into dedicated script modules.

Upgrade Notes

  • Knex users with an existing queue_jobs table must add the new deduplication columns before using .dedup():

    import { QueueSchemaService } from '@boringnode/queue'
    
    const schema = new QueueSchemaService(knex)
    await schema.addDedupColumns()

    If you use a custom jobs table name, pass it explicitly:

    await schema.addDedupColumns('my_queue_jobs')

    New tables created with createJobsTable() already include these columns.

  • Deduplication is supported by the Redis and Knex adapters. The Sync adapter still runs every dispatch inline and does not apply deduplication.

  • Deduplication only applies to single-job dispatch. Batch dispatch and scheduled jobs do not support deduplication.

  • The user-provided deduplication ID must be 400 characters or fewer, and the final <jobName>::<id> key must be 510 characters or fewer.

  • Custom adapter implementations must implement the new renewJobs(queue, jobIds) adapter method.

  • Existing Redis jobs remain compatible after upgrading. Jobs that do not yet have the new Redis metadata are read from their existing stored JSON.

  • Redis jobs whose payloads were already corrupted before upgrading cannot be automatically repaired, because an already-stored {} cannot be distinguished from an original [].

Commits Since v0.5.2

  • chore: update dependencies (a400cf5)
  • chore: update yarn (48787f7)
  • feat: renew in-flight job timestamps via a worker heartbeat (#17) (177ebd5)
  • refactor(redis): move lua scripts out of adapter (4a576a3)
  • docs: add changelog for job deduplication (500935a)
  • fix(redis): preserve opaque job payloads (b14e488)
  • chore: remove old changelog (cb40c90)
  • feat: add .dedup() method for job deduplication (#12) (52c83be)

Explicit job loading and better observability

11 May 19:53

Choose a tag to compare

What's Changed

Added

  • Added messaging.job.queue_time_ms to OpenTelemetry consumer spans, making it easier to measure how long a job waited in the queue before execution.
  • Added support for using fake = QueueManager.fake() so tests can automatically restore real adapters when the fake goes out of scope.
  • Added QueueManager.loadJobs() and autoLoadJobs to support framework integrations that need to defer job discovery until a command lifecycle is ready.

Fixed

  • Improved Redis schedule claiming by moving due-schedule iteration into a bounded server-side Lua script, reducing round-trips when many schedules exist.
  • Fixed the release configuration by removing the unsupported GitHub web option.

Changed

  • OpenTelemetry queue time now uses job.createdAt instead of injecting internal timestamp keys into trace context.
  • Tests now run through Yarn in CI.

Documentation

  • Documented public queue configuration types so users get better guidance directly from TypeScript autocomplete.

Commits

  • 83db97d feat(otel): add queue_time_ms span attribute
  • 5d69498 feat: using test helper (#11)
  • 69172be fix(redis): bound schedule claim round-trips
  • ba545d8 feat: expose explicit job loading
  • 12f8566 docs: document public queue config types

Full Changelog: v0.5.1...v0.5.2

Experimental OpenTelemetry Instrumentation

23 Mar 13:59

Choose a tag to compare

Features

OpenTelemetry Instrumentation (experimental)

@boringnode/queue now ships with built-in OpenTelemetry instrumentation that creates PRODUCER spans for job dispatch and CONSUMER spans for job execution, following OTel messaging semantic conventions.

import { QueueInstrumentation } from '@boringnode/queue/otel'
import * as boringqueue from '@boringnode/queue'

const instrumentation = new QueueInstrumentation({
  messagingSystem: 'boringqueue',
  executionSpanLinkMode: 'link', // or 'parent'
})

instrumentation.enable()
instrumentation.manuallyRegister(boringqueue)

Warning

This is experimental — the API may change in future releases.

Scheduler: custom payload with manual trigger

Schedule.trigger() now accepts an optional payload, allowing you to pass custom data when manually triggering a scheduled job.

What's Changed

  • feat(scheduler): allow custom payload with manual trigger by @kerwanp in #7
  • feat: add experimental otel instrumentation by @Julien-R44 in #8

New Contributors

Full Changelog: v0.5.0...v0.5.1

Queue Runtime and Lifecycle Fixes

08 Mar 14:10
v0.5.0
bc8dc23

Choose a tag to compare

Breaking Change

QueueManager.getMergedRetryConfig(), QueueManager.getMergedJobOptions(), and QueueManager.getWorkerTimeout() have been removed from the public API.

Effective runtime configuration is now resolved through QueueManager.getConfigResolver(). The merge precedence is unchanged.

Worker.stop() also no longer destroys the underlying adapter instance.

Stopping a worker now only:

  • stops pulling new jobs
  • waits for in-flight jobs to finish
  • removes shutdown handlers

Adapter cleanup is now owned by QueueManager.destroy().

Bug Fixes

The sync adapter now uses the same retry and failure decision logic as the worker-backed runtime.

This fixes two issues:

  • jobs executed through dispatch().run() with the sync adapter now honor retries and call failed() after the final failure
  • delayed sync jobs no longer leak execution errors as unhandled promise rejections

If a delayed sync job fails outside of a caller-controlled promise chain, the error is now logged through the configured queue logger instead of surfacing as an unhandled rejection.

Job.dispatch(), Job.dispatchMany(), and Job.schedule() now work with job subclasses that use typed constructor dependency injection.**

This fixes the TypeScript incompatibility that occurred with patterns such as AdonisJS @inject(), where the job constructor exposes concrete dependency types instead of unknown[].

QueueManager.init() now cleans up the current runtime state before applying a new configuration.

This fixes two reinitialization issues:

  • cached adapter instances are destroyed before the manager swaps to the new config
  • fake state is reset during reinitialization, so fake() and restore() cannot bring back an older configuration snapshot

These reinitialization fixes mainly affect long-lived processes and test environments that call QueueManager.init() multiple times in the same process.

Migration Guide

Before:

// Resolving Configuration
const retry = QueueManager.getMergedRetryConfig('emails', { maxRetries: 1 })
const options = QueueManager.getMergedJobOptions('emails', {
  removeOnFail: false,
})

// Stopping the worker
await worker.stop()

After:

// Resolving Configuration
const resolver = QueueManager.getConfigResolver()

const retry = resolver.resolveRetryConfig('emails', { maxRetries: 1 })
const options = resolver.resolveJobOptions('emails', {
  removeOnFail: false,
})
const workerTimeout = resolver.getWorkerTimeout()


// Stopping the worker
await worker.stop()
await QueueManager.destroy()

QueueManager.init() must still be called before accessing the resolver.

Hardening: schedule consistency, Redis atomicity/perf, and timeout correctness

03 Mar 19:30
v0.4.1
1169dd2

Choose a tag to compare

Highlights

  • Improved schedule upsert consistency across adapters.
  • Fixed worker timeout edge cases and timeout listener cleanup.
  • Optimized Redis schedule listing and made schedule deletion atomic.
  • Expanded integration test coverage for Redis and Knex behavior.
  • Reduced internal any usage and tightened TypeScript types.

What’s Changed

  • Fixed Redis schedule upsert to clear stale scheduling fields.
  • Renamed schedule creation flow to upsertSchedule, kept createSchedule as deprecated wrapper.
  • Fixed schedule manual trigger queue routing and queue manager re-init behavior via follow-up fixes.
  • Fixed worker timeout listener cleanup to prevent retained abort listeners.
  • Fixed timeout: 0 handling so it behaves as an immediate timeout.
  • Preserved runCount on schedule upsert for adapter parity.
  • Moved queue-isolation assertions from fake-adapter-only tests to shared adapter contract tests.
  • Optimized Redis listSchedules with pipelining (removes N+1 request pattern).
  • Made Redis deleteSchedule atomic using MULTI/EXEC.
  • Added Redis integration test helper for write-stream spying.
  • Added Knex query-spy helper and query-count tests for listSchedules and deleteSchedule (SQLite + PostgreSQL).
  • Refactored internal typing to reduce any usage and improve dispatcher/worker type safety.
  • Added repository GitHub link in package.json.
  • Dependency update and general maintenance changes.

QueueSchemaService for Knex Adapter

27 Jan 07:15
v0.4.0
8110dc2

Choose a tag to compare

Breaking Change

The Knex adapter no longer automatically creates database tables on first use. You must now create the tables explicitly using the new QueueSchemaService or your own migration system.

New Feature: QueueSchemaService

A new QueueSchemaService class is now exported from the main package, providing methods to create and drop the queue tables in a controlled manner.

Methods

  • createJobsTable(tableName?, extend?) - Creates the jobs table with the default schema
  • createSchedulesTable(tableName?, extend?) - Creates the schedules table with the default schema
  • dropJobsTable(tableName?) - Drops the jobs table if it exists
  • dropSchedulesTable(tableName?) - Drops the schedules table if it exists

Usage

import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'

const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)

// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()

// Or with custom table names
await schemaService.createJobsTable('my_jobs')
await schemaService.createSchedulesTable('my_schedules')

// Extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
  table.string('tenant_id', 255).nullable()
})

AdonisJS Migration Example

import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'

export default class extends BaseSchema {
  async up() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())

    await schemaService.createJobsTable()
    await schemaService.createSchedulesTable()
  }

  async down() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())

    await schemaService.dropSchedulesTable()
    await schemaService.dropJobsTable()
  }
}

Migration Guide

If you were relying on automatic table creation, you need to:

  1. Create a migration that uses QueueSchemaService to create the tables
  2. Run the migration before starting your application

This change gives you full control over when and how the tables are created, and allows you to extend the schema with custom columns.

Fixed concurrent job processing

23 Jan 07:21
v0.3.4
9646272

Choose a tag to compare

Bug Fixes

Previously, when jobs were dispatched with delays between them while the worker was already processing, newly enqueued jobs would not be picked up until a currently running job completed, even when concurrency slots were available.

For example, with concurrency: 5, if Job 0 was dispatched and started processing (taking 10 seconds), Jobs 1, 2, and 3 dispatched shortly after would wait for Job 0 to finish before being picked up, effectively running sequentially instead of concurrently.

The worker now periodically checks for new jobs when it has available capacity, ensuring that newly enqueued jobs are picked up promptly and run concurrently as expected.

Related #4

Error cause propagation

20 Jan 20:14
v0.3.3
04dbac8

Choose a tag to compare

New Feature

When rethrowing an error, the original error is now forwarded using the standard cause option.
This ensures the full error chain is preserved, making debugging and logging significantly easier.

export default class SendEmailJob extends Job<SendEmailPayload> {
  static options: JobOptions = {
    queue: 'email',
  }

  async execute(): Promise<void> {
    console.log(`Sending email to: ${this.payload.to}`)
  }
  
  failed(e) {
    console.log(e.cause)
  }
}

Fake adapter support and bug fixes

20 Jan 18:27
v0.3.2
6402da2

Choose a tag to compare

New Features

QueueManager.fake / QueueManager.restore

You can replace all adapters with the fake adapter for test assertions, then restore the original configuration.

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost' }),
  },
})

const fake = QueueManager.fake()

await SendEmailJob.dispatch({ to: 'user@example.com' })

fake.assertPushed(SendEmailJob, {
  queue: 'default',
  payload: { to: 'user@example.com' },
})

QueueManager.restore()

Bug Fixes

Respect maxRetries: 0 in job retry configuration

Fixed a bug where maxRetries: 0 configuration was not properly respected, allowing jobs to retry when they should not. (Closes #3)

All types are imported from /types subpath

15 Jan 17:39
v0.3.1
59556a0

Choose a tag to compare

Breaking Change (patch due to 0.3.0 being released < 10 minutes ago)

Types are no longer exported from the main entry point. Import them from the /types subpath instead:

- import type { AdapterFactory } from '@boringnode/queue'
+ import type { AdapterFactory } from '@boringnode/queue/types'

Also added missing type exports: AdapterFactory, DispatchManyResult, JobRecord, JobRetention, JobStatus.