Published on

The Transactional Outbox Pattern — Guaranteed Message Delivery Without 2PC

Authors

Introduction

The dual-write problem: you write to your database and publish to Kafka. The database succeeds, Kafka fails, and you lose the event. Or the reverse: Kafka succeeds, the database crashes before it finishes, and other services see an event for a transaction that never happened. The transactional outbox pattern solves this: write to a single database atomically, then reliably publish from there. No two-phase commit needed.

The Dual-Write Problem and Outbox Solution

// BAD: Dual write - not atomic
async createOrder(req: CreateOrderRequest): Promise<void> {
  // If one fails, they're inconsistent
  const order = await this.db.query(
    `INSERT INTO orders (customer_id, total, status) VALUES ($1, $2, 'pending') RETURNING id`,
    [req.customerId, req.totalAmount]
  );

  // Kafka might fail; order exists but event wasn't published
  await this.kafka.publish('orders.created', { orderId: order.rows[0].id });
}

// GOOD: Write to outbox + database atomically
async createOrder(req: CreateOrderRequest): Promise<void> {
  await this.db.query('BEGIN');

  try {
    const order = await this.db.query(
      `INSERT INTO orders (customer_id, total, status) VALUES ($1, $2, 'pending') RETURNING id`,
      [req.customerId, req.totalAmount]
    );

    const orderId = order.rows[0].id;

    // Write event to outbox in same transaction
    await this.db.query(
      `INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, created_at)
       VALUES ($1, $2, $3, $4, NOW())`,
      [
        orderId,
        'Order',
        'OrderCreated',
        JSON.stringify({
          orderId,
          customerId: req.customerId,
          totalAmount: req.totalAmount,
        }),
      ]
    );

    await this.db.query('COMMIT');
  } catch (error) {
    await this.db.query('ROLLBACK');
    throw error;
  }
}

Outbox table schema:

async initializeOutbox(): Promise<void> {
  await this.db.query(`
    CREATE TABLE IF NOT EXISTS outbox (
      id BIGSERIAL PRIMARY KEY,
      aggregate_id VARCHAR(255) NOT NULL,
      aggregate_type VARCHAR(50) NOT NULL,
      event_type VARCHAR(100) NOT NULL,
      payload JSONB NOT NULL,
      created_at TIMESTAMP DEFAULT NOW(),
      published_at TIMESTAMP,
      published BOOLEAN DEFAULT FALSE,
      INDEX idx_outbox_published (published, created_at)
    );
  `);
}

Polling Publisher

A separate process polls the outbox table and publishes unpublished events.

class PollingPublisher {
  private batchSize = 100;
  private pollInterval = 1000; // 1 second

  async start(): Promise<void> {
    setInterval(() => this.poll(), this.pollInterval);
  }

  private async poll(): Promise<void> {
    const unpublished = await this.db.query(
      `SELECT id, aggregate_id, aggregate_type, event_type, payload
       FROM outbox
       WHERE published = FALSE
       ORDER BY created_at ASC
       LIMIT $1`,
      [this.batchSize]
    );

    for (const event of unpublished.rows) {
      try {
        // Publish to Kafka
        await this.kafka.send({
          topic: this.getTopicName(event.aggregate_type),
          messages: [
            {
              key: event.aggregate_id,
              value: JSON.stringify({
                aggregateId: event.aggregate_id,
                eventType: event.event_type,
                payload: event.payload,
                timestamp: Date.now(),
              }),
            },
          ],
        });

        // Mark as published
        await this.db.query(
          `UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1`,
          [event.id]
        );
      } catch (error) {
        console.error(`Failed to publish event ${event.id}:`, error);
        // Don't mark as published; will retry next poll
      }
    }
  }

  private getTopicName(aggregateType: string): string {
    return `${aggregateType.toLowerCase()}.events`;
  }

  // Cleanup old published events (optional, prevents unbounded growth)
  async cleanup(): Promise<void> {
    const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
    const result = await this.db.query(
      `DELETE FROM outbox WHERE published = TRUE AND published_at < $1`,
      [thirtyDaysAgo]
    );
    console.log(`Cleaned up ${result.rowCount} old outbox events`);
  }
}

Change Data Capture with Debezium

Debezium watches the database write-ahead log and publishes changes automatically. More scalable than polling.

