Published on

CQRS Implementation — Separate Command and Query Models for High-Scale Apps

Authors

Introduction

CQRS separates read and write logic, allowing you to optimize each independently. Commands modify state, queries read state. They may use different databases, different schemas, even different technologies. Combined with event sourcing, CQRS becomes powerful. Used alone, it's simpler: write to a normalized database, read from denormalized projections. This post covers both approaches, implementing command and query buses, handling eventual consistency, building projections, and recognizing when CQRS adds complexity without value.

Command Bus Pattern

A command bus routes commands to handlers. Commands are requests to change state (must succeed or fail atomically).

import { v4 as uuid } from 'uuid';

// Commands are immutable value objects
abstract class Command {
  readonly id: string = uuid();
  readonly timestamp: Date = new Date();
  readonly userId: string;

  constructor(userId: string) {
    this.userId = userId;
  }
}

class CreatePostCommand extends Command {
  constructor(
    userId: string,
    readonly title: string,
    readonly content: string,
    readonly tags: string[]
  ) {
    super(userId);
  }
}

class PublishPostCommand extends Command {
  constructor(
    userId: string,
    readonly postId: string
  ) {
    super(userId);
  }
}

class SuspendUserCommand extends Command {
  constructor(
    userId: string,
    readonly targetUserId: string,
    readonly reason: string
  ) {
    super(userId);
  }
}

// Command handlers implement business logic
interface CommandHandler<T extends Command> {
  handle(command: T): Promise<void | unknown>;
}

class CreatePostCommandHandler implements CommandHandler<CreatePostCommand> {
  constructor(private db: Database) {}

  async handle(command: CreatePostCommand): Promise<string> {
    // Validate authorization
    const user = await this.db.user.findById(command.userId);
    if (!user || user.status === 'suspended') {
      throw new Error('User not authorized to create posts');
    }

    // Validate input
    if (command.title.length < 5 || command.content.length < 10) {
      throw new Error('Invalid post content');
    }

    // Execute command
    const post = await this.db.transaction(async trx => {
      return await trx.post.create({
        id: uuid(),
        authorId: command.userId,
        title: command.title,
        slug: slugify(command.title),
        content: command.content,
        tags: command.tags,
        createdAt: command.timestamp,
      });
    });

    return post.id;
  }
}

class PublishPostCommandHandler implements CommandHandler<PublishPostCommand> {
  constructor(
    private db: Database,
    private eventBus: EventBus
  ) {}

  async handle(command: PublishPostCommand): Promise<void> {
    const post = await this.db.post.findById(command.postId);
    if (!post) {
      throw new Error('Post not found');
    }

    if (post.authorId !== command.userId && !isAdmin(command.userId)) {
      throw new Error('Not authorized');
    }

    if (post.published) {
      throw new Error('Post already published');
    }

    await this.db.transaction(async trx => {
      await trx.post.update(command.postId, {
        published: true,
        publishedAt: new Date(),
      });

      await this.eventBus.publish({
        type: 'PostPublished',
        aggregateId: command.postId,
        data: {
          authorId: post.authorId,
          title: post.title,
          tags: post.tags,
        },
        timestamp: new Date(),
      });
    });
  }
}

// Command bus routes commands to handlers
class CommandBus {
  private handlers = new Map<typeof Command, CommandHandler<any>>();

  register<T extends Command>(
    commandType: new (...args: any[]) => T,
    handler: CommandHandler<T>
  ): void {
    this.handlers.set(commandType, handler);
  }

  async execute<T extends Command>(command: T): Promise<any> {
    const handler = this.handlers.get(command.constructor as typeof Command);
    if (!handler) {
      throw new Error(`No handler for ${command.constructor.name}`);
    }

    try {
      return await handler.handle(command);
    } catch (error) {
      console.error(`Command failed: ${command.constructor.name}`, error);
      throw error;
    }
  }
}

// Setup
const commandBus = new CommandBus();
commandBus.register(CreatePostCommand, new CreatePostCommandHandler(db));
commandBus.register(PublishPostCommand, new PublishPostCommandHandler(db, eventBus));

// Usage
const postId = await commandBus.execute(
  new CreatePostCommand('user-123', 'My Post', 'Content here', ['nodejs', 'cqrs'])
);

await commandBus.execute(new PublishPostCommand('user-123', postId));

Query Bus Pattern

Queries request information without side effects. Use a separate read model optimized for each query.

// Queries are read-only requests
abstract class Query<T> {
  readonly id: string = uuid();
  readonly timestamp: Date = new Date();
}

class GetPostQuery extends Query<PostDetail> {
  constructor(readonly postId: string) {
    super();
  }
}

class ListPublishedPostsQuery extends Query<PostSummary[]> {
  constructor(
    readonly page: number = 1,
    readonly limit: number = 20,
    readonly tags?: string[]
  ) {
    super();
  }
}

