Loading content...
For decades, the database industry accepted an implicit truth: SQL and horizontal scalability were fundamentally incompatible. The rich semantics of SQL—joins, transactions, foreign keys, complex queries—seemed to require centralized coordination that contradicted the distributed nature of scale-out architectures.
This assumption was wrong.
NewSQL databases prove that SQL's expressiveness and ACID's correctness guarantees can coexist with elastic, horizontal scalability. This page examines how—exploring the architectural patterns, distributed algorithms, and engineering innovations that make SQL at scale a reality.
By the end of this page, you will understand the specific techniques NewSQL systems use to distribute SQL workloads across multiple nodes, maintain consistency during distributed operations, and provide linear scalability without sacrificing query capabilities. You'll see how query planning, transaction coordination, and data distribution work together to deliver SQL at any scale.
Before examining solutions, let's understand precisely why SQL databases traditionally struggled with horizontal scaling. This isn't a limitation of SQL the language—it's a challenge of implementing SQL's semantics across distributed nodes.
Challenge 1: Distributed Transactions
Consider a simple bank transfer in SQL:
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 'alice';
UPDATE accounts SET balance = balance + 100 WHERE id = 'bob';
COMMIT;
In a single-node database, this is straightforward. But what if Alice's account data lives on Node A and Bob's account lives on Node B? The system must:
Challenge 2: Distributed Joins
SQL's power comes largely from joins—combining data from multiple tables based on relationships:
SELECT orders.id, customers.name, products.title
FROM orders
JOIN customers ON orders.customer_id = customers.id
JOIN products ON orders.product_id = products.id
WHERE orders.date > '2024-01-01';
If these three tables are partitioned across different nodes, the database must:
Challenge 3: Global Secondary Indexes
Secondary indexes (indexes on non-primary-key columns) enable efficient queries like:
SELECT * FROM users WHERE email = 'alice@example.com';
In a distributed environment, the user table is partitioned by primary key (user_id). How does the email index work when users are spread across all nodes? Options include:
NewSQL systems must balance these trade-offs for every index.
Manual sharding of traditional databases addresses some scaling issues but creates significant limitations. Cross-shard queries become application responsibilities, transactions across shards require custom coordination code, and schema changes become operational nightmares. NewSQL solves these problems at the database level.
NewSQL databases transform SQL queries into distributed execution plans that parallelize work across nodes while minimizing data movement. This process involves several sophisticated components.
Query Planning and Optimization
The query optimizer in a NewSQL database must consider factors that single-node optimizers never face:
1234567891011121314151617181920212223242526272829303132
-- Example QuerySELECT c.name, SUM(o.amount) as totalFROM customers cJOIN orders o ON c.id = o.customer_idWHERE c.region = 'EMEA'GROUP BY c.nameHAVING SUM(o.amount) > 10000; -- Distributed Execution Plan (simplified) STEP 1: PARALLEL SCAN (on all nodes holding 'customers' ranges) - Filter: region = 'EMEA' - Each node scans local customer data - Output: matching (customer_id, name) tuples STEP 2: PARALLEL LOOKUP (on order data nodes) - For each customer_id from Step 1 - Scan orders where orders.customer_id matches - Executed in parallel across all order nodes STEP 3: SHUFFLE (redistribute by customer name) - Hash-partition intermediate results - Send tuples to aggregation nodes based on hash(name) STEP 4: LOCAL AGGREGATE (on each aggregation node) - GROUP BY name - SUM(amount) - Filter: total > 10000 STEP 5: GATHER (at coordinator) - Collect final results from all aggregation nodes - Return to clientPush-Down Optimization
A critical optimization in NewSQL is 'pushing down' operations as close to the data as possible:
These optimizations can reduce network traffic by 100x compared to naive approaches.
WHERE region = 'EMEA' at each storage node before returning any data to the coordinator.id, name columns from customers, not all columns.SUM(amount) before shuffling; final aggregation merges partial sums.The heart of NewSQL's ACID guarantees is the distributed transaction protocol. NewSQL systems typically use an enhanced form of Two-Phase Commit (2PC) combined with consensus-based replication.
Two-Phase Commit Overview
The classic 2PC protocol coordinates transactions across multiple participants:
Phase 1 - Prepare:
PREPARE to all participants (nodes holding affected data)VOTE-COMMIT or VOTE-ABORTPhase 2 - Commit/Abort:
3. If all votes are COMMIT, coordinator sends COMMIT to all participants
4. If any vote is ABORT, coordinator sends ABORT to all participants
5. Participants apply or rollback changes, release locks
NewSQL Enhancements to 2PC
Classic 2PC has known limitations—particularly, coordinator failure after sending PREPARE can leave participants in an uncertain state. NewSQL databases address these with several enhancements:
1. Consensus-Based Coordinator
The coordinator isn't a single node but a Raft group. If the coordinator leader fails, another replica takes over with full knowledge of the transaction state.
2. Parallel Commits
Modern NewSQL systems (like CockroachDB's parallel commits) optimize the commit path:
123456789101112131415
// Traditional 2PC: 2 consensus roundsRound 1: Write intents to all participantsRound 2: Write transaction record as COMMITTED // Parallel Commits: 1 consensus roundRound 1 (parallel): - Write intents to all participants - Write transaction record as STAGING // Transaction is committed when:// - Transaction record shows STAGING (or later COMMITTED)// - AND all intents are successfully written // Background: Eventually flip STAGING -> COMMITTED// This is async and doesn't block the clientNewSQL systems typically default to serializable isolation—the strongest level. This prevents anomalies like write skew that weaker isolation allows. While serializable traditionally implied significant overhead, NewSQL implementations like serializable snapshot isolation (SSI) make it practical for most workloads.
How data is distributed across nodes fundamentally affects performance, scalability, and operational characteristics. NewSQL systems employ sophisticated distribution strategies.
Range-Based Partitioning
Most NewSQL databases partition data into ranges (contiguous key spans):
1234567891011121314151617181920
-- Example: Users table partitioned by user_id -- Range 1: user_id 0 - 999,999-- - Stored on Nodes {A, B, C} (3 replicas)-- - Leader: Node A -- Range 2: user_id 1,000,000 - 1,999,999 -- - Stored on Nodes {B, C, D} (3 replicas)-- - Leader: Node C -- Range 3: user_id 2,000,000 - 2,999,999-- - Stored on Nodes {A, D, E} (3 replicas)-- - Leader: Node D -- Query execution:SELECT * FROM users WHERE user_id = 1,500,000;-- Route directly to Range 2 leader (Node C) SELECT * FROM users WHERE user_id BETWEEN 900,000 AND 1,100,000;-- Parallel scan: Range 1 AND Range 2Automatic Range Splitting
As data grows, ranges are automatically split:
Automatic Rebalancing
The system continuously monitors range distribution:
| Characteristic | Range-Based (NewSQL) | Hash-Based (Traditional Sharding) |
|---|---|---|
| Key ordering | Preserved (efficient range scans) | Destroyed (random distribution) |
| Hotspot handling | Automatic splitting | Manual resharding |
| Adding nodes | Automatic rebalancing | Full data redistribution |
| Range queries | Efficient (scan contiguous ranges) | Scatter-gather (all shards) |
| Point queries | O(log N) range lookup | O(1) hash lookup |
| Complexity | More sophisticated metadata | Simpler, predictable |
Interleaved Tables (Colocation)
For related tables accessed together, NewSQL databases offer interleaving (or colocation):
-- Parent table
CREATE TABLE customers (
id INT PRIMARY KEY,
name STRING
);
-- Child table interleaved with parent
CREATE TABLE orders (
id INT,
customer_id INT,
amount DECIMAL,
PRIMARY KEY (customer_id, id),
INTERLEAVE IN PARENT customers (customer_id)
);
With interleaving:
Maintaining consistency across distributed nodes requires sophisticated mechanisms for ordering operations and resolving conflicts.
Transaction Ordering with Timestamps
Every transaction receives a timestamp that determines its position in the global order. The challenge is generating globally consistent timestamps across multiple nodes without a central authority.
Approach 1: Hybrid Logical Clocks (HLC)
Used by CockroachDB and other systems without specialized hardware:
12345678910111213141516171819202122232425262728
// Hybrid Logical Clock StructureHLC = { physical: int64, // Wall clock time in nanoseconds logical: int64 // Logical counter} // On local event (e.g., new transaction)function localEvent(hlc): now = getCurrentWallTime() if now > hlc.physical: hlc.physical = now hlc.logical = 0 else: hlc.logical += 1 return hlc // On receiving message with sender's HLCfunction receiveEvent(hlc, senderHLC): now = getCurrentWallTime() if now > max(hlc.physical, senderHLC.physical): hlc.physical = now hlc.logical = 0 else if hlc.physical == max(...): hlc.logical += 1 else: // senderHLC.physical is max hlc.physical = senderHLC.physical hlc.logical = senderHLC.logical + 1 return hlcApproach 2: TrueTime (Google Spanner)
Google Spanner uses TrueTime—a globally synchronized time system based on GPS receivers and atomic clocks in each data center. TrueTime provides:
We'll explore TrueTime in detail in the Google Spanner page.
Read Consistency Levels
NewSQL systems offer different read consistency options:
| Level | Guarantee | Performance | Use Case |
|---|---|---|---|
| Serializable | Reads reflect all prior committed transactions | May wait for uncertain transactions | Financial transactions, inventory |
| Read Committed | Reads see only committed data | Faster, no blocking | Most OLTP workloads |
| Stale Reads | Reads from snapshot N seconds ago | Fastest, any replica | Analytics, dashboards |
| Follower Reads | Reads from nearest replica (possibly stale) | Lowest latency, geo-distributed | User-facing reads where freshness isn't critical |
Applications can choose consistency levels per-query. A banking application might use serializable for balance checks but stale reads for transaction history display. This fine-grained control optimizes performance without sacrificing correctness where it matters.
Joins are where distributed SQL becomes complex. NewSQL systems employ multiple join strategies depending on data location and query characteristics.
Join Strategy Selection
The query optimizer evaluates:
123456789101112131415161718192021222324252627
-- Example: Join between orders and products -- STRATEGY 1: Lookup Join (when orders is filtered)SELECT o.id, p.nameFROM orders oJOIN products p ON o.product_id = p.idWHERE o.date = '2024-01-15';-- Few orders match date filter-- For each order, lookup product by id using index-- Minimal data movement -- STRATEGY 2: Broadcast Join (when products is small)SELECT o.id, p.name FROM orders oJOIN products p ON o.product_id = p.id;-- Products table is small (10,000 rows)-- Broadcast entire products table to all order nodes-- Each node joins locally -- STRATEGY 3: Hash Join (when both tables are large)SELECT o.id, u.name, SUM(o.amount)FROM orders oJOIN users u ON o.user_id = u.idGROUP BY o.id, u.name;-- Both tables are large, no useful indexes-- Shuffle both tables by user_id-- Perform hash join after co-locationJoin Ordering Optimization
For multi-table joins, order matters enormously. A query joining 5 tables has 5! = 120 possible join orderings. The optimizer must:
NewSQL optimizers use dynamic programming (similar to traditional optimizers) but with additional cost models for network transfer and distributed coordination.
The best join is one you never perform across nodes. Designing schemas with colocation in mind—interleaving child tables, choosing partition keys that match common join keys—can eliminate distributed join overhead for the most critical queries.
Secondary indexes enable efficient queries on non-primary-key columns. In distributed databases, implementing them correctly is non-trivial.
The Challenge
Consider a Users table partitioned by user_id (primary key):
CREATE TABLE users (
user_id INT PRIMARY KEY,
email STRING UNIQUE,
region STRING,
created_at TIMESTAMP
);
A query by email (WHERE email = 'alice@example.com') can't use the partition structure—the email could belong to any user_id, on any node.
Solution 1: Global Indexes
Create a separate, independently partitioned index:
123456789101112131415161718
// Global Index Structure for 'email' // Index is partitioned by email (not user_id)// Range A: emails 'a...' to 'm...' -> Nodes {1, 2, 3}// Range B: emails 'n...' to 'z...' -> Nodes {4, 5, 6} // Index entry format:// { email: 'alice@example.com', user_id: 12345 } // Query: WHERE email = 'alice@example.com'// 1. Hash/range lookup in global email index// 2. Find user_id = 12345 // 3. Fetch user row from users table using user_id // Trade-offs:// + Fast reads (single index lookup)// - Writes must update both table AND index (cross-node transaction)// - Index updates increase transaction latencySolution 2: Local Indexes
Each node indexes only its own data:
Node 1 (users 0-999):
Local email index: { 'alice@example.com' -> 50 }
Node 2 (users 1000-1999):
Local email index: { 'bob@example.com' -> 1500 }
Solution 3: Covering Indexes
Include additional columns in the index to avoid table lookup:
1234567891011121314
-- Covering index includes non-key columnsCREATE INDEX idx_users_email_covering ON users (email) STORING (name, region); -- Query that can be answered from index alone:SELECT name, region FROM users WHERE email = 'alice@example.com'; -- Execution:-- 1. Lookup in global email index-- 2. Index entry contains name and region-- 3. No secondary fetch from users table needed -- Trade-off: Larger index, but faster reads| Index Type | Read Performance | Write Performance | Storage Cost |
|---|---|---|---|
| Global Index | O(1) lookup | Cross-node transaction | Separate replication |
| Local Index | Scatter to all nodes | Local update only | Stored with table data |
| Covering Index | No table lookup | Larger updates | Higher storage for extra columns |
| Partial Index | O(1) for filtered rows | Only indexed rows updated | Smaller index size |
NewSQL's ultimate promise is linear scalability—doubling nodes should approximately double throughput. Let's examine how this works and its practical limits.
Scaling Dimensions
NewSQL databases scale along multiple dimensions:
Factors That Limit Linear Scaling
Real-world workloads may not scale perfectly linearly due to:
1. Hotspots
2. Cross-Shard Transactions
3. Global Indexes
4. Large Scans
| Workload Pattern | Scaling Behavior | Optimization Approach |
|---|---|---|
| Point reads by primary key | Near-perfect linear | Increase replica count |
| Point writes to distributed keys | Near-linear | Proper partition key selection |
| Range scans with partition filter | Linear within partition | Ensure filter uses partition key |
| Full table scans | Parallel but fixed overhead | Add covering indexes, materialized views |
| Multi-shard transactions | Sub-linear (coordination overhead) | Schema colocation, batch operations |
| Hotspot writes (single key) | No scaling (single leader) | Redesign schema, use sequences |
Production NewSQL deployments regularly demonstrate near-linear scaling. CockroachDB benchmarks show 1 million+ writes/second across 256 nodes. Google Spanner handles Google's advertising infrastructure—billions of transactions daily. The technology delivers on its promise for well-designed workloads.
We've explored the deep technical mechanisms that enable NewSQL databases to provide SQL semantics at distributed scale. Let's consolidate the key insights:
What's Next
With the architectural foundations understood, we'll now examine specific NewSQL implementations:
You now understand how NewSQL databases achieve the combination of SQL semantics and horizontal scalability. The distributed query processing, transaction protocols, and data distribution strategies work together to deliver what was once considered impossible. Next, we'll see these concepts in action with Google Spanner.