// Debezium setup (via Docker/Kubernetes manifests typically)
interface DebeziumConnectorConfig {
  name: string;
  config: {
    'connector.class': string;
    'database.hostname': string;
    'database.port': number;
    'database.user': string;
    'database.password': string;
    'database.dbname': string;
    'database.server.name': string;
    'table.include.list': string;
    'plugin.name': string;
    'publication.name': string;
    'slot.name': string;
  };
}

// Example PostgreSQL CDC setup with wal2json
const pgConfig: DebeziumConnectorConfig = {
  name: 'postgres-outbox-connector',
  config: {
    'connector.class': 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname': 'postgres',
    'database.port': 5432,
    'database.user': 'postgres',
    'database.password': 'password',
    'database.dbname': 'myapp',
    'database.server.name': 'myapp',
    'table.include.list': 'public.outbox',
    'plugin.name': 'wal2json',
    'publication.name': 'dbz_publication',
    'slot.name': 'dbz_slot',
  },
};

// Consumer receives CDC events and publishes to application topic
class CDCEventRouter {
  async handleDebeziumEvent(cdcEvent: DebeziumRecord): Promise<void> {
    if (cdcEvent.op === 'i' || cdcEvent.op === 'u') {
      // Insert or update
      const outboxRecord = cdcEvent.after;

      if (!outboxRecord.published) {
        // Publish the business event
        await this.kafka.send({
          topic: `${outboxRecord.aggregate_type.toLowerCase()}.events`,
          messages: [
            {
              key: outboxRecord.aggregate_id,
              value: JSON.stringify({
                aggregateId: outboxRecord.aggregate_id,
                eventType: outboxRecord.event_type,
                payload: outboxRecord.payload,
                timestamp: outboxRecord.created_at.getTime(),
                sourceId: cdcEvent.source.lsn, // Log position for deduplication
              }),
            },
          ],
        });

        // Mark published in database (Debezium will capture this too)
        await this.db.query(
          `UPDATE outbox SET published = TRUE WHERE id = $1`,
          [outboxRecord.id]
        );
      }
    }
  }
}

interface DebeziumRecord {
  op: 'i' | 'u' | 'd' | 't'; // insert, update, delete, truncate
  before: any;
  after: any;
  source: {
    lsn: number;
    txId: number;
  };
}

At-Least-Once Delivery and Idempotent Consumers

The outbox pattern provides at-least-once delivery: events are published at least once, possibly multiple times. Consumers must be idempotent.

class OrderEventConsumer {
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const eventId = `order-created:${event.aggregateId}`;

    // Check if we've processed this event before
    const processed = await this.db.query(
      `SELECT id FROM processed_events WHERE event_id = $1`,
      [eventId]
    );

    if (processed.rows.length > 0) {
      return; // Already processed
    }

    try {
      // Process the event
      await this.db.query(
        `INSERT INTO order_notifications (order_id, type, sent_at)
         VALUES ($1, 'created', NOW())`,
        [event.aggregateId]
      );

      // Record that we've processed this event
      await this.db.query(
        `INSERT INTO processed_events (event_id, processed_at)
         VALUES ($1, NOW())`,
        [eventId]
      );
    } catch (error) {
      console.error(`Failed to process event ${eventId}:`, error);
      // Don't mark as processed; will retry
      throw error;
    }
  }
}

class InventoryEventConsumer {
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const eventId = `order-created:${event.aggregateId}`;

    // Upsert: only reserve if not already reserved
    const result = await this.db.query(
      `INSERT INTO reserved_inventory (order_id, items, reserved_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (order_id) DO NOTHING
       RETURNING id`,
      [event.aggregateId, JSON.stringify(event.payload.items)]
    );

    if (result.rows.length === 0) {
      return; // Already reserved by a previous message
    }

    // Process reservation...
  }
}

Inbox Pattern for Deduplication

Paired with the outbox pattern, an inbox in the consumer prevents processing duplicates.