class GetUserStatsQuery extends Query<UserStats> {
  constructor(readonly userId: string) {
    super();
  }
}

// Query handlers fetch from read models
interface QueryHandler<T extends Query<R>, R> {
  handle(query: T): Promise<R>;
}

interface PostDetail {
  id: string;
  title: string;
  content: string;
  authorName: string;
  tags: string[];
  publishedAt: Date;
  viewCount: number;
  commentCount: number;
}

class GetPostQueryHandler implements QueryHandler<GetPostQuery, PostDetail> {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: GetPostQuery): Promise<PostDetail> {
    // Query optimized read model
    return this.readDb.query(`
      SELECT
        p.id, p.title, p.content,
        u.display_name as authorName,
        p.tags,
        p.published_at as publishedAt,
        p.view_count as viewCount,
        COUNT(DISTINCT c.id) as commentCount
      FROM post_read_model p
      LEFT JOIN user_read_model u ON p.author_id = u.id
      LEFT JOIN comments c ON p.id = c.post_id
      WHERE p.id = $1
      GROUP BY p.id, u.id
    `, [query.postId]);
  }
}

interface PostSummary {
  id: string;
  title: string;
  excerpt: string;
  authorName: string;
  publishedAt: Date;
}

class ListPublishedPostsQueryHandler
  implements QueryHandler<ListPublishedPostsQuery, PostSummary[]>
{
  constructor(private cache: Cache, private readDb: ReadDatabase) {}

  async handle(query: ListPublishedPostsQuery): Promise<PostSummary[]> {
    // Try cache first
    const cacheKey = `posts:${query.page}:${query.limit}:${(query.tags || []).join(',')}`;
    const cached = await this.cache.get(cacheKey);
    if (cached) return cached;

    let sql = `
      SELECT id, title, excerpt, display_name as authorName, published_at
      FROM post_read_model
      WHERE published = true
    `;
    const params: any[] = [];

    if (query.tags?.length) {
      sql += ` AND tags && $1`;
      params.push(query.tags);
    }

    sql += ` ORDER BY published_at DESC LIMIT $${params.length + 1} OFFSET $${params.length + 2}`;
    params.push(query.limit, (query.page - 1) * query.limit);

    const results = await this.readDb.query(sql, params);

    // Cache for 5 minutes
    await this.cache.set(cacheKey, results, 300);
    return results;
  }
}

interface UserStats {
  userId: string;
  username: string;
  postCount: number;
  totalViews: number;
  followerCount: number;
  avgCommentsPerPost: number;
}

class GetUserStatsQueryHandler implements QueryHandler<GetUserStatsQuery, UserStats> {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: GetUserStatsQuery): Promise<UserStats> {
    return this.readDb.query(`
      SELECT
        u.id as userId,
        u.username,
        COALESCE(stats.post_count, 0)::int as postCount,
        COALESCE(stats.total_views, 0)::int as totalViews,
        COALESCE(stats.follower_count, 0)::int as followerCount,
        COALESCE(stats.avg_comments_per_post, 0)::decimal as avgCommentsPerPost
      FROM user_read_model u
      LEFT JOIN user_stats_read_model stats ON u.id = stats.user_id
      WHERE u.id = $1
    `, [query.userId]);
  }
}

// Query bus
class QueryBus {
  private handlers = new Map<typeof Query, QueryHandler<any, any>>();

  register<T extends Query<R>, R>(
    queryType: new (...args: any[]) => T,
    handler: QueryHandler<T, R>
  ): void {
    this.handlers.set(queryType, handler);
  }

  async execute<T extends Query<R>, R>(query: T): Promise<R> {
    const handler = this.handlers.get(query.constructor as typeof Query);
    if (!handler) {
      throw new Error(`No handler for ${query.constructor.name}`);
    }

    return handler.handle(query);
  }
}

const queryBus = new QueryBus();
queryBus.register(GetPostQuery, new GetPostQueryHandler(readDb));
queryBus.register(ListPublishedPostsQuery, new ListPublishedPostsQueryHandler(cache, readDb));
queryBus.register(GetUserStatsQuery, new GetUserStatsQueryHandler(readDb));

// Usage
const post = await queryBus.execute(new GetPostQuery('post-123'));
const posts = await queryBus.execute(new ListPublishedPostsQuery(1, 20, ['nodejs']));
const stats = await queryBus.execute(new GetUserStatsQuery('user-123'));

Eventual Consistency Between Write and Read Models

Commands write to the write model; domain events update read models asynchronously.

import { EventEmitter } from 'events';

