- Published on
BullMQ in Production — Priority Queues, Rate Limiting, and Dead Letter Handling
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Background job queues are the backbone of resilient Node.js applications. BullMQ, built on Redis, provides sophisticated job management: priority ordering, automatic retries, rate limiting, and dead letter queues. Understanding these patterns prevents cascading failures and ensures your background work executes reliably even under extreme load.
- Queue Architecture and Worker Basics
- Priority Queues
- Built-in Rate Limiting
- Job Deduplication
- Delayed Jobs for Scheduled Work
- Dead Letter Queues and Error Handling
- Flow Producer for Job Dependencies
- Graceful Shutdown
- Checklist
- Conclusion
Queue Architecture and Worker Basics
BullMQ separates concerns: producers add jobs to a queue, workers consume and process them, with Redis as the backing store.
import { Queue, Worker, QueueEvents } from 'bullmq';
import Redis from 'ioredis';
// Shared Redis connection
const redis = new Redis({
host: 'localhost',
port: 6379,
retryStrategy: times => Math.min(times * 50, 2000),
});
interface JobData {
userId: string;
email: string;
templateId: string;
}
interface JobResult {
messageId: string;
sentAt: Date;
}
// Producer: Create queue and add jobs
const emailQueue = new Queue<JobData, JobResult>('email', { connection: redis });
async function sendWelcomeEmail(userId: string, email: string) {
await emailQueue.add(
'send-welcome',
{ userId, email, templateId: 'welcome-v1' },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
}
);
}
// Consumer: Create worker to process jobs
const emailWorker = new Worker<JobData, JobResult>(
'email',
async job => {
console.log(`Processing job ${job.id}: ${job.data.email}`);
const result = await sendEmail({
to: job.data.email,
templateId: job.data.templateId,
variables: { userId: job.data.userId },
});
return { messageId: result.id, sentAt: new Date() };
},
{ connection: redis, concurrency: 10 }
);
emailWorker.on('completed', job => {
console.log(`Job ${job.id} completed:`, job.returnvalue);
});
emailWorker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
// Monitor job events
const queueEvents = new QueueEvents('email', { connection: redis });
queueEvents.on('completed', ({ jobId }) => {
console.log(`Job ${jobId} completed`);
});
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
Priority Queues
Jobs with higher priority execute before lower-priority jobs:
interface NotificationJob {
userId: string;
message: string;
type: 'notification' | 'broadcast';
}
const notificationQueue = new Queue<NotificationJob>('notifications', { connection: redis });
async function sendNotification(
userId: string,
message: string,
isPriority: boolean = false
) {
await notificationQueue.add(
'send',
{ userId, message, type: 'notification' },
{
priority: isPriority ? 10 : 1, // Higher number = higher priority
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
}
);
}
async function broadcastNotification(message: string) {
const userIds = await db.user.findMany({ select: { id: true } });
// Add one job per user with same priority
const jobs = userIds.map(user => ({
name: 'send',
data: { userId: user.id, message, type: 'broadcast' },
opts: {
priority: 50, // Broadcast is high priority
attempts: 3,
},
}));
await notificationQueue.addBulk(jobs);
}
// Worker processes higher-priority jobs first
const notificationWorker = new Worker<NotificationJob>(
'notifications',
async job => {
await sendPushNotification(job.data.userId, job.data.message);
return { sent: true, timestamp: Date.now() };
},
{ connection: redis, concurrency: 20 }
);
Built-in Rate Limiting
Limit how many jobs execute per time interval:
interface PaymentJob {
userId: string;
amount: number;
idempotencyKey: string;
}
const paymentQueue = new Queue<PaymentJob>('payments', { connection: redis });
async function queuePayment(userId: string, amount: number) {
const idempotencyKey = crypto.randomUUID();
await paymentQueue.add(
'process',
{ userId, amount, idempotencyKey },
{
// Rate limit: max 100 jobs per 60 seconds
// This prevents overwhelming the payment processor
rateLimit: {
max: 100,
duration: 60000, // milliseconds
},
attempts: 5,
backoff: { type: 'exponential', delay: 5000 },
}
);
}
const paymentWorker = new Worker<PaymentJob>(
'payments',
async job => {
const result = await processPayment(
job.data.userId,
job.data.amount,
job.data.idempotencyKey
);
// Idempotent processing ensures duplicate jobs don't double-charge
return result;
},
{ connection: redis, concurrency: 5 }
);
// Rate limit per user instead of global
async function queueUserPayment(userId: string, amount: number) {
await paymentQueue.add(
'process',
{ userId, amount, idempotencyKey: crypto.randomUUID() },
{
// Job ID includes userId for per-user rate limiting
jobId: `${userId}-${Date.now()}`,
rateLimit: {
max: 5,
duration: 3600000, // 5 transactions per hour per user
},
}
);
}
Job Deduplication
Prevent duplicate processing with deterministic job IDs:
interface DataSyncJob {
tenantId: string;
sourceId: string;
}
const syncQueue = new Queue<DataSyncJob>('data-sync', { connection: redis });
async function syncData(tenantId: string, sourceId: string) {
const jobId = `sync-${tenantId}-${sourceId}`;
// Only one job with this ID exists at a time
// If already processing, new attempt is ignored
await syncQueue.add(
'sync',
{ tenantId, sourceId },
{
jobId: jobId,
removeOnComplete: true,
// Don't add if already exists
skipDuplicate: true,
}
);
}
const syncWorker = new Worker<DataSyncJob>(
'data-sync',
async job => {
const startTime = Date.now();
const result = await syncDataSource(
job.data.tenantId,
job.data.sourceId
);
return {
status: 'success',
recordsProcessed: result.count,
durationMs: Date.now() - startTime,
};
},
{ connection: redis, concurrency: 10 }
);
Delayed Jobs for Scheduled Work
Schedule jobs to run at a specific time:
interface ReminderJob {
userId: string;
message: string;
}
const reminderQueue = new Queue<ReminderJob>('reminders', { connection: redis });
async function scheduleReminder(userId: string, message: string, delayMs: number) {
await reminderQueue.add(
'send-reminder',
{ userId, message },
{
delay: delayMs, // Milliseconds from now
attempts: 3,
backoff: { type: 'fixed', delay: 5000 },
}
);
}
// Schedule for specific timestamp
async function scheduleReminderAt(
userId: string,
message: string,
scheduledTime: Date
) {
const delayMs = scheduledTime.getTime() - Date.now();
if (delayMs < 0) {
throw new Error('Cannot schedule in the past');
}
await reminderQueue.add(
'send-reminder',
{ userId, message },
{
delay: delayMs,
removeOnComplete: true,
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
}
);
}
const reminderWorker = new Worker<ReminderJob>(
'reminders',
async job => {
await sendReminder(job.data.userId, job.data.message);
return { delivered: true };
},
{ connection: redis, concurrency: 50 }
);
Dead Letter Queues and Error Handling
Jobs that fail repeatedly move to a dead letter queue:
interface ExportJob {
reportId: string;
format: 'csv' | 'pdf';
userId: string;
}
const exportQueue = new Queue<ExportJob>('exports', { connection: redis });
const dlq = new Queue('exports-dlq', { connection: redis });
async function queueExport(reportId: string, format: 'csv' | 'pdf', userId: string) {
await exportQueue.add(
'generate',
{ reportId, format, userId },
{
attempts: 5,
backoff: { type: 'exponential', delay: 10000 },
// Move to DLQ after 5 failed attempts
removeOnFail: false,
}
);
}
const exportWorker = new Worker<ExportJob>(
'exports',
async job => {
try {
const report = await generateReport(
job.data.reportId,
job.data.format
);
// Notify user
await notifyUser(job.data.userId, {
message: `Your ${job.data.format.toUpperCase()} report is ready`,
downloadUrl: report.url,
});
return { reportId: job.data.reportId, url: report.url };
} catch (error) {
// Log for monitoring
console.error(`Export job ${job.id} failed:`, error);
throw error; // Trigger retry logic
}
},
{
connection: redis,
concurrency: 5,
// Move to DLQ after max attempts
settings: {
maxStalledCount: 2,
},
}
);
exportWorker.on('failed', async (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts!) {
// Move to DLQ for manual review
const dlqJob = await dlq.add(
'review',
{
...job.data,
failureReason: err.message,
failureCount: job.attemptsMade,
originalJobId: job.id,
},
{ removeOnComplete: true }
);
console.error(`Job ${job.id} moved to DLQ as ${dlqJob.id}`);
// Alert operations team
await alertOpsTeam({
level: 'critical',
message: `Export job ${job.id} failed permanently`,
details: { reportId: job.data.reportId, error: err.message },
});
}
});
// Process DLQ items manually
const dlqWorker = new Worker(
'exports-dlq',
async job => {
// Manual intervention required
console.log(`DLQ item ${job.id} awaiting review`);
return { status: 'pending-review' };
},
{ connection: redis, concurrency: 1 }
);
Flow Producer for Job Dependencies
Chain jobs in sequences or fan-out patterns:
import { FlowProducer } from 'bullmq';
interface DataProcessingFlow {
datasetId: string;
userId: string;
}
const flow = new FlowProducer({ connection: redis });
async function processDataset(datasetId: string, userId: string) {
const flowDefinition = {
name: 'process-dataset',
data: { datasetId, userId },
children: [
{
name: 'validate',
data: { datasetId },
},
{
name: 'transform',
data: { datasetId },
},
{
name: 'aggregate',
data: { datasetId },
children: [
{
name: 'generate-report',
data: { datasetId, format: 'pdf' },
},
{
name: 'send-notification',
data: { userId, message: 'Dataset processing complete' },
},
],
},
],
};
await flow.add(flowDefinition);
}
// Each stage defined in its own worker
const validateWorker = new Worker('validate', async job => {
const { datasetId } = job.data;
const rows = await countRows(datasetId);
return { rowCount: rows, validAt: new Date() };
}, { connection: redis });
const transformWorker = new Worker('transform', async job => {
const { datasetId } = job.data;
await transformDataset(datasetId);
return { transformedAt: new Date() };
}, { connection: redis });
const aggregateWorker = new Worker('aggregate', async job => {
const { datasetId } = job.data;
await aggregateDataset(datasetId);
return { aggregatedAt: new Date() };
}, { connection: redis });
Graceful Shutdown
Properly shut down workers without losing jobs:
class QueueManager {
private workers: Worker[] = [];
private queues: Queue[] = [];
registerWorker(worker: Worker) {
this.workers.push(worker);
}
registerQueue(queue: Queue) {
this.queues.push(queue);
}
async gracefulShutdown(timeoutMs: number = 30000) {
console.log('Starting graceful shutdown...');
// Stop accepting new jobs
await Promise.all(
this.workers.map(worker => worker.close())
);
// Wait for in-flight jobs to complete (with timeout)
const shutdownPromise = Promise.all(
this.workers.map(worker =>
new Promise<void>(resolve => {
const timer = setTimeout(() => {
console.warn('Shutdown timeout reached, forcing exit');
resolve();
}, timeoutMs);
// Wait for active jobs to drain
const checkInterval = setInterval(async () => {
const activeCount = await worker.getActiveCount();
if (activeCount === 0) {
clearTimeout(timer);
clearInterval(checkInterval);
resolve();
}
}, 100);
})
)
);
await shutdownPromise;
// Close queues
await Promise.all(
this.queues.map(queue => queue.close())
);
console.log('Graceful shutdown complete');
}
}
// Usage
const manager = new QueueManager();
manager.registerWorker(emailWorker);
manager.registerWorker(paymentWorker);
manager.registerQueue(emailQueue);
manager.registerQueue(paymentQueue);
process.on('SIGTERM', async () => {
await manager.gracefulShutdown(30000);
process.exit(0);
});
Checklist
- Use Redis persistence (RDB or AOF) to prevent job loss
- Configure exponential backoff with jitter for retries
- Set appropriate concurrency limits per worker
- Implement idempotent job processors (safe to retry)
- Use rate limiting to protect downstream services
- Move failed jobs to DLQ after max attempts
- Implement graceful shutdown (stop accepting, drain in-flight)
- Monitor queue depth and worker latency
- Use job priorities for time-sensitive operations
- Test job processing with chaos (simulate failures)
Conclusion
BullMQ transforms background job processing from a fragile afterthought into a production-grade system. Priority queues ensure urgent work executes first, rate limiting protects downstream services, and dead letter queues surface issues that need human intervention. Combined with proper error handling and graceful shutdown, you achieve the reliability expected of modern production systems.