Published on

AI Background Processing — Async LLM Jobs, Queues, and Webhook Callbacks

Authors

Introduction

LLM requests take 2-30 seconds. Synchronous API calls kill user experience. This guide covers production patterns for async LLM processing with job queues, idempotency, and status tracking.

Why LLM Calls Must Be Async

LLM latency makes synchronous requests impractical for user-facing features.

import Bull from 'bull';
import Redis from 'ioredis';

interface LLMJob {
  id: string;
  input: string;
  model: string;
  userId: string;
  createdAt: Date;
  startedAt?: Date;
  completedAt?: Date;
  status: 'pending' | 'processing' | 'completed' | 'failed';
  result?: string;
  error?: string;
}

class AsyncLLMProcessor {
  private queue: Bull.Queue<LLMJob>;
  private redis: Redis.Redis;
  private jobs = new Map<string, LLMJob>();

  constructor(redisUrl: string) {
    this.redis = new Redis(redisUrl);
    this.queue = new Bull('llm-processing', redisUrl);

    this.queue.process(5, async (job) => {
      const data = job.data as LLMJob;
      return this.processLLMJob(data);
    });
  }

  async submitJob(input: string, model: string, userId: string): Promise<string> {
    const jobId = `job_${Date.now()}_${Math.random()}`;
    const job: LLMJob = {
      id: jobId,
      input,
      model,
      userId,
      createdAt: new Date(),
      status: 'pending',
    };

    this.jobs.set(jobId, job);

    // Add to queue without waiting for result
    await this.queue.add(job, { jobId, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });

    // Return immediately
    return jobId;
  }

  private async processLLMJob(job: LLMJob): Promise<string> {
    const stored = this.jobs.get(job.id);
    if (!stored) {
      stored = job;
      this.jobs.set(job.id, job);
    }

    stored.status = 'processing';
    stored.startedAt = new Date();

    try {
      const response = await fetch('https://api.openai.com/v1/chat/completions', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
        },
        body: JSON.stringify({
          model: job.model,
          messages: [{ role: 'user', content: job.input }],
          temperature: 0.7,
        }),
      });

      const data = (await response.json()) as { choices: Array<{ message: { content: string } }> };
      const result = data.choices[0].message.content;

      stored.result = result;
      stored.status = 'completed';
      stored.completedAt = new Date();

      await this.redis.setex(
        `job:${job.id}`,
        86400 * 7, // 7 days retention
        JSON.stringify(stored)
      );

      return result;
    } catch (error) {
      stored.error = error instanceof Error ? error.message : String(error);
      stored.status = 'failed';
      throw error;
    }
  }

  async getJobStatus(jobId: string): Promise<LLMJob | null> {
    return this.jobs.get(jobId) || null;
  }

  async waitForCompletion(jobId: string, timeoutMs: number = 60000): Promise<string> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeoutMs) {
      const job = this.jobs.get(jobId);
      if (job?.status === 'completed') {
        return job.result || '';
      }

      if (job?.status === 'failed') {
        throw new Error(job.error || 'Job failed');
      }

      await new Promise((resolve) => setTimeout(resolve, 500));
    }

    throw new Error(`Job ${jobId} timed out`);
  }
}

const processor = new AsyncLLMProcessor(process.env.REDIS_URL!);
const jobId = await processor.submitJob('Summarize this text...', 'gpt-4', 'user123');
console.log(`Job submitted: ${jobId}`);

const status = await processor.getJobStatus(jobId);
console.log(`Current status: ${status?.status}`);

BullMQ Job Queue Setup

Use BullMQ for reliable job processing with retries and scheduling.

import { Queue, Worker, QueueScheduler } from 'bullmq';
import IORedis from 'ioredis';

interface AIProcessingJob {
  type: 'summarize' | 'classify' | 'extract';
  content: string;
  options?: Record<string, unknown>;
  userId: string;
}

class BullMQProcessor {
  private queue: Queue<AIProcessingJob>;
  private worker: Worker<AIProcessingJob>;
  private scheduler: QueueScheduler;

  constructor(redisUrl: string) {
    const connection = new IORedis(redisUrl);

    this.queue = new Queue<AIProcessingJob>('ai-processing', { connection });
    this.worker = new Worker<AIProcessingJob>(
      'ai-processing',
      async (job) => {
        return this.handleJob(job);
      },
      { connection, concurrency: 5 }
    );

    this.scheduler = new QueueScheduler('ai-processing', { connection });

    this.worker.on('completed', (job) => {
      console.log(`Job ${job.id} completed`);
    });

    this.worker.on('failed', (job, err) => {
      console.log(`Job ${job?.id} failed: ${err.message}`);
    });
  }