type DomainEvent =
  | {
      type: 'PostCreated';
      data: { postId: string; authorId: string; title: string; tags: string[] };
    }
  | {
      type: 'PostPublished';
      data: { postId: string; publishedAt: Date };
    }
  | {
      type: 'PostViewed';
      data: { postId: string; viewedAt: Date };
    }
  | {
      type: 'CommentAdded';
      data: { postId: string; authorId: string; commentId: string };
    };

class PostCreatedProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: Extract<DomainEvent, { type: 'PostCreated' }>): Promise<void> {
    await this.readDb.execute(
      `INSERT INTO post_read_model (id, author_id, title, tags, published, created_at)
       VALUES ($1, $2, $3, $4, false, NOW())`,
      [event.data.postId, event.data.authorId, event.data.title, event.data.tags]
    );
  }
}

class PostPublishedProjection {
  constructor(private readDb: ReadDatabase, private cache: Cache) {}

  async handle(event: Extract<DomainEvent, { type: 'PostPublished' }>): Promise<void> {
    await this.readDb.execute(
      `UPDATE post_read_model SET published = true, published_at = $1 WHERE id = $2`,
      [event.data.publishedAt, event.data.postId]
    );

    // Invalidate cache
    await this.cache.delete(`post:${event.data.postId}`);
  }
}

class UserStatsProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    if (event.type === 'PostCreated') {
      await this.readDb.execute(
        `INSERT INTO user_stats_read_model (user_id, post_count) VALUES ($1, 1)
         ON CONFLICT (user_id) DO UPDATE SET post_count = post_count + 1`,
        [event.data.authorId]
      );
    }

    if (event.type === 'CommentAdded') {
      await this.readDb.execute(
        `UPDATE user_stats_read_model SET avg_comments_per_post = (
          SELECT SUM(comment_count) / NULLIF(post_count, 0)
          FROM user_posts_view WHERE user_id = $1
        ) WHERE user_id = $1`,
        [event.data.authorId]
      );
    }
  }
}

// Event dispatcher publishes domain events to all projections
class EventDispatcher extends EventEmitter {
  private projections: Map<string, any> = new Map();

  registerProjection(eventType: string, projection: any): void {
    this.projections.set(eventType, projection);
  }

  async dispatch(event: DomainEvent): Promise<void> {
    const projection = this.projections.get(event.type);
    if (!projection) return;

    try {
      await projection.handle(event);
    } catch (error) {
      console.error(`Projection failed for event ${event.type}:`, error);
      // Store failed event for retry
      await this.storeFailedEvent(event, error);
    }
  }

  private async storeFailedEvent(event: DomainEvent, error: Error): Promise<void> {
    // Implement dead letter queue for replay
  }
}

const dispatcher = new EventDispatcher();
dispatcher.registerProjection('PostCreated', new PostCreatedProjection(readDb));
dispatcher.registerProjection('PostPublished', new PostPublishedProjection(readDb, cache));
dispatcher.registerProjection('PostViewed', new UserStatsProjection(readDb));

// After command succeeds
class CreatePostCommandHandler implements CommandHandler<CreatePostCommand> {
  async handle(command: CreatePostCommand): Promise<string> {
    const postId = uuid();

    await this.db.transaction(async trx => {
      // Write to normalized write model
      const post = await trx.post.create({
        id: postId,
        authorId: command.userId,
        title: command.title,
        content: command.content,
        tags: command.tags,
      });

      // Publish event (asynchronously update read models)
      await dispatcher.dispatch({
        type: 'PostCreated',
        data: {
          postId: post.id,
          authorId: post.authorId,
          title: post.title,
          tags: post.tags,
        },
      });
    });

    return postId;
  }
}

CQRS Without Event Sourcing

Build CQRS with a standard write database and denormalized read models.

// Write side: normalized tables, standard ACID
interface WriteDatabase {
  post: {
    create(data: any): Promise<any>;
    update(id: string, data: any): Promise<any>;
    delete(id: string): Promise<void>;
  };
  comment: {
    create(data: any): Promise<any>;
  };
}

// Read side: denormalized, optimized for queries
interface ReadDatabase {
  query(sql: string, params?: any[]): Promise<any>;
  execute(sql: string, params?: any[]): Promise<any>;
}

// Synchronization: after write succeeds, update read models
class PostCommandHandler {
  constructor(
    private writeDb: WriteDatabase,
    private readDb: ReadDatabase
  ) {}

  async handle(command: CreatePostCommand): Promise<string> {
    // Execute write (ACID guaranteed)
    const post = await this.writeDb.post.create({
      id: uuid(),
      authorId: command.userId,
      title: command.title,
      content: command.content,
      tags: command.tags,
    });

    // Update read models (best effort, async OK)
    try {
      await this.readDb.execute(
        `INSERT INTO post_read_model (id, author_id, title, tags)
         VALUES ($1, $2, $3, $4)`,
        [post.id, post.authorId, post.title, post.tags]
      );

      // Update user stats
      await this.readDb.execute(
        `UPDATE user_stats_read_model SET post_count = post_count + 1 WHERE user_id = $1`,
        [post.authorId]
      );
    } catch (error) {
      // Log error, trigger rebuild of read models from write side
      console.error('Read model update failed:', error);
      triggerReadModelRebuild();
    }

    return post.id;
  }
}

