- Published on
Reliable Background Jobs — Handling Timeouts, Poison Pills, and Job Duplication
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Background jobs are where reliability expectations meet reality. A job that sometimes crashes, duplicates work, or hangs forever becomes a production incident. Proper timeout enforcement, poison pill detection, exactly-once semantics, and monitoring transform background work from a liability into a foundation. Most job systems fail silently until data corruption occurs.
- Job Timeout Enforcement
- Exponential Backoff with Jitter
- Poison Pill Detection
- Job Deduplication with Deterministic IDs
- Exactly-Once Execution with Database Locks
- Job Heartbeat for Long-Running Tasks
- Fan-Out Pattern (One Job Spawns Many)
- Job Observability (Metrics)
- Checklist
- Conclusion
Job Timeout Enforcement
Ensure jobs terminate, whether they complete or crash:
import { Worker } from 'bullmq';
import { AbortSignal } from 'node-abort-controller';
interface JobData {
userId: string;
reportId: string;
}
// Timeout enforcement with AbortSignal
class TimeoutJobWorker {
private worker: Worker;
constructor() {
this.worker = new Worker<JobData>(
'reports',
async (job) => {
// Create abort signal for job timeout
const timeoutMs = job.opts.timeout || 30000; // 30 second default
const controller = new AbortController();
const timeoutHandle = setTimeout(() => controller.abort(), timeoutMs);
try {
const result = await this.processReport(job.data, controller.signal);
return result;
} finally {
clearTimeout(timeoutHandle);
}
},
{ connection: redis }
);
}
private async processReport(data: JobData, signal: AbortSignal): Promise<any> {
// Check signal before expensive operations
if (signal.aborted) throw new Error('Job aborted');
const report = await db.report.findUnique({ where: { id: data.reportId } });
// Long operation with cancellation support
const results = await this.generateReport(data.reportId, signal);
if (signal.aborted) throw new Error('Job aborted');
await db.report.update({
where: { id: data.reportId },
data: { results, completedAt: new Date(), status: 'completed' },
});
return { reportId: data.reportId, success: true };
}
private async generateReport(reportId: string, signal: AbortSignal): Promise<any[]> {
const batchSize = 1000;
const results: any[] = [];
// Process in batches with cancellation checks
for (let offset = 0; offset < 100000; offset += batchSize) {
if (signal.aborted) throw new Error('Job aborted during processing');
const batch = await db.data.findMany({
skip: offset,
take: batchSize,
});
results.push(...batch);
// Yield to event loop
await new Promise(resolve => setImmediate(resolve));
}
return results;
}
}
// Timeout configuration per job
async function queueReportGeneration(userId: string, reportId: string, timeoutMs: number = 60000) {
await emailQueue.add(
'generate',
{ userId, reportId },
{
timeout: timeoutMs, // BullMQ timeout (fails after this)
}
);
}
Exponential Backoff with Jitter
Retry failed jobs without overwhelming the system:
import { Queue } from 'bullmq';
interface BackoffConfig {
type: 'exponential' | 'fixed';
delay: number;
maxDelay?: number;
multiplier?: number;
}
function calculateBackoffDelay(
attemptNumber: number,
config: BackoffConfig
): number {
let delay = config.delay;
if (config.type === 'exponential') {
delay = config.delay * Math.pow(config.multiplier || 2, attemptNumber - 1);
// Cap maximum delay
if (config.maxDelay) {
delay = Math.min(delay, config.maxDelay);
}
}
// Add jitter to prevent thundering herd
// If all failed jobs retry at same time, causes spike
const jitter = delay * 0.1 * Math.random();
return delay + jitter;
}
// Example backoff schedule
const config: BackoffConfig = {
type: 'exponential',
delay: 1000, // Start at 1 second
maxDelay: 60000, // Cap at 1 minute
multiplier: 2,
};
console.log('Backoff schedule:');
for (let attempt = 1; attempt <= 10; attempt++) {
const delay = calculateBackoffDelay(attempt, config);
console.log(`Attempt ${attempt}: ${(delay / 1000).toFixed(2)}s`);
}
// Output:
// Attempt 1: 1.05s
// Attempt 2: 2.10s
// Attempt 3: 4.15s
// Attempt 4: 8.30s
// Attempt 5: 16.60s
// Attempt 6: 33.20s
// Attempt 7: 60.00s (capped)
// Attempt 8: 60.00s (capped)
const paymentQueue = new Queue('payments', { connection: redis });
async function queuePayment(userId: string, amount: number) {
await paymentQueue.add(
'process',
{ userId, amount },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 2000,
multiplier: 2,
},
removeOnComplete: true,
removeOnFail: false, // Keep failed jobs for inspection
}
);
}
Poison Pill Detection
Identify and quarantine jobs that fail consistently:
interface PoisonPillConfig {
maxAttempts: number;
timeWindow: number; // milliseconds
detectionThreshold: number; // % of jobs failing in time window
}
class PoisonPillDetector {
private failureHistory = new Map<string, number[]>(); // job type -> timestamps of failures
private config: PoisonPillConfig = {
maxAttempts: 5,
timeWindow: 60000, // 1 minute
detectionThreshold: 80, // 80% failure rate
};
recordFailure(jobType: string, timestamp: number = Date.now()): void {
if (!this.failureHistory.has(jobType)) {
this.failureHistory.set(jobType, []);
}
const history = this.failureHistory.get(jobType)!;
history.push(timestamp);
// Clean up old entries
const cutoff = timestamp - this.config.timeWindow;
const recentFailures = history.filter(t => t > cutoff);
this.failureHistory.set(jobType, recentFailures);
// Check for poison pill pattern
if (recentFailures.length >= this.config.maxAttempts) {
const failureRate = (recentFailures.length / this.config.maxAttempts) * 100;
if (failureRate >= this.config.detectionThreshold) {
this.emitPoison Pill Alert(jobType, failureRate);
}
}
}
private emitPoisonPillAlert(jobType: string, failureRate: number): void {
console.error(`Poison pill detected for job type "${jobType}": ${failureRate.toFixed(2)}% failure rate`);
// Actions:
// 1. Pause the queue
// 2. Alert operations team
// 3. Move remaining jobs to DLQ
// 4. Investigate root cause
}
}
// BullMQ integration
const poisonPillDetector = new PoisonPillDetector();
const worker = new Worker(
'exports',
async job => {
try {
return await processExport(job.data);
} catch (error) {
poisonPillDetector.recordFailure('exports');
throw error;
}
},
{ connection: redis }
);
worker.on('failed', (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts!) {
console.log(`Job ${job.id} permanently failed, moving to DLQ`);
// Move to DLQ handled by BullMQ
}
});
Job Deduplication with Deterministic IDs
Prevent duplicate processing of the same work:
import crypto from 'crypto';
import { Queue } from 'bullmq';
interface EmailJob {
userId: string;
email: string;
templateId: string;
}
function generateDeterministicJobId(data: EmailJob): string {
// Create content hash so identical jobs get same ID
const content = JSON.stringify(data);
const hash = crypto.createHash('sha256').update(content).digest('hex').slice(0, 16);
// Include timestamp to allow retries of same data
return `email-${hash}`;
}
const emailQueue = new Queue<EmailJob>('emails', { connection: redis });
async function sendEmail(userId: string, email: string, templateId: string) {
const jobId = generateDeterministicJobId({ userId, email, templateId });
// Add job with deduplication
// If job with same ID already exists, skipDuplicate prevents re-adding
const job = await emailQueue.add(
'send',
{ userId, email, templateId },
{
jobId: jobId,
skipDuplicate: true, // Don't add if ID exists
removeOnComplete: true,
attempts: 3,
}
);
return job;
}
// Scenario: Webhook retry sends same data twice
await sendEmail('user-123', 'john@example.com', 'welcome');
await sendEmail('user-123', 'john@example.com', 'welcome'); // Same job ID, skipped
// Scenario: Hourly digest job
// JobId: "digest-2026-03-15" (based on date)
// Run at 9am: generates job
// Run at 9am (retry): same job ID, skipped
// Run at 10am: new day, new job ID, processes
const digestQueue = new Queue('digest', { connection: redis });
async function scheduleDigest() {
const today = new Date().toISOString().split('T')[0]; // "2026-03-15"
const jobId = `digest-${today}`;
await digestQueue.add(
'send',
{ date: today },
{
jobId,
skipDuplicate: true,
removeOnComplete: true,
}
);
}
Exactly-Once Execution with Database Locks
Guarantee job processes exactly once despite retries:
import { Queue, Worker } from 'bullmq';
// Use database lock for exactly-once semantics
class ExactlyOnceProcessor {
async processWithLock<T>(jobId: string, processor: () => Promise<T>): Promise<T> {
const lockKey = `job-lock:${jobId}`;
const lockValue = `${Date.now()}-${Math.random()}`;
// Try to acquire lock
const lockAcquired = await redis.set(
lockKey,
lockValue,
'PX', 3600000, // 1 hour timeout
'NX' // Only set if doesn't exist
);
if (!lockAcquired) {
// Another process is handling this job
throw new Error(`Job ${jobId} already being processed`);
}
try {
// Check if job already completed
const result = await db.jobResult.findUnique({
where: { jobId },
});
if (result) {
return result.output as T;
}
// Process job
const output = await processor();
// Store result with atomic transaction
await db.jobResult.upsert({
where: { jobId },
update: { completedAt: new Date(), output },
create: { jobId, output, completedAt: new Date() },
});
return output;
} finally {
// Release lock
const currentLock = await redis.get(lockKey);
if (currentLock === lockValue) {
await redis.del(lockKey);
}
}
}
}
const processor = new ExactlyOnceProcessor();
const transferQueue = new Queue('transfers', { connection: redis });
const transferWorker = new Worker(
'transfers',
async job => {
return processor.processWithLock(job.id!, async () => {
// This code runs exactly once despite retries
await db.account.update({
where: { id: job.data.fromAccount },
data: { balance: { decrement: job.data.amount } },
});
await db.account.update({
where: { id: job.data.toAccount },
data: { balance: { increment: job.data.amount } },
});
return { fromAccount: job.data.fromAccount, toAccount: job.data.toAccount };
});
},
{ connection: redis }
);
// Retry safety: Job processes exactly once even if retried
await transferQueue.add('transfer', {
fromAccount: 'acc-1',
toAccount: 'acc-2',
amount: 1000,
});
Job Heartbeat for Long-Running Tasks
Send signals during processing to prevent timeout miscalculation:
import { Queue, Worker } from 'bullmq';
const csvExportQueue = new Queue('csv-export', { connection: redis });
interface HeartbeatConfig {
interval: number; // milliseconds
timeout: number;
}
async function exportLargeDataset(jobId: string, datasetId: string) {
const heartbeatConfig: HeartbeatConfig = {
interval: 5000, // Send heartbeat every 5 seconds
timeout: 120000, // Kill if no heartbeat for 2 minutes
};
let lastHeartbeat = Date.now();
let isCancelled = false;
const heartbeatInterval = setInterval(() => {
lastHeartbeat = Date.now();
// Signal to scheduler that job is alive
console.log(`[${jobId}] Heartbeat: processing`);
}, heartbeatConfig.interval);
try {
const rows = await db.data.count({ where: { datasetId } });
const batchSize = 10000;
for (let offset = 0; offset < rows; offset += batchSize) {
if (isCancelled) break;
const batch = await db.data.findMany({
skip: offset,
take: batchSize,
where: { datasetId },
});
// Process batch
const csvLines = batch.map(row => formatCSV(row));
// Yield to event loop
await new Promise(resolve => setImmediate(resolve));
console.log(`[${jobId}] Progress: ${offset + batchSize} of ${rows}`);
}
return { exported: rows };
} finally {
isCancelled = true;
clearInterval(heartbeatInterval);
}
}
const exportWorker = new Worker(
'csv-export',
async job => {
return exportLargeDataset(job.id!, job.data.datasetId);
},
{ connection: redis, concurrency: 5 }
);
exportWorker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}`);
});
Fan-Out Pattern (One Job Spawns Many)
Decompose large jobs into parallel subtasks:
import { FlowProducer } from 'bullmq';
const flow = new FlowProducer({ connection: redis });
interface DataProcessingFlow {
datasetId: string;
regions: string[];
}
async function processDatasetParallel(datasetId: string, regions: string[]) {
// Parent job coordinates, creates child jobs
const children = regions.map(region => ({
name: 'process-region',
data: { datasetId, region },
}));
const parentJob = {
name: 'process-dataset',
data: { datasetId, regions },
children, // Child jobs process in parallel
};
await flow.add(parentJob);
}
const datasetQueue = new Queue('process-dataset', { connection: redis });
const regionQueue = new Queue('process-region', { connection: redis });
// Parent job: coordinate parallel work
const parentWorker = new Worker(
'process-dataset',
async job => {
// Parent waits for all children
// Handled by BullMQ flow mechanism
return { datasetId: job.data.datasetId, status: 'completed' };
},
{ connection: redis }
);
// Child jobs: process in parallel
const regionWorker = new Worker(
'process-region',
async job => {
const { datasetId, region } = job.data;
// Each region processed independently
const results = await processRegionData(datasetId, region);
return { region, recordsProcessed: results.length };
},
{ connection: redis, concurrency: 10 }
);
// Benefits: 4 regions process 4x faster (parallelization)
// If one region fails, others continue
// Failure handling per subtask
Job Observability (Metrics)
Track job health at scale:
interface JobMetrics {
jobType: string;
totalProcessed: number;
totalFailed: number;
averageDuration: number;
p99Duration: number;
errorRate: number;
}
class JobObservability {
private metrics = new Map<string, JobMetrics>();
private durations = new Map<string, number[]>();
recordJobCompletion(jobType: string, durationMs: number, success: boolean): void {
if (!this.metrics.has(jobType)) {
this.metrics.set(jobType, {
jobType,
totalProcessed: 0,
totalFailed: 0,
averageDuration: 0,
p99Duration: 0,
errorRate: 0,
});
this.durations.set(jobType, []);
}
const metrics = this.metrics.get(jobType)!;
const durations = this.durations.get(jobType)!;
metrics.totalProcessed++;
if (!success) {
metrics.totalFailed++;
}
durations.push(durationMs);
// Keep last 1000 measurements
if (durations.length > 1000) {
durations.shift();
}
// Update metrics
metrics.averageDuration = durations.reduce((a, b) => a + b, 0) / durations.length;
metrics.p99Duration = this.percentile(durations, 0.99);
metrics.errorRate = metrics.totalFailed / metrics.totalProcessed;
}
private percentile(arr: number[], p: number): number {
const sorted = [...arr].sort((a, b) => a - b);
const index = Math.ceil(sorted.length * p) - 1;
return sorted[Math.max(0, index)];
}
getMetrics(jobType?: string): JobMetrics[] {
if (jobType) {
return this.metrics.get(jobType) ? [this.metrics.get(jobType)!] : [];
}
return Array.from(this.metrics.values());
}
exportPrometheus(): string {
const lines: string[] = [];
for (const metrics of this.metrics.values()) {
lines.push(`job_total{type="${metrics.jobType}"} ${metrics.totalProcessed}`);
lines.push(`job_failed{type="${metrics.jobType}"} ${metrics.totalFailed}`);
lines.push(`job_duration_ms{type="${metrics.jobType}",quantile="avg"} ${metrics.averageDuration}`);
lines.push(`job_duration_ms{type="${metrics.jobType}",quantile="p99"} ${metrics.p99Duration}`);
lines.push(`job_error_rate{type="${metrics.jobType}"} ${(metrics.errorRate * 100).toFixed(2)}`);
}
return lines.join('\n');
}
}
const observability = new JobObservability();
// Integration with worker
const worker = new Worker(
'exports',
async job => {
const start = performance.now();
try {
const result = await processJob(job);
const duration = performance.now() - start;
observability.recordJobCompletion('exports', duration, true);
return result;
} catch (error) {
const duration = performance.now() - start;
observability.recordJobCompletion('exports', duration, false);
throw error;
}
},
{ connection: redis }
);
// Export metrics to Prometheus
app.get('/metrics', (req, res) => {
res.setHeader('Content-Type', 'text/plain');
res.send(observability.exportPrometheus());
});
Checklist
- Enforce job timeouts with AbortSignal
- Implement exponential backoff with jitter
- Detect poison pills (repeated failures)
- Use deterministic job IDs for deduplication
- Implement exactly-once semantics with database locks
- Add heartbeat signals for long-running jobs
- Use fan-out for parallel subtasks
- Monitor job metrics (duration, error rate, p99)
- Store failed jobs for inspection/replay
- Test failure scenarios (timeout, crash, poison pill)
Conclusion
Reliable background jobs require multiple layers of protection. Timeout enforcement prevents hangs, deduplication prevents double-processing, heartbeats signal progress, and poison pill detection surfaces failures. Combined with proper observability and exactly-once semantics via database locks, background jobs become a trustworthy foundation rather than a silent source of data corruption.