  async addJob(
    job: AIProcessingJob,
    options?: { delay?: number; priority?: number; attempts?: number }
  ): Promise<string> {
    const bullJob = await this.queue.add(job.type, job, {
      attempts: options?.attempts || 3,
      backoff: { type: 'exponential', delay: 2000 },
      priority: options?.priority || 0,
      delay: options?.delay,
      removeOnComplete: { age: 3600 }, // Remove after 1 hour
    });

    return bullJob.id!;
  }

  async getJobStatus(jobId: string): Promise<string> {
    const job = await this.queue.getJob(jobId);
    if (!job) return 'not_found';

    const state = await job.getState();
    return state;
  }

  async getJobResult(jobId: string): Promise<unknown> {
    const job = await this.queue.getJob(jobId);
    if (!job) throw new Error(`Job ${jobId} not found`);

    return job.returnvalue;
  }

  private async handleJob(job: any): Promise<unknown> {
    const data = job.data as AIProcessingJob;

    switch (data.type) {
      case 'summarize':
        return this.summarize(data.content);
      case 'classify':
        return this.classify(data.content);
      case 'extract':
        return this.extract(data.content);
      default:
        throw new Error(`Unknown job type: ${data.type}`);
    }
  }

  private async summarize(content: string): Promise<string> {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
      },
      body: JSON.stringify({
        model: 'gpt-3.5-turbo',
        messages: [{ role: 'user', content: `Summarize: ${content}` }],
      }),
    });

    const result = (await response.json()) as { choices: Array<{ message: { content: string } }> };
    return result.choices[0].message.content;
  }

  private async classify(content: string): Promise<string> {
    return `Classification of: ${content.slice(0, 50)}`;
  }

  private async extract(content: string): Promise<unknown> {
    return { extracted: content.slice(0, 100) };
  }

  async close(): Promise<void> {
    await this.worker.close();
    await this.scheduler.close();
    await this.queue.close();
  }
}

const processor = new BullMQProcessor(process.env.REDIS_URL!);
const jobId = await processor.addJob({ type: 'summarize', content: 'Long text...', userId: 'user1' });

const status = await processor.getJobStatus(jobId);
console.log(`Job status: ${status}`);

Job Status Polling vs Webhooks

Implement both polling and webhook delivery for job completion.

class JobStatusTracker {
  private completionCallbacks = new Map<string, { url: string; retries: number }>();
  private pollingClients = new Map<string, { resolve: Function; reject: Function; timeoutId: NodeJS.Timeout }>();

  registerWebhook(jobId: string, webhookUrl: string): void {
    this.completionCallbacks.set(jobId, { url: webhookUrl, retries: 3 });
  }

  setupPolling(jobId: string): Promise<unknown> {
    return new Promise((resolve, reject) => {
      const timeoutId = setTimeout(() => {
        this.pollingClients.delete(jobId);
        reject(new Error(`Polling timeout for ${jobId}`));
      }, 300000); // 5 minute timeout

      this.pollingClients.set(jobId, { resolve, reject, timeoutId });
    });
  }

  async notifyCompletion(jobId: string, result: unknown): Promise<void> {
    // Notify polling clients
    const polling = this.pollingClients.get(jobId);
    if (polling) {
      clearTimeout(polling.timeoutId);
      polling.resolve(result);
      this.pollingClients.delete(jobId);
    }

    // Send webhook
    const webhook = this.completionCallbacks.get(jobId);
    if (webhook) {
      await this.sendWebhook(webhook.url, result, jobId, webhook.retries);
    }
  }

  private async sendWebhook(
    url: string,
    data: unknown,
    jobId: string,
    retriesLeft: number
  ): Promise<void> {
    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ jobId, data, timestamp: new Date().toISOString() }),
      });

      if (!response.ok) {
        throw new Error(`Webhook returned ${response.status}`);
      }

      this.completionCallbacks.delete(jobId);
    } catch (error) {
      if (retriesLeft > 0) {
        console.log(`Webhook failed, retrying... (${retriesLeft} left)`);
        const delay = 5000 * (3 - retriesLeft);
        await new Promise((resolve) => setTimeout(resolve, delay));
        await this.sendWebhook(url, data, jobId, retriesLeft - 1);
      } else {
        console.error(`Webhook failed permanently for ${jobId}`);
      }
    }
  }
}

const tracker = new JobStatusTracker();
tracker.registerWebhook('job_123', 'https://example.com/webhook');
await tracker.notifyCompletion('job_123', { result: 'Success' });

Idempotency for AI Jobs

Ensure same input produces same job ID to prevent duplicate processing.

