- Published on
The Transactional Outbox Pattern — Guaranteed Message Delivery Without 2PC
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
The dual-write problem: you write to your database and publish to Kafka. The database succeeds, Kafka fails, and you lose the event. Or the reverse: Kafka succeeds, the database crashes before it finishes, and other services see an event for a transaction that never happened. The transactional outbox pattern solves this: write to a single database atomically, then reliably publish from there. No two-phase commit needed.
- The Dual-Write Problem and Outbox Solution
- Polling Publisher
- Change Data Capture with Debezium
- At-Least-Once Delivery and Idempotent Consumers
- Inbox Pattern for Deduplication
- Testing Outbox with Testcontainers and Kafka
- Checklist
- Conclusion
The Dual-Write Problem and Outbox Solution
// BAD: Dual write - not atomic
async createOrder(req: CreateOrderRequest): Promise<void> {
// If one fails, they're inconsistent
const order = await this.db.query(
`INSERT INTO orders (customer_id, total, status) VALUES ($1, $2, 'pending') RETURNING id`,
[req.customerId, req.totalAmount]
);
// Kafka might fail; order exists but event wasn't published
await this.kafka.publish('orders.created', { orderId: order.rows[0].id });
}
// GOOD: Write to outbox + database atomically
async createOrder(req: CreateOrderRequest): Promise<void> {
await this.db.query('BEGIN');
try {
const order = await this.db.query(
`INSERT INTO orders (customer_id, total, status) VALUES ($1, $2, 'pending') RETURNING id`,
[req.customerId, req.totalAmount]
);
const orderId = order.rows[0].id;
// Write event to outbox in same transaction
await this.db.query(
`INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, created_at)
VALUES ($1, $2, $3, $4, NOW())`,
[
orderId,
'Order',
'OrderCreated',
JSON.stringify({
orderId,
customerId: req.customerId,
totalAmount: req.totalAmount,
}),
]
);
await this.db.query('COMMIT');
} catch (error) {
await this.db.query('ROLLBACK');
throw error;
}
}
Outbox table schema:
async initializeOutbox(): Promise<void> {
await this.db.query(`
CREATE TABLE IF NOT EXISTS outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
published_at TIMESTAMP,
published BOOLEAN DEFAULT FALSE,
INDEX idx_outbox_published (published, created_at)
);
`);
}
Polling Publisher
A separate process polls the outbox table and publishes unpublished events.
class PollingPublisher {
private batchSize = 100;
private pollInterval = 1000; // 1 second
async start(): Promise<void> {
setInterval(() => this.poll(), this.pollInterval);
}
private async poll(): Promise<void> {
const unpublished = await this.db.query(
`SELECT id, aggregate_id, aggregate_type, event_type, payload
FROM outbox
WHERE published = FALSE
ORDER BY created_at ASC
LIMIT $1`,
[this.batchSize]
);
for (const event of unpublished.rows) {
try {
// Publish to Kafka
await this.kafka.send({
topic: this.getTopicName(event.aggregate_type),
messages: [
{
key: event.aggregate_id,
value: JSON.stringify({
aggregateId: event.aggregate_id,
eventType: event.event_type,
payload: event.payload,
timestamp: Date.now(),
}),
},
],
});
// Mark as published
await this.db.query(
`UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1`,
[event.id]
);
} catch (error) {
console.error(`Failed to publish event ${event.id}:`, error);
// Don't mark as published; will retry next poll
}
}
}
private getTopicName(aggregateType: string): string {
return `${aggregateType.toLowerCase()}.events`;
}
// Cleanup old published events (optional, prevents unbounded growth)
async cleanup(): Promise<void> {
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const result = await this.db.query(
`DELETE FROM outbox WHERE published = TRUE AND published_at < $1`,
[thirtyDaysAgo]
);
console.log(`Cleaned up ${result.rowCount} old outbox events`);
}
}
Change Data Capture with Debezium
Debezium watches the database write-ahead log and publishes changes automatically. More scalable than polling.
// Debezium setup (via Docker/Kubernetes manifests typically)
interface DebeziumConnectorConfig {
name: string;
config: {
'connector.class': string;
'database.hostname': string;
'database.port': number;
'database.user': string;
'database.password': string;
'database.dbname': string;
'database.server.name': string;
'table.include.list': string;
'plugin.name': string;
'publication.name': string;
'slot.name': string;
};
}
// Example PostgreSQL CDC setup with wal2json
const pgConfig: DebeziumConnectorConfig = {
name: 'postgres-outbox-connector',
config: {
'connector.class': 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname': 'postgres',
'database.port': 5432,
'database.user': 'postgres',
'database.password': 'password',
'database.dbname': 'myapp',
'database.server.name': 'myapp',
'table.include.list': 'public.outbox',
'plugin.name': 'wal2json',
'publication.name': 'dbz_publication',
'slot.name': 'dbz_slot',
},
};
// Consumer receives CDC events and publishes to application topic
class CDCEventRouter {
async handleDebeziumEvent(cdcEvent: DebeziumRecord): Promise<void> {
if (cdcEvent.op === 'i' || cdcEvent.op === 'u') {
// Insert or update
const outboxRecord = cdcEvent.after;
if (!outboxRecord.published) {
// Publish the business event
await this.kafka.send({
topic: `${outboxRecord.aggregate_type.toLowerCase()}.events`,
messages: [
{
key: outboxRecord.aggregate_id,
value: JSON.stringify({
aggregateId: outboxRecord.aggregate_id,
eventType: outboxRecord.event_type,
payload: outboxRecord.payload,
timestamp: outboxRecord.created_at.getTime(),
sourceId: cdcEvent.source.lsn, // Log position for deduplication
}),
},
],
});
// Mark published in database (Debezium will capture this too)
await this.db.query(
`UPDATE outbox SET published = TRUE WHERE id = $1`,
[outboxRecord.id]
);
}
}
}
}
interface DebeziumRecord {
op: 'i' | 'u' | 'd' | 't'; // insert, update, delete, truncate
before: any;
after: any;
source: {
lsn: number;
txId: number;
};
}
At-Least-Once Delivery and Idempotent Consumers
The outbox pattern provides at-least-once delivery: events are published at least once, possibly multiple times. Consumers must be idempotent.
class OrderEventConsumer {
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
const eventId = `order-created:${event.aggregateId}`;
// Check if we've processed this event before
const processed = await this.db.query(
`SELECT id FROM processed_events WHERE event_id = $1`,
[eventId]
);
if (processed.rows.length > 0) {
return; // Already processed
}
try {
// Process the event
await this.db.query(
`INSERT INTO order_notifications (order_id, type, sent_at)
VALUES ($1, 'created', NOW())`,
[event.aggregateId]
);
// Record that we've processed this event
await this.db.query(
`INSERT INTO processed_events (event_id, processed_at)
VALUES ($1, NOW())`,
[eventId]
);
} catch (error) {
console.error(`Failed to process event ${eventId}:`, error);
// Don't mark as processed; will retry
throw error;
}
}
}
class InventoryEventConsumer {
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
const eventId = `order-created:${event.aggregateId}`;
// Upsert: only reserve if not already reserved
const result = await this.db.query(
`INSERT INTO reserved_inventory (order_id, items, reserved_at)
VALUES ($1, $2, NOW())
ON CONFLICT (order_id) DO NOTHING
RETURNING id`,
[event.aggregateId, JSON.stringify(event.payload.items)]
);
if (result.rows.length === 0) {
return; // Already reserved by a previous message
}
// Process reservation...
}
}
Inbox Pattern for Deduplication
Paired with the outbox pattern, an inbox in the consumer prevents processing duplicates.
class InboxPattern {
async processIncomingEvent(event: any): Promise<void> {
const inboxId = `${event.source}:${event.eventId}`;
// Start transaction
const conn = await this.pool.connect();
await conn.query('BEGIN');
try {
// Write to inbox first
const result = await conn.query(
`INSERT INTO inbox (source_id, event_id, event_type, payload, received_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (source_id, event_id) DO NOTHING
RETURNING id`,
[event.source, event.eventId, event.type, JSON.stringify(event)]
);
if (result.rows.length === 0) {
// Already in inbox; was this processed?
const processed = await conn.query(
`SELECT processed_at FROM inbox
WHERE source_id = $1 AND event_id = $2`,
[event.source, event.eventId]
);
if (processed.rows[0].processed_at) {
return; // Already fully processed
}
// Else: in inbox but not processed yet; continue below
}
// Process the event (this must be idempotent)
await this.handleEvent(conn, event);
// Mark as processed
await conn.query(
`UPDATE inbox SET processed_at = NOW()
WHERE source_id = $1 AND event_id = $2`,
[event.source, event.eventId]
);
await conn.query('COMMIT');
} catch (error) {
await conn.query('ROLLBACK');
throw error;
} finally {
conn.release();
}
}
private async handleEvent(conn: any, event: any): Promise<void> {
// Process event with provided connection (same transaction)
if (event.type === 'OrderCreated') {
await conn.query(
`INSERT INTO customer_orders (customer_id, order_id)
VALUES ($1, $2)
ON CONFLICT DO NOTHING`,
[event.customerId, event.orderId]
);
}
}
}
Testing Outbox with Testcontainers and Kafka
describe('OutboxPattern', () => {
let kafkaContainer: StartedTestContainer;
let postgresContainer: StartedTestContainer;
let publisher: PollingPublisher;
beforeAll(async () => {
kafkaContainer = await new GenericContainer('confluentinc/cp-kafka:7.5.0')
.withEnvironment({
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181',
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092',
})
.withExposedPorts(9092)
.start();
postgresContainer = await new PostgreSqlContainer()
.withDatabase('testdb')
.withUsername('test')
.withPassword('test')
.start();
const pool = new Pool({
host: postgresContainer.getHost(),
port: postgresContainer.getPort(),
database: 'testdb',
user: 'test',
password: 'test',
});
publisher = new PollingPublisher(pool, kafkaContainer.getHost(), kafkaContainer.getMappedPort(9092));
});
it('should publish events from outbox to Kafka', async () => {
const consumer = new KafkaConsumer(kafkaContainer.getHost(), kafkaContainer.getMappedPort(9092));
await consumer.subscribe('order.events');
// Insert event to outbox
await pool.query(
`INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, published)
VALUES ($1, $2, $3, $4, FALSE)`,
['order-123', 'Order', 'OrderCreated', JSON.stringify({ amount: 100 })]
);
// Poll should publish it
await publisher.poll();
// Verify message on Kafka
const messages = await consumer.readMessages({ maxAttempts: 10 });
expect(messages).toHaveLength(1);
expect(messages[0].value).toContain('OrderCreated');
});
it('should retry publishing on failure', async () => {
const failingPublisher = new FailingPollingPublisher();
failingPublisher.failNextAttempts(2);
await pool.query(
`INSERT INTO outbox (aggregate_id, aggregate_type, event_type, payload, published)
VALUES ($1, $2, $3, $4, FALSE)`,
['order-456', 'Order', 'OrderCreated', JSON.stringify({ amount: 200 })]
);
// First two polls fail; third succeeds
await failingPublisher.poll(); // Fails, event stays unpublished
await failingPublisher.poll(); // Fails, event stays unpublished
await failingPublisher.poll(); // Succeeds
const unpublished = await pool.query(`SELECT id FROM outbox WHERE published = FALSE`);
expect(unpublished.rows).toHaveLength(0);
});
afterAll(async () => {
await kafkaContainer.stop();
await postgresContainer.stop();
});
});
Checklist
- Design outbox table with publish status and timestamp indices
- Write to outbox atomically with your business transaction
- Implement polling publisher or set up Debezium CDC
- Make all consumers idempotent (check for event ID before processing)
- Use inbox pattern in critical consumers to prevent duplicate processing
- Test failure scenarios: Kafka down, polling stuck, CDC lag
- Monitor outbox table size and publish lag
- Implement cleanup policy for old published events
- Use correlation/trace IDs to follow events through system
Conclusion
The transactional outbox pattern eliminates the dual-write problem by leveraging atomicity of a single database. Pair it with idempotent consumers and an inbox pattern for reliability. Polling is simple but has latency; CDC is faster but adds operational complexity. Start with polling, migrate to CDC if needed.