- Published on
Kafka Consumer Patterns — At-Least-Once, Exactly-Once, and Everything in Between
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Kafka is the backbone of event-driven systems, but consuming from Kafka reliably is non-trivial. Offsets determine where you resume after failure. Consumer groups coordinate work across multiple instances. Rebalancing pauses processing. You must choose between at-least-once (simple, requires idempotent consumers) and exactly-once (complex, lower throughput). We'll navigate these tradeoffs and build resilient consumers.
- Consumer Groups and Partition Assignment
- Manual Offset Commit for At-Least-Once
- Exactly-Once with Transactional Consumers
- Dead Letter Queue for Poison Messages
- Consumer Group Pause/Resume
- Consumer Lag Monitoring
- Batch Processing vs One-at-a-Time
- Testing Kafka Consumers
- Checklist
- Conclusion
Consumer Groups and Partition Assignment
Kafka partitions ensure ordering within a partition, and consumer groups ensure each partition is consumed by exactly one consumer.
import { Kafka, logLevel } from 'kafkajs';
class ConsumerGroupExample {
private kafka = new Kafka({
clientId: 'order-processor',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.ERROR,
retry: {
initialRetryTime: 100,
retries: 8,
maxRetryTime: 30000,
randomizationFactor: 0.2,
multiplier: 2,
},
});
private consumer = this.kafka.consumer({
groupId: 'order-processor-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
rebalanceTimeout: 60000,
});
async startConsuming(): Promise<void> {
await this.consumer.connect();
// Subscribe to topic; Kafka assigns partitions automatically
await this.consumer.subscribe({
topic: 'orders',
fromBeginning: false,
});
// Listen for rebalancing events
this.consumer.on('consumer.rebalance', async event => {
if (event.type === 'REBALANCE_IN_PROGRESS') {
console.log('Rebalancing started...');
// Pause processing if needed
} else if (event.type === 'REBALANCE_FINISHED') {
console.log('Rebalancing finished');
console.log('Assigned partitions:', event.partitions);
// Resume processing
}
});
// Run message loop
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Processing message on partition ${partition}`);
await this.processMessage(message);
},
});
}
private async processMessage(message: any): Promise<void> {
// Implementation
}
}
Manual Offset Commit for At-Least-Once
At-least-once: commit offset AFTER processing succeeds. If processing fails, the consumer retries on restart.
class AtLeastOnceConsumer {
private kafka = new Kafka({
clientId: 'payment-processor',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
private consumer = this.kafka.consumer({
groupId: 'payment-processor-group',
allowAutoTopicCreation: false,
});
async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'payments' });
await this.consumer.run({
// Disable auto-commit; we'll commit manually
autoCommitInterval: null,
autoCommitThreshold: null,
eachMessage: async ({ topic, partition, message }) => {
try {
// Process message
const payment = JSON.parse(message.value!.toString());
await this.processPayment(payment);
// Only commit after successful processing
await this.consumer.commitOffsets([
{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
},
]);
console.log(`Processed and committed offset: ${message.offset}`);
} catch (error) {
console.error(`Failed to process payment:`, error);
// Don't commit; message will be retried on next startup
throw error;
}
},
});
}
private async processPayment(payment: any): Promise<void> {
// Process payment; must be idempotent (can be called multiple times)
const existing = await this.db.query(
`SELECT id FROM payments WHERE idempotency_key = $1`,
[payment.idempotencyKey]
);
if (existing.rows.length > 0) {
return; // Already processed
}
await this.chargeCard(payment.customerId, payment.amount);
await this.db.query(
`INSERT INTO payments (idempotency_key, customer_id, amount, status)
VALUES ($1, $2, $3, 'completed')`,
[payment.idempotencyKey, payment.customerId, payment.amount]
);
}
private async chargeCard(customerId: string, amount: number): Promise<void> {
// Call payment provider
}
}
Exactly-Once with Transactional Consumers
Exactly-once-semantics (EOS): use idempotent producer + transactional consumer. Complex but guarantees no duplicates or losses.
class ExactlyOnceConsumer {
private kafka = new Kafka({
clientId: 'ledger-processor',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
private consumer = this.kafka.consumer({
groupId: 'ledger-processor-group',
});
private admin = this.kafka.admin();
async start(): Promise<void> {
await this.consumer.connect();
await this.admin.connect();
// Set consumer isolation level to READ_COMMITTED
// This ensures we only see messages from committed transactions
const consumerConfig = {
topic: 'transactions',
fromBeginning: false,
isolationLevel: 1, // READ_COMMITTED
};
await this.consumer.subscribe(consumerConfig);
await this.consumer.run({
eachBatch: async ({ payload }) => {
// Process a batch atomically with offset commit
const { topic, partition, messages, resolveOffset, heartbeat, isRunning } = payload;
const batch = messages.map(msg => JSON.parse(msg.value!.toString()));
try {
// Process all messages in batch
for (const message of batch) {
await this.postTransaction(message);
}
// Commit all offsets atomically
resolveOffset(messages[messages.length - 1].offset);
// Send heartbeat to prevent session timeout during long processing
await heartbeat();
} catch (error) {
console.error('Failed to process batch:', error);
// Don't resolve offset; batch will be retried
throw error;
}
},
});
}
private async postTransaction(transaction: any): Promise<void> {
// Idempotent operation: use transaction ID as key
const existing = await this.db.query(
`SELECT id FROM transactions WHERE tx_id = $1`,
[transaction.txId]
);
if (existing.rows.length > 0) {
return; // Already posted
}
// Use transaction for atomicity
const client = await this.db.connect();
try {
await client.query('BEGIN ISOLATION LEVEL SERIALIZABLE');
// Post to ledger
await client.query(
`INSERT INTO ledger (tx_id, account_id, amount, posted_at)
VALUES ($1, $2, $3, NOW())`,
[transaction.txId, transaction.accountId, transaction.amount]
);
// Update balance
await client.query(
`UPDATE accounts SET balance = balance + $1 WHERE id = $2`,
[transaction.amount, transaction.accountId]
);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
Dead Letter Queue for Poison Messages
When a message consistently fails to process, move it to a DLQ instead of blocking the consumer.
class ConsumerWithDLQ {
private maxRetries = 3;
async startConsuming(): Promise<void> {
const consumer = this.kafka.consumer({ groupId: 'order-processor-group' });
const producer = this.kafka.producer();
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const retryCount = this.getRetryCount(message);
try {
await this.processOrder(message);
await consumer.commitOffsets([
{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
},
]);
} catch (error) {
if (retryCount >= this.maxRetries) {
// Move to DLQ
console.error(
`Message failed after ${this.maxRetries} retries, sending to DLQ:`,
error
);
await producer.send({
topic: 'orders-dlq',
messages: [
{
key: message.key,
value: message.value,
headers: {
'x-retry-count': retryCount.toString(),
'x-original-topic': topic,
'x-original-partition': partition.toString(),
'x-error-message': (error as Error).message,
},
},
],
});
// Commit offset to move forward
await consumer.commitOffsets([
{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
},
]);
} else {
// Publish retry with incremented count
await producer.send({
topic, // Re-publish to same topic
messages: [
{
key: message.key,
value: message.value,
headers: {
...message.headers,
'x-retry-count': (retryCount + 1).toString(),
},
timestamp: Date.now().toString(),
},
],
});
}
}
},
});
}
private getRetryCount(message: any): number {
const header = message.headers?.['x-retry-count'];
return header ? parseInt(header.toString()) : 0;
}
private async processOrder(message: any): Promise<void> {
const order = JSON.parse(message.value!.toString());
// Implementation; should throw on transient failures
}
}
Consumer Group Pause/Resume
Pause consumption temporarily to handle backpressure or allow graceful shutdown.
class PausableConsumer {
private running = true;
async start(): Promise<void> {
const consumer = this.kafka.consumer({ groupId: 'processor-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'events' });
const checksum = new BackpressureMonitor();
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Check if we're overloaded
const queueDepth = checksum.getQueueDepth();
if (queueDepth > 10000) {
console.log('Pausing consumer due to backpressure...');
await consumer.pause([{ topic }]);
// Wait for queue to drain
while (checksum.getQueueDepth() > 1000) {
await sleep(1000);
}
console.log('Resuming consumer...');
await consumer.resume([{ topic }]);
}
try {
checksum.increment();
await this.processEvent(message);
} finally {
checksum.decrement();
}
},
});
// Handle graceful shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
this.running = false;
await consumer.pause([{ topic: 'events' }]);
await checksum.waitForDrain(5000);
await consumer.disconnect();
});
}
private async processEvent(message: any): Promise<void> {
// Implementation
}
}
class BackpressureMonitor {
private queue = 0;
increment(): void {
this.queue++;
}
decrement(): void {
this.queue--;
}
getQueueDepth(): number {
return this.queue;
}
async waitForDrain(timeoutMs: number): Promise<void> {
const startTime = Date.now();
while (this.queue > 0 && Date.now() - startTime < timeoutMs) {
await sleep(100);
}
}
}
Consumer Lag Monitoring
Track how far behind consumers are to detect issues early.
class LagMonitor {
async monitorLag(): Promise<void> {
const admin = this.kafka.admin();
await admin.connect();
setInterval(async () => {
const consumer = this.kafka.consumer({ groupId: 'order-processor-group' });
await consumer.connect();
// Get offsets for all partitions in topic
const topicOffsets = await admin.fetchTopicOffsets('orders');
const consumerOffsets = await consumer.fetchOffsets('orders');
for (const partition of topicOffsets) {
const highWaterMark = parseInt(partition.high);
const consumerOffset = parseInt(
consumerOffsets.find(o => o.partition === partition.partition)?.offset || '0'
);
const lag = highWaterMark - consumerOffset;
console.log(`Partition ${partition.partition} lag: ${lag} messages`);
// Alert if lag is high
if (lag > 10000) {
await this.alertSlackChannel(
`High consumer lag detected: ${lag} messages on partition ${partition.partition}`
);
}
// Store in monitoring system
await this.prometheus.histogram('kafka_consumer_lag', lag, {
group: 'order-processor-group',
partition: partition.partition.toString(),
});
}
await consumer.disconnect();
}, 30000); // Check every 30 seconds
}
private async alertSlackChannel(message: string): Promise<void> {
// Implementation
}
private prometheus: any;
}
Batch Processing vs One-at-a-Time
Batch processing is more efficient; one-at-a-time is simpler.
// One-at-a-time (simple, low throughput)
await consumer.run({
eachMessage: async ({ message }) => {
await this.processMessage(message);
},
});
// Batch processing (efficient, complex)
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ payload }) => {
const { messages, resolveOffset, heartbeat } = payload;
const batch = messages.map(msg => JSON.parse(msg.value!.toString()));
// Process in bulk
await this.bulkInsert(batch);
// Resolve all at once
resolveOffset(messages[messages.length - 1].offset);
// Send heartbeat for long-running batches
await heartbeat();
},
});
async function bulkInsert(batch: any[]): Promise<void> {
// Use parameterized insert for efficiency
const placeholders = batch.map((_, i) => `($${i * 2 + 1}, $${i * 2 + 2})`).join(',');
const params = batch.flatMap(item => [item.id, JSON.stringify(item)]);
await db.query(
`INSERT INTO events (id, data) VALUES ${placeholders}
ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data`,
params
);
}
Testing Kafka Consumers
describe('KafkaConsumer', () => {
it('should process messages in order', async () => {
const consumer = new AtLeastOnceConsumer(testKafka);
const producer = testKafka.producer();
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{ key: 'order-1', value: JSON.stringify({ amount: 100 }) },
{ key: 'order-1', value: JSON.stringify({ amount: 200 }) },
{ key: 'order-1', value: JSON.stringify({ amount: 300 }) },
],
});
const processedOrders: any[] = [];
jest.spyOn(consumer, 'processOrder').mockImplementation(async order => {
processedOrders.push(order);
});
await consumer.start();
await sleep(2000); // Wait for processing
expect(processedOrders).toEqual([{ amount: 100 }, { amount: 200 }, { amount: 300 }]);
});
it('should retry on transient failure', async () => {
const consumer = new ConsumerWithDLQ();
let attempts = 0;
jest.spyOn(consumer, 'processOrder').mockImplementation(async () => {
attempts++;
if (attempts < 3) {
throw new Error('Transient failure');
}
});
// Should eventually succeed after retries
// DLQ should remain empty
});
});
Checklist
- Use consumer groups for distributed consumption
- Choose at-least-once (simple) or exactly-once (complex) semantics
- Commit offsets AFTER successful processing
- Implement dead-letter queue for poison messages
- Monitor consumer lag continuously
- Use batch processing for throughput, one-at-a-time for simplicity
- Test rebalancing and consumer failure scenarios
- Implement heartbeats for long-running operations
- Document retry policies and idempotency requirements
- Alert on high consumer lag
Conclusion
Kafka consumers need careful design. Choose between at-least-once (requires idempotency) and exactly-once (adds complexity). Always commit offsets after processing succeeds. Use dead-letter queues for unprocesable messages. Monitor lag vigilantly—it's your early warning system. Test failure scenarios: consumer crashes, rebalancing, broker failures.