Database Sharding
Data sharding - sharding strategies, routing, and cross-shard queries
Database Sharding
Sharding is a technique for horizontally partitioning data across multiple database instances to handle data volumes and workloads that a single machine cannot support.
Why Sharding?
Single database bottlenecks:
┌─────────────────────────────────────────────────────────┐
│ Data volume: Tables with hundreds of millions of rows │
│ Write capacity: Single node write throughput limited │
│ Read capacity: Vertical scaling expensive, has limits │
│ Storage: Single machine disk space limited │
└─────────────────────────────────────────────────────────┘
After sharding:
┌──────────┐
│ 1B rows │
│Single node│
└────┬─────┘
│ Shard
┌────┼────┐
▼ ▼ ▼
┌──────┐┌──────┐┌──────┐
│ 333M ││ 333M ││ 333M │
│Shard1││Shard2││Shard3│
└──────┘└──────┘└──────┘Sharding Strategies
Range Sharding
Partition data by contiguous ranges:
Shard Key: user_id
┌─────────────────────────────────────────────────────────┐
│ Shard 1: user_id 1 - 1,000,000 │
│ Shard 2: user_id 1,000,001 - 2,000,000 │
│ Shard 3: user_id 2,000,001 - 3,000,000 │
└─────────────────────────────────────────────────────────┘
Pros:
✓ Efficient range queries
✓ Contiguous data, good for batch operations
Cons:
✗ Hotspot issues: new users concentrated in last shard
✗ Requires periodic rebalancing-- Range sharding example
-- Routing logic
SELECT * FROM users WHERE user_id = 1500000;
-- Routes to Shard 2 (1,000,001 - 2,000,000)
-- Range query
SELECT * FROM users WHERE user_id BETWEEN 999900 AND 1000100;
-- May span Shard 1 and Shard 2Hash Sharding
Hash the shard key and take modulo:
hash(user_id) % num_shards = shard_id
┌─────────────────────────────────────────────────────────┐
│ user_id = 12345 │
│ hash(12345) = 7829384 │
│ 7829384 % 3 = 1 → Shard 1 │
└─────────────────────────────────────────────────────────┘
Pros:
✓ Even data distribution
✓ Avoids hotspots
Cons:
✗ Range queries must hit all shards
✗ Scaling requires data redistributionConsistent Hashing
Solves large-scale data migration during scaling:
Hash Ring:
0
│
┌──────┼──────┐
N3 ● │ ● N1
│ │ │
│ │ │
────────┼──────┼──────┼────────
│ │ │
│ │ │
● │ │
N2 │ │
│
2^32
Data ownership: first node found clockwise
Adding new node N4:
- Only affects data between N3 and N4
- Other data needs no migrationDirectory-Based Sharding
Use a lookup table to determine data location:
┌──────────────────┐
│ Lookup Table │
├──────────────────┤
│ Key Range│ Shard │
├──────────────────┤
│ A-F │ 1 │
│ G-L │ 2 │
│ M-R │ 3 │
│ S-Z │ 4 │
└──────────────────┘
Pros:
✓ Flexible, customizable rules
✓ Easy to adjust data distribution
Cons:
✗ Lookup table becomes single point of failure
✗ Additional query overheadShard Key Selection
Selection Criteria
| Criteria | Description |
|---|---|
| High cardinality | More key values, more even distribution |
| Write distribution | Avoid hotspot shards |
| Query friendly | Support common query patterns |
| Business related | Related data in same shard |
Good Shard Key Examples
-- User data: shard by user_id
-- Benefit: All user-related data in same shard, avoids cross-shard JOINs
CREATE TABLE user_orders (
id BIGINT,
user_id BIGINT, -- shard key
total DECIMAL,
PRIMARY KEY (user_id, id)
);
-- Multi-tenant: shard by tenant_id
-- Benefit: Tenant data isolation, single-tenant queries don't cross shards
CREATE TABLE documents (
id BIGINT,
tenant_id INT, -- shard key
content TEXT,
PRIMARY KEY (tenant_id, id)
);
-- Time series: composite shard by time + business ID
-- Benefit: Hot data concentrated, cold data archived
CREATE TABLE events (
id BIGINT,
timestamp TIMESTAMP,
source_id INT,
data JSONB
);
-- Composite key: hash(source_id) + range(timestamp)Poor Shard Key Examples
-- ✗ Auto-increment ID: new data concentrated in last shard
-- ✗ Low cardinality columns: e.g., status with only a few values
-- ✗ Frequently updated columns: causes data migration
-- ✗ Columns not matching query patternsCross-Shard Operations
Cross-Shard Queries
-- Query needs to access multiple shards
SELECT COUNT(*) FROM orders WHERE created_at > '2024-01-01';
Execution flow:
┌─────────────────────────────────────────────────────────┐
│ Query Router │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │
│ │COUNT=100│ │COUNT=200│ │COUNT=150│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ ▼ │
│ Aggregate: 450 │
└─────────────────────────────────────────────────────────┘Cross-Shard JOINs
-- Avoid if possible! If necessary:
-- Option 1: Application-level JOIN
users = query_shard("SELECT * FROM users WHERE region = 'US'")
orders = query_all_shards("SELECT * FROM orders WHERE user_id IN (?)", user_ids)
result = application_join(users, orders)
-- Option 2: Data denormalization
-- Replicate JOIN data to each shard
CREATE TABLE user_info (
user_id BIGINT,
user_name VARCHAR(100),
-- Redundant storage, sync periodically
);
-- Option 3: Broadcast tables
-- Replicate small tables to all shards
CREATE TABLE countries (
id INT PRIMARY KEY,
name VARCHAR(100)
);
-- Auto-sync to all shardsDistributed Transactions
-- Two-Phase Commit (2PC)
-- Coordinator asks all shards if they can commit
Phase 1 (Prepare):
┌─────────────────────────────────────────────────────────┐
│ Coordinator → Shard1: "Ready to commit?" │
│ Coordinator → Shard2: "Ready to commit?" │
│ Shard1 → Coordinator: "Ready" │
│ Shard2 → Coordinator: "Ready" │
└─────────────────────────────────────────────────────────┘
Phase 2 (Commit):
┌─────────────────────────────────────────────────────────┐
│ Coordinator → Shard1: "Commit" │
│ Coordinator → Shard2: "Commit" │
│ Shard1 → Coordinator: "Committed" │
│ Shard2 → Coordinator: "Committed" │
└─────────────────────────────────────────────────────────┘
Problems: Blocking, poor performance
Alternatives: Saga pattern, TCCSharding Implementations
Vitess (MySQL)
# VSchema configuration
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"users": {
"column_vindexes": [
{
"column": "user_id",
"name": "hash"
}
]
}
}
}Citus (PostgreSQL)
-- Create distributed table
SELECT create_distributed_table('orders', 'user_id');
-- Queries auto-route
SELECT * FROM orders WHERE user_id = 123;
-- Only queries shard containing that user_id
-- Aggregate queries
SELECT user_id, COUNT(*) FROM orders GROUP BY user_id;
-- Execute in parallel on shards, merge resultsMongoDB Sharding
// Enable sharding
sh.enableSharding("mydb")
// Create shard key index
db.orders.createIndex({ user_id: "hashed" })
// Shard the collection
sh.shardCollection("mydb.orders", { user_id: "hashed" })
// View shard distribution
db.orders.getShardDistribution()Resharding
When data grows or distribution becomes uneven, resharding is needed.
Strategy
Online resharding steps:
┌─────────────────────────────────────────────────────────┐
│ 1. Add new shards │
│ 2. Start data migration (background) │
│ 3. Dual-write: write to both old and new shards │
│ 4. Migration complete, switch routing │
│ 5. Clean up old shard data │
└─────────────────────────────────────────────────────────┘
Phase 1 Phase 2 Phase 3
┌─────────┐ ┌─────────┐ ┌─────────┐
│ S1 | S2 │ → │S1|S2|S3 │ → │S1|S2|S3 │
│ 50%|50% │ │33%|33%|34%│ │33%|33%|34%│
│ Normal │ │Dual-write│ │ Complete │
└─────────┘ └─────────┘ └─────────┘Best Practices
-
Delay sharding: Introduce sharding as late as possible, try other optimizations first
-
Choose the right shard key: Most important decision, hard to change later
-
Design for sharding: Data models that avoid cross-shard operations
-
Monitor shard balance: Regularly check data distribution
-
Keep global tables: Small reference tables can be replicated to all shards
-
Handle cross-shard queries: Expect slower performance, optimize or avoid
-
Test resharding: Ensure you can safely add/remove shards
-- Shard monitoring query example (Citus)
SELECT shardid, nodeport, logicalrelid, shardminvalue, shardmaxvalue
FROM pg_dist_shard_placement
JOIN pg_dist_shard USING (shardid)
ORDER BY logicalrelid, shardminvalue;