Published on

Reliable Background Jobs — Handling Timeouts, Poison Pills, and Job Duplication

Authors

Introduction

Background jobs are where reliability expectations meet reality. A job that sometimes crashes, duplicates work, or hangs forever becomes a production incident. Proper timeout enforcement, poison pill detection, exactly-once semantics, and monitoring transform background work from a liability into a foundation. Most job systems fail silently until data corruption occurs.

Job Timeout Enforcement

Ensure jobs terminate, whether they complete or crash:

import { Worker } from 'bullmq';
import { AbortSignal } from 'node-abort-controller';

interface JobData {
  userId: string;
  reportId: string;
}

// Timeout enforcement with AbortSignal
class TimeoutJobWorker {
  private worker: Worker;

  constructor() {
    this.worker = new Worker<JobData>(
      'reports',
      async (job) => {
        // Create abort signal for job timeout
        const timeoutMs = job.opts.timeout || 30000; // 30 second default
        const controller = new AbortController();
        const timeoutHandle = setTimeout(() => controller.abort(), timeoutMs);

        try {
          const result = await this.processReport(job.data, controller.signal);
          return result;
        } finally {
          clearTimeout(timeoutHandle);
        }
      },
      { connection: redis }
    );
  }

  private async processReport(data: JobData, signal: AbortSignal): Promise<any> {
    // Check signal before expensive operations
    if (signal.aborted) throw new Error('Job aborted');

    const report = await db.report.findUnique({ where: { id: data.reportId } });

    // Long operation with cancellation support
    const results = await this.generateReport(data.reportId, signal);

    if (signal.aborted) throw new Error('Job aborted');

    await db.report.update({
      where: { id: data.reportId },
      data: { results, completedAt: new Date(), status: 'completed' },
    });

    return { reportId: data.reportId, success: true };
  }

  private async generateReport(reportId: string, signal: AbortSignal): Promise<any[]> {
    const batchSize = 1000;
    const results: any[] = [];

    // Process in batches with cancellation checks
    for (let offset = 0; offset < 100000; offset += batchSize) {
      if (signal.aborted) throw new Error('Job aborted during processing');

      const batch = await db.data.findMany({
        skip: offset,
        take: batchSize,
      });

      results.push(...batch);

      // Yield to event loop
      await new Promise(resolve => setImmediate(resolve));
    }

    return results;
  }
}

// Timeout configuration per job
async function queueReportGeneration(userId: string, reportId: string, timeoutMs: number = 60000) {
  await emailQueue.add(
    'generate',
    { userId, reportId },
    {
      timeout: timeoutMs, // BullMQ timeout (fails after this)
    }
  );
}

Exponential Backoff with Jitter

Retry failed jobs without overwhelming the system:

import { Queue } from 'bullmq';

interface BackoffConfig {
  type: 'exponential' | 'fixed';
  delay: number;
  maxDelay?: number;
  multiplier?: number;
}

function calculateBackoffDelay(
  attemptNumber: number,
  config: BackoffConfig
): number {
  let delay = config.delay;

  if (config.type === 'exponential') {
    delay = config.delay * Math.pow(config.multiplier || 2, attemptNumber - 1);

    // Cap maximum delay
    if (config.maxDelay) {
      delay = Math.min(delay, config.maxDelay);
    }
  }

  // Add jitter to prevent thundering herd
  // If all failed jobs retry at same time, causes spike
  const jitter = delay * 0.1 * Math.random();

  return delay + jitter;
}

// Example backoff schedule
const config: BackoffConfig = {
  type: 'exponential',
  delay: 1000,      // Start at 1 second
  maxDelay: 60000,  // Cap at 1 minute
  multiplier: 2,
};

console.log('Backoff schedule:');
for (let attempt = 1; attempt <= 10; attempt++) {
  const delay = calculateBackoffDelay(attempt, config);
  console.log(`Attempt ${attempt}: ${(delay / 1000).toFixed(2)}s`);
}
// Output:
// Attempt 1: 1.05s
// Attempt 2: 2.10s
// Attempt 3: 4.15s
// Attempt 4: 8.30s
// Attempt 5: 16.60s
// Attempt 6: 33.20s
// Attempt 7: 60.00s (capped)
// Attempt 8: 60.00s (capped)

const paymentQueue = new Queue('payments', { connection: redis });

async function queuePayment(userId: string, amount: number) {
  await paymentQueue.add(
    'process',
    { userId, amount },
    {
      attempts: 10,
      backoff: {
        type: 'exponential',
        delay: 2000,
        multiplier: 2,
      },
      removeOnComplete: true,
      removeOnFail: false, // Keep failed jobs for inspection
    }
  );
}

