Loading learning content...
Once both input relations are sorted on the join attribute, the merge phase begins. This phase is where the power of sort-merge join truly manifests—two potentially massive datasets are combined with elegant simplicity, using techniques borrowed from the classic merge operation of merge sort.
The merge phase operates with a remarkably clean invariant: if both inputs are sorted on the join key, we can find all matching pairs with a single linear scan through each relation. No random access. No hash table lookups. Just two coordinated pointers advancing through sorted data.
This simplicity belies the sophistication required to handle real-world complications: duplicate join keys, null values, outer joins, and memory constraints. This page explores the merge algorithm in complete detail, building from the basic case to production-ready implementations.
By the end of this page, you will understand the complete mechanics of the merge phase: the basic two-pointer algorithm, handling of duplicate keys (the partition challenge), memory management for large duplicate groups, null handling semantics, outer join variants, and the subtle invariants that guarantee correctness.
The merge phase assumes both input relations R and S are sorted on the join attribute(s). The algorithm uses two cursors—one for each relation—that advance in coordination based on key comparisons.
Core Algorithm (Unique Keys):
When join keys are unique in both relations, the algorithm is beautifully simple:
1234567891011121314151617181920212223242526272829303132
def merge_join_unique_keys(R: SortedRelation, S: SortedRelation, join_key: str) -> Iterator[Tuple]: """ Sort-merge join for unique join keys. Preconditions: - R is sorted on join_key - S is sorted on join_key - join_key values are unique in both relations Time Complexity: O(|R| + |S|) Space Complexity: O(1) beyond input/output buffers """ r = R.get_first() # Current tuple from R s = S.get_first() # Current tuple from S while r is not None and s is not None: r_key = r[join_key] s_key = s[join_key] if r_key < s_key: # R's tuple has no match in S - advance R r = R.get_next() elif r_key > s_key: # S's tuple has no match in R - advance S s = S.get_next() else: # r_key == s_key # Match found! Produce joined tuple yield concatenate(r, s) # Both advance (unique keys = at most one match each) r = R.get_next() s = S.get_next()Algorithm Trace:
Let's trace through a concrete example:
R (sorted on id): {(1,'A'), (3,'B'), (5,'C'), (7,'D'), (9,'E')}
S (sorted on id): {(2,'X'), (3,'Y'), (5,'Z'), (8,'W')}
| Step | r (R cursor) | s (S cursor) | Comparison | Action | Output |
|---|---|---|---|---|---|
| 1 | (1,'A') | (2,'X') | 1 < 2 | Advance R | |
| 2 | (3,'B') | (2,'X') | 3 > 2 | Advance S | |
| 3 | (3,'B') | (3,'Y') | 3 = 3 | Match! Advance both | (3,'B','Y') |
| 4 | (5,'C') | (5,'Z') | 5 = 5 | Match! Advance both | (5,'C','Z') |
| 5 | (7,'D') | (8,'W') | 7 < 8 | Advance R | |
| 6 | (9,'E') | (8,'W') | 9 > 8 | Advance S | |
| 7 | (9,'E') | EOF | Terminate |
Each tuple from R is examined exactly once. Each tuple from S is examined exactly once. No matter how large the relations, the merge phase makes exactly |R| + |S| tuple accesses. This linear time complexity is what makes sort-merge join competitive with hash join for large datasets.
The simple algorithm above breaks when join keys are not unique. Consider:
R: {(1,'A'), (1,'B'), (1,'C')}
S: {(1,'X'), (1,'Y')}
With duplicate key value 1, we need to produce all combinations: (A,X), (A,Y), (B,X), (B,Y), (C,X), (C,Y) — that's 3 × 2 = 6 output tuples.
The basic algorithm's "advance both" step would only produce one match. We need to partition the data into groups of matching keys and perform a cross-product within each partition.
12345678910111213141516171819202122232425262728293031323334353637383940414243
def merge_join_with_duplicates(R: SortedRelation, S: SortedRelation, join_key: str) -> Iterator[Tuple]: """ Sort-merge join handling duplicate join keys. For each matching key value, produces cross-product of all R tuples with that key and all S tuples with that key. Time: O(|R| + |S| + |output|) Space: O(partition_size) - may need to buffer one partition """ r = R.get_first() s = S.get_first() while r is not None and s is not None: r_key = r[join_key] s_key = s[join_key] if r_key < s_key: r = R.get_next() elif r_key > s_key: s = S.get_next() else: # Keys match - handle partition # Collect all R tuples with this key partition_key = r_key r_partition = [] while r is not None and r[join_key] == partition_key: r_partition.append(r) r = R.get_next() # Mark start of S partition for potential backtracking s_partition_start = S.get_current_position() # For each R tuple in partition, scan all matching S tuples for r_tuple in r_partition: S.seek_to_position(s_partition_start) s = S.get_current() while s is not None and s[join_key] == partition_key: yield concatenate(r_tuple, s) s = S.get_next() # S cursor now points past partition - continue merge s = S.get_current()The Partition Problem:
Notice that we collect R's partition into memory and re-scan S's partition multiple times. This creates two challenges:
Different implementations handle this differently:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
def merge_join_optimized(R: SortedRelation, S: SortedRelation, join_key: str, memory_budget: int) -> Iterator[Tuple]: """ Production-grade merge join with partition handling. Buffers smaller partition when possible, spills to disk if partition exceeds memory budget. """ r = R.get_first() s = S.get_first() while r is not None and s is not None: r_key = r[join_key] s_key = s[join_key] if r_key < s_key: r = R.get_next() elif r_key > s_key: s = S.get_next() else: partition_key = r_key # Collect partitions, track sizes r_partition = collect_partition(R, join_key, partition_key) s_partition = collect_partition(S, join_key, partition_key) r_size = sum(tuple_size(t) for t in r_partition) s_size = sum(tuple_size(t) for t in s_partition) # Choose strategy based on partition sizes if r_size <= memory_budget and s_size <= memory_budget: # Both fit - simple nested loop in memory yield from memory_cross_product(r_partition, s_partition) elif min(r_size, s_size) <= memory_budget: # Buffer smaller, stream larger if r_size <= s_size: yield from stream_join(r_partition, s_partition, buffer='left') else: yield from stream_join(r_partition, s_partition, buffer='right') else: # Neither fits - spill and use block nested loop r_file = spill_to_disk(r_partition) s_file = spill_to_disk(s_partition) yield from block_nested_loop_join(r_file, s_file, memory_budget) cleanup([r_file, s_file]) r = R.get_current() s = S.get_current() def collect_partition(relation: SortedRelation, key_attr: str, key_value: Any) -> List[Tuple]: """Collect all tuples with given key value.""" partition = [] current = relation.get_current() while current is not None and current[key_attr] == key_value: partition.append(current) current = relation.get_next() return partitionThe worst case for sort-merge join is highly skewed data where a single key value appears millions of times in both relations. The cross-product of these partitions can be enormous. Query optimizers estimate partition sizes using histograms and may choose hash join instead for skewed joins.
The merge algorithm's correctness depends on maintaining several invariants throughout execution. Understanding these invariants helps in debugging, optimization, and implementing variants:
Invariant 1: Sorted Input Guarantee
Both input relations must be sorted on the join attribute(s) in the same order (both ascending or both descending). If this invariant is violated, the algorithm produces incorrect results silently—it won't crash, just miss matches.
1234567891011121314151617181920212223
def verify_sorted_input(relation: Relation, key_attr: str, ascending: bool = True) -> bool: """ Debug assertion: verify relation is properly sorted. In production, this check is performed once during development and trusted thereafter for performance. """ prev_key = None for tuple in relation: current_key = tuple[key_attr] if prev_key is not None: if ascending and current_key < prev_key: raise SortViolationError( f"Expected ascending order: {prev_key} -> {current_key}" ) if not ascending and current_key > prev_key: raise SortViolationError( f"Expected descending order: {prev_key} -> {current_key}" ) prev_key = current_key return TrueInvariant 2: Progress Guarantee
In every iteration, at least one cursor must advance. This ensures the algorithm terminates and processes O(|R| + |S|) tuples. The only exception is when outputting cross-product within a partition, where we temporarily "pause" one cursor while iterating the partition.
Invariant 3: No Match Missed
If a tuple from R matches tuple(s) from S, they will be adjacent when the cursors are aligned on the same key value. The sorted order guarantees that all matches are "discoverable" without backtracking (except within partitions).
Invariant 4: Partition Completeness
When processing a partition (duplicate keys), we must process ALL matching combinations before advancing past the partition. An early exit produces incorrect (incomplete) results.
Unlike hash join (which must consume the build relation entirely before producing output), merge join can start producing output immediately after reading the first matching pair. This 'streaming' property is valuable for queries with LIMIT clauses or when results are pipelined to subsequent operators.
Invariant 5: Memory Bound Compliance
The merge phase should use bounded memory regardless of input size. For unique keys, this is trivially satisfied (O(1) working memory). For duplicates, careful partition handling ensures we don't exceed the allocated buffer.
Invariant 6: Deterministic Output Order
The output of merge join is sorted by the join attribute(s). This property can be exploited by downstream operators—sorting for GROUP BY or ORDER BY may be avoided if they share the same key.
Null values in join attributes require special handling. SQL follows three-valued logic where comparisons involving NULL yield UNKNOWN rather than TRUE or FALSE. For joins, this has specific implications:
Standard Inner Join Semantics:
In a standard inner join, NULL never matches anything—not even another NULL:
NULL = NULL → UNKNOWN (treated as FALSE for join purposes)NULL = 5 → UNKNOWN (treated as FALSE)Tuples with NULL join keys never appear in inner join output.
1234567891011121314151617181920212223242526272829303132333435
def merge_join_with_nulls(R: SortedRelation, S: SortedRelation, join_key: str) -> Iterator[Tuple]: """ Sort-merge join with proper NULL handling. NULL values are sorted to one end (typically first or last, depending on database). We skip over NULLs in both relations. """ r = R.get_first() s = S.get_first() # Skip NULLs in R (assuming NULLs sort first) while r is not None and r[join_key] is None: r = R.get_next() # Skip NULLs in S while s is not None and s[join_key] is None: s = S.get_next() # Standard merge on non-NULL values while r is not None and s is not None: r_key = r[join_key] s_key = s[join_key] # At this point, neither key should be NULL assert r_key is not None and s_key is not None if r_key < s_key: r = R.get_next() elif r_key > s_key: s = S.get_next() else: yield from process_matching_partition(R, S, join_key, r_key) r = R.get_current() s = S.get_current()Sort Order of NULLs:
Different database systems sort NULLs differently:
NULLS FIRST/LAST clauseThe merge algorithm must be aware of the NULL sort position to skip them efficiently.
| Database | ASC Default | DESC Default | Configurable |
|---|---|---|---|
| PostgreSQL | NULLS LAST | NULLS FIRST | Yes (NULLS FIRST/LAST) |
| Oracle | NULLS LAST | NULLS FIRST | Yes (NULLS FIRST/LAST) |
| MySQL | NULLS FIRST | NULLS LAST | No (workarounds exist) |
| SQL Server | NULLS FIRST | NULLS LAST | No |
| SQLite | NULLS FIRST | NULLS LAST | Yes (NULLS FIRST) |
Some databases support IS DISTINCT FROM which treats NULLs as equal (NULL IS NOT DISTINCT FROM NULL = TRUE). For merge join with this non-standard equality, NULLs form their own partition and match each other.
Sort-merge join naturally extends to outer join variants. The key insight is tracking which tuples have been matched during the merge phase:
Left Outer Join:
Every tuple from R appears in output. Unmatched R tuples are padded with NULLs for S's attributes.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
def merge_left_outer_join(R: SortedRelation, S: SortedRelation, join_key: str) -> Iterator[Tuple]: """ Left outer merge join: every R tuple appears in output. Unmatched R tuples are paired with NULL values for S columns. """ r = R.get_first() s = S.get_first() null_s = create_null_tuple(S.schema) while r is not None: if s is None: # S exhausted - output remaining R with NULLs yield concatenate(r, null_s) r = R.get_next() continue r_key = r[join_key] s_key = s[join_key] if r_key < s_key: # R tuple has no match - output with NULLs yield concatenate(r, null_s) r = R.get_next() elif r_key > s_key: # S tuple not needed yet - advance S s = S.get_next() else: # Match found - process partition partition_key = r_key r_partition = collect_partition(R, join_key, partition_key) s_partition_start = S.get_position() for r_tuple in r_partition: S.seek(s_partition_start) s = S.get_current() matched = False while s is not None and s[join_key] == partition_key: yield concatenate(r_tuple, s) matched = True s = S.get_next() # Note: matched is always True here (we're in match branch) r = R.get_current() s = S.get_current() def merge_full_outer_join(R: SortedRelation, S: SortedRelation, join_key: str) -> Iterator[Tuple]: """ Full outer merge join: every tuple from both relations appears. Unmatched tuples from either side are padded with NULLs. """ r = R.get_first() s = S.get_first() null_r = create_null_tuple(R.schema) null_s = create_null_tuple(S.schema) while r is not None or s is not None: if r is None: # R exhausted - output remaining S with NULLs for R yield concatenate(null_r, s) s = S.get_next() continue if s is None: # S exhausted - output remaining R with NULLs for S yield concatenate(r, null_s) r = R.get_next() continue r_key = r[join_key] s_key = s[join_key] if r_key < s_key: # R tuple unmatched yield concatenate(r, null_s) r = R.get_next() elif r_key > s_key: # S tuple unmatched yield concatenate(null_r, s) s = S.get_next() else: # Match - process partition yield from process_matching_partition(R, S, join_key, r_key) r = R.get_current() s = S.get_current()Anti-Join and Semi-Join:
The merge framework also supports anti-join (tuples that have NO match) and semi-join (tuples that have AT LEAST ONE match, without duplicates):
All outer join variants maintain the sorted output property. The output is sorted by join key, with unmatched tuples in their natural sorted position. This is valuable when subsequent operations benefit from sorted input.
While the merge phase is conceptually simple, production implementations must carefully manage memory, especially for large partitions. Here's how database systems handle this:
Buffer Allocation:
The merge phase typically needs:
Minimal configuration uses 3 pages total (one for each), but more pages improve I/O efficiency through prefetching and batching.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
class BufferedMergeJoin: """ Production merge join with explicit buffer management. """ def __init__(self, R_file: str, S_file: str, join_key: str, buffer_pages: int): self.join_key = join_key # Allocate buffer pages strategically # Reserve 2 for output double-buffering # Split remaining between R, S, and partition buffer available = buffer_pages - 2 if available < 3: raise InsufficientMemoryError( f"Need at least 5 pages, have {buffer_pages}" ) # Typical allocation: 40% each for R/S input, 20% for partition r_pages = int(available * 0.4) s_pages = int(available * 0.4) partition_pages = available - r_pages - s_pages self.r_buffer = BufferedReader(R_file, r_pages * PAGE_SIZE) self.s_buffer = BufferedReader(S_file, s_pages * PAGE_SIZE) self.output_buffer = DoubleBuffer(2 * PAGE_SIZE) self.partition_limit = partition_pages * PAGE_SIZE def execute(self) -> Iterator[Tuple]: """Execute merge join with buffer management.""" r = self.r_buffer.read_next() s = self.s_buffer.read_next() while r is not None and s is not None: r_key = r[self.join_key] s_key = s[self.join_key] if r_key < s_key: r = self.r_buffer.read_next() elif r_key > s_key: s = self.s_buffer.read_next() else: yield from self._process_partition(r_key) r = self.r_buffer.current s = self.s_buffer.current def _process_partition(self, key_value) -> Iterator[Tuple]: """ Process partition with memory-conscious strategy. """ # Collect R partition (up to memory limit) r_partition = [] r_partition_size = 0 r = self.r_buffer.current while (r is not None and r[self.join_key] == key_value): if r_partition_size + tuple_size(r) > self.partition_limit: # Partition exceeds memory - switch strategy yield from self._large_partition_fallback( key_value, r_partition ) return r_partition.append(r) r_partition_size += tuple_size(r) r = self.r_buffer.read_next() # R partition fits in memory - scan S partition s_partition_start = self.s_buffer.get_position() for r_tuple in r_partition: self.s_buffer.seek(s_partition_start) s = self.s_buffer.current while (s is not None and s[self.join_key] == key_value): result = concatenate(r_tuple, s) yield self.output_buffer.write(result) s = self.s_buffer.read_next() def _large_partition_fallback(self, key_value, partial_r: List) -> Iterator[Tuple]: """ Handle partitions too large for memory. Strategy: Use block nested loop within the partition, or spill to temporary files. """ # Log warning - this may indicate data skew issue log.warning(f"Large partition for key={key_value}, using fallback") # Spill R partition to temp file r_temp = TempFile() for r_tuple in partial_r: r_temp.write(r_tuple) # Continue reading R partition r = self.r_buffer.current while r is not None and r[self.join_key] == key_value: r_temp.write(r) r = self.r_buffer.read_next() # Similarly collect S partition s_temp = TempFile() s = self.s_buffer.current while s is not None and s[self.join_key] == key_value: s_temp.write(s) s = self.s_buffer.read_next() # Use block nested loop on temp files yield from block_nested_loop_join( r_temp.path, s_temp.path, self.partition_limit ) # Cleanup r_temp.delete() s_temp.delete()A partition with 10,000 matching R tuples and 10,000 matching S tuples produces 100 million output tuples. This is inherent to the join semantics, not a flaw in the algorithm. Query optimizers estimate output cardinality to avoid launching queries with unexpectedly massive results.
The merge phase is where sort-merge join's elegance manifests. Two coordinated cursors traverse sorted relations, producing matches with minimal overhead. Let's consolidate the key concepts:
Looking Ahead:
With both the external sorting and merge phase understood, we're ready to analyze the complete cost model for sort-merge join. The next page develops precise I/O and CPU cost formulas, essential for query optimizers to choose the best join algorithm for each query.
You now understand the merge phase of sort-merge join in complete detail—from the elegant basic algorithm to production-grade implementations handling duplicates, nulls, outer joins, and memory constraints. This algorithmic foundation prepares you for cost analysis and optimization.