Docs For AI
Advanced

Database Sharding

Data sharding - sharding strategies, routing, and cross-shard queries

Database Sharding

Sharding is a technique for horizontally partitioning data across multiple database instances to handle data volumes and workloads that a single machine cannot support.

Why Sharding?

Single database bottlenecks:
┌─────────────────────────────────────────────────────────┐
│ Data volume: Tables with hundreds of millions of rows   │
│ Write capacity: Single node write throughput limited    │
│ Read capacity: Vertical scaling expensive, has limits   │
│ Storage: Single machine disk space limited              │
└─────────────────────────────────────────────────────────┘

After sharding:
    ┌──────────┐
    │ 1B rows  │
    │Single node│
    └────┬─────┘
         │ Shard
    ┌────┼────┐
    ▼    ▼    ▼
┌──────┐┌──────┐┌──────┐
│ 333M ││ 333M ││ 333M │
│Shard1││Shard2││Shard3│
└──────┘└──────┘└──────┘

Sharding Strategies

Range Sharding

Partition data by contiguous ranges:

Shard Key: user_id
┌─────────────────────────────────────────────────────────┐
│ Shard 1: user_id 1 - 1,000,000                         │
│ Shard 2: user_id 1,000,001 - 2,000,000                 │
│ Shard 3: user_id 2,000,001 - 3,000,000                 │
└─────────────────────────────────────────────────────────┘

Pros:
✓ Efficient range queries
✓ Contiguous data, good for batch operations

Cons:
✗ Hotspot issues: new users concentrated in last shard
✗ Requires periodic rebalancing
-- Range sharding example
-- Routing logic
SELECT * FROM users WHERE user_id = 1500000;
-- Routes to Shard 2 (1,000,001 - 2,000,000)

-- Range query
SELECT * FROM users WHERE user_id BETWEEN 999900 AND 1000100;
-- May span Shard 1 and Shard 2

Hash Sharding

Hash the shard key and take modulo:

hash(user_id) % num_shards = shard_id

┌─────────────────────────────────────────────────────────┐
│ user_id = 12345                                        │
│ hash(12345) = 7829384                                  │
│ 7829384 % 3 = 1 → Shard 1                              │
└─────────────────────────────────────────────────────────┘

Pros:
✓ Even data distribution
✓ Avoids hotspots

Cons:
✗ Range queries must hit all shards
✗ Scaling requires data redistribution

Consistent Hashing

Solves large-scale data migration during scaling:

Hash Ring:
               0

        ┌──────┼──────┐
     N3 ●      │      ● N1
        │      │      │
        │      │      │
────────┼──────┼──────┼────────
        │      │      │
        │      │      │
        ●      │      │
       N2      │      │

             2^32

Data ownership: first node found clockwise

Adding new node N4:
- Only affects data between N3 and N4
- Other data needs no migration

Directory-Based Sharding

Use a lookup table to determine data location:

┌──────────────────┐
│   Lookup Table   │
├──────────────────┤
│ Key Range│ Shard │
├──────────────────┤
│ A-F      │   1   │
│ G-L      │   2   │
│ M-R      │   3   │
│ S-Z      │   4   │
└──────────────────┘

Pros:
✓ Flexible, customizable rules
✓ Easy to adjust data distribution

Cons:
✗ Lookup table becomes single point of failure
✗ Additional query overhead

Shard Key Selection

Selection Criteria

CriteriaDescription
High cardinalityMore key values, more even distribution
Write distributionAvoid hotspot shards
Query friendlySupport common query patterns
Business relatedRelated data in same shard

Good Shard Key Examples

-- User data: shard by user_id
-- Benefit: All user-related data in same shard, avoids cross-shard JOINs
CREATE TABLE user_orders (
    id BIGINT,
    user_id BIGINT,  -- shard key
    total DECIMAL,
    PRIMARY KEY (user_id, id)
);

-- Multi-tenant: shard by tenant_id
-- Benefit: Tenant data isolation, single-tenant queries don't cross shards
CREATE TABLE documents (
    id BIGINT,
    tenant_id INT,  -- shard key
    content TEXT,
    PRIMARY KEY (tenant_id, id)
);

-- Time series: composite shard by time + business ID
-- Benefit: Hot data concentrated, cold data archived
CREATE TABLE events (
    id BIGINT,
    timestamp TIMESTAMP,
    source_id INT,
    data JSONB
);
-- Composite key: hash(source_id) + range(timestamp)

Poor Shard Key Examples

-- ✗ Auto-increment ID: new data concentrated in last shard
-- ✗ Low cardinality columns: e.g., status with only a few values
-- ✗ Frequently updated columns: causes data migration
-- ✗ Columns not matching query patterns

Cross-Shard Operations

Cross-Shard Queries

-- Query needs to access multiple shards
SELECT COUNT(*) FROM orders WHERE created_at > '2024-01-01';

