- Published on
PostgreSQL Sharding — When to Shard, How to Shard, and What It Costs You
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Sharding is a double-edged sword: it solves single-node scaling limits but introduces cross-shard consistency, query complexity, and operational overhead. Many teams shard too early or choose the wrong shard key, creating years of technical debt. This post covers when you actually need sharding and how to do it right.
- Signs You Need Sharding
- Hash vs Range vs List Partitioning
- Application-Level Sharding with Consistent Hashing
- Citus for Horizontal Scaling
- Shard Key Selection Criteria
- The Cross-Shard Query Problem
- Alternatives to Sharding
- PostgreSQL Sharding Checklist
- Conclusion
Signs You Need Sharding
Single PostgreSQL nodes have hard limits. Recognize the warning signs before you exceed them:
-- Monitor single-node capacity
SELECT
pg_size_pretty(pg_database_size('mydb')) AS db_size,
(SELECT count(*) FROM pg_stat_user_tables) AS table_count;
-- Check table size for largest tables
SELECT
schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
n_live_tup AS row_count
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 10;
-- Estimate storage growth
-- Single-node realistic limits:
-- - ~4TB disk capacity
-- - ~1M write transactions/sec (with good hardware)
-- - ~100K concurrent connections
-- - 10-20TB total working set with caching
-- Your situation calls for sharding if:
-- 1. Single table >100GB and hot (write-heavy)
-- 2. Write throughput consistently > 100K tps
-- 3. Database doubling in size every 3-6 months
-- 4. Read replicas insufficient because bottleneck is write throughput
Before sharding, exhaust these alternatives: better indexing, read replicas, write de-duplication, and caching layers.
Hash vs Range vs List Partitioning
PostgreSQL supports three partitioning strategies. Choose based on query patterns:
-- HASH PARTITIONING: Distribute rows evenly by hash
-- Best for: user_id, account_id (random access patterns)
CREATE TABLE orders (
id BIGSERIAL,
user_id BIGINT NOT NULL,
amount DECIMAL,
created_at TIMESTAMP
) PARTITION BY HASH (user_id);
CREATE TABLE orders_0 PARTITION OF orders FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE orders_1 PARTITION OF orders FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE orders_2 PARTITION OF orders FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE orders_3 PARTITION OF orders FOR VALUES WITH (MODULUS 4, REMAINDER 3);
-- RANGE PARTITIONING: Distribute rows by ordered key
-- Best for: created_at, date, version (time-series, archived data)
CREATE TABLE events (
id BIGSERIAL,
user_id BIGINT,
event_type TEXT,
created_at TIMESTAMP NOT NULL
) PARTITION BY RANGE (created_at);
CREATE TABLE events_2026_01 PARTITION OF events
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE events_2026_02 PARTITION OF events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- LIST PARTITIONING: Distribute by discrete values
-- Best for: country, status, region (categorical data)
CREATE TABLE users (
id BIGSERIAL,
email TEXT,
region TEXT NOT NULL
) PARTITION BY LIST (region);
CREATE TABLE users_us PARTITION OF users FOR VALUES IN ('US');
CREATE TABLE users_eu PARTITION OF users FOR VALUES IN ('EU', 'UK');
CREATE TABLE users_apac PARTITION OF users FOR VALUES IN ('APAC', 'AU');
Hash partitioning enables even load distribution. Range partitioning optimizes time-based queries and retention policies. List partitioning suits categorical data.
Application-Level Sharding with Consistent Hashing
When PostgreSQL native partitioning is insufficient, implement application-level sharding for cross-database distribution:
// Consistent hashing library (install: npm install ketama)
import Ketama from 'ketama';
const shards = [
'postgresql://shard-0.db.internal:5432/app',
'postgresql://shard-1.db.internal:5432/app',
'postgresql://shard-2.db.internal:5432/app',
'postgresql://shard-3.db.internal:5432/app',
];
const hasher = new Ketama(shards);
// Determine shard for user
function getUserShard(userId: string): string {
return hasher.hash(userId); // Returns shard connection string
}
// Query pattern: single-shard (fast)
async function getUserOrders(userId: string) {
const shardUrl = getUserShard(userId);
const client = new pg.Client(shardUrl);
await client.connect();
try {
const result = await client.query(
'SELECT * FROM orders WHERE user_id = $1',
[userId]
);
return result.rows;
} finally {
await client.end();
}
}
// Cross-shard query pattern (slow, use rarely)
async function getAllOrders() {
const results = await Promise.all(
shards.map(shardUrl => {
const client = new pg.Client(shardUrl);
return client.connect()
.then(() => client.query('SELECT * FROM orders'))
.then(r => r.rows)
.finally(() => client.end());
})
);
return results.flat();
}
// Shard rebalancing: move user to new shard
async function rebalanceUser(userId: string, newShardUrl: string) {
const oldShardUrl = getUserShard(userId);
// 1. Read from old shard
const oldClient = new pg.Client(oldShardUrl);
await oldClient.connect();
const data = await oldClient.query(
'SELECT * FROM orders WHERE user_id = $1',
[userId]
);
// 2. Write to new shard
const newClient = new pg.Client(newShardUrl);
await newClient.connect();
for (const row of data.rows) {
await newClient.query(
'INSERT INTO orders VALUES ($1, $2, $3, $4)',
[row.id, row.user_id, row.amount, row.created_at]
);
}
// 3. Delete from old shard
await oldClient.query('DELETE FROM orders WHERE user_id = $1', [userId]);
// 4. Update shard lookup (in your routing service)
// Update consistent hash ring or lookup table
await oldClient.end();
await newClient.end();
}
Consistent hashing minimizes reshuffling when nodes change. Single-shard queries are fast; cross-shard queries are slow.
Citus for Horizontal Scaling
Citus is a PostgreSQL extension enabling distributed queries transparently:
-- Install Citus extension (PostgreSQL 12+ required)
CREATE EXTENSION citus;
-- Create distributed table (coordinator node)
SELECT create_distributed_table('orders', 'user_id');
SELECT create_distributed_table('users', 'id', colocate_with => 'orders');
-- Data automatically sharded across worker nodes
-- Coordinator routes queries transparently
-- Co-located join (efficient: executes on same shard)
EXPLAIN
SELECT o.id, u.email
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.user_id = 42;
-- Cross-shard aggregation (less efficient)
EXPLAIN
SELECT COUNT(*), SUM(amount)
FROM orders
GROUP BY user_id;
-- Add/remove worker nodes
SELECT * from citus_add_node('worker-4.internal', 5432);
SELECT * from citus_remove_node('worker-1.internal', 5432);
-- Monitor shard distribution
SELECT
nodename, count(*) as shard_count
FROM citus_shards
GROUP BY nodename;
Citus handles rebalancing and shard routing automatically. Trade-off: limited SQL support (some window functions, CTEs) and operational complexity.
Shard Key Selection Criteria
Your shard key determines query efficiency. Choose poorly and you'll regret it:
// GOOD shard key: uniform distribution, enables single-shard queries
// Example: user_id (most queries filter by user)
// - Even distribution: 1M users per shard
// - Query benefit: "SELECT * FROM orders WHERE user_id = X" → single shard
// BAD shard key: skewed distribution or rarely filters
// Example: status (only 5 values: pending, processing, completed, failed, archived)
// - Skewed: most rows in 'pending' partition
// - Query penalty: "SELECT COUNT(*) FROM orders" → all shards
// BAD shard key: limits growth
// Example: country (only ~200 countries)
// - Limited shards: can't shard across >200 nodes
// - Solution: shard by country, then by user_id within each country (hierarchical)
// GOOD hierarchical shard key
function getShardForUserOrder(userId: string, countryId: string): string {
const countryHash = hashFunction(countryId) % countryCount;
const userHash = hashFunction(userId) % usersPerCountry;
return `shard_${countryHash}_${userHash}`;
}
Ideal shard key: frequently used in queries, evenly distributed, high cardinality (millions of unique values).
The Cross-Shard Query Problem
Queries touching multiple shards are your bottleneck:
-- FAST: Single-shard query (10ms)
SELECT * FROM orders WHERE user_id = 42;
-- SLOW: Cross-shard query (5 shards, 500ms total)
SELECT COUNT(*), SUM(amount) FROM orders;
-- SLOW: Cross-shard join
SELECT u.email, COUNT(o.id)
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.email;
-- WORKAROUND: Pre-compute aggregates
-- Write to analytics table via async job
CREATE TABLE order_stats (
time_bucket TIMESTAMP,
total_orders BIGINT,
total_amount DECIMAL
);
-- Materialized view on one shard
CREATE MATERIALIZED VIEW order_summary AS
SELECT
DATE_TRUNC('hour', created_at) as time_bucket,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM orders
GROUP BY DATE_TRUNC('hour', created_at);
Design queries to be single-shard. Use pre-computed analytics for cross-shard metrics.
Alternatives to Sharding
Exhaust these approaches first:
#!/bin/bash
# 1. Read Replicas: 100K qps writes → 1 primary + 5 replicas
# Each replica handles 20K read qps
# Total throughput: 100K writes + 100K reads per second
# 2. Write De-duplication: Reduce writes 10x
# Example: batch 100 events into 1 write per second
# Instead of: INSERT INTO events VALUES (...) [100x/sec]
# Do: INSERT INTO events_batch VALUES (ARRAY[...]) [1x/sec]
# 3. Caching: Cache hot 20% of data
# Redis in front of PostgreSQL
# - 90% of requests served from cache
# - Write-through on database change
# 4. Table Partitioning: Separate old vs new data
# Archive old records to separate table
# SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '1 year';
# 5. Columnar Storage: TimescaleDB, Citus Hyperscale
# Compress time-series data 100x
# Example: 1TB → 10GB with compression
# Sharding is the LAST resort, not the first solution
PostgreSQL Sharding Checklist
- Single-node limits measured: database size, write throughput, read replicas effectiveness
- Read replicas exhausted (cannot scale writes further)
- Shard key selected (high cardinality, frequently filtered, even distribution)
- Hash vs range vs list partitioning decision made
- Cross-shard query patterns documented (accept slowness)
- Pre-computed aggregates strategy for cross-shard metrics
- Shard rebalancing plan (how to add/remove shards)
- Citus evaluation completed (if available for your PostgreSQL version)
- Application middleware updated for shard routing
- Monitoring for shard skew (uneven data distribution)
Conclusion
Sharding is a massive operational undertaking. Before splitting data across multiple databases, verify you've truly exhausted single-node scaling: better indexes, read replicas, aggressive caching, and query optimization. Only when writes exceed a single node's capacity should you consider sharding. Choose your shard key carefully—it affects your architecture for years. If possible, use Citus or Vitess to automate shard routing. Accept that cross-shard queries will be slow and design your schema accordingly. The best approach: stay on a single PostgreSQL node for as long as possible.