class IdempotentJobManager {
  private jobIdCache = new Map<string, string>(); // Hash -> Job ID
  private processedJobs = new Map<string, unknown>(); // Job ID -> Result

  private hashInput(input: string, userId: string, model: string): string {
    const crypto = require('crypto');
    return crypto.createHash('sha256').update(`${input}${userId}${model}`).digest('hex');
  }

  async submitIdempotent(
    input: string,
    userId: string,
    model: string,
    submitter: (jobId: string) => Promise<void>
  ): Promise<string> {
    const hash = this.hashInput(input, userId, model);

    // Check if we've already submitted this
    if (this.jobIdCache.has(hash)) {
      console.log('Duplicate request detected, returning existing job ID');
      return this.jobIdCache.get(hash)!;
    }

    // Check if it's already processed
    const existingJob = Array.from(this.jobIdCache.entries()).find(
      ([h, _]) => h === hash
    );
    if (existingJob) {
      return existingJob[1];
    }

    // New job
    const jobId = `job_${Date.now()}_${Math.random()}`;
    this.jobIdCache.set(hash, jobId);

    await submitter(jobId);
    return jobId;
  }

  recordCompletion(jobId: string, result: unknown): void {
    this.processedJobs.set(jobId, result);
  }

  getResult(jobId: string): unknown | undefined {
    return this.processedJobs.get(jobId);
  }

  isAlreadyProcessed(jobId: string): boolean {
    return this.processedJobs.has(jobId);
  }
}

const idempotenceManager = new IdempotentJobManager();

const jobId1 = await idempotenceManager.submitIdempotent(
  'Summarize this document',
  'user1',
  'gpt-4',
  async (id) => console.log(`Processing job ${id}`)
);

const jobId2 = await idempotenceManager.submitIdempotent(
  'Summarize this document',
  'user1',
  'gpt-4',
  async (id) => console.log(`Processing job ${id}`)
);

console.log(`Same job: ${jobId1 === jobId2}`); // true

Retry Strategy for Rate Limits

Handle 429 responses with exponential backoff and jitter.

class RateLimitHandler {
  async executeWithRateLimit<T>(
    fn: () => Promise<T>,
    maxRetries: number = 5
  ): Promise<T> {
    let retryCount = 0;

    while (true) {
      try {
        return await fn();
      } catch (error) {
        const statusCode = (error as any).statusCode || (error as any).status;

        if (statusCode === 429) {
          if (retryCount >= maxRetries) {
            throw new Error(`Rate limited after ${maxRetries} retries`);
          }

          const delayMs = this.calculateBackoff(retryCount);
          console.log(`Rate limited (429), retrying in ${delayMs}ms...`);
          await new Promise((resolve) => setTimeout(resolve, delayMs));
          retryCount++;
        } else {
          throw error;
        }
      }
    }
  }

  private calculateBackoff(attempt: number): number {
    // Exponential backoff: 2^attempt * 1000ms + random jitter
    const exponential = Math.pow(2, attempt) * 1000;
    const jitter = Math.random() * 1000;
    const maxDelay = 60000; // Cap at 60 seconds

    return Math.min(exponential + jitter, maxDelay);
  }

  async batchWithRateLimit<T>(
    items: T[],
    processor: (item: T) => Promise<void>,
    batchSize: number = 5,
    delayBetweenBatches: number = 5000
  ): Promise<void> {
    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize);

      await Promise.all(batch.map((item) => this.executeWithRateLimit(() => processor(item))));

      if (i + batchSize < items.length) {
        console.log(`Batch processed, waiting ${delayBetweenBatches}ms before next batch...`);
        await new Promise((resolve) => setTimeout(resolve, delayBetweenBatches));
      }
    }
  }
}

const rateLimiter = new RateLimitHandler();

await rateLimiter.executeWithRateLimit(async () => {
  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    body: JSON.stringify({}),
  });

  if (response.status === 429) {
    const error = new Error('Rate limited');
    (error as any).status = 429;
    throw error;
  }

  return response.json();
});

Streaming Progress Updates via SSE

Send real-time progress updates to clients using Server-Sent Events.

import { EventEmitter } from 'events';

class ProgressEmitter extends EventEmitter {
  emit(eventName: string | symbol, ...args: unknown[]): boolean {
    return super.emit(eventName, ...args);
  }

  recordProgress(jobId: string, message: string, progress: number): void {
    this.emit('progress', { jobId, message, progress, timestamp: new Date() });
  }

  recordCompletion(jobId: string, result: unknown): void {
    this.emit('completed', { jobId, result, timestamp: new Date() });
  }

  recordError(jobId: string, error: string): void {
    this.emit('error', { jobId, error, timestamp: new Date() });
  }
}

