Loading content...
The basic hash join algorithm assumes a simple premise: the build relation fits in available memory. This assumption enables single-pass processing and O(1) lookups—the foundation of hash join efficiency. But real-world data regularly violates this assumption.
A 100 GB table joined against a 10 GB table cannot build a 10+ GB hash table when only 2 GB of memory is available. Without careful handling, the system would either crash, thrash to disk, or produce incorrect results. Partition handling solves this problem through divide-and-conquer: split both relations into smaller partitions that individually fit in memory, then process each partition pair independently.
By the end of this page, you will understand why partitioning is necessary, how partition-based hash joins work, the guarantees that make partitioning correct, memory allocation strategies, handling of partition overflow, and the I/O cost implications. This knowledge is essential for understanding how databases handle joins of arbitrary scale.
When the build relation is larger than available memory, three problematic scenarios can occur:
Scenario 1: Hash table allocation fails The system attempts to allocate memory for the hash table and receives an out-of-memory error. The query fails immediately.
Scenario 2: Uncontrolled spilling to disk The operating system's virtual memory swaps hash table pages to disk. Random access patterns cause severe thrashing—potentially 1000× slower than in-memory execution.
Scenario 3: Partial hash table Only part of the build relation fits. Probes against missing entries produce incorrect results (missing matches).
The fundamental constraint:
Let M = available memory in pages, B = block size Let |R| = size of build relation, |S| = size of probe relation
For single-pass hash join:
When this inequality doesn't hold, we need a different strategy.
| Build Size vs Memory | Strategy | Passes Required | Complexity |
|---|---|---|---|
| < M | Simple hash join | 1 pass each relation | O(|R| + |S|) |
| M to 2M | Partition (small) | 2 passes each | O(2 × (|R| + |S|)) |
| 2M to M² | Partition (standard) | 2-3 passes each | O(2-3 × (|R| + |S|)) |
M² | Recursive partition | Multiple passes | O(k × (|R| + |S|)) where k ≥ 3 |
With M pages of memory, we can create at most M-1 output partitions (one page per partition buffer plus one page for input). Each partition can grow up to M pages in the subsequent phase. This means single-level partitioning handles build relations up to M × (M-1) ≈ M² pages. Beyond this, recursive partitioning is necessary—a key threshold in hash join theory.
Partition-based hash join splits the problem into manageable pieces. The core insight: if we partition both relations using the same hash function on the join key, matching tuples are guaranteed to be in corresponding partitions.
This guarantee is crucial. A tuple from R with key value k will hash to partition i. Any matching tuple from S with the same key k will also hash to partition i. Therefore, we only need to compare tuples within matching partition pairs—partition i of R against partition i of S.
The two-phase algorithm:
Phase 1: Partition
Phase 2: Join
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
// Partitioned hash join algorithmclass PartitionedHashJoin { int num_partitions; vector<PartitionFile> build_partitions; vector<PartitionFile> probe_partitions; HashFunction partition_hash; // hash₁ HashFunction join_hash; // hash₂ void execute(Relation* R, Relation* S, ResultSet* output) { // Phase 1: Partition both relations partition_relation(R, build_partitions); partition_relation(S, probe_partitions); // Phase 2: Join corresponding partitions for (int i = 0; i < num_partitions; i++) { if (build_partitions[i].empty()) { // No build tuples in this partition = no output continue; } // Build hash table from partition i of R HashTable* ht = build_hash_table(build_partitions[i]); // Probe with partition i of S probe_and_output(probe_partitions[i], ht, output); delete ht; } } private: void partition_relation(Relation* rel, vector<PartitionFile>& partitions) { // Allocate one buffer page per partition vector<Page*> buffers(num_partitions); for (int i = 0; i < num_partitions; i++) { buffers[i] = allocate_page(); } // Scan relation and partition for (Tuple& tuple : *rel) { int p = partition_hash(tuple.join_key) % num_partitions; if (buffers[p]->is_full()) { // Flush buffer to disk partitions[p].write(buffers[p]); buffers[p]->clear(); } buffers[p]->add(tuple); } // Flush remaining tuples for (int i = 0; i < num_partitions; i++) { if (!buffers[i]->empty()) { partitions[i].write(buffers[i]); } free_page(buffers[i]); } } HashTable* build_hash_table(PartitionFile& partition) { HashTable* ht = new HashTable(); for (Page* page : partition.pages()) { for (Tuple& tuple : *page) { // Use second hash function for hash table uint64_t h = join_hash(tuple.join_key); ht->insert(h, tuple); } } return ht; }};Partitioned hash join's correctness rests on a fundamental property of hash functions: determinism. The same key always produces the same hash value, and thus maps to the same partition.
Formal guarantee:
Let R and S be relations with join attribute A. Let h be a hash function and p(k) = h(k) mod n be the partition function.
For any two tuples r ∈ R and s ∈ S: If r.A = s.A, then p(r.A) = p(s.A)
Proof: Since r.A = s.A, and h is a function (deterministic), h(r.A) = h(s.A). Therefore h(r.A) mod n = h(s.A) mod n, which means p(r.A) = p(s.A). ∎
This means matching tuples always land in the same partition. No matches are missed; no cross-partition comparisons are needed.
We use one hash function (h₁) for partitioning and a different one (h₂) for the in-partition hash table. Why? If we used the same function, all tuples in partition i would have the same h mod n value, causing poor distribution in the hash table. Using a different function ensures good distribution within each partition's hash table.
1234567891011121314151617181920212223242526272829303132333435363738394041
// Using two different hash functionsclass HashFunctions { // Hash function 1: for partitioning // Uses one set of mixing constants static uint64_t partition_hash(const JoinKey& key) { uint64_t h = key.as_uint64(); // MurmurHash-style mixing h ^= h >> 33; h *= 0xff51afd7ed558ccdULL; h ^= h >> 33; h *= 0xc4ceb9fe1a85ec53ULL; h ^= h >> 33; return h; } // Hash function 2: for in-partition hash table // Uses different constants or algorithm static uint64_t table_hash(const JoinKey& key) { uint64_t h = key.as_uint64(); // Different mixing pattern h *= 0x9e3779b97f4a7c15ULL; // Based on golden ratio h ^= h >> 30; h *= 0xbf58476d1ce4e5b9ULL; h ^= h >> 27; h *= 0x94d049bb133111ebULL; h ^= h >> 31; return h; }}; // Example showing why different functions mattervoid demonstrate_hash_difference() { // All keys that map to partition 5 (out of 16) using partition_hash // have partition_hash(key) mod 16 == 5 // If we used partition_hash for the hash table with 32 buckets: // All these keys would map to buckets 5 or 21 only! // (because hash mod 16 == 5 implies hash mod 32 is 5 or 21) // Using table_hash distributes these keys across all 32 buckets}NULL handling in partitioning:
NULL join keys present a special case. Since NULL ≠ NULL in SQL, tuples with NULL keys never match. These tuples can be:
The key insight is that NULL-key tuples need consistent handling across build and probe partitioning—if they're excluded, they must be excluded from both sides.
Choosing the right number of partitions is a critical decision that balances multiple factors:
Too few partitions:
Too many partitions:
The goal: Choose n such that each partition of the build relation fits in memory, with some safety margin.
Partition count formula:
Given:
Minimum partitions needed:
n ≥ f × |R| / M
Maximum practical partitions (given output buffers during partitioning):
n ≤ M - 1 (one page per partition buffer, leaving one for input)
Example calculation:
Minimum: n ≥ 1.2 × 10,000 / 1000 = 12 partitions Maximum: n ≤ 999 (plenty of room)
We might choose n = 16 (power of 2 for efficient modulo), giving expected partition size of 625 pages, well within the 1000-page memory limit.
Partition count decisions are made before reading data, based on estimated relation sizes. When estimates are wrong (often 10× or more for complex queries), partitions may be too large (requiring recursive partitioning) or too small (wasting resources). Adaptive approaches that adjust during execution are increasingly common in modern systems.
12345678910111213141516171819202122232425262728293031323334353637383940414243
// Partition count determinationclass PartitionPlanner { size_t available_memory; // In bytes size_t page_size; double fudge_factor; int compute_partition_count( size_t estimated_build_size, // bytes size_t estimated_tuple_width ) { size_t memory_pages = available_memory / page_size; size_t build_pages = (estimated_build_size + page_size - 1) / page_size; // Account for hash table overhead // Hash table typically needs 1.5-2x the raw data size size_t ht_pages = build_pages * 2; // Minimum partitions to ensure each fits in memory int min_partitions = (int)ceil(fudge_factor * ht_pages / memory_pages); // Maximum partitions limited by buffer pool allocation // Need one buffer page per partition during partitioning phase int max_partitions = memory_pages - 2; // Reserve pages for I/O // Choose power of 2 for efficient modulo int n = 1; while (n < min_partitions) n *= 2; // Clamp to valid range return min(n, max_partitions); } // Validate that partitions will fit bool validate_partition_plan(int n, size_t build_size) { size_t expected_partition_size = build_size / n; size_t ht_size = expected_partition_size * 2; // 2x for hash table // Allow for skew: largest partition might be 2-3× average size_t worst_case = ht_size * 3; return worst_case < available_memory * 0.9; // 90% threshold }};Partitioning adds I/O cost compared to simple hash join. Understanding this cost is essential for optimizer decisions about join strategies.
Cost breakdown for two-pass partitioned hash join:
Phase 1 (Partitioning):
Total Phase 1: 2|R| + 2|S| page I/Os
Phase 2 (Join):
Total Phase 2: |R| + |S| page I/Os
Grand total: 3|R| + 3|S| page I/Os
Compare to simple hash join (when build fits in memory):
Partitioning costs 3× the I/O of simple hash join.
| Join Type | I/O Cost | When Appropriate |
|---|---|---|
| Simple hash join | |R| + |S| | Build relation fits in memory |
| 2-pass partitioned | 3 × (|R| + |S|) | Build > memory, but partition fits |
| 3-pass partitioned | 5 × (|R| + |S|) | Need recursive partitioning |
| Sort-merge join | 3 × (|R| + |S|) | When data is nearly sorted |
| Block nested loop | |S| × ⌈|R|/M⌉ + |R| | When very limited memory |
While partitioned hash join uses 3× the page I/Os of simple hash join, most of these are sequential. Partition files are written and read sequentially. On HDDs, sequential I/O can be 100× faster than random I/O. On SSDs, the gap is smaller (3-10×) but still significant. This makes partitioned hash join practical even at 3× the page count.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
// I/O cost estimation for partitioned hash joinstruct IOCostEstimate { size_t build_pages; size_t probe_pages; size_t memory_pages; double io_cost_per_page; // milliseconds double random_io_penalty; // multiplier for random vs sequential // Simple hash join (if build fits) double simple_hash_join_cost() { if (build_pages * 2 > memory_pages) { return INFINITY; // Won't fit } // Sequential read of both relations return (build_pages + probe_pages) * io_cost_per_page; } // Two-pass partitioned hash join double partitioned_hash_join_cost() { // Phase 1: read + write both relations double phase1 = 2 * (build_pages + probe_pages) * io_cost_per_page; // Phase 2: read both relations again double phase2 = (build_pages + probe_pages) * io_cost_per_page; return phase1 + phase2; } // Block nested loop for comparison double block_nested_loop_cost() { size_t blocks_of_R = (build_pages + memory_pages - 1) / (memory_pages - 1); // Read R once, read S once per block of R double cost = build_pages * io_cost_per_page; cost += probe_pages * blocks_of_R * io_cost_per_page; // Add penalty for random access pattern cost *= random_io_penalty * 0.3; // Partial random factor return cost; }}; // Example: join 100,000-page R with 1,000,000-page S, 10,000-page memory// Simple hash join: not possible (R doesn't fit)// Partitioned: 3 × (100K + 1M) × 1ms = 3,300 seconds// BNL: 100K × 1ms + 1M × 10 × 1ms = 10,100 seconds// Partitioned hash join wins by 3×Even with careful partition count selection, individual partitions may overflow memory. This happens due to:
Systems must detect and handle overflow gracefully.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
// Recursive partitioning for overflowvoid process_partition_recursive( PartitionFile& build_partition, PartitionFile& probe_partition, int depth, ResultSet* output) { // Read build partition and check size size_t partition_size = build_partition.size_bytes(); if (partition_size <= available_memory * 0.8) { // Base case: partition fits - do simple hash join HashTable* ht = build_hash_table(build_partition); probe_and_output(probe_partition, ht, output); delete ht; return; } // Overflow case: need to re-partition if (depth > MAX_RECURSION_DEPTH) { // Extreme skew detected - fall back to nested loop log_warning("Partition overflow at max depth, using nested loop"); nested_loop_join(build_partition, probe_partition, output); return; } // Recursive case: re-partition this partition into sub-partitions int sub_partitions = compute_sub_partition_count(partition_size); vector<PartitionFile> build_subparts(sub_partitions); vector<PartitionFile> probe_subparts(sub_partitions); // Use different hash function for sub-partitioning HashFunction sub_hash = get_hash_function(depth + 1); // Re-partition build side for (Tuple& t : build_partition) { int sp = sub_hash(t.join_key) % sub_partitions; build_subparts[sp].append(t); } // Re-partition probe side for (Tuple& t : probe_partition) { int sp = sub_hash(t.join_key) % sub_partitions; probe_subparts[sp].append(t); } // Recursively process sub-partitions for (int i = 0; i < sub_partitions; i++) { if (!build_subparts[i].empty()) { process_partition_recursive( build_subparts[i], probe_subparts[i], depth + 1, output ); } }}If many tuples share the exact same join key value, no amount of re-partitioning helps—they'll always end up together. For example, if 1 million orders reference customer_id = 12345, partitioning always puts those 1 million tuples together. The fallback to nested loop (or other strategies like building a partial hash table) handles this edge case.
Effective memory allocation during partitioning significantly impacts performance. The challenge: balance between I/O efficiency (larger buffers) and partition count (more partitions require more buffers).
Buffer allocation during partitioning:
With M pages of memory and n partitions:
Smaller output buffers mean more frequent flushes to disk, increasing I/O count but reducing I/O due to larger sequential writes when buffers are full.
Trade-off analysis:
With 1000 pages memory and 100 partitions:
Each partition buffer holds ~10 pages before flushing. With build relation of 10,000 pages:
| Strategy | I/O Pattern | Pros | Cons |
|---|---|---|---|
| Single page per partition | Many small writes | Simple, supports many partitions | Poor I/O efficiency |
| Multi-page buffers | Fewer, larger writes | Good sequential I/O | Limits partition count |
| Dynamic allocation | Varies | Adapts to actual usage | Complex management |
| Double buffering | Overlapped I/O | Maximum throughput | 2× memory overhead |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
// Buffer allocation strategies for partitioningclass PartitionBufferManager { size_t total_memory; int num_partitions; // Strategy 1: Fixed equal allocation vector<Buffer> fixed_equal_allocation() { size_t input_reserved = 4 * PAGE_SIZE; size_t available = total_memory - input_reserved; size_t per_partition = available / num_partitions; vector<Buffer> buffers; for (int i = 0; i < num_partitions; i++) { buffers.push_back(Buffer(per_partition)); } return buffers; } // Strategy 2: Dynamic with stealing class DynamicBufferPool { size_t total; size_t allocated; vector<Buffer*> partition_buffers; void write_and_potentially_expand(int partition, const Tuple& t) { Buffer* buf = partition_buffers[partition]; if (buf->is_full()) { // Flush to disk flush_buffer(partition); // Try to expand if under-average allocation size_t avg = total / num_partitions; if (buf->size < avg && allocated < total) { expand_buffer(buf, min(avg, total - allocated)); } } buf->add(t); } }; // Strategy 3: Double buffering for async I/O class DoubleBufferPartition { Buffer* active; // Currently filling Buffer* flushing; // Being written to disk AsyncWriter* writer; void add(const Tuple& t) { if (active->is_full()) { // Wait for previous flush to complete writer->wait(); // Swap buffers swap(active, flushing); // Start async write of full buffer writer->write_async(flushing); } active->add(t); } };};Modern systems overlap computation and I/O using asynchronous writes. While one buffer is being written to disk, another receives new tuples. This hides I/O latency behind CPU work, achieving near-maximum disk throughput. Linux's io_uring, Windows IOCP, and similar mechanisms enable this with minimal overhead.
Data skew is the nemesis of partitioned hash join. When join key values have highly non-uniform distribution, some partitions grow much larger than others, negating the benefits of partitioning.
Common skew scenarios:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
// Heavy hitter detection and special handlingclass SkewAwarePartitioner { unordered_map<JoinKey, size_t> key_counts; size_t skew_threshold; set<JoinKey> heavy_hitters; // Phase 0: Sample to identify heavy hitters void detect_heavy_hitters(Relation* R, double sample_rate) { size_t sample_count = 0; for (Tuple& t : *R) { if (random() < sample_rate) { key_counts[t.join_key]++; sample_count++; } } // Keys appearing > 1% of samples are heavy hitters size_t threshold = sample_count / 100; for (auto& [key, count] : key_counts) { if (count > threshold) { heavy_hitters.insert(key); } } log_info("Detected {} heavy hitters", heavy_hitters.size()); } // Modified partitioning with heavy hitter handling void partition_with_skew_handling(Relation* R, vector<PartitionFile>& normal_partitions, vector<PartitionFile>& heavy_partitions) { for (Tuple& t : *R) { if (heavy_hitters.count(t.join_key)) { // Heavy hitter: use separate partition strategy // Option 1: Dedicated partition per heavy key // Option 2: Round-robin across multiple partitions int hh_partition = hash_heavy_hitter(t.join_key); heavy_partitions[hh_partition].append(t); } else { // Normal key: standard hash partitioning int partition = partition_hash(t.join_key) % num_partitions; normal_partitions[partition].append(t); } } } // Join heavy hitter partitions with broadcast void join_heavy_hitters(vector<PartitionFile>& build_heavy, vector<PartitionFile>& probe_heavy, ResultSet* output) { // For each heavy hitter, all matching probe tuples are in one partition // But we might replicate build tuples for parallel processing for (const JoinKey& hh_key : heavy_hitters) { // Simple approach: build hash table from heavy build tuples // and probe all matching probe tuples join_single_heavy_hitter(hh_key, build_heavy, probe_heavy, output); } }};Skew mitigation strategies are often workload-specific. A star schema with dimension tables rarely shows skew (dimension keys are uniformly distributed). But OLTP workloads joining on user_id or account_id frequently exhibit power-law distributions. Production systems often allow hints or automatic mode selection based on observed statistics.
Partition handling transforms hash join from a memory-bound algorithm to one that scales to arbitrary data sizes. The cost is additional I/O, but the ability to handle joins of any scale makes this trade-off essential. Let's consolidate the key concepts:
What's Next:
We've established the fundamentals of partition handling. The next page explores Grace Hash Join—a specific algorithm that systematically applies partitioning with optimal I/O behavior, and has influenced virtually all modern hash join implementations.
You now understand partition handling in hash joins: why partitioning is necessary, how it preserves correctness, partition count determination, I/O cost analysis, overflow handling, memory allocation strategies, and skew mitigation. This knowledge is essential for understanding how databases scale join operations to any data size.