// Periodic rebuild of read models from write side
async function rebuildReadModels(): Promise<void> {
  const posts = await writeDb.query('SELECT * FROM posts');
  const users = await writeDb.query('SELECT * FROM users');

  // Truncate read models
  await readDb.execute('TRUNCATE post_read_model');
  await readDb.execute('TRUNCATE user_stats_read_model');

  // Rebuild from write side
  for (const post of posts) {
    await readDb.execute(
      `INSERT INTO post_read_model (id, author_id, title, tags, published, published_at)
       VALUES ($1, $2, $3, $4, $5, $6)`,
      [post.id, post.author_id, post.title, post.tags, post.published, post.published_at]
    );
  }

  console.log('Read models rebuilt');
}

Testing CQRS Systems

Test command and query logic in isolation.

describe('PostCommands', () => {
  let commandBus: CommandBus;
  let mockDb: jest.Mocked<Database>;

  beforeEach(() => {
    mockDb = createMockDatabase();
    commandBus = new CommandBus();
    commandBus.register(
      CreatePostCommand,
      new CreatePostCommandHandler(mockDb)
    );
  });

  it('creates post with valid input', async () => {
    const command = new CreatePostCommand(
      'user-123',
      'Test Post',
      'Test content here',
      ['test']
    );

    const postId = await commandBus.execute(command);

    expect(mockDb.post.create).toHaveBeenCalledWith({
      authorId: 'user-123',
      title: 'Test Post',
      content: 'Test content here',
      tags: ['test'],
    });
    expect(postId).toBeDefined();
  });

  it('rejects post with title too short', async () => {
    const command = new CreatePostCommand(
      'user-123',
      'Too',
      'Content',
      []
    );

    await expect(commandBus.execute(command)).rejects.toThrow(
      'Invalid post content'
    );
  });

  it('publishes event on success', async () => {
    const mockEventBus = createMockEventBus();
    const handler = new CreatePostCommandHandler(mockDb, mockEventBus);

    await handler.handle(
      new CreatePostCommand('user-123', 'Valid', 'Valid content', [])
    );

    expect(mockEventBus.publish).toHaveBeenCalledWith(
      expect.objectContaining({
        type: 'PostCreated',
      })
    );
  });
});

describe('PostQueries', () => {
  let queryBus: QueryBus;
  let readDb: jest.Mocked<ReadDatabase>;

  beforeEach(() => {
    readDb = createMockReadDatabase();
    queryBus = new QueryBus();
    queryBus.register(GetPostQuery, new GetPostQueryHandler(readDb));
  });

  it('returns post detail', async () => {
    readDb.query.mockResolvedValue({
      id: 'post-1',
      title: 'Test',
      viewCount: 42,
    });

    const result = await queryBus.execute(new GetPostQuery('post-1'));

    expect(result.id).toBe('post-1');
    expect(result.viewCount).toBe(42);
  });

  it('returns empty list for no posts', async () => {
    readDb.query.mockResolvedValue([]);

    const result = await queryBus.execute(
      new ListPublishedPostsQuery(1, 20)
    );

    expect(result).toHaveLength(0);
  });
});

When CQRS Adds Complexity Without Value

CQRS is not always beneficial. Recognize when simpler approaches are better:

  • Single normalized database with good indexing: Most applications don't need separate read models
  • Real-time consistency required: CQRS introduces eventual consistency complexity
  • Small datasets: Optimization from denormalization irrelevant
  • Simple queries: No benefit from query-specific schemas
  • Team unfamiliar with patterns: Training cost significant

Use CQRS when you have:

  • High read/write ratio disparity
  • Complex queries on denormalized data
  • Performance requirements met only with independent scaling
  • Regulatory need for command audit trails

CQRS Decision Checklist

  • Command side: write to normalized database, validate authorization
  • Query side: read from optimized denormalized read models
  • Projections update read models after commands succeed
  • Cache layer reduces read model queries (Redis, in-memory)
  • Consistency boundaries defined (seconds, minutes?)
  • Failed projections retried or replayed
  • Read models can be rebuilt from write side
  • Testing separates command and query logic
  • Team trained on eventual consistency implications

Conclusion

CQRS enables scaling reads independently, but adds complexity through eventual consistency and projection management. Start with simple command handlers and query database queries. Add read models when queries become performance bottlenecks. Consider event sourcing for richer event history and better projection rebuilding, but don't feel obligated—CQRS works without it.