- Published on
Event-Driven Architecture in Practice — From Direct Calls to Async Events
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Synchronous service calls create tight coupling: if one service is slow, everything waits. If you need to add a new consumer, old code changes. Event-driven architecture inverts this: services emit events about what happened, and any number of consumers can react asynchronously. Netflix, Uber, and Amazon are built on this pattern.
- Events vs Commands vs Queries
- CloudEvents Specification for Interoperability
- Event Schema Registry (Confluent, AWS Glue)
- Domain Events vs Integration Events
- Event Versioning and Backwards Compatibility
- Consumer Group Strategy for Event Fanout
- Event Sourcing vs Event-Driven (Different Concepts)
- Observability for Event Flows
- Checklist
- Conclusion
Events vs Commands vs Queries
These three patterns often get confused. Each solves different problems:
// QUERY: Request data synchronously, expect immediate response
async function getUser(userId: string): Promise<User> {
const response = await fetch(`/users/${userId}`)
return response.json()
}
// COMMAND: Request action, expect synchronous result or error
async function createOrder(items: CartItem[]): Promise<OrderCreated> {
const response = await fetch('/orders', {
method: 'POST',
body: JSON.stringify({ items })
})
if (!response.ok) throw new Error('Order creation failed')
return response.json()
}
// EVENT: Notification that something happened, fire-and-forget
async function publishOrderCreated(event: OrderCreatedEvent): Promise<void> {
await eventBus.publish('order.created', event)
// We don't wait for consumers
// We don't know if they succeeded
// Multiple consumers can react
}
// Mixing patterns creates problems:
// Problem 1: Synchronous call chains
function checkout(cartId: string) {
const order = createOrder(cartId) // Slow: waits for all logic
const payment = processPayment(order.id) // Slow: waits for payment processor
const notification = sendConfirmationEmail(order.id) // Slow: waits for email service
return order
}
// Better: Emit event, let consumers handle async work
function checkout(cartId: string) {
const order = createOrder(cartId) // Fast: just creates order
eventBus.publish('order.created', { orderId: order.id })
// Email, payment confirmation, inventory update all happen async
return order
}
CloudEvents Specification for Interoperability
CloudEvents is a standard format for events across systems. Whether you use Kafka, RabbitMQ, or AWS EventBridge, CloudEvents keeps you portable:
// CloudEvents standard structure
interface CloudEvent {
// Required
specversion: '1.0'
type: string // e.g., 'com.example.orders.created'
source: string // e.g., '/orders-service'
id: string // Unique event ID
time: string // ISO 8601 timestamp
// Optional but recommended
datacontenttype: 'application/json'
subject?: string // e.g., 'orders/123'
dataschema?: string // URL to event schema
data?: Record<string, any>
}
// Implementation
import { v4 as uuidv4 } from 'uuid'
export class CloudEventFactory {
static create<T = Record<string, any>>(
type: string,
source: string,
data: T,
subject?: string
): CloudEvent {
return {
specversion: '1.0',
type,
source,
id: uuidv4(),
time: new Date().toISOString(),
datacontenttype: 'application/json',
subject,
dataschema: `https://api.example.com/schemas/${type}`,
data
}
}
}
// Usage
const event = CloudEventFactory.create(
'com.example.orders.created',
'/orders-service',
{
orderId: '123',
userId: 'user456',
total: 99.99,
items: [{ productId: 'p1', qty: 2 }]
},
'orders/123'
)
// Now this event can be published to any CloudEvents-compatible broker
// Consumers in any language can understand it
Event Schema Registry (Confluent, AWS Glue)
As events evolve, you need schema versioning. A schema registry prevents incompatible events from being published:
// Schema registry workflow:
// 1. Define schema in registry
// 2. Services check schema before publishing
// 3. Consumers know what to expect
// src/schemas/order-created.schema.json
{
"$id": "https://api.example.com/schemas/orders/created.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "OrderCreated",
"version": 1,
"type": "object",
"required": ["orderId", "userId", "total"],
"properties": {
"orderId": {
"type": "string",
"format": "uuid"
},
"userId": {
"type": "string",
"format": "uuid"
},
"total": {
"type": "number",
"minimum": 0
},
"items": {
"type": "array",
"items": {
"type": "object",
"required": ["productId", "quantity"],
"properties": {
"productId": { "type": "string" },
"quantity": { "type": "integer", "minimum": 1 }
}
}
},
"createdAt": {
"type": "string",
"format": "date-time"
}
}
}
// Schema registry client
import axios from 'axios'
import Ajv from 'ajv'
export class SchemaRegistry {
private schemas: Map<string, any> = new Map()
private ajv = new Ajv()
async registerSchema(name: string, schema: any): Promise<number> {
const response = await axios.post(
`${process.env.SCHEMA_REGISTRY_URL}/subjects/${name}/versions`,
{ schema: JSON.stringify(schema) }
)
return response.data.id
}
async getSchema(name: string): Promise<any> {
if (this.schemas.has(name)) {
return this.schemas.get(name)
}
const response = await axios.get(
`${process.env.SCHEMA_REGISTRY_URL}/subjects/${name}/versions/latest`
)
this.schemas.set(name, response.data.schema)
return response.data.schema
}
async validateEvent(schemaName: string, event: any): Promise<boolean> {
const schema = await this.getSchema(schemaName)
const validate = this.ajv.compile(schema)
return validate(event)
}
}
// Usage in event publishing
export class OrderService {
constructor(
private eventBus: EventBus,
private schemaRegistry: SchemaRegistry
) {}
async createOrder(request: CreateOrderRequest): Promise<Order> {
const order = new Order(request)
await this.repository.save(order)
const event = {
orderId: order.id,
userId: request.userId,
total: order.total,
items: order.items,
createdAt: new Date().toISOString()
}
// Validate against schema before publishing
const isValid = await this.schemaRegistry.validateEvent('OrderCreated', event)
if (!isValid) {
throw new Error('Event does not match schema')
}
await this.eventBus.publish('order.created', event)
return order
}
}
Domain Events vs Integration Events
Domain events model business occurrences. Integration events notify other services. They're different tools:
// Domain events: Occur within your aggregate, internal to service
export class Order {
private domainEvents: DomainEvent[] = []
constructor(items: OrderItem[]) {
this.items = items
this.status = 'pending'
// When order is created, something happened in this domain
this.addDomainEvent(new OrderCreatedDomainEvent(this.id))
}
confirm(paymentId: string): void {
this.status = 'confirmed'
this.paymentId = paymentId
// Business rule: when payment confirmed, order confirmed
this.addDomainEvent(new OrderConfirmedDomainEvent(this.id, paymentId))
}
private addDomainEvent(event: DomainEvent): void {
this.domainEvents.push(event)
}
getDomainEvents(): DomainEvent[] {
return this.domainEvents
}
}
// After order is persisted, publish integration events
export class OrderRepository {
async save(order: Order): Promise<void> {
// Persist order
await this.db.orders.create({
id: order.id,
status: order.status
})
// Convert domain events to integration events and publish
const domainEvents = order.getDomainEvents()
for (const domainEvent of domainEvents) {
const integrationEvent = this.toIntegrationEvent(domainEvent)
await this.eventBus.publish(integrationEvent)
}
order.clearDomainEvents()
}
private toIntegrationEvent(domainEvent: DomainEvent): IntegrationEvent {
if (domainEvent instanceof OrderCreatedDomainEvent) {
return {
type: 'order.created',
source: 'orders-service',
data: {
orderId: domainEvent.orderId,
timestamp: new Date().toISOString()
}
}
}
throw new Error('Unknown domain event')
}
}
// Other services consume integration events
export class NotificationService {
constructor(private eventBus: EventBus) {
this.eventBus.subscribe('order.created', this.onOrderCreated.bind(this))
}
private async onOrderCreated(event: IntegrationEvent): Promise<void> {
await this.emailSender.send({
to: 'customer@example.com',
subject: 'Order Confirmed',
body: `Your order ${event.data.orderId} has been created`
})
}
}
Event Versioning and Backwards Compatibility
Events evolve. Handle it gracefully:
// Version 1: Original event
{
"eventType": "order.created",
"orderId": "123",
"total": 99.99
}
// Version 2: Add optional field (backwards compatible)
{
"eventType": "order.created",
"version": 2,
"orderId": "123",
"total": 99.99,
"currency": "USD" // New, optional
}
// Version 3: Add required field (NOT backwards compatible, handle carefully)
{
"eventType": "order.created",
"version": 3,
"orderId": "123",
"total": 99.99,
"currency": "USD",
"userId": "user456" // New, required
}
// Handle version differences in consumer
export class OrderNotificationConsumer {
async handle(event: any): Promise<void> {
// Support multiple versions
const orderId = event.orderId
const total = event.total
const currency = event.currency || 'USD' // Default for v1
const userId = event.userId // May not exist in v1-2
if (!event.userId) {
// For old events without userId, look it up
const order = await this.orderRepository.findById(orderId)
event.userId = order.userId
}
await this.sendNotification(orderId, total, userId)
}
}
// Better: Use discriminated unions for type safety
type OrderCreatedEventV1 = {
version: 1
orderId: string
total: number
}
type OrderCreatedEventV2 = {
version: 2
orderId: string
total: number
currency: string
}
type OrderCreatedEventV3 = {
version: 3
orderId: string
total: number
currency: string
userId: string
}
type OrderCreatedEvent = OrderCreatedEventV1 | OrderCreatedEventV2 | OrderCreatedEventV3
function handleOrderCreated(event: OrderCreatedEvent): void {
switch (event.version) {
case 1:
// Handle V1
break
case 2:
// Handle V2
break
case 3:
// Handle V3
break
}
}
Consumer Group Strategy for Event Fanout
Kafka consumer groups ensure each consumer gets events, but multiple instances don't duplicate work:
// Kafka setup with consumer groups
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'notifications-service',
brokers: ['kafka:9092']
})
// Consumer group 1: Notifications
// If you have 5 notification-service instances, each gets different partitions
const notificationsConsumer = kafka.consumer({ groupId: 'notifications-service' })
// Consumer group 2: Analytics
// Analytics-service instances form separate group
const analyticsConsumer = kafka.consumer({ groupId: 'analytics-service' })
// Same event goes to both groups, but each instance only processes its partition
await notificationsConsumer.subscribe({ topic: 'orders', fromBeginning: false })
await analyticsConsumer.subscribe({ topic: 'orders', fromBeginning: false })
// Notifications group: 5 instances, 5 partitions → each instance gets 1 partition
await notificationsConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Partition ${partition}: ${message.value}`)
// Instance A handles partition 0
// Instance B handles partition 1
// ... etc
}
})
// Analytics group: Can have different number of instances
// Handles topic from start to catch up on history
await analyticsConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Processes all historical orders, not just new ones
}
})
// Key insight: Same topic, different consumer groups = parallel fanout
// Multiple services can independently consume same events
Event Sourcing vs Event-Driven (Different Concepts)
These are often confused but solve different problems:
// EVENT-DRIVEN: Services emit events about what happened
// Other services listen and react
// Events are the communication mechanism between services
class OrderService {
async createOrder(request: CreateOrderRequest): Promise<Order> {
const order = new Order(request)
await this.repository.save(order)
// Emit event so other services know about it
await this.eventBus.publish('order.created', {
orderId: order.id,
userId: request.userId,
total: order.total
})
return order
}
}
// EVENT SOURCING: Store the complete history of changes as events
// Rebuild state by replaying events
// Offers perfect audit trail, time travel, event replay
class OrderEventStore {
async createOrder(request: CreateOrderRequest): Promise<Order> {
const orderId = crypto.randomUUID()
// Store event in event store (immutable log)
await this.eventStore.append('Order', orderId, {
type: 'OrderCreated',
orderId,
userId: request.userId,
items: request.items,
total: request.total,
timestamp: new Date()
})
// Rebuild order state from event log
const order = await this.rebuildOrderFromEvents(orderId)
return order
}
async confirmOrder(orderId: string, paymentId: string): Promise<Order> {
// Append confirmation event
await this.eventStore.append('Order', orderId, {
type: 'OrderConfirmed',
orderId,
paymentId,
timestamp: new Date()
})
// Rebuild order with new state
const order = await this.rebuildOrderFromEvents(orderId)
return order
}
private async rebuildOrderFromEvents(orderId: string): Promise<Order> {
const events = await this.eventStore.getEvents('Order', orderId)
let order = null
for (const event of events) {
switch (event.type) {
case 'OrderCreated':
order = new Order(
event.orderId,
event.userId,
event.items,
event.total
)
break
case 'OrderConfirmed':
order.confirm(event.paymentId)
break
}
}
return order
}
}
// Use both together:
// - Event Sourcing for audit trail and state reconstruction
// - Event-Driven for inter-service communication
// Events from event store can be published to event bus
Observability for Event Flows
Event flows are hard to debug if you can't see them. Implement tracing:
// Distributed tracing for events
import { trace } from '@opentelemetry/api'
const tracer = trace.getTracer('orders-service')
export class OrderService {
async createOrder(request: CreateOrderRequest): Promise<Order> {
const span = tracer.startSpan('createOrder')
try {
span.addEvent('order_creation_started', {
userId: request.userId,
itemCount: request.items.length
})
const order = new Order(request)
await this.repository.save(order)
// Pass trace ID to event so consumers can track it
const traceId = span.spanContext().traceId
await this.eventBus.publish('order.created', {
orderId: order.id,
userId: request.userId,
total: order.total,
traceId // Enable end-to-end tracing
})
span.addEvent('order_created', { orderId: order.id })
return order
} catch (error) {
span.recordException(error)
throw error
} finally {
span.end()
}
}
}
// Consumer can continue trace
export class NotificationConsumer {
async handle(event: OrderCreatedEvent): Promise<void> {
// Create child span under event's trace
const span = tracer.startSpan('send_order_notification', {
attributes: {
'trace_id': event.traceId
}
})
try {
await this.emailSender.send({
to: 'customer@example.com',
subject: 'Order Confirmed'
})
span.addEvent('email_sent')
} finally {
span.end()
}
}
}
// Dashboards show: Order created → Email sent → Analytics recorded
// All linked by trace ID, even across services
Checklist
- Events follow CloudEvents spec for portability
- Event schema registry enforces compatibility
- Domain events modeled in aggregates
- Domain events converted to integration events before publishing
- Event versioning strategy documented
- Backwards-compatible schema evolution planned
- Multiple consumer groups handle fanout correctly
- Consumers are idempotent (safe to retry)
- Distributed tracing connects event flows
- Dead letter queue configured for failed consumers
- Event retention policy defined
- Consumer lag monitored in production
Conclusion
Event-driven architecture scales horizontally: add new consumers without changing publishers. It handles asynchronous work elegantly and decouples services cleanly. But it requires discipline: events must be well-defined, versioning must be thought through, and observability must be built in from day one. When done right, it's the foundation of systems that scale.