class InboxPattern {
  async processIncomingEvent(event: any): Promise<void> {
    const inboxId = `${event.source}:${event.eventId}`;

    // Start transaction
    const conn = await this.pool.connect();
    await conn.query('BEGIN');

    try {
      // Write to inbox first
      const result = await conn.query(
        `INSERT INTO inbox (source_id, event_id, event_type, payload, received_at)
         VALUES ($1, $2, $3, $4, NOW())
         ON CONFLICT (source_id, event_id) DO NOTHING
         RETURNING id`,
        [event.source, event.eventId, event.type, JSON.stringify(event)]
      );

      if (result.rows.length === 0) {
        // Already in inbox; was this processed?
        const processed = await conn.query(
          `SELECT processed_at FROM inbox
           WHERE source_id = $1 AND event_id = $2`,
          [event.source, event.eventId]
        );

        if (processed.rows[0].processed_at) {
          return; // Already fully processed
        }
        // Else: in inbox but not processed yet; continue below
      }

      // Process the event (this must be idempotent)
      await this.handleEvent(conn, event);

      // Mark as processed
      await conn.query(
        `UPDATE inbox SET processed_at = NOW()
         WHERE source_id = $1 AND event_id = $2`,
        [event.source, event.eventId]
      );

      await conn.query('COMMIT');
    } catch (error) {
      await conn.query('ROLLBACK');
      throw error;
    } finally {
      conn.release();
    }
  }

  private async handleEvent(conn: any, event: any): Promise<void> {
    // Process event with provided connection (same transaction)
    if (event.type === 'OrderCreated') {
      await conn.query(
        `INSERT INTO customer_orders (customer_id, order_id)
         VALUES ($1, $2)
         ON CONFLICT DO NOTHING`,
        [event.customerId, event.orderId]
      );
    }
  }
}

Testing Outbox with Testcontainers and Kafka

describe('OutboxPattern', () => {
  let kafkaContainer: StartedTestContainer;
  let postgresContainer: StartedTestContainer;
  let publisher: PollingPublisher;

  beforeAll(async () => {
    kafkaContainer = await new GenericContainer('confluentinc/cp-kafka:7.5.0')
      .withEnvironment({
        KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181',
        KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092',
      })
      .withExposedPorts(9092)
      .start();

    postgresContainer = await new PostgreSqlContainer()
      .withDatabase('testdb')
      .withUsername('test')
      .withPassword('test')
      .start();

    const pool = new Pool({
      host: postgresContainer.getHost(),
      port: postgresContainer.getPort(),
      database: 'testdb',
      user: 'test',
      password: 'test',
    });

    publisher = new PollingPublisher(pool, kafkaContainer.getHost(), kafkaContainer.getMappedPort(9092));
  });

  it('should publish events from outbox to Kafka', async () => {
    const consumer = new KafkaConsumer(kafkaContainer.getHost(), kafkaContainer.getMappedPort(9092));
    await consumer.subscribe('order.events');

    // Insert event to outbox
    await pool.query(
      `INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, published)
       VALUES ($1, $2, $3, $4, FALSE)`,
      ['order-123', 'Order', 'OrderCreated', JSON.stringify({ amount: 100 })]
    );

    // Poll should publish it
    await publisher.poll();

    // Verify message on Kafka
    const messages = await consumer.readMessages({ maxAttempts: 10 });
    expect(messages).toHaveLength(1);
    expect(messages[0].value).toContain('OrderCreated');
  });

  it('should retry publishing on failure', async () => {
    const failingPublisher = new FailingPollingPublisher();
    failingPublisher.failNextAttempts(2);

    await pool.query(
      `INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, published)
       VALUES ($1, $2, $3, $4, FALSE)`,
      ['order-456', 'Order', 'OrderCreated', JSON.stringify({ amount: 200 })]
    );

    // First two polls fail; third succeeds
    await failingPublisher.poll(); // Fails, event stays unpublished
    await failingPublisher.poll(); // Fails, event stays unpublished
    await failingPublisher.poll(); // Succeeds

    const unpublished = await pool.query(`SELECT id FROM outbox WHERE published = FALSE`);
    expect(unpublished.rows).toHaveLength(0);
  });

  afterAll(async () => {
    await kafkaContainer.stop();
    await postgresContainer.stop();
  });
});

Checklist

  • Design outbox table with publish status and timestamp indices
  • Write to outbox atomically with your business transaction
  • Implement polling publisher or set up Debezium CDC
  • Make all consumers idempotent (check for event ID before processing)
  • Use inbox pattern in critical consumers to prevent duplicate processing
  • Test failure scenarios: Kafka down, polling stuck, CDC lag
  • Monitor outbox table size and publish lag
  • Implement cleanup policy for old published events
  • Use correlation/trace IDs to follow events through system

Conclusion

The transactional outbox pattern eliminates the dual-write problem by leveraging atomicity of a single database. Pair it with idempotent consumers and an inbox pattern for reliability. Polling is simple but has latency; CDC is faster but adds operational complexity. Start with polling, migrate to CDC if needed.