Execution flow:
┌─────────────────────────────────────────────────────────┐
│                    Query Router                         │
│                         │                               │
│         ┌───────────────┼───────────────┐              │
│         ▼               ▼               ▼              │
│    ┌─────────┐     ┌─────────┐     ┌─────────┐        │
│    │ Shard 1 │     │ Shard 2 │     │ Shard 3 │        │
│    │COUNT=100│     │COUNT=200│     │COUNT=150│        │
│    └─────────┘     └─────────┘     └─────────┘        │
│         │               │               │              │
│         └───────────────┼───────────────┘              │
│                         ▼                               │
Aggregate: 450
└─────────────────────────────────────────────────────────┘

Cross-Shard JOINs

-- Avoid if possible! If necessary:

-- Option 1: Application-level JOIN
users = query_shard("SELECT * FROM users WHERE region = 'US'")
orders = query_all_shards("SELECT * FROM orders WHERE user_id IN (?)", user_ids)
result = application_join(users, orders)

-- Option 2: Data denormalization
-- Replicate JOIN data to each shard
CREATE TABLE user_info (
    user_id BIGINT,
    user_name VARCHAR(100),
    -- Redundant storage, sync periodically
);

-- Option 3: Broadcast tables
-- Replicate small tables to all shards
CREATE TABLE countries (
    id INT PRIMARY KEY,
    name VARCHAR(100)
);
-- Auto-sync to all shards

Distributed Transactions

-- Two-Phase Commit (2PC)
-- Coordinator asks all shards if they can commit

Phase 1 (Prepare):
┌─────────────────────────────────────────────────────────┐
│ Coordinator → Shard1: "Ready to commit?"
│ Coordinator → Shard2: "Ready to commit?"
│ Shard1 → Coordinator: "Ready"
│ Shard2 → Coordinator: "Ready"
└─────────────────────────────────────────────────────────┘

Phase 2 (Commit):
┌─────────────────────────────────────────────────────────┐
│ Coordinator → Shard1: "Commit"
│ Coordinator → Shard2: "Commit"
│ Shard1 → Coordinator: "Committed"
│ Shard2 → Coordinator: "Committed"
└─────────────────────────────────────────────────────────┘

Problems: Blocking, poor performance
Alternatives: Saga pattern, TCC

Sharding Implementations

Vitess (MySQL)

# VSchema configuration
{
  "sharded": true,
  "vindexes": {
    "hash": {
      "type": "hash"
    }
  },
  "tables": {
    "users": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "hash"
        }
      ]
    }
  }
}

Citus (PostgreSQL)

-- Create distributed table
SELECT create_distributed_table('orders', 'user_id');

-- Queries auto-route
SELECT * FROM orders WHERE user_id = 123;
-- Only queries shard containing that user_id

-- Aggregate queries
SELECT user_id, COUNT(*) FROM orders GROUP BY user_id;
-- Execute in parallel on shards, merge results

MongoDB Sharding

// Enable sharding
sh.enableSharding("mydb")

// Create shard key index
db.orders.createIndex({ user_id: "hashed" })

// Shard the collection
sh.shardCollection("mydb.orders", { user_id: "hashed" })

// View shard distribution
db.orders.getShardDistribution()

Resharding

When data grows or distribution becomes uneven, resharding is needed.

Strategy

Online resharding steps:
┌─────────────────────────────────────────────────────────┐
│ 1. Add new shards                                       │
│ 2. Start data migration (background)                    │
│ 3. Dual-write: write to both old and new shards         │
│ 4. Migration complete, switch routing                   │
│ 5. Clean up old shard data                              │
└─────────────────────────────────────────────────────────┘

        Phase 1          Phase 2          Phase 3
      ┌─────────┐      ┌─────────┐      ┌─────────┐
      │ S1 | S2 │ →    │S1|S2|S3 │  →   │S1|S2|S3 │
      │ 50%|50% │      │33%|33%|34%│     │33%|33%|34%│
      │ Normal  │      │Dual-write│     │ Complete │
      └─────────┘      └─────────┘      └─────────┘

Best Practices

  1. Delay sharding: Introduce sharding as late as possible, try other optimizations first

  2. Choose the right shard key: Most important decision, hard to change later

  3. Design for sharding: Data models that avoid cross-shard operations

  4. Monitor shard balance: Regularly check data distribution

  5. Keep global tables: Small reference tables can be replicated to all shards

  6. Handle cross-shard queries: Expect slower performance, optimize or avoid

  7. Test resharding: Ensure you can safely add/remove shards

-- Shard monitoring query example (Citus)
SELECT shardid, nodeport, logicalrelid, shardminvalue, shardmaxvalue
FROM pg_dist_shard_placement
JOIN pg_dist_shard USING (shardid)
ORDER BY logicalrelid, shardminvalue;

On this page