class SSEServer {
  private emitter: ProgressEmitter;
  private clients = new Map<string, { res: any; jobId: string }>();

  constructor() {
    this.emitter = new ProgressEmitter();

    this.emitter.on('progress', (data) => {
      this.broadcast('progress', data);
    });

    this.emitter.on('completed', (data) => {
      this.broadcast('completed', data);
    });

    this.emitter.on('error', (data) => {
      this.broadcast('error', data);
    });
  }

  createSSEEndpoint(jobId: string, res: any): void {
    const clientId = `client_${Date.now()}_${Math.random()}`;

    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    this.clients.set(clientId, { res, jobId });

    res.write(`: Connected to job ${jobId}\n\n`);

    res.on('close', () => {
      this.clients.delete(clientId);
    });
  }

  private broadcast(eventType: string, data: unknown): void {
    for (const [, client] of this.clients) {
      try {
        client.res.write(`event: ${eventType}\n`);
        client.res.write(`data: ${JSON.stringify(data)}\n\n`);
      } catch (error) {
        // Client disconnected
      }
    }
  }

  getEmitter(): ProgressEmitter {
    return this.emitter;
  }
}

const sseServer = new SSEServer();
const emitter = sseServer.getEmitter();

// Simulate job progress
emitter.recordProgress('job_123', 'Starting processing...', 0);
await new Promise((resolve) => setTimeout(resolve, 1000));
emitter.recordProgress('job_123', 'Sending to LLM...', 30);
await new Promise((resolve) => setTimeout(resolve, 2000));
emitter.recordProgress('job_123', 'Processing response...', 80);
await new Promise((resolve) => setTimeout(resolve, 500));
emitter.recordCompletion('job_123', { result: 'Done' });

Storing and Versioning AI Outputs

Persist job outputs with versioning and audit trails.

interface StoredOutput {
  jobId: string;
  version: number;
  input: string;
  output: string;
  model: string;
  tokens: { input: number; output: number };
  userId: string;
  createdAt: Date;
  expiresAt: Date;
  metadata?: Record<string, unknown>;
}

class OutputStorage {
  private outputs = new Map<string, StoredOutput[]>();
  private readonly defaultRetentionDays = 90;

  store(jobId: string, input: string, output: string, model: string, userId: string): void {
    const stored: StoredOutput = {
      jobId,
      version: 1,
      input,
      output,
      model,
      tokens: { input: Math.ceil(input.length / 4), output: Math.ceil(output.length / 4) },
      userId,
      createdAt: new Date(),
      expiresAt: new Date(Date.now() + this.defaultRetentionDays * 86400000),
    };

    const existing = this.outputs.get(jobId) || [];
    stored.version = existing.length + 1;
    existing.push(stored);
    this.outputs.set(jobId, existing);
  }

  getLatest(jobId: string): StoredOutput | undefined {
    const all = this.outputs.get(jobId);
    return all?.[all.length - 1];
  }

  getVersion(jobId: string, version: number): StoredOutput | undefined {
    const all = this.outputs.get(jobId);
    return all?.find((o) => o.version === version);
  }

  getHistory(jobId: string): StoredOutput[] {
    return this.outputs.get(jobId) || [];
  }

  pruneExpired(): number {
    let removed = 0;

    for (const [jobId, outputs] of this.outputs.entries()) {
      const filtered = outputs.filter((o) => o.expiresAt > new Date());
      if (filtered.length === 0) {
        this.outputs.delete(jobId);
        removed++;
      } else {
        this.outputs.set(jobId, filtered);
      }
    }

    return removed;
  }
}

const storage = new OutputStorage();
storage.store('job_123', 'Summarize this...', 'Summary result', 'gpt-4', 'user1');

const latest = storage.getLatest('job_123');
console.log('Latest output:', latest?.output);

const history = storage.getHistory('job_123');
console.log('Output history:', history.length, 'versions');

Checklist

  • Submit LLM requests to background queues immediately
  • Use BullMQ or similar for reliable job processing
  • Implement idempotency using input hashing
  • Support both webhook callbacks and polling
  • Handle 429 rate limits with exponential backoff
  • Stream progress updates via Server-Sent Events
  • Persist all outputs with retention policies
  • Version outputs for audit trails
  • Retry failed jobs up to 3 times
  • Monitor queue depth and job latency
  • Prune expired outputs regularly
  • Log all job state transitions for debugging

Conclusion

Async job processing is essential for LLM workloads. Use BullMQ with exponential backoff for reliable processing, idempotency hashing to prevent duplicates, webhooks or SSE for status updates, and persistent storage with versioning. This layered approach provides reliability, observability, and scalability for background AI tasks at any scale.