Loading learning content...
For decades, scaling SQL databases meant one thing: buying bigger hardware. When that ran out, you faced an impossible choice—abandon SQL for NoSQL systems that scaled but lost relational semantics, or implement complex application-level sharding that fragmented your data model.
CockroachDB offers a third path: Distributed SQL. Your application issues standard SQL queries—SELECT, INSERT, UPDATE, DELETE, complex JOINs, subqueries, window functions—and CockroachDB executes them transparently across a cluster of nodes. Add more nodes to scale. Remove nodes to save costs. The SQL interface remains unchanged.
This isn't simple query routing to shards. CockroachDB implements a sophisticated distributed query execution engine (DistSQL) that parallelizes work across nodes, pushes computation to where data resides, and optimizes network transfer. A query that would take seconds on a single node can complete in milliseconds when executed across dozens of nodes in parallel.
Understanding how Distributed SQL works is essential for:
By the end of this page, you will understand how CockroachDB processes SQL queries across a distributed cluster. You'll learn about the SQL layer architecture, the DistSQL execution engine, query routing, distributed join strategies, and how to write queries that leverage CockroachDB's parallelism effectively.
CockroachDB's SQL layer transforms human-readable SQL into operations on the underlying distributed key-value store. This transformation happens through a series of well-defined stages.
Stage 1: Parsing
The parser converts SQL text into an Abstract Syntax Tree (AST). CockroachDB supports the PostgreSQL SQL dialect, so queries written for PostgreSQL typically parse without modification:
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.region = 'us-east'
GROUP BY u.name
HAVING COUNT(o.id) > 5
ORDER BY order_count DESC
LIMIT 10;
The parser validates syntax, identifies tables and columns, and produces a tree representation of the query.
Stage 2: Semantic Analysis
The analyzer resolves names—mapping table names to actual tables, column names to schema definitions, and verifying that the query is semantically valid:
users table exist?users have a region column?u.id = o.user_id comparing compatible types?Stage 3: Logical Planning
The planner creates a logical execution plan—a tree of operations that, if executed, would produce the correct result:
Limit(10)
└── Sort(order_count DESC)
└── Filter(count > 5)
└── GroupBy(u.name)
└── HashJoin(u.id = o.user_id)
├── Filter(u.region = 'us-east')
│ └── Scan(users)
└── Scan(orders)
Stage 4: Optimization
The cost-based optimizer evaluates multiple possible execution plans and selects the one with lowest estimated cost. It considers:
SQL QUERY PROCESSING PIPELINE═══════════════════════════════════════════════════════════════════ INPUT QUERY:────────────────────────────────────────────────────────────────────SELECT u.name, COUNT(o.id) AS order_countFROM users uJOIN orders o ON u.id = o.user_idWHERE u.region = 'us-east'GROUP BY u.nameHAVING COUNT(o.id) > 5ORDER BY order_count DESCLIMIT 10; STAGE 1: PARSING → Abstract Syntax Tree────────────────────────────────────────────────────────────────────┌─ SELECT ─────────────────────────────────────────────────────────┐│ ├── Columns: [u.name, COUNT(o.id)] ││ ├── FROM: [users AS u, orders AS o] ││ ├── JOIN: ON u.id = o.user_id ││ ├── WHERE: u.region = 'us-east' ││ ├── GROUP BY: [u.name] ││ ├── HAVING: COUNT(o.id) > 5 ││ ├── ORDER BY: [order_count DESC] ││ └── LIMIT: 10 │└──────────────────────────────────────────────────────────────────┘ STAGE 2: SEMANTIC ANALYSIS → Resolved Names────────────────────────────────────────────────────────────────────┌─ Resolved Objects ───────────────────────────────────────────────┐│ users → database.public.users (id: 52, columns: [id, name...]) ││ orders → database.public.orders (id: 57, columns: [id, user_id])││ u.region → users.region (column index: 3, type: STRING) ││ u.id → users.id (column index: 0, type: INT, PRIMARY KEY) ││ o.user_id → orders.user_id (column index: 1, type: INT, FK) │└──────────────────────────────────────────────────────────────────┘ STAGE 3: LOGICAL PLAN → Operation Tree────────────────────────────────────────────────────────────────────Limit(10) └── Sort(order_count DESC) └── Filter(count > 5) ← HAVING clause └── GroupBy(u.name) └── HashJoin(u.id = o.user_id) ├── Filter(u.region = 'us-east') │ └── Scan(users) └── Scan(orders) STAGE 4: OPTIMIZATION → Cost-Based Selection────────────────────────────────────────────────────────────────────Optimizer evaluates alternatives: Option A: Hash Join Cost: 5,234Option B: Merge Join (requires sort) Cost: 12,891Option C: Lookup Join Cost: 3,122 ← SELECTED Predicate pushdown applied:- Filter(region='us-east') pushed to users scan- Uses index: users@users_region_idx Final optimized plan:Limit(10) └── Sort(order_count DESC) └── Filter(count > 5) └── GroupBy(u.name) └── LookupJoin(orders ON user_id) └── IndexScan(users@users_region_idx, region='us-east') STAGE 5: PHYSICAL PLANNING → DistSQL Distribution────────────────────────────────────────────────────────────────────(Covered in next section...)Stage 5: Physical Planning
After optimization produces a logical plan, the physical planner decides where each operation executes. For a single-node query, everything runs locally. For distributed queries, the planner creates a DistSQL plan that:
This is where CockroachDB's distributed nature becomes visible.
Use EXPLAIN ANALYZE to see how CockroachDB executes your query: EXPLAIN ANALYZE SELECT .... This shows the operations performed, execution times, and data flow between nodes. For distributed plans, use EXPLAIN (DISTSQL) SELECT ... to see a graphical representation of the DistSQL plan.
DistSQL is CockroachDB's mechanism for executing SQL queries across multiple nodes in parallel. It transforms the single-node execution plan into a distributed plan that leverages the entire cluster.
The Core Principle: Move Computation to Data
In distributed databases, the primary performance bottleneck is network transfer. Moving large amounts of data between nodes is slow and expensive. DistSQL minimizes this by:
DistSQL Plan Components:
A DistSQL plan consists of processors connected by streams:
DISTSQL EXECUTION FLOW═══════════════════════════════════════════════════════════════════ Query: SELECT region, COUNT(*) FROM orders WHERE status = 'shipped' GROUP BY region; Assumption: orders table split across 4 nodes (Node1=US-East, Node2=US-West, Node3=EU-West, Node4=Asia) NON-DISTRIBUTED (Single Node) Execution:────────────────────────────────────────────────────────────────────┌─────────────────────────────────────────────────────────────────┐│ Gateway Node ││ ┌───────────────────────────────────────────────────────────┐ ││ │ 1. Fetch ALL rows from ALL nodes (network transfer) │ ││ │ 2. Filter status = 'shipped' locally │ ││ │ 3. GROUP BY region locally │ ││ │ 4. COUNT(*) locally │ ││ │ 5. Return result │ ││ └───────────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────┘Problem: Transfers 100% of data across network before filtering DISTSQL (Distributed) Execution:──────────────────────────────────────────────────────────────────── Phase 1: Local Processing (Parallel on all nodes)─────────────────────────────────────────────────┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ Node 1 │ │ Node 2 │ │ Node 3 │ │ Node 4 ││ (US-East) │ │ (US-West) │ │ (EU-West) │ │ (Asia) ││ │ │ │ │ │ │ ││ TableReader │ │ TableReader │ │ TableReader │ │ TableReader ││ ↓ │ │ ↓ │ │ ↓ │ │ ↓ ││ Filter │ │ Filter │ │ Filter │ │ Filter ││ (shipped) │ │ (shipped) │ │ (shipped) │ │ (shipped) ││ ↓ │ │ ↓ │ │ ↓ │ │ ↓ ││ Aggregator │ │ Aggregator │ │ Aggregator │ │ Aggregator ││ (partial) │ │ (partial) │ │ (partial) │ │ (partial) ││ │ │ │ │ │ │ ││ Result: │ │ Result: │ │ Result: │ │ Result: ││ us-east: 500 │ │ us-west: 300 │ │ eu-west: 400 │ │ asia: 250 ││ us-west: 50 │ │ us-east: 25 │ │ asia: 10 │ │ eu-west: 15 │└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ └──────────────────┴────────┬─────────┴─────────────────┘ │ Network Transfer (Only aggregated results) │ ▼Phase 2: Final Aggregation (Gateway node)─────────────────────────────────────────┌─────────────────────────────────────────────────────────────────┐│ Gateway Node ││ ┌───────────────────────────────────────────────────────────┐ ││ │ Receive partial aggregates from all nodes: │ ││ │ us-east: 500+25 = 525 │ ││ │ us-west: 300+50 = 350 │ ││ │ eu-west: 400+15 = 415 │ ││ │ asia: 250+10 = 260 │ ││ │ │ ││ │ Return final result to client │ ││ └───────────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────┘ COMPARISON:──────────────────────────────────────────────────────────────────── Non-Distributed DistSQL────────────────────────────────────────────────────────────────────Data transferred 10 million rows 8 partial aggregatesProcessing time Sequential Parallel (4x faster)Network bottleneck Yes NoScalability Limited Linear with nodesWhen DistSQL Activates:
Not all queries use DistSQL. CockroachDB's optimizer decides based on:
SELECT * FROM table WHERE pk = 123) don't benefit from parallelismDistSQL Flow Types:
DistSQL plans use different flow patterns depending on the query:
CockroachDB's web console (usually at port 8080) includes a DistSQL visualization. Run EXPLAIN (DISTSQL) SELECT ... and click the generated link to see a graphical representation of processors, data flow, and which nodes participate in query execution.
When your application connects to CockroachDB, it connects to a specific node—the gateway node for that connection. Understanding how queries are routed from gateway nodes to data is essential for optimizing performance.
Every Node is a Gateway
Unlike databases with dedicated query coordinators, every CockroachDB node can serve as a gateway. When you issue a query:
Implications for Load Balancing:
Since any node can be a gateway, you typically place a load balancer in front of your cluster:
┌─────────────────────────────────────┐
│ Load Balancer │
│ (HAProxy, NGINX, Cloud LB, etc.) │
└─────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│(Gateway)│ │(Gateway)│ │(Gateway)│
└─────────┘ └─────────┘ └─────────┘
Load balancing distributes the gateway work across nodes, preventing any single node from becoming a bottleneck.
Gateway Overhead:
The gateway node does coordination work even for queries that don't touch its local data:
For latency-sensitive applications, connecting to the node that owns the data minimizes hops. CockroachDB supports topology-aware routing (connecting to the nearest replica) and follower reads (reading from any replica for slightly stale data).
| Scenario | Gateway Location | Data Location | Network Round Trips | Optimization |
|---|---|---|---|---|
| Local query | Node A | Node A (leaseholder) | 0 | Best case—no network |
| Remote single range | Node A | Node B (leaseholder) | 1 RT to B | Consider connecting to B directly |
| Multi-range query | Node A | Nodes B, C, D | 1 RT to each in parallel | DistSQL parallelism helps |
| Global aggregation | Node A | All nodes | 1 RT to each in parallel | Ensure gateway isn't bottleneck |
Leaseholder Routing:
For reads and writes, the leaseholder is the authoritative replica for a range. The gateway must communicate with the leaseholder to ensure consistency:
The gateway uses range descriptors (cached locally, refreshed via gossip) to determine which node is the leaseholder for each range needed by the query.
Minimizing Gateway Overhead:
Strategies for reducing gateway-related latency:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
package main import ( "database/sql" "fmt" _ "github.com/lib/pq") // TopologyAwareConnectionPool demonstrates connecting to// region-local nodes for lower latency type RegionalPool struct { primary *sql.DB // Primary region for writes localRead *sql.DB // Local region for reads} func NewRegionalPool(userRegion string) (*RegionalPool, error) { // CockroachDB cluster endpoints by region endpoints := map[string]string{ "us-east": "cockroach-us-east.example.com:26257", "us-west": "cockroach-us-west.example.com:26257", "eu-west": "cockroach-eu-west.example.com:26257", } // Primary region for writes (configured as leaseholder preference) primary, err := sql.Open("postgres", fmt.Sprintf("postgresql://root@%s/mydb?sslmode=require", endpoints["us-east"])) if err != nil { return nil, err } // Local region for reads (using follower reads) localEndpoint := endpoints[userRegion] if localEndpoint == "" { localEndpoint = endpoints["us-east"] // fallback } localRead, err := sql.Open("postgres", fmt.Sprintf("postgresql://root@%s/mydb?sslmode=require", localEndpoint)) if err != nil { return nil, err } return &RegionalPool{ primary: primary, localRead: localRead, }, nil} func (p *RegionalPool) Write(query string, args ...interface{}) (sql.Result, error) { // Writes always go to primary for strong consistency return p.primary.Exec(query, args...)} func (p *RegionalPool) Read(query string, args ...interface{}) (*sql.Rows, error) { // Reads can go to local replica with follower reads // Adds AS OF SYSTEM TIME follower_read_timestamp() for bounded staleness followerQuery := query + " AS OF SYSTEM TIME follower_read_timestamp()" return p.localRead.Query(followerQuery, args...)} func (p *RegionalPool) StrongRead(query string, args ...interface{}) (*sql.Rows, error) { // Strong reads go to leaseholder via primary return p.primary.Query(query, args...)}If all connections route to a single node, that node becomes a bottleneck—even though data is distributed. Monitor connection distribution in the CockroachDB console and ensure load balancers are distributing evenly. For very high-throughput applications, ensure adequate gateway nodes for the connection count.
Joins in a distributed database are inherently challenging—the data from different tables may reside on different nodes. CockroachDB implements several join strategies, each optimized for different scenarios.
Join Strategy 1: Lookup Join
The lookup join (also called index nested loop join) is used when one side of the join is small and the other has an index on the join key:
When it's chosen:
Example:
-- users table: 100 filtered rows
-- orders table: 10 million rows with index on user_id
SELECT u.name, o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'Iceland'; -- Small country = few users
The optimizer will likely choose lookup join: scan the ~100 Icelandic users, then do 100 index lookups into orders.
Join Strategy 2: Hash Join
The hash join builds a hash table from one side of the join and probes it with the other side:
Distributed Hash Join:
When both tables are large and spread across nodes:
DISTRIBUTED HASH JOIN═══════════════════════════════════════════════════════════════════ Query: SELECT u.name, o.total FROM users u JOIN orders o ON u.id = o.user_id; Assumption: Both tables spread across 3 nodes, no co-location on user_id Phase 1: Hash Partitioning────────────────────────────────────────────────────────────────────Each node reads its local data and computes hash(join_key) Node 1 (local users): Node 1 (local orders):├── User A (id=1) → hash=0 ├── Order X (user_id=2) → hash=1├── User B (id=2) → hash=1 ├── Order Y (user_id=1) → hash=0└── User C (id=3) → hash=0 └── Order Z (user_id=3) → hash=0 Node 2 (local users): Node 2 (local orders):├── User D (id=4) → hash=1 ├── Order P (user_id=5) → hash=1└── User E (id=5) → hash=1 └── Order Q (user_id=4) → hash=1 Node 3 (local users): Node 3 (local orders):├── User F (id=6) → hash=0 ├── Order R (user_id=6) → hash=0└── User G (id=7) → hash=1 └── Order S (user_id=7) → hash=1 Phase 2: Shuffle by Hash Partition────────────────────────────────────────────────────────────────────Rows with hash=0 → Node 1Rows with hash=1 → Node 2 After shuffle: ┌─────────────────────────────────────────────────────────────────┐│ Node 1 (hash=0 partition) ││ Users: A(1), C(3), F(6) ││ Orders: Y(user=1), Z(user=3), R(user=6) │└─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐│ Node 2 (hash=1 partition) ││ Users: B(2), D(4), E(5), G(7) ││ Orders: X(user=2), P(user=5), Q(user=4), S(user=7) │└─────────────────────────────────────────────────────────────────┘ Phase 3: Local Hash Join (Parallel)────────────────────────────────────────────────────────────────────Node 1: Hash join on local data A↔Y, C↔Z, F↔R ✓ Node 2: Hash join on local data B↔X, D↔Q, E↔P, G↔S ✓ Phase 4: Combine Results────────────────────────────────────────────────────────────────────Gateway collects results from Node 1 and Node 2Returns to client COST ANALYSIS:────────────────────────────────────────────────────────────────────Network: Full shuffle of both tables (expensive)CPU: Parallel hash join (efficient)Memory: Hash table for smaller side on each node Best for: Large-large table joins with no co-locationAvoid: When tables could be co-located by schema designJoin Strategy 3: Merge Join
The merge join requires both sides to be sorted on the join key:
When it's chosen:
Join Strategy 4: Cross Join
When no join predicate exists (or only non-equality predicates), CockroachDB may use a cross join:
Cross joins are expensive (O(n×m)) and should be avoided for large tables.
Co-located Joins: The Best Case
The most efficient distributed join is one that doesn't need network shuffling—a co-located join. If related tables are partitioned on the same key:
-- Users partitioned by region
-- Orders partitioned by user's region (derived from user_id)
-- Join happens locally on each node—no shuffle needed
SELECT u.name, o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.region = 'us-east';
Schema design that enables co-located joins is one of the most impactful optimizations for distributed databases.
| Strategy | Best For | Data Movement | Memory Usage | Time Complexity |
|---|---|---|---|---|
| Lookup Join | Small-large table, indexed | Minimal (point lookups) | Low | O(n) probes |
| Hash Join | Medium tables, no index | Shuffle if distributed | Build-side in memory | O(n + m) |
| Merge Join | Both sides sorted/indexed | Shuffle if distributed | Low | O(n + m) |
| Co-located Join | Same partition key | None (local) | Per-partition | O(n + m) local |
| Cross Join | Small tables only | Full combination | Low | O(n × m) AVOID |
When designing schemas for CockroachDB, consider which tables are frequently joined and partition them on the same key. For example, partition both users and orders by user_id so joins are always local. This is the distributed database equivalent of Spanner's interleaved tables.
SQL queries in CockroachDB execute within transactions—even single statements are implicitly wrapped in a transaction. Understanding how transactions work in a distributed context is essential for both correctness and performance.
Every Query is a Transaction
When you execute:
SELECT * FROM users WHERE region = 'us-east';
CockroachDB implicitly wraps this in a transaction:
BEGIN;
SELECT * FROM users WHERE region = 'us-east';
COMMIT;
This ensures the query sees a consistent snapshot of the database, even if other transactions are modifying data concurrently.
Read-Only vs. Read-Write Transactions
CockroachDB distinguishes between:
Distributed Transaction Flow (Read-Write):
DISTRIBUTED TRANSACTION LIFECYCLE═══════════════════════════════════════════════════════════════════ Example Transaction:────────────────────────────────────────────────────────────────────BEGIN;UPDATE accounts SET balance = balance - 100 WHERE id = 'alice';UPDATE accounts SET balance = balance + 100 WHERE id = 'bob';COMMIT; Assumption: Alice's account on Node A, Bob's account on Node B STEP 1: BEGIN (Gateway Node)────────────────────────────────────────────────────────────────────Gateway (any node client connected to):├── Generate transaction ID: txn-abc123├── Assign provisional commit timestamp: ts=1000└── Track transaction state: PENDING STEP 2: First UPDATE (Alice's account)────────────────────────────────────────────────────────────────────Gateway:├── Locate leaseholder for 'alice': Node A└── Send write request to Node A Node A (Leaseholder for alice):├── Lock alice's row (write intent)├── Write provisional value: alice.balance = (old - 100) @ ts=1000│ └── Intent marker: txn-abc123 (not yet committed)├── Store in Raft log (but don't replicate commit yet)└── Respond to Gateway: write intent placed STEP 3: Second UPDATE (Bob's account) ────────────────────────────────────────────────────────────────────Gateway:├── Locate leaseholder for 'bob': Node B└── Send write request to Node B Node B (Leaseholder for bob):├── Lock bob's row (write intent)├── Write provisional value: bob.balance = (old + 100) @ ts=1000│ └── Intent marker: txn-abc123 (not yet committed)├── Store in Raft log└── Respond to Gateway: write intent placed STEP 4: COMMIT (Two-Phase Commit)──────────────────────────────────────────────────────────────────── Phase 1: PREPARE (Parallel to all participants)─────────────────────────────────────────────────Gateway → Node A: "Prepare to commit txn-abc123"Gateway → Node B: "Prepare to commit txn-abc123" Node A: Check write intent still valid → PREPAREDNode B: Check write intent still valid → PREPARED Both respond: PREPARED ✓ Phase 2: COMMIT (After all prepared)─────────────────────────────────────────────────Gateway → Transaction Record: Mark COMMITTED @ ts=1000 └── Transaction record replicated via Raft Gateway → Node A: "Resolve intent txn-abc123 as committed"Gateway → Node B: "Resolve intent txn-abc123 as committed" Node A: Convert intent to permanent value, release lockNode B: Convert intent to permanent value, release lock STEP 5: Return to Client────────────────────────────────────────────────────────────────────Gateway → Client: Transaction committed successfully CONFLICT HANDLING:────────────────────────────────────────────────────────────────────If another transaction (txn-xyz) tries to read/write alice during txn-abc: Case 1: txn-xyz reads, txn-abc uncommitted → txn-xyz waits for txn-abc to commit/abort → Returns committed value once resolved Case 2: txn-xyz writes, txn-abc uncommitted → Write-write conflict detected → One transaction must abort and retry (SSI) Case 3: txn-xyz reads at older timestamp → Can read pre-intent value (MVCC) → No conflictParallel Commits Optimization:
The 2PC flow described above requires two round trips: PREPARE then COMMIT. CockroachDB's parallel commits optimization eliminates this for common cases:
This reduces commit latency from 2 round trips to 1 for transactions touching multiple ranges.
Automatic Retries:
CockroachDB automatically retries transactions that encounter conflicts, within limits:
Applications using CockroachDB's Go, Java, or Python drivers benefit from automatic retry logic. For raw SQL connections, applications should implement retry loops for 40001 (serialization failure) errors.
High contention on hot keys—many transactions trying to update the same row—causes performance degradation. CockroachDB provides contention metrics in the admin UI. If you see high contention, consider: (1) redesigning to avoid hot keys, (2) using SELECT FOR UPDATE to acquire locks early, or (3) breaking large transactions into smaller ones.
Writing efficient queries for CockroachDB requires understanding both SQL optimization principles and distributed-specific considerations.
Key Optimization Strategies:
1. Use Indexes Effectively
Indexes in CockroachDB work similarly to traditional databases but have distributed implications:
-- Covering index for common query pattern
CREATE INDEX orders_user_status_idx ON orders(user_id, status)
STORING (total, created_at);
-- Query satisfied entirely by index
SELECT user_id, status, total, created_at
FROM orders
WHERE user_id = 123 AND status = 'shipped';
2. Minimize Network Round Trips
Each range access is potentially a network round trip. Minimize by:
WHERE id IN (1,2,3) instead of multiple queries)3. Leverage Locality
For multi-region deployments, query performance depends on data locality:
REGIONAL BY ROW tables to pin data to regionslease_preferences for leaseholder placement123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
-- ═══════════════════════════════════════════════════════════════════-- QUERY OPTIMIZATION PATTERNS FOR COCKROACHDB-- ═══════════════════════════════════════════════════════════════════ -- PATTERN 1: Covering Indexes-- ─────────────────────────────────────────────────────────────────────-- BAD: Requires secondary lookup to fetch 'name'SELECT id, name FROM users WHERE email = 'alice@example.com';-- Index on email doesn't include 'name', so:-- 1. Index scan to find row -- 2. Primary key lookup to get 'name' -- GOOD: Covering index includes all needed columnsCREATE INDEX users_email_covering ON users(email) STORING (name);-- Now query is satisfied entirely by index scan -- PATTERN 2: Batch Point Lookups-- ─────────────────────────────────────────────────────────────────────-- BAD: Multiple round tripsSELECT * FROM products WHERE id = 1;SELECT * FROM products WHERE id = 2;SELECT * FROM products WHERE id = 3;-- Each query = potential network round trip -- GOOD: Single batched querySELECT * FROM products WHERE id IN (1, 2, 3);-- Single query, CockroachDB parallelizes lookups -- PATTERN 3: Avoid SELECT *-- ─────────────────────────────────────────────────────────────────────-- BAD: Fetches all columns, some may require additional lookupsSELECT * FROM orders WHERE user_id = 123; -- GOOD: Fetch only needed columns (matches covering index)SELECT id, status, total FROM orders WHERE user_id = 123; -- PATTERN 4: Predicate Pushdown for Joins-- ─────────────────────────────────────────────────────────────────────-- SUBOPTIMAL: Filter applied after joinSELECT u.name, o.total FROM users u JOIN orders o ON u.id = o.user_idWHERE u.region = 'us-east'; -- BETTER: Same query, but ensure index supports the filterCREATE INDEX users_region_idx ON users(region);-- Optimizer will push filter down and use index -- PATTERN 5: Follower Reads for Staleness-Tolerant Queries-- ─────────────────────────────────────────────────────────────────────-- STANDARD: Strong consistency, must read from leaseholderSELECT * FROM product_catalog WHERE category = 'electronics';-- May require cross-region round trip if leaseholder is remote -- WITH FOLLOWER READS: Read from nearest replica (bounded staleness)SELECT * FROM product_catalog WHERE category = 'electronics'AS OF SYSTEM TIME follower_read_timestamp();-- Reads from local replica, ~4.8 seconds behind (configurable) -- PATTERN 6: Pagination with Key-Based Cursors-- ─────────────────────────────────────────────────────────────────────-- BAD: OFFSET-based pagination (scans and discards rows)SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 1000;-- Must scan 1020 rows to return 20 -- GOOD: Key-based cursor (starts scan at cursor position)SELECT * FROM events WHERE created_at < '2024-01-15T10:30:00Z' -- cursor from previous pageORDER BY created_at DESC LIMIT 20;-- Starts scan at cursor, returns exactly 20 -- PATTERN 7: Lock Hints for High-Contention Scenarios-- ─────────────────────────────────────────────────────────────────────-- STANDARD: Optimistic locking, may conflict at commitBEGIN;SELECT balance FROM accounts WHERE id = 'hot_account';-- computation...UPDATE accounts SET balance = new_balance WHERE id = 'hot_account';COMMIT;-- May fail if another transaction committed first -- WITH FOR UPDATE: Pessimistic lock acquired earlyBEGIN;SELECT balance FROM accounts WHERE id = 'hot_account' FOR UPDATE;-- Lock acquired immediately, other transactions waitUPDATE accounts SET balance = new_balance WHERE id = 'hot_account';COMMIT;-- No conflict at commit timeCockroachDB's EXPLAIN ANALYZE shows actual execution statistics—not just the plan but real row counts, timing, and network transfers. For distributed queries, it shows which nodes participated and how much data moved. This is your primary tool for optimization.
We've explored how CockroachDB processes SQL queries across a distributed cluster. Let's consolidate the key concepts:
What's Next:
Distributed SQL provides the query semantics, but consistency guarantees require something more—serializable isolation. In the next page, we'll dive deep into CockroachDB's transaction model, exploring how Multi-Version Concurrency Control (MVCC), Serializable Snapshot Isolation (SSI), and write intents work together to provide the strongest isolation level without sacrificing performance.
You now understand how CockroachDB processes SQL queries in a distributed environment—from parsing through optimization to DistSQL execution. You can reason about query performance, identify optimization opportunities, and understand the trade-offs in distributed query processing. Next, we'll explore the serializable isolation that makes CockroachDB's transactions reliable across any scale.