Poison Pill Detection

Identify and quarantine jobs that fail consistently:

interface PoisonPillConfig {
  maxAttempts: number;
  timeWindow: number; // milliseconds
  detectionThreshold: number; // % of jobs failing in time window
}

class PoisonPillDetector {
  private failureHistory = new Map<string, number[]>(); // job type -> timestamps of failures
  private config: PoisonPillConfig = {
    maxAttempts: 5,
    timeWindow: 60000, // 1 minute
    detectionThreshold: 80, // 80% failure rate
  };

  recordFailure(jobType: string, timestamp: number = Date.now()): void {
    if (!this.failureHistory.has(jobType)) {
      this.failureHistory.set(jobType, []);
    }

    const history = this.failureHistory.get(jobType)!;
    history.push(timestamp);

    // Clean up old entries
    const cutoff = timestamp - this.config.timeWindow;
    const recentFailures = history.filter(t => t > cutoff);
    this.failureHistory.set(jobType, recentFailures);

    // Check for poison pill pattern
    if (recentFailures.length >= this.config.maxAttempts) {
      const failureRate = (recentFailures.length / this.config.maxAttempts) * 100;

      if (failureRate >= this.config.detectionThreshold) {
        this.emitPoison Pill Alert(jobType, failureRate);
      }
    }
  }

  private emitPoisonPillAlert(jobType: string, failureRate: number): void {
    console.error(`Poison pill detected for job type "${jobType}": ${failureRate.toFixed(2)}% failure rate`);

    // Actions:
    // 1. Pause the queue
    // 2. Alert operations team
    // 3. Move remaining jobs to DLQ
    // 4. Investigate root cause
  }
}

// BullMQ integration
const poisonPillDetector = new PoisonPillDetector();

const worker = new Worker(
  'exports',
  async job => {
    try {
      return await processExport(job.data);
    } catch (error) {
      poisonPillDetector.recordFailure('exports');
      throw error;
    }
  },
  { connection: redis }
);

worker.on('failed', (job, err) => {
  if (job && job.attemptsMade >= job.opts.attempts!) {
    console.log(`Job ${job.id} permanently failed, moving to DLQ`);
    // Move to DLQ handled by BullMQ
  }
});

Job Deduplication with Deterministic IDs

Prevent duplicate processing of the same work:

import crypto from 'crypto';
import { Queue } from 'bullmq';

interface EmailJob {
  userId: string;
  email: string;
  templateId: string;
}

function generateDeterministicJobId(data: EmailJob): string {
  // Create content hash so identical jobs get same ID
  const content = JSON.stringify(data);
  const hash = crypto.createHash('sha256').update(content).digest('hex').slice(0, 16);

  // Include timestamp to allow retries of same data
  return `email-${hash}`;
}

const emailQueue = new Queue<EmailJob>('emails', { connection: redis });

async function sendEmail(userId: string, email: string, templateId: string) {
  const jobId = generateDeterministicJobId({ userId, email, templateId });

  // Add job with deduplication
  // If job with same ID already exists, skipDuplicate prevents re-adding
  const job = await emailQueue.add(
    'send',
    { userId, email, templateId },
    {
      jobId: jobId,
      skipDuplicate: true, // Don't add if ID exists
      removeOnComplete: true,
      attempts: 3,
    }
  );

  return job;
}

// Scenario: Webhook retry sends same data twice
await sendEmail('user-123', 'john@example.com', 'welcome');
await sendEmail('user-123', 'john@example.com', 'welcome'); // Same job ID, skipped

// Scenario: Hourly digest job
// JobId: "digest-2026-03-15" (based on date)
// Run at 9am: generates job
// Run at 9am (retry): same job ID, skipped
// Run at 10am: new day, new job ID, processes

const digestQueue = new Queue('digest', { connection: redis });

async function scheduleDigest() {
  const today = new Date().toISOString().split('T')[0]; // "2026-03-15"
  const jobId = `digest-${today}`;

  await digestQueue.add(
    'send',
    { date: today },
    {
      jobId,
      skipDuplicate: true,
      removeOnComplete: true,
    }
  );
}

Exactly-Once Execution with Database Locks

Guarantee job processes exactly once despite retries:

import { Queue, Worker } from 'bullmq';

