Loading learning content...
Hybrid Hash Join represents the culmination of hash join algorithm evolution—a sophisticated approach that combines the in-memory efficiency of simple hash join with the scalability of Grace hash join. By retaining one partition in memory during the partitioning phase, hybrid hash join produces output immediately without writing or reading that partition to disk.
This optimization is not merely incremental. For joins where data is "somewhat larger" than memory—a common real-world scenario—hybrid hash join can reduce I/O by 30-50% compared to pure Grace. It's the algorithm of choice in virtually all modern commercial and open-source database systems, from PostgreSQL to Oracle to SQL Server.
By the end of this page, you will understand the hybrid hash join algorithm in depth: how it keeps partition 0 in memory, when this optimization pays off, memory allocation strategies, cost analysis across different scenarios, and implementation considerations. This knowledge represents the state of the art in hash join algorithms.
Grace hash join's weakness is that it always partitions—even when a significant portion of data could be processed in memory. Consider a scenario:
But notice: 100 MB of the build relation could fit in memory. Why not process that portion immediately, writing only the remaining 50 MB to disk?
The hybrid insight:
During the partitioning phase, allocate memory as follows:
As build tuples arrive:
During probe phase:
The result: partition 0 is never written to or read from disk—saving 2× I/O cost for that portion of the data.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
// Conceptual Hybrid Hash Joinclass HybridHashJoin { HashTable* partition0_ht; // In-memory hash table for partition 0 vector<DiskPartition> build_partitions; // Disk partitions 1..n-1 vector<DiskPartition> probe_partitions; size_t memory_for_ht; // Memory allocated to partition 0 size_t memory_for_buffers; // Memory for disk partition buffers void execute(Relation* R, Relation* S, ResultSet* output) { // Phase 1: Build - partition R, keeping partition 0 in memory partition0_ht = new HashTable(estimated_p0_size); for (const Tuple& tuple : *R) { int p = partition_hash(tuple.join_key) % num_partitions; if (p == 0) { // Keep in memory partition0_ht->insert(join_hash(tuple.join_key), tuple); } else { // Write to disk partition write_to_partition(build_partitions[p], tuple); } } // Phase 2: Probe - partition S, joining partition 0 immediately for (const Tuple& tuple : *S) { int p = partition_hash(tuple.join_key) % num_partitions; if (p == 0) { // Probe immediately against in-memory hash table for (Tuple* match : partition0_ht->find_all( join_hash(tuple.join_key), tuple.join_key)) { output->emit(tuple, *match); } } else { // Write to disk for later processing write_to_partition(probe_partitions[p], tuple); } } // Phase 3: Process remaining disk-based partition pairs (like Grace) for (int i = 1; i < num_partitions; i++) { if (!build_partitions[i].is_empty()) { join_partition_pair(build_partitions[i], probe_partitions[i], output); } } delete partition0_ht; }};The choice of partition 0 is arbitrary—any partition could be kept in memory. However, keeping the first partition is conventional and simplifies implementation. Some systems adaptively choose which partition to keep based on actual observed sizes during partitioning.
The key challenge in hybrid hash join is dividing memory between the in-memory partition and disk partition buffers. Allocating too much to partition 0 leaves insufficient buffer space; allocating too little wastes the opportunity to process data in-memory.
Memory division decision:
Given M pages of memory, n partitions, and expected build relation size |R|:
Optimal allocation:
Let f = fraction of memory for partition 0's hash table
To maximize benefit:
Solving: f ≤ 1 - (n-1)/M
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
// Memory allocation for Hybrid Hash Joinclass HybridMemoryAllocator { size_t total_memory; size_t page_size; double hash_table_overhead = 1.5; // Hash table uses 1.5× raw data struct AllocationPlan { size_t ht_memory; // For partition 0 hash table size_t buffer_memory; // For disk partition buffers int num_partitions; size_t per_buffer_size; // Size of each disk buffer }; AllocationPlan compute_allocation(size_t estimated_build_size) { AllocationPlan plan; // First, determine number of partitions needed // Each partition (except 0) must fit in remaining memory later // Available for later = M - any lingering overhead size_t join_phase_memory = total_memory * 0.9; // 90% usable // Expected size per partition after partitioning plan.num_partitions = 1; // Start with minimum while (true) { size_t partition_size = estimated_build_size / plan.num_partitions; size_t ht_size = partition_size * hash_table_overhead; if (ht_size <= join_phase_memory) { break; // Partitions will fit } plan.num_partitions *= 2; // Double partitions } // Now allocate memory between partition 0 and buffers // Partition 0 expected size size_t expected_p0_size = estimated_build_size / plan.num_partitions; size_t p0_ht_size = expected_p0_size * hash_table_overhead; // Memory needed for disk buffers (1 page minimum per partition) size_t min_buffer_memory = (plan.num_partitions - 1) * page_size; // Can we fit partition 0 and minimum buffers? if (p0_ht_size + min_buffer_memory > total_memory) { // Reduce partition 0 allocation plan.ht_memory = total_memory - min_buffer_memory; } else { // Plenty of room: use expected size, give rest to buffers plan.ht_memory = p0_ht_size; } plan.buffer_memory = total_memory - plan.ht_memory; plan.per_buffer_size = plan.buffer_memory / (plan.num_partitions - 1); return plan; } // Dynamic adjustment during execution void handle_p0_overflow(AllocationPlan& plan, HashTable* ht) { // If partition 0 exceeds allocation, need to spill // Options: // 1. Convert to full Grace (spill p0 to disk) // 2. Steal memory from disk buffers (reduce their size) // 3. Spill oldest entries from p0 (complex LRU-style) // Option 1 is simplest and most common if (ht->memory_used() > plan.ht_memory * 1.2) { // Spill partition 0 to disk, convert to pure Grace DiskPartition p0_disk = spill_to_disk(ht); // Continue as Grace from here... } }};If partition 0 grows larger than expected (due to estimation errors or skew), hybrid hash join can run out of memory. Production systems monitor partition 0's growth and either spill it to disk (converting to Grace) or employ partial spilling strategies. The fallback to Grace ensures correct execution even when estimates are wrong.
Hybrid hash join's I/O cost falls between simple hash join (when everything fits) and Grace hash join (when nothing fits). The savings depend on how much data stays in memory.
Cost formula:
Let p₀ = fraction of data in partition 0 (approximately 1/n for n partitions)
For partition 0 (in-memory):
For disk partitions (like Grace):
Total hybrid cost:
Cost_hybrid = p₀ × (|R| + |S|) + 3 × (1-p₀) × (|R| + |S|)
= (3 - 2p₀) × (|R| + |S|)
Comparison:
| p₀ (in-memory fraction) | Hybrid Cost | vs. Grace 3(|R|+|S|) | I/O Savings |
|---|---|---|---|
| 0.0 (pure Grace) | 3.0 × (|R|+|S|) | Same | 0% |
| 0.1 (10% in memory) | 2.8 × (|R|+|S|) | 93% | 7% |
| 0.25 | 2.5 × (|R|+|S|) | 83% | 17% |
| 0.5 | 2.0 × (|R|+|S|) | 67% | 33% |
| 0.75 | 1.5 × (|R|+|S|) | 50% | 50% |
| 1.0 (pure in-memory) | 1.0 × (|R|+|S|) | 33% | 67% |
When hybrid shines:
Hybrid provides the greatest benefit when a significant fraction of data fits in memory. This is common in real workloads:
When hybrid doesn't help:
Some systems start with the assumption that data might fit in memory (simple hash join attempt). Only when memory pressure is detected do they switch to hybrid or Grace. This adaptive approach avoids partitioning overhead for workloads that happen to fit, while gracefully handling overflow.
Let's trace through hybrid hash join in complete detail, understanding each decision point and data flow:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
// Detailed Hybrid Hash Join Implementationclass DetailedHybridHashJoin { // Configuration int num_partitions; size_t total_memory; size_t p0_memory_limit; // State HashTable* p0_hash_table; vector<PartitionBuffer> build_buffers; // For partitions 1..n-1 vector<PartitionBuffer> probe_buffers; vector<DiskPartition> build_partitions; vector<DiskPartition> probe_partitions; bool p0_spilled = false; // Track if partition 0 has been spilled public: void execute(Relation* R, Relation* S, ResultSet* output) { // ======================================== // INITIALIZATION // ======================================== initialize_partitions(); p0_hash_table = new HashTable(); // ======================================== // PHASE 1: BUILD - Partition R, keep p0 in memory // ======================================== for (const Tuple& tuple : *R) { int p = partition_hash(tuple.join_key) % num_partitions; if (p == 0 && !p0_spilled) { // Try to insert into in-memory hash table if (!try_insert_p0(tuple)) { // Memory limit reached - spill partition 0 spill_partition_0(); // Now write to disk partition instead write_to_disk_partition(0, tuple); } else { p0_hash_table->insert(join_hash(tuple.join_key), tuple); } } else { // Write to disk partition write_to_disk_partition(p, tuple); } } // Flush all disk partition buffers flush_all_build_buffers(); // ======================================== // PHASE 2: PROBE - Partition S, probe p0 immediately // ======================================== for (const Tuple& tuple : *S) { int p = partition_hash(tuple.join_key) % num_partitions; if (p == 0 && !p0_spilled) { // Probe against in-memory hash table immediately auto matches = p0_hash_table->find_all( join_hash(tuple.join_key), tuple.join_key); for (Tuple* match : matches) { output->emit(tuple, *match); } // Note: tuple NOT written to disk } else { // Write to disk partition for later processing write_to_probe_partition(p, tuple); } } // Flush all probe partition buffers flush_all_probe_buffers(); // ======================================== // PHASE 3: JOIN remaining partitions (Grace-style) // ======================================== // Free partition 0's hash table to make room for others delete p0_hash_table; p0_hash_table = nullptr; // Start from partition 0 if it was spilled, else from 1 int start_partition = p0_spilled ? 0 : 1; for (int i = start_partition; i < num_partitions; i++) { if (build_partitions[i].is_empty()) { continue; // Skip empty partitions } join_disk_partition_pair(i, output); } } private: bool try_insert_p0(const Tuple& tuple) { size_t tuple_overhead = sizeof(HashEntry) + tuple.size(); return (p0_hash_table->memory_used() + tuple_overhead) <= p0_memory_limit; } void spill_partition_0() { log_warning("Partition 0 exceeded memory limit, spilling to disk"); // Write all entries from hash table to disk partition for (HashEntry& entry : *p0_hash_table) { write_to_disk_partition(0, entry.tuple); } // Clear hash table but keep structure for potential reuse p0_hash_table->clear(); p0_spilled = true; } void join_disk_partition_pair(int partition_idx, ResultSet* output) { // Load build partition into memory HashTable* ht = new HashTable(); if (build_partitions[partition_idx].size() > (total_memory * 0.8)) { // Won't fit - need recursive partitioning recursive_partition_and_join(partition_idx, output); return; } for (const Page& page : build_partitions[partition_idx].pages()) { for (const Tuple& tuple : page) { ht->insert(join_hash(tuple.join_key), tuple); } } // Probe with corresponding probe partition for (const Page& page : probe_partitions[partition_idx].pages()) { for (const Tuple& tuple : page) { auto matches = ht->find_all( join_hash(tuple.join_key), tuple.join_key); for (Tuple* match : matches) { output->emit(tuple, *match); } } } delete ht; }};Hybrid must complete partitioning of R before starting S because partition 0's hash table serves as both the build destination (phase 1) and the probe target (phase 2). If we interleaved tuples from both relations, we couldn't distinguish partially-built hash table entries from complete ones.
The basic hybrid algorithm keeps only partition 0 in memory. But if memory permits, why not keep multiple partitions in memory? This extension, sometimes called "multi-partition hybrid" or "extended hybrid hash join," maximizes the in-memory fraction.
The idea:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
// Multi-partition Hybrid Hash Joinclass MultiPartitionHybrid { int num_partitions; int num_in_memory; // Number of partitions kept in memory vector<HashTable*> in_memory_tables; // Partitions 0..k-1 vector<DiskPartition> disk_build_parts; // Partitions k..n-1 vector<DiskPartition> disk_probe_parts; void initialize(size_t build_size_estimate, size_t memory) { // Determine total partitions needed num_partitions = compute_partition_count(build_size_estimate, memory); // How many can we fit in memory? // Each partition is approximately build_size / num_partitions size_t per_partition = build_size_estimate / num_partitions; size_t per_partition_ht = per_partition * 1.5; // 1.5× overhead // Leave some memory for disk buffers size_t memory_for_hts = memory * 0.8; num_in_memory = min( (int)(memory_for_hts / per_partition_ht), num_partitions ); // Allocate hash tables for in-memory partitions in_memory_tables.resize(num_in_memory); for (int i = 0; i < num_in_memory; i++) { in_memory_tables[i] = new HashTable(); } // Allocate disk partitions for the rest disk_build_parts.resize(num_partitions - num_in_memory); disk_probe_parts.resize(num_partitions - num_in_memory); } void process_build_tuple(const Tuple& tuple) { int p = partition_hash(tuple.join_key) % num_partitions; if (p < num_in_memory) { // In-memory partition in_memory_tables[p]->insert(join_hash(tuple.join_key), tuple); } else { // Disk partition disk_build_parts[p - num_in_memory].append(tuple); } } void process_probe_tuple(const Tuple& tuple, ResultSet* output) { int p = partition_hash(tuple.join_key) % num_partitions; if (p < num_in_memory) { // Probe against in-memory hash table immediately auto matches = in_memory_tables[p]->find_all( join_hash(tuple.join_key), tuple.join_key); for (Tuple* match : matches) { output->emit(tuple, *match); } } else { // Write to disk for later processing disk_probe_parts[p - num_in_memory].append(tuple); } } void finish(ResultSet* output) { // Free in-memory hash tables for (auto* ht : in_memory_tables) { delete ht; } in_memory_tables.clear(); // Process disk partitions (Grace-style) for (int i = 0; i < disk_build_parts.size(); i++) { if (!disk_build_parts[i].is_empty()) { join_partition_pair(disk_build_parts[i], disk_probe_parts[i], output); } } }};With multiple in-memory hash tables, memory becomes fragmented. If partition 3's hash table needs to grow, it may be blocked by partition 2's adjacent memory. Some implementations use a single contiguous hash table with partition tags, avoiding fragmentation at the cost of slightly more complex lookup logic.
A critical challenge in hybrid hash join is handling cases where partition 0 (or multiple in-memory partitions) grows beyond the allocated memory. This can happen due to:
Systems employ several strategies to handle this gracefully:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
// Adaptive overflow handling in Hybrid Hash Joinclass AdaptiveHybridHashJoin { enum class OverflowStrategy { FULL_SPILL, // Convert to Grace PARTIAL_SPILL, // Spill portion of p0 STEAL_BUFFERS, // Reduce disk buffers ADAPTIVE_SELECT // Switch which partition is in-memory }; OverflowStrategy strategy = OverflowStrategy::ADAPTIVE_SELECT; vector<size_t> partition_sizes; // Track sizes during build int in_memory_partition = 0; // Which partition is in memory void handle_memory_overflow() { switch (strategy) { case OverflowStrategy::FULL_SPILL: // Dump partition 0 to disk, continue as Grace spill_in_memory_partition(); break; case OverflowStrategy::PARTIAL_SPILL: // Spill half of entries, keep half partial_spill_oldest_entries(0.5); break; case OverflowStrategy::STEAL_BUFFERS: // Reduce disk buffer sizes if (can_reduce_buffers()) { reallocate_buffer_memory(); } else { spill_in_memory_partition(); } break; case OverflowStrategy::ADAPTIVE_SELECT: // Find smallest disk partition, swap with in-memory adaptive_partition_swap(); break; } } void adaptive_partition_swap() { // During build, track sizes of disk partitions size_t in_memory_size = p0_hash_table->memory_used(); // Find smallest disk partition int smallest_disk = -1; size_t smallest_size = SIZE_MAX; for (int i = 1; i < num_partitions; i++) { if (partition_sizes[i] < smallest_size) { smallest_size = partition_sizes[i]; smallest_disk = i; } } // If a disk partition is smaller than current in-memory, swap if (smallest_size < in_memory_size * 0.5) { // Spill current in-memory partition spill_partition(in_memory_partition); // Load smallest disk partition into memory // (will be done at end of build phase) in_memory_partition = smallest_disk; log_info("Swapped in-memory partition {} -> {}", 0, smallest_disk); } else { // No beneficial swap possible, use partial spill partial_spill_oldest_entries(0.5); } } void partial_spill_oldest_entries(double fraction) { // Use LRU or FIFO to select entries to spill size_t to_spill = p0_hash_table->num_entries() * fraction; auto entries_to_spill = p0_hash_table->get_oldest_entries(to_spill); for (auto& entry : entries_to_spill) { // Write to disk partition 0 disk_partition_0.append(entry.tuple); p0_hash_table->remove(entry); } // Mark that partition 0 is split between memory and disk p0_is_split = true; }};When partition 0 is partially spilled, the probe phase becomes more complex. Each probe tuple must check both the in-memory hash table AND the spilled portion on disk. This typically requires reading the spilled portion back, building a second hash table, and probing both. Some systems mark spilled tuples with a flag and re-probe them in a second pass.
While hybrid hash join is generally preferred, there are scenarios where pure Grace may be more appropriate:
Break-even analysis:
Hybrid's overhead includes:
Hybrid's benefit is I/O reduction: 2p₀ × (|R| + |S|) fewer I/Os.
When p₀ is very small (e.g., < 5%), the I/O savings are minimal but overhead remains. In these cases, pure Grace's simplicity may be preferable.
Modern consensus:
Most production systems default to hybrid because:
Grace is sometimes used explicitly when operators know the join will heavily spill, avoiding hybrid's memory management overhead.
PostgreSQL implements hybrid hash join with adaptive batch (partition) increase. It starts with a target number of batches based on work_mem and estimated build size. If partition 0 exceeds memory during build, it increases the batch count and re-partitions in-memory data. This adaptive approach handles estimation errors gracefully.
Implementing hybrid hash join in a production database system requires attention to numerous practical details:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
// Production-ready Hybrid Hash Join skeletonclass ProductionHybridHashJoin { // Resource management MemoryContext* memory_context; TempFileManager* temp_files; ExecutionStats* stats; // Cancellation support atomic<bool>* cancel_flag; // Progress tracking atomic<size_t> tuples_processed; atomic<int> partitions_completed; public: void execute_with_monitoring() { try { // Phase 1: Build with progress tracking for (const Tuple& t : *build_relation) { if (cancel_flag->load()) { throw QueryCancelledException(); } process_build_tuple(t); tuples_processed++; // Periodic memory check if (tuples_processed % 10000 == 0) { check_memory_status(); } } // Record build phase stats stats->record("build_tuples", tuples_processed.load()); stats->record("partition_0_size", p0_hash_table->size()); stats->record("disk_partition_sizes", get_disk_sizes()); // Phase 2: Probe tuples_processed = 0; for (const Tuple& t : *probe_relation) { if (cancel_flag->load()) { throw QueryCancelledException(); } process_probe_tuple(t, output); tuples_processed++; } // Phase 3: Disk partitions for (int i = 1; i < num_partitions; i++) { if (cancel_flag->load()) { throw QueryCancelledException(); } join_partition(i, output); partitions_completed++; // Immediate cleanup of processed partition files temp_files->delete_partition(i); } // Final stats stats->record("output_tuples", output->count()); stats->record("disk_ios", temp_files->total_ios()); } catch (...) { // Cleanup on any error cleanup_resources(); throw; } } private: void cleanup_resources() { // Free memory if (p0_hash_table) { memory_context->free(p0_hash_table); } // Delete all temp files temp_files->cleanup_all(); // Report partial stats for debugging stats->record("cleanup_reason", "error_or_cancel"); stats->record("tuples_at_cleanup", tuples_processed.load()); } void check_memory_status() { size_t used = memory_context->bytes_used(); size_t limit = memory_context->limit(); if (used > limit * 0.95) { // Approaching limit - take action handle_memory_pressure(); } }};Production hybrid implementations require extensive testing of edge cases: empty relations, single-tuple relations, all NULL keys, maximum skew (all tuples same key), memory exactly at limit, disk full conditions, concurrent queries competing for memory. Each edge case has historically caused production bugs in major database systems.
Hybrid hash join represents the practical optimum for hash join algorithms—combining in-memory efficiency with disk-based scalability. Let's consolidate the essential concepts:
Module Complete: Hash Join Mastery
You have now completed the comprehensive study of hash join algorithms. From the fundamental build and probe phases, through partition handling for large data, to the elegant Grace algorithm and its hybrid refinement—you understand the complete evolution of hash join technology.
This knowledge enables you to:
Hash join is the workhorse of relational databases. Whether you're optimizing a slow query, designing a data warehouse, or building a new database engine, these concepts are foundational.
Congratulations! You have mastered hash join algorithms: build phase construction, probe phase lookups, partition handling for oversized data, Grace hash join for optimal disk-based processing, and Hybrid hash join for the best of both worlds. This knowledge represents the gold standard of join algorithm understanding.