Loading content...
Grace Hash Join is one of the most elegant algorithms in database systems—a clean, recursive approach that guarantees hash join completion for data of any size, with provably optimal I/O behavior. Developed in the 1980s at the University of Tokyo as part of the Grace database machine project, this algorithm fundamentally shaped how modern databases handle large-scale joins.
The genius of Grace hash join lies in its simplicity: partition both relations completely to disk, then process partition pairs one by one. By fully materializing partitions before joining, the algorithm achieves predictable memory usage and optimal disk I/O patterns, making it the gold standard for out-of-core hash join operations.
By the end of this page, you will understand the Grace hash join algorithm in depth: its phases, recursive partitioning mechanism, I/O optimality properties, memory requirements, and comparison with alternatives. This knowledge provides the theoretical foundation for understanding modern hash join implementations.
Grace hash join operates in two distinct phases with a clear separation: complete partitioning followed by partition-pair processing. Unlike hybrid approaches that try to keep some data in memory during partitioning, Grace fully commits to disk-based processing.
Phase 1: Complete Partitioning
Phase 2: Partition-Pair Joining
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
// Grace Hash Join implementationclass GraceHashJoin { size_t available_memory; int num_partitions; HashFunction partition_hash; HashFunction join_hash; public: void execute(Relation* R, Relation* S, ResultSet* output) { // Determine number of partitions based on build relation size estimate num_partitions = compute_partition_count(R->estimated_size()); // Phase 1: Partition both relations to disk vector<DiskPartition> R_partitions = partition_to_disk(R); vector<DiskPartition> S_partitions = partition_to_disk(S); // Phase 2: Join corresponding partition pairs for (int i = 0; i < num_partitions; i++) { if (R_partitions[i].is_empty()) { // No build tuples in this partition - no output possible continue; } join_partition_pair(R_partitions[i], S_partitions[i], output); } // Cleanup: delete temporary partition files for (int i = 0; i < num_partitions; i++) { R_partitions[i].delete_file(); S_partitions[i].delete_file(); } } private: vector<DiskPartition> partition_to_disk(Relation* rel) { // Allocate output buffers - one page per partition vector<Page*> buffers(num_partitions); vector<DiskPartition> partitions(num_partitions); for (int i = 0; i < num_partitions; i++) { buffers[i] = allocate_page(); partitions[i] = DiskPartition::create_temp(); } // Scan relation, distribute tuples to partitions for (const Tuple& tuple : *rel) { int p = partition_hash(tuple.join_key) % num_partitions; if (!buffers[p]->can_fit(tuple)) { // Buffer full: flush to disk partitions[p].write(buffers[p]); buffers[p]->clear(); } buffers[p]->add(tuple); } // Final flush of remaining tuples for (int i = 0; i < num_partitions; i++) { if (!buffers[i]->is_empty()) { partitions[i].write(buffers[i]); } free_page(buffers[i]); } return partitions; } void join_partition_pair(DiskPartition& R_part, DiskPartition& S_part, ResultSet* output) { // Check if build partition fits in memory if (R_part.size() > available_memory * 0.8) { // Need recursive partitioning recursive_join(R_part, S_part, output, 1); return; } // Build hash table from R partition HashTable* ht = new HashTable(); for (const Page& page : R_part.pages()) { for (const Tuple& tuple : page) { uint64_t h = join_hash(tuple.join_key); ht->insert(h, tuple); } } // Probe with S partition for (const Page& page : S_part.pages()) { for (const Tuple& tuple : page) { uint64_t h = join_hash(tuple.join_key); for (Tuple* match : ht->find_all(h, tuple.join_key)) { output->emit(tuple, *match); } } } delete ht; }};Grace hash join's defining characteristic is complete materialization to disk before joining. This might seem wasteful—why write to disk when some data might fit in memory? The answer lies in predictability and worst-case performance. By always partitioning, Grace provides consistent behavior regardless of estimation accuracy, and recursion depth is bounded by logarithmic growth.
When a partition exceeds available memory, Grace hash join recursively partitions it using a different hash function. This recursive process continues until each sub-partition fits in memory. The key insight: each recursion level uses a different hash function, ensuring that tuples clumped together at one level get distributed at the next.
Recursion mechanism:
Level 0: Use hash function h₀ to create n₀ partitions Level 1: For overflowing partitions, use h₁ to split into n₁ sub-partitions Level 2: For still-overflowing sub-partitions, use h₂ to split into n₂ sub-sub-partitions ...
This continues until all partitions fit in memory or we detect a degenerate case (single key value causing overflow).
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
// Recursive partitioning in Grace Hash Joinclass GraceHashJoinRecursive { static const int MAX_RECURSION_DEPTH = 10; vector<HashFunction> hash_functions; // Different function per level void recursive_join(DiskPartition& R_part, DiskPartition& S_part, ResultSet* output, int depth) { // Safety check: prevent infinite recursion if (depth > MAX_RECURSION_DEPTH) { handle_degenerate_case(R_part, S_part, output); return; } // Determine sub-partition count for this level size_t partition_size = R_part.size(); int sub_partitions = compute_sub_partition_count(partition_size, depth); // Get hash function for this recursion level HashFunction h = hash_functions[depth]; // Re-partition both relations vector<DiskPartition> R_subparts = repartition(R_part, h, sub_partitions); vector<DiskPartition> S_subparts = repartition(S_part, h, sub_partitions); // Process each sub-partition pair for (int i = 0; i < sub_partitions; i++) { if (R_subparts[i].is_empty()) continue; if (R_subparts[i].size() <= available_memory * 0.8) { // Sub-partition fits: do simple hash join simple_hash_join(R_subparts[i], S_subparts[i], output); } else { // Still doesn't fit: recurse deeper recursive_join(R_subparts[i], S_subparts[i], output, depth + 1); } // Clean up temporary files as we go R_subparts[i].delete_file(); S_subparts[i].delete_file(); } } void handle_degenerate_case(DiskPartition& R_part, DiskPartition& S_part, ResultSet* output) { // If we hit max recursion, likely many tuples share same key // Options: // 1. Fall back to nested loop join // 2. Use block nested loop with available memory // 3. Sort-merge join the partition pair log_warning("Grace: degenerate case at partition level, " "falling back to nested loop"); nested_loop_join_partitions(R_part, S_part, output); } // Generate different hash functions per level static HashFunction get_level_hash(int level) { // Different seeds produce different hash distributions // Common approach: use level number as seed component return [level](const JoinKey& key) { uint64_t h = key.as_uint64(); h ^= (level * 0x9e3779b97f4a7c15ULL); // Level-specific mixing h *= 0xbf58476d1ce4e5b9ULL; h ^= h >> 27; h *= 0x94d049bb133111ebULL; h ^= h >> 31; return h; }; }};Why different hash functions at each level?
If we reused the same hash function, tuples in sub-partition i of partition j would all hash to the same value (j × sub_partitions + i) modulo the original partition count. Redistributing with the same function would put them all in the same sub-sub-partition—no progress made.
Using different hash functions ensures that tuples grouped together at level k get distributed across partitions at level k+1, progressively reducing partition sizes until they fit in memory.
If millions of tuples share the same join key value, no amount of rehashing splits them—hash(k) always equals hash(k). This degenerate case requires fallback to a non-hash-based algorithm. The nested loop fallback is guaranteed to work but may be slow for very large same-key groups.
Grace hash join achieves optimal I/O complexity for disk-based joins under the comparison model. Understanding this optimality helps appreciate why Grace-style approaches are universally adopted.
I/O complexity analysis:
For relations R and S with sizes |R| and |S| pages, memory M pages:
Case 1: |R| ≤ M (build fits in memory)
Case 2: M < |R| ≤ M² (single-level partitioning suffices)
Case 3: M² < |R| ≤ M³ (two-level partitioning)
General formula:
I/O = (2k + 1) × (|R| + |S|)
where k = ⌈log_M(|R|/M)⌉ = number of partitioning passes
| Build Size (pages) | Partitioning Levels | I/O Formula | I/O Ratio to Simple |
|---|---|---|---|
| ≤ 1,000 | 0 (fits in memory) | |R| + |S| | 1× (optimal) |
| 1,000 - 1,000,000 | 1 | 3(|R| + |S|) | 3× |
| 1M - 1B | 2 | 5(|R| + |S|) | 5× |
| 1B - 1T | 3 | 7(|R| + |S|) | 7× |
Optimality justification:
Grace hash join achieves the lower bound for comparison-based join algorithms when data must be read from disk. The key insight: each tuple must be read at least once for joining, and if memory is insufficient, tuples must be written to disk (partitioned) and read back. The 3× cost for single-level partitioning is optimal for this scenario—no algorithm can do better in the general case.
While Grace uses 3× the page I/Os of in-memory hash join, these I/Os are predominantly sequential. Partitions are written sequentially during creation and read sequentially during joining. On HDDs, sequential I/O can be 100× faster than random access. On SSDs, the ratio is 3-10×. This makes Grace's 3× page count translate to much less than 3× wall-clock time.
Grace hash join has minimal memory requirements during partitioning but requires sufficient memory to hold one partition during the join phase.
Partitioning phase memory:
During partitioning, we need:
With larger buffers (multiple pages per partition), we reduce flush frequency and improve I/O efficiency.
Join phase memory:
During joining, we need:
This dictates the minimum partition count: n ≥ |R| / (M × efficiency_factor)
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
// Memory requirement calculations for Grace Hash Joinclass GraceMemoryPlanner { size_t available_memory; size_t page_size; double hash_table_overhead = 1.5; // HT needs 1.5× raw tuple data // Minimum partitions needed for guaranteed single-level processing int min_partitions_needed(size_t build_size_bytes) { // Each partition must fit in available memory with HT overhead size_t max_partition_size = available_memory / hash_table_overhead; return (int)ceil((double)build_size_bytes / max_partition_size); } // Maximum partitions possible (limited by buffer memory) int max_partitions_possible() { // Need 1 input buffer + n output buffers = n+1 pages total // Reserve some pages for OS and miscellaneous size_t buffer_memory = available_memory / 2; // Half for buffers return buffer_memory / page_size - 1; } // Optimal partition count balancing I/O and memory int optimal_partition_count(size_t build_size_bytes) { int min_n = min_partitions_needed(build_size_bytes); int max_n = max_partitions_possible(); if (min_n > max_n) { // Cannot create enough partitions with current memory // Will need recursive partitioning return max_n; // Create maximum partitions, recurse as needed } // Choose power of 2 >= min_n for efficient modulo int n = 1; while (n < min_n) n *= 2; // But don't exceed max return min(n, max_n); } // Memory layout during join phase struct JoinPhaseMemory { size_t hash_table; // For build partition + overhead size_t probe_buffer; // For reading probe partition size_t output_buffer; // For join output size_t working; // Miscellaneous }; JoinPhaseMemory allocate_join_memory(size_t partition_size) { JoinPhaseMemory layout; layout.hash_table = partition_size * hash_table_overhead; layout.probe_buffer = page_size * 4; // 4 pages for probe reading layout.output_buffer = page_size * 4; layout.working = available_memory - layout.hash_table - layout.probe_buffer - layout.output_buffer; return layout; }};Memory vs. partition count trade-off:
| More memory | → | Fewer partitions needed | → | Less I/O, faster |
|---|---|---|---|---|
| Less memory | → | More partitions needed | → | More partition files, more seeks |
The √M rule of thumb:
A good heuristic is to choose n ≈ √M partitions when |R| ≈ M². This balances:
Production systems monitor memory pressure during Grace execution. If the operating system starts swapping or memory allocation fails, the system can: (1) flush more aggressively during partitioning, (2) process smaller batches during joining, or (3) abort and retry with more partitions. Graceful degradation is essential for production reliability.
Grace hash join competes with several alternatives for large-scale joins. Each has strengths depending on data characteristics and available resources.
| Algorithm | I/O Cost | Memory Required | Best For |
|---|---|---|---|
| Simple Hash Join | |R| + |S| | |R| fits in memory | Small build relations |
| Grace Hash Join | 3(|R| + |S|) | O(√|R|) | Large unsorted data |
| Sort-Merge Join | 3(|R| + |S|) | O(√(|R|+|S|)) | Pre-sorted or sorted output needed |
| Block Nested Loop | |R| + |S|×⌈|R|/M⌉ | M pages | Very small memory, or as fallback |
| Index Nested Loop | |R| + |S|×log|R| | Index fits | Build has useful index |
When to choose Grace over Sort-Merge:
When Sort-Merge wins:
Production Grace hash join implementations include numerous optimizations beyond the basic algorithm:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
// Optimized Grace Hash Join with double buffering and Bloom filtersclass OptimizedGraceHashJoin { // Bloom filter per partition for fast emptiness checks vector<BloomFilter*> partition_blooms; // Async I/O handles for double buffering AsyncIOManager io_manager; void partition_with_optimizations(Relation* rel, vector<DiskPartition>& partitions, bool is_build) { // Double buffers for async writes vector<Page*> active_bufs(num_partitions); vector<Page*> flush_bufs(num_partitions); vector<AsyncHandle> pending_writes(num_partitions); for (int i = 0; i < num_partitions; i++) { active_bufs[i] = allocate_page(); flush_bufs[i] = allocate_page(); if (is_build) { partition_blooms[i] = new BloomFilter(estimated_per_partition); } } // Compress before writing if enabled Compressor* compressor = create_lz4_compressor(); for (const Tuple& tuple : *rel) { int p = partition_hash(tuple.join_key) % num_partitions; if (is_build) { // Add to Bloom filter during build partition_blooms[p]->insert(join_hash(tuple.join_key)); } if (!active_bufs[p]->can_fit(tuple)) { // Wait for any pending write to this partition if (pending_writes[p].valid()) { io_manager.wait(pending_writes[p]); } // Swap buffers swap(active_bufs[p], flush_bufs[p]); // Compress and start async write auto compressed = compressor->compress(flush_bufs[p]); pending_writes[p] = io_manager.write_async( partitions[p].file(), compressed ); active_bufs[p]->clear(); } active_bufs[p]->add(tuple); } // Final flush and cleanup for (int i = 0; i < num_partitions; i++) { if (pending_writes[i].valid()) { io_manager.wait(pending_writes[i]); } // Flush remaining if (!active_bufs[i]->is_empty()) { auto compressed = compressor->compress(active_bufs[i]); partitions[i].write(compressed); } free_page(active_bufs[i]); free_page(flush_bufs[i]); } } void join_with_bloom_filtering(vector<DiskPartition>& R_parts, vector<DiskPartition>& S_parts, ResultSet* output) { // Sort partitions by size for better memory management vector<int> order(num_partitions); iota(order.begin(), order.end(), 0); sort(order.begin(), order.end(), [&](int a, int b) { return R_parts[a].size() < R_parts[b].size(); }); for (int i : order) { if (R_parts[i].is_empty()) continue; // Quick check: any probe tuples in this partition? // Use build Bloom filter against sampled probe keys if (!probe_partition_might_have_matches(S_parts[i], partition_blooms[i])) { // Skip this partition pair entirely continue; } join_single_partition(R_parts[i], S_parts[i], output); } }};Partition file compression trades CPU cycles for reduced I/O. LZ4 and Snappy achieve 2-4× compression at GB/s speeds, making them nearly 'free' on I/O-bound systems. For SSD-based systems where I/O is faster, compression overhead may outweigh benefits. Adaptive systems measure and adjust compression strategy based on observed I/O rates.
Grace hash join is naturally parallelizable—partitions are independent, enabling straightforward parallel execution. This property made it the foundation for parallel join implementations in MPP (Massively Parallel Processing) and distributed databases.
Parallelization strategies:
Strategy 1: Partition-level parallelism
Strategy 2: Data-parallel partitioning
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
// Parallel Grace Hash Joinclass ParallelGraceHashJoin { ThreadPool& workers; int num_workers; void execute_parallel(Relation* R, Relation* S, ResultSet* output) { // Phase 1: Parallel partitioning // Each worker has local partition buffers, merge at end vector<future<vector<DiskPartition>>> partition_futures; size_t r_chunk_size = R->size() / num_workers; for (int w = 0; w < num_workers; w++) { auto start = R->begin() + w * r_chunk_size; auto end = (w == num_workers - 1) ? R->end() : start + r_chunk_size; partition_futures.push_back( workers.submit([=]() { return partition_chunk(start, end); }) ); } // Merge partition files from all workers vector<vector<DiskPartition>> worker_partitions; for (auto& f : partition_futures) { worker_partitions.push_back(f.get()); } vector<DiskPartition> R_partitions = merge_partitions(worker_partitions); // Repeat for S vector<DiskPartition> S_partitions = partition_parallel(S); // Phase 2: Parallel joining // Assign partition pairs to workers atomic<int> next_partition(0); vector<future<void>> join_futures; for (int w = 0; w < num_workers; w++) { join_futures.push_back( workers.submit([&, w]() { while (true) { int p = next_partition.fetch_add(1); if (p >= num_partitions) break; if (!R_partitions[p].is_empty()) { join_partition_pair(R_partitions[p], S_partitions[p], output->worker_output(w)); } } }) ); } // Wait for all joins to complete for (auto& f : join_futures) { f.wait(); } } // Work stealing for load balancing void join_with_work_stealing() { // Instead of static assignment, use work-stealing queue WorkStealingQueue<int> partition_queue; for (int i = 0; i < num_partitions; i++) { partition_queue.push(i); } // Workers steal work when idle // Helps balance when partition sizes vary }};Data skew causes some partitions to be much larger than others. With static partition assignment, workers handling large partitions become bottlenecks while others sit idle. Work-stealing and dynamic assignment mitigate this: workers that finish early claim partitions from the shared queue. Processing partitions smallest-first also helps—early-finishing workers can take remaining large partitions.
Grace hash join emerged from the GRACE Database Machine project at the University of Tokyo in the early 1980s. The project aimed to build specialized hardware for relational database processing, and the Grace algorithm was designed to exploit the machine's disk parallelism.
Key historical milestones:
Grace's enduring influence:
Virtually every modern database system's hash join implementation traces lineage to Grace. The core insights—deterministic partitioning, recursive handling of overflow, I/O-optimal behavior—remain foundational. Modern refinements add:
But the algorithmic core—partition, build hash table per partition, probe—remains essentially unchanged from 1984.
Grace was developed for specialized database hardware—a common research direction in the 1980s. While database machines didn't succeed commercially (general-purpose hardware proved more cost-effective), the algorithms developed for them became foundational in software-based database systems. Grace hash join outlived its eponymous machine by decades.
Grace hash join represents a fundamental achievement in database algorithm design—an elegant, provably optimal approach to joining data that exceeds memory capacity. Let's consolidate the key concepts:
What's Next:
Grace hash join always pays the full partitioning cost, even when data could fit in memory. The next page explores Hybrid Hash Join—an optimization that keeps some data in memory during partitioning, combining the best of simple hash join and Grace for optimal performance across the entire spectrum of data sizes.
You now understand Grace Hash Join: its two-phase algorithm, recursive partitioning mechanism, I/O optimality properties, memory requirements, comparison with alternatives, implementation optimizations, and parallel execution strategies. This knowledge provides the theoretical foundation for understanding all modern hash join implementations.