// Use database lock for exactly-once semantics
class ExactlyOnceProcessor {
  async processWithLock<T>(jobId: string, processor: () => Promise<T>): Promise<T> {
    const lockKey = `job-lock:${jobId}`;
    const lockValue = `${Date.now()}-${Math.random()}`;

    // Try to acquire lock
    const lockAcquired = await redis.set(
      lockKey,
      lockValue,
      'PX', 3600000, // 1 hour timeout
      'NX' // Only set if doesn't exist
    );

    if (!lockAcquired) {
      // Another process is handling this job
      throw new Error(`Job ${jobId} already being processed`);
    }

    try {
      // Check if job already completed
      const result = await db.jobResult.findUnique({
        where: { jobId },
      });

      if (result) {
        return result.output as T;
      }

      // Process job
      const output = await processor();

      // Store result with atomic transaction
      await db.jobResult.upsert({
        where: { jobId },
        update: { completedAt: new Date(), output },
        create: { jobId, output, completedAt: new Date() },
      });

      return output;
    } finally {
      // Release lock
      const currentLock = await redis.get(lockKey);
      if (currentLock === lockValue) {
        await redis.del(lockKey);
      }
    }
  }
}

const processor = new ExactlyOnceProcessor();

const transferQueue = new Queue('transfers', { connection: redis });

const transferWorker = new Worker(
  'transfers',
  async job => {
    return processor.processWithLock(job.id!, async () => {
      // This code runs exactly once despite retries
      await db.account.update({
        where: { id: job.data.fromAccount },
        data: { balance: { decrement: job.data.amount } },
      });

      await db.account.update({
        where: { id: job.data.toAccount },
        data: { balance: { increment: job.data.amount } },
      });

      return { fromAccount: job.data.fromAccount, toAccount: job.data.toAccount };
    });
  },
  { connection: redis }
);

// Retry safety: Job processes exactly once even if retried
await transferQueue.add('transfer', {
  fromAccount: 'acc-1',
  toAccount: 'acc-2',
  amount: 1000,
});

Job Heartbeat for Long-Running Tasks

Send signals during processing to prevent timeout miscalculation:

import { Queue, Worker } from 'bullmq';

const csvExportQueue = new Queue('csv-export', { connection: redis });

interface HeartbeatConfig {
  interval: number; // milliseconds
  timeout: number;
}

async function exportLargeDataset(jobId: string, datasetId: string) {
  const heartbeatConfig: HeartbeatConfig = {
    interval: 5000, // Send heartbeat every 5 seconds
    timeout: 120000, // Kill if no heartbeat for 2 minutes
  };

  let lastHeartbeat = Date.now();
  let isCancelled = false;

  const heartbeatInterval = setInterval(() => {
    lastHeartbeat = Date.now();
    // Signal to scheduler that job is alive
    console.log(`[${jobId}] Heartbeat: processing`);
  }, heartbeatConfig.interval);

  try {
    const rows = await db.data.count({ where: { datasetId } });
    const batchSize = 10000;

    for (let offset = 0; offset < rows; offset += batchSize) {
      if (isCancelled) break;

      const batch = await db.data.findMany({
        skip: offset,
        take: batchSize,
        where: { datasetId },
      });

      // Process batch
      const csvLines = batch.map(row => formatCSV(row));

      // Yield to event loop
      await new Promise(resolve => setImmediate(resolve));

      console.log(`[${jobId}] Progress: ${offset + batchSize} of ${rows}`);
    }

    return { exported: rows };
  } finally {
    isCancelled = true;
    clearInterval(heartbeatInterval);
  }
}

const exportWorker = new Worker(
  'csv-export',
  async job => {
    return exportLargeDataset(job.id!, job.data.datasetId);
  },
  { connection: redis, concurrency: 5 }
);

exportWorker.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}`);
});

Fan-Out Pattern (One Job Spawns Many)

Decompose large jobs into parallel subtasks:

import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection: redis });

interface DataProcessingFlow {
  datasetId: string;
  regions: string[];
}

async function processDatasetParallel(datasetId: string, regions: string[]) {
  // Parent job coordinates, creates child jobs
  const children = regions.map(region => ({
    name: 'process-region',
    data: { datasetId, region },
  }));

  const parentJob = {
    name: 'process-dataset',
    data: { datasetId, regions },
    children, // Child jobs process in parallel
  };

  await flow.add(parentJob);
}

const datasetQueue = new Queue('process-dataset', { connection: redis });
const regionQueue = new Queue('process-region', { connection: redis });

// Parent job: coordinate parallel work
const parentWorker = new Worker(
  'process-dataset',
  async job => {
    // Parent waits for all children
    // Handled by BullMQ flow mechanism
    return { datasetId: job.data.datasetId, status: 'completed' };
  },
  { connection: redis }
);

// Child jobs: process in parallel
const regionWorker = new Worker(
  'process-region',
  async job => {
    const { datasetId, region } = job.data;

    // Each region processed independently
    const results = await processRegionData(datasetId, region);

    return { region, recordsProcessed: results.length };
  },
  { connection: redis, concurrency: 10 }
);

// Benefits: 4 regions process 4x faster (parallelization)
// If one region fails, others continue
// Failure handling per subtask

Job Observability (Metrics)

Track job health at scale:

interface JobMetrics {
  jobType: string;
  totalProcessed: number;
  totalFailed: number;
  averageDuration: number;
  p99Duration: number;
  errorRate: number;
}

class JobObservability {
  private metrics = new Map<string, JobMetrics>();
  private durations = new Map<string, number[]>();

  recordJobCompletion(jobType: string, durationMs: number, success: boolean): void {
    if (!this.metrics.has(jobType)) {
      this.metrics.set(jobType, {
        jobType,
        totalProcessed: 0,
        totalFailed: 0,
        averageDuration: 0,
        p99Duration: 0,
        errorRate: 0,
      });
      this.durations.set(jobType, []);
    }

    const metrics = this.metrics.get(jobType)!;
    const durations = this.durations.get(jobType)!;

    metrics.totalProcessed++;

    if (!success) {
      metrics.totalFailed++;
    }

    durations.push(durationMs);

    // Keep last 1000 measurements
    if (durations.length > 1000) {
      durations.shift();
    }

    // Update metrics
    metrics.averageDuration = durations.reduce((a, b) => a + b, 0) / durations.length;
    metrics.p99Duration = this.percentile(durations, 0.99);
    metrics.errorRate = metrics.totalFailed / metrics.totalProcessed;
  }

  private percentile(arr: number[], p: number): number {
    const sorted = [...arr].sort((a, b) => a - b);
    const index = Math.ceil(sorted.length * p) - 1;
    return sorted[Math.max(0, index)];
  }

  getMetrics(jobType?: string): JobMetrics[] {
    if (jobType) {
      return this.metrics.get(jobType) ? [this.metrics.get(jobType)!] : [];
    }

    return Array.from(this.metrics.values());
  }

  exportPrometheus(): string {
    const lines: string[] = [];

    for (const metrics of this.metrics.values()) {
      lines.push(`job_total{type="${metrics.jobType}"} ${metrics.totalProcessed}`);
      lines.push(`job_failed{type="${metrics.jobType}"} ${metrics.totalFailed}`);
      lines.push(`job_duration_ms{type="${metrics.jobType}",quantile="avg"} ${metrics.averageDuration}`);
      lines.push(`job_duration_ms{type="${metrics.jobType}",quantile="p99"} ${metrics.p99Duration}`);
      lines.push(`job_error_rate{type="${metrics.jobType}"} ${(metrics.errorRate * 100).toFixed(2)}`);
    }

    return lines.join('\n');
  }
}

const observability = new JobObservability();

// Integration with worker
const worker = new Worker(
  'exports',
  async job => {
    const start = performance.now();

    try {
      const result = await processJob(job);
      const duration = performance.now() - start;
      observability.recordJobCompletion('exports', duration, true);
      return result;
    } catch (error) {
      const duration = performance.now() - start;
      observability.recordJobCompletion('exports', duration, false);
      throw error;
    }
  },
  { connection: redis }
);

// Export metrics to Prometheus
app.get('/metrics', (req, res) => {
  res.setHeader('Content-Type', 'text/plain');
  res.send(observability.exportPrometheus());
});

Checklist

  • Enforce job timeouts with AbortSignal
  • Implement exponential backoff with jitter
  • Detect poison pills (repeated failures)
  • Use deterministic job IDs for deduplication
  • Implement exactly-once semantics with database locks
  • Add heartbeat signals for long-running jobs
  • Use fan-out for parallel subtasks
  • Monitor job metrics (duration, error rate, p99)
  • Store failed jobs for inspection/replay
  • Test failure scenarios (timeout, crash, poison pill)

Conclusion

Reliable background jobs require multiple layers of protection. Timeout enforcement prevents hangs, deduplication prevents double-processing, heartbeats signal progress, and poison pill detection surfaces failures. Combined with proper observability and exactly-once semantics via database locks, background jobs become a trustworthy foundation rather than a silent source of data corruption.