Database Sharding & Scaling
Scaling Strategies Overview
Vertical Scaling (Scale Up) Horizontal Scaling (Scale Out)
───────────────────────────── ─────────────────────────────
More RAM/CPU/Disk More servers
Single node Multiple nodes
Simpler Complex coordination
Limited ceiling Nearly unlimited
No distribution bugs Distributed systems problemsFor databases, horizontal scaling usually means:
- Read replicas — scale reads
- Sharding — scale writes & storage
Read Replicas
Architecture
┌──────────────┐
│ Primary │
│ (read/write)│
└──────┬───────┘
│ replication stream
┌───────────────┼───────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Replica 1│ │ Replica 2│ │ Replica 3│
│(read-only)│ │(read-only)│ │(read-only)│
└──────────┘ └──────────┘ └──────────┘Replication Lag
Replicas are eventually consistent. Writes to primary appear on replicas after replication lag (typically 1-100ms, can be seconds if primary is under load).
sql-- Check replication lag in PostgreSQL
SELECT
client_addr,
pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS send_lag,
pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag
FROM pg_stat_replication;Read-Your-Writes Consistency
After a user writes, they expect to read their own write. Can't rely on replica.
Solutions:
js// Option 1: Route user's own reads to primary (sticky reads)
async function getUser(userId, request) {
// If user just performed a write, read from primary
const justWrote = request.session.lastWriteTime
&& (Date.now() - request.session.lastWriteTime < 5000);
const db = justWrote ? primaryDb : replicaDb;
return db.findUser(userId);
}
// Option 2: Wait for replication (PostgreSQL)
// After write: capture LSN, wait for replica to catch up
const { pg_current_wal_lsn: lsn } = await primary.query(
'SELECT pg_current_wal_lsn()'
);
// On replica read:
await replica.query(
'SELECT pg_wal_lsn_diff($1, pg_last_wal_replay_lsn()) <= 0',
[lsn]
);
// Option 3: Use version/timestamp token (DynamoDB)
// Write returns a version, subsequent reads include it
// DB waits until replica is at least at that versionConnection Pooling with Replicas (PgBouncer / Prisma)
js// Prisma read replicas
const prisma = new PrismaClient({
datasources: {
db: { url: process.env.PRIMARY_URL },
},
});
// Use $extends for read replica routing
const prismaExtended = prisma.$extends(
readReplicas({
url: [process.env.REPLICA1_URL, process.env.REPLICA2_URL],
})
);
// Reads go to replica automatically
const users = await prismaExtended.user.findMany(); // → replica
// Writes go to primary
await prismaExtended.user.create({ data: { name: 'Alice' } }); // → primary
// Force primary read (after write)
const user = await prismaExtended.user.findUnique({
where: { id },
// $primaryKey
});Database Sharding
Split data across multiple database instances (shards). Each shard holds a subset of the data.
Without sharding: With sharding:
┌────────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ All users │ │ Users 0-33%│ │Users 33-66%│ │Users 66-99%│
│ users table │ │ Shard 0 │ │ Shard 1 │ │ Shard 2 │
│ 100M rows │ │ ~33M rows │ │ ~33M rows │ │ ~33M rows │
└────────────────┘ └────────────┘ └────────────┘ └────────────┘Sharding Strategies
1. Range-Based Sharding
Shard 0: user_id 1–1,000,000
Shard 1: user_id 1,000,001–2,000,000
Shard 2: user_id 2,000,001–3,000,000
Shard key: user_idPros: Range queries efficient (scan one shard), easy to add new shards at end. Cons: Hotspot problem — new users always go to last shard ("write hotspot").
jsfunction getShard(userId, totalShards = 3) {
const rangeSize = 1_000_000;
return Math.floor(userId / rangeSize) % totalShards;
}2. Hash-Based Sharding
shard = hash(user_id) % N
user_id=42 → hash → shard 1
user_id=43 → hash → shard 0
user_id=44 → hash → shard 2Pros: Even distribution, no hotspots. Cons: Range queries require all shards, resharding is expensive.
jsfunction getShard(userId, totalShards) {
// Consistent hash preferred over simple modulo (see below)
return murmurhash(userId.toString()) % totalShards;
}3. Consistent Hashing
Solves the resharding problem with hash-based sharding.
Ring of hash values (0 to 2^32):
0
/ \
Node B Node A
| |
Node C Node D
\ /
2^32
Data point → find nearest node clockwiseWhen adding Node E:
- Only data between E's predecessor and E migrates to E
- Other nodes unaffected (unlike modulo where all data moves)
jsclass ConsistentHashRing {
constructor(nodes = [], replicas = 150) {
this.replicas = replicas;
this.ring = new Map();
this.sortedKeys = [];
nodes.forEach(n => this.addNode(n));
}
hash(key) {
// Use murmurhash or fnv in production
let h = 0;
for (const c of key) h = (Math.imul(31, h) + c.charCodeAt(0)) | 0;
return Math.abs(h);
}
addNode(node) {
for (let i = 0; i < this.replicas; i++) {
const key = this.hash(`${node}:${i}`);
this.ring.set(key, node);
this.sortedKeys.push(key);
}
this.sortedKeys.sort((a, b) => a - b);
}
getNode(key) {
const hash = this.hash(key);
// Find first node with hash >= key hash
const idx = this.sortedKeys.findIndex(k => k >= hash);
const ringKey = idx === -1
? this.sortedKeys[0] // wrap around
: this.sortedKeys[idx];
return this.ring.get(ringKey);
}
}
const ring = new ConsistentHashRing(['shard0', 'shard1', 'shard2']);
ring.getNode('user:42'); // → 'shard1'
ring.getNode('user:100'); // → 'shard0'4. Directory-Based Sharding
A lookup table maps shard keys to shard locations.
lookup_table:
user_id 1-1000 → shard0
user_id 1001-2000 → shard1
org_id 'acme' → shard3 (tenant-based)
Pros: Maximum flexibility, easy to rebalance
Cons: Lookup table is a bottleneck, must be highly availableChoosing a Shard Key
The most important decision. A bad shard key causes:
- Hotspots — one shard gets all traffic
- Uneven data — one shard fills up
- Cross-shard queries — slow, requires scatter-gather
Good shard key properties
- High cardinality (many distinct values)
- Even distribution
- Co-locates frequently queried data
- Avoids cross-shard transactions
Examples
BAD: shard by country → most users in US → shard_US is a hotspot
BAD: shard by created_at → new data piles on the latest shard
BAD: shard by user_type → 90% are 'free' users → one shard overwhelmed
GOOD: shard by user_id hash → even spread, user data co-located
GOOD: shard by (org_id, user_id) → multi-tenant app, org data co-located
GOOD: shard by UUID → inherently random, even distributionCross-Shard Operations
The pain of sharding:
Scatter-Gather Queries
js// Find all users with age > 30 — must query all shards
async function findUsersOverAge(age) {
const shards = [shard0, shard1, shard2];
const results = await Promise.all(
shards.map(shard =>
shard.query('SELECT * FROM users WHERE age > $1', [age])
)
);
// Merge and sort results
return results.flat().sort((a, b) => a.id - b.id);
}Cross-Shard Transactions
Avoid if possible. If needed:
- Use Saga pattern (eventual consistency)
- Use 2PC with a transaction coordinator (high latency)
- Redesign to co-locate related data
js// Transfer between users on different shards — Saga approach
async function transfer(fromUserId, toUserId, amount) {
const fromShard = ring.getNode(fromUserId);
const toShard = ring.getNode(toUserId);
// Step 1: Debit
await fromShard.query(
'UPDATE users SET balance = balance - $1 WHERE id = $2 AND balance >= $1',
[amount, fromUserId]
);
// Step 2: Credit (may fail — need compensation)
try {
await toShard.query(
'UPDATE users SET balance = balance + $1 WHERE id = $2',
[amount, toUserId]
);
} catch (err) {
// Compensate: refund
await fromShard.query(
'UPDATE users SET balance = balance + $1 WHERE id = $2',
[amount, fromUserId]
);
throw err;
}
}CQRS — Command Query Responsibility Segregation
Separate the read model and write model.
┌───────────────────────────────┐
│ Application │
└────────┬────────────┬──────────┘
│ │
Commands Queries
(writes) (reads)
│ │
┌────────▼──┐ ┌───▼────────────┐
│ Command │ │ Query Handler │
│ Handler │ │ │
└────────┬──┘ └───▲────────────┘
│ │
┌────────▼──┐ ┌───┴────────────┐
│Write Model│ │ Read Model │
│(normalized│ │(denormalized, │
│Postgres) │ │ Elasticsearch, │
└────────┬──┘ │ Redis, etc.) │
│ └────────────────┘
│ Event / Projection
└──────────────────────►Example: Order System
js// Write model — normalized, consistent
// orders table: id, user_id, status, created_at
// order_items: order_id, product_id, quantity, price
// Command
async function placeOrder(command) {
const { userId, items } = command;
await db.transaction(async (trx) => {
const order = await trx('orders').insert({
user_id: userId,
status: 'pending',
created_at: new Date(),
}).returning('*');
await trx('order_items').insert(
items.map(item => ({ order_id: order[0].id, ...item }))
);
// Emit event for read model projection
await eventBus.publish('OrderPlaced', {
orderId: order[0].id,
userId,
items,
});
});
}
// Read model — denormalized for fast queries
// Read store (Redis/Elasticsearch/denormalized PG table):
// {
// orderId, userId, userName, userEmail,
// items: [{ productName, quantity, price }],
// total, status, createdAt
// }
// Projection — subscribes to events, updates read model
eventBus.subscribe('OrderPlaced', async (event) => {
const user = await db('users').where({ id: event.userId }).first();
const products = await db('products').whereIn('id',
event.items.map(i => i.productId)
);
const readModel = {
orderId: event.orderId,
userId: event.userId,
userName: user.name,
userEmail: user.email,
items: event.items.map(item => ({
productName: products.find(p => p.id === item.productId).name,
quantity: item.quantity,
price: item.price,
})),
total: event.items.reduce((sum, i) => sum + i.price * i.quantity, 0),
status: 'pending',
};
await redis.set(`order:${event.orderId}`, JSON.stringify(readModel));
await elasticsearch.index({ index: 'orders', body: readModel });
});
// Query — ultra-fast, no joins
async function getOrder(orderId) {
return JSON.parse(await redis.get(`order:${orderId}`));
}When to Use CQRS
- Read/write workloads are very different (e.g., 100:1 read to write ratio)
- Read model needs different structure than write model (search, reports)
- Need to scale reads independently
- Event sourcing (natural fit)
When NOT to Use CQRS
- Simple CRUD with no complex business logic
- Small teams / simple domains (added complexity not worth it)
- Eventual consistency is unacceptable for reads
Connection Pooling Deep Dive
Why Connection Pooling
Each PostgreSQL connection = ~5-10MB RAM + OS thread. With 1000 concurrent requests, 1000 connections = 5-10GB just for connections. PgBouncer pools connections, limiting actual DB connections.
App servers (1000 concurrent) PgBouncer PostgreSQL
───────────────────────── ───────── ──────────
[req1] ──────────────────► pool 100 ──────► max 100 conns
[req2] ──────────────────► connections shared
[req3] ──────────────────►
...
[req1000]PgBouncer Modes
| Mode | Description | Use Case |
|---|---|---|
| Session | Connection held for session lifetime | Legacy apps |
| Transaction | Connection held per transaction (default) | Most web apps |
| Statement | Connection released after each statement | Rarely used |
Pool Sizing Formula
Pool size = Tn × (Cm - 1) + 1
Tn = number of threads in the application
Cm = time to complete a query / time to send + receive query
Simpler guideline (PostgreSQL wiki):
connections = ((core_count * 2) + effective_spindle_count)js// pg-pool configuration
const pool = new Pool({
host: 'localhost',
database: 'mydb',
max: 20, // max connections in pool
min: 5, // min connections to keep warm
idleTimeoutMillis: 30000, // close idle after 30s
connectionTimeoutMillis: 2000, // fail if no connection in 2s
maxUses: 7500, // recycle connection after N uses
});
// Monitoring pool health
pool.on('error', (err, client) => {
console.error('Unexpected error on idle client', err);
});
setInterval(() => {
console.log({
total: pool.totalCount,
idle: pool.idleCount,
waiting: pool.waitingCount,
});
}, 5000);Interview Questions
Q: What is sharding and when would you use it? Sharding horizontally partitions data across multiple DB instances. Use when: single DB can't handle write throughput, dataset too large for one node, or need geographic distribution. Trade-offs: operational complexity, cross-shard queries are expensive, resharding is hard.
Q: What's the difference between range and hash sharding? Range: shard by value range (e.g., user_id 1-1M → shard0). Efficient for range queries but prone to hotspots. Hash: shard = hash(key) % N. Even distribution but range queries hit all shards. Use consistent hashing to avoid bulk data movement when adding shards.
Q: How do you handle replication lag in a read replica setup? Options: (1) read-your-writes: route user's own reads to primary for a short window after a write, (2) version tokens: include the write's LSN in the response, replica waits until it catches up, (3) sticky sessions: always route a user to the same replica. In practice, a combination: primary for writes + recent reads, replicas for everything else.
Q: What's the benefit of CQRS? Separates write model (normalized, consistent, ACID) from read model (denormalized, optimized for queries). Allows independent scaling, technology choice per side (e.g., Postgres for writes, Elasticsearch for full-text search reads), and keeps complex domain logic in the command/write side without polluting the query side.
Q: Why is connection pooling critical for Node.js apps? Node.js may have hundreds of concurrent async operations. Without pooling, each would need a DB connection, exhausting DB connection limits (PostgreSQL defaults to 100). Pooling reuses connections across requests — with transaction-mode PgBouncer, a high-traffic app can multiplex thousands of concurrent requests onto ~20-100 actual connections.