Loading learning content...
Imagine you're building a real-time analytics dashboard. Stock prices stream in continuously. User response times arrive with every request. Sensor readings flow from thousands of IoT devices. At any moment, stakeholders want to know: what's the median value?
Computing the median of static data is straightforward—sort and pick the middle. But what happens when data arrives continuously, potentially millions of values per second, and you need the median now, not after processing the entire stream?
The naive approach—maintaining a sorted list and looking up the middle—requires O(n) insertion time, making it impractical for high-throughput streams. This is where the dual-heap technique becomes indispensable: two heaps working in harmony to maintain the median in O(log n) per insertion with O(1) query time.
By the end of this page, you will understand why the median is more challenging than min/max, master the elegant dual-heap solution that partitions data around the median, analyze the invariants that make the technique work, and recognize variations including sliding window median and weighted median.
Before diving into the solution, let's understand why median maintenance is fundamentally more challenging than other running statistics.
| Statistic | Update Time | Query Time | Technique |
|---|---|---|---|
| Minimum | O(1)* | O(1) | Single value tracking (with caveats for deletions) |
| Maximum | O(1)* | O(1) | Single value tracking (with caveats for deletions) |
| Mean | O(1) | O(1) | Maintain sum and count: mean = sum/count |
| Variance | O(1) | O(1) | Welford's online algorithm |
| Median | O(log n) | O(1) | Dual-heap technique (this page) |
| Mode | O(1) | O(n) or O(1) | Frequency map + max tracking |
Why is Median Special?
The mean is a decomposable statistic—you can compute the mean of combined datasets from the means of individual datasets. The median is not decomposable. Knowing the median of two halves tells you almost nothing about the median of the combined dataset.
Fundamentally:
This is why median requires storing information about all elements, not just aggregate quantities. The dual-heap technique is clever because it stores all elements while still enabling O(log n) updates.
The median is robust to outliers—a few extreme values don't skew it like they skew the mean. This makes median crucial for: latency monitoring (99th percentile response times), financial analysis (median income vs mean income), sensor data (robust to faulty readings), and any scenario where outliers shouldn't dominate.
The key insight is to partition the data around the median into two halves:
Why these specific heap types? Because we need quick access to:
These two elements are either the median (if odd count) or the two elements whose average is the median (if even count).
Visual Representation:
Lower Half Upper Half
(max-heap) (min-heap)
Elements ≤ median Elements ≥ median
[5] [7]
/ \ / \
[4] [3] [8] [10]
/ \
[1] [2]
max-heap root = 5 min-heap root = 7
If sizes equal: median = (5 + 7) / 2 = 6
If lower larger: median = 5
The heaps maintain a beautiful invariant: all elements in the max-heap are ≤ all elements in the min-heap. The heap roots are the 'boundary' elements closest to the median.
Think of the two heaps as a partition of the data. The max-heap holds the smaller half, the min-heap holds the larger half. The median is always at the boundary—accessible in O(1) from one or both heap roots.
The algorithm maintains two key invariants:
Invariant 1 (Order): Every element in max-heap ≤ every element in min-heap Invariant 2 (Balance): Heap sizes differ by at most 1 (|size(max) - size(min)| ≤ 1)
Insert Operation:
Query Operation:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
import heapq class MedianFinder: """ LeetCode 295: Find Median from Data Stream Maintains running median using two heaps: - max_heap: stores smaller half (negated for Python's min-heap) - min_heap: stores larger half Time: O(log n) per insert, O(1) per query Space: O(n) to store all elements """ def __init__(self): # max_heap stores negated values (Python only has min-heap) self.max_heap = [] # Lower half: elements <= median self.min_heap = [] # Upper half: elements >= median def addNum(self, num: int) -> None: """ Add a number to the data structure. Strategy: 1. Add to appropriate heap based on comparison with current median 2. Rebalance if sizes differ by more than 1 """ # Step 1: Route to appropriate heap if not self.max_heap or num <= -self.max_heap[0]: # num belongs in lower half heapq.heappush(self.max_heap, -num) else: # num belongs in upper half heapq.heappush(self.min_heap, num) # Step 2: Rebalance if needed # We maintain: len(max_heap) == len(min_heap) or len(max_heap) == len(min_heap) + 1 if len(self.max_heap) > len(self.min_heap) + 1: # Move max_heap root to min_heap val = -heapq.heappop(self.max_heap) heapq.heappush(self.min_heap, val) elif len(self.min_heap) > len(self.max_heap): # Move min_heap root to max_heap val = heapq.heappop(self.min_heap) heapq.heappush(self.max_heap, -val) def findMedian(self) -> float: """ Return the median of all elements added so far. O(1) time - just look at heap roots. """ if len(self.max_heap) > len(self.min_heap): # Odd count: max_heap has one more element return -self.max_heap[0] else: # Even count: average of two roots return (-self.max_heap[0] + self.min_heap[0]) / 2.0 # Example usagemf = MedianFinder() stream = [2, 3, 4, 1, 5, 6]for num in stream: mf.addNum(num) print(f"After adding {num}: median = {mf.findMedian()}") # Output:# After adding 2: median = 2.0# After adding 3: median = 2.5# After adding 4: median = 3.0# After adding 1: median = 2.5# After adding 5: median = 3.0# After adding 6: median = 3.5Let's trace through a complete example to see how the invariants are maintained.
| Step | Add | max_heap (≤ median) | min_heap (> median) | Rebalance? | Median | |
|---|---|---|---|---|---|---|
| 1 | 5 | [5] | [] | No | 5 | |
| 2 | 2 | [5, 2] | [] | Yes → move 5 | [2]→[5] | then 3.5 |
| 3 | 8 | [2] | [5, 8] | Yes → move 5 | [5, 2]→[8] | then 5 |
| 4 | 1 | [5, 2, 1] | [8] | Yes → move 5 | [2, 1]→[5, 8] | then 3.5 |
| 5 | 9 | [2, 1] | [5, 8, 9] | Yes → move 5 | [5, 2, 1]→[8, 9] | then 5 |
| 6 | 4 | [5, 4, 2, 1] | [8, 9] | Yes → move 5 | [4, 2, 1]→[5, 8, 9] | then 4.5 |
Let's trace step 3 in detail:
Initial state after step 2:
Add 8:
Query median:
The rebalancing step ensures that after insertion, max_heap size is either equal to or one greater than min_heap size. We consistently favor the max_heap for the extra element. This convention choice simplifies the findMedian logic.
Time Complexity:
addNum (Insert): O(log n)
findMedian (Query): O(1)
Space Complexity: O(n)
| Approach | Insert Time | Query Time | Space |
|---|---|---|---|
| Sorted Array | O(n) | O(1) | O(n) |
| Unsorted Array | O(1) | O(n) | O(n) |
| Balanced BST | O(log n) | O(log n)* | O(n) |
| Dual Heaps | O(log n) | O(1) | O(n) |
Note: A balanced BST with order statistics can achieve O(log n) query, but dual heaps achieve O(1) query with simpler implementation.
Why Dual Heaps Are Optimal:
For the running median problem with only insertions (no deletions), dual heaps are theoretically near-optimal:
The dual-heap technique is one of the cleanest examples of using the right data structure to achieve optimal complexity.
Let's rigorously prove that the dual-heap approach maintains the correct median.
Invariants (maintained after every insertion):
Claim: Given these invariants, the median is correctly computed as:
Proof:
Let n = total elements = size(max_heap) + size(min_heap).
Case 1: n is odd, so n = 2k + 1 for some k ≥ 0
By balance invariant: size(max_heap) = k + 1, size(min_heap) = k
The max_heap contains the (k+1) smallest elements (by partition invariant). The median is the (k+1)-th element in sorted order. The max_heap root is the largest among the (k+1) smallest = the (k+1)-th = the median. ✓
Case 2: n is even, so n = 2k for some k > 0
By balance invariant: size(max_heap) = size(min_heap) = k
The max_heap root is the k-th element (largest of smaller half). The min_heap root is the (k+1)-th element (smallest of larger half). Median = average of k-th and (k+1)-th elements = (max_root + min_root) / 2. ✓
This proof illustrates why invariant-based reasoning is so powerful. By proving that two simple invariants are maintained, we automatically guarantee correctness for all operations. This is the essence of DSA design: establish invariants, prove they're maintained, derive correctness.
The basic algorithm handles most cases, but production code must consider edge cases.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
import heapqfrom typing import Optional class RobustMedianFinder: """ Production-ready median finder with edge case handling. """ def __init__(self): self.max_heap = [] # Lower half (negated) self.min_heap = [] # Upper half self.count = 0 def addNum(self, num: int) -> None: self.count += 1 # First element always goes to max_heap if not self.max_heap: heapq.heappush(self.max_heap, -num) return # Route based on comparison with partition boundary if num <= -self.max_heap[0]: heapq.heappush(self.max_heap, -num) else: heapq.heappush(self.min_heap, num) # Rebalance self._rebalance() def _rebalance(self) -> None: """Ensure max_heap has same size or one more than min_heap.""" while len(self.max_heap) > len(self.min_heap) + 1: heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap)) while len(self.min_heap) > len(self.max_heap): heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap)) def findMedian(self) -> Optional[float]: """Return median, or None if empty.""" if self.count == 0: return None if len(self.max_heap) > len(self.min_heap): return float(-self.max_heap[0]) # Overflow-safe average calculation a = -self.max_heap[0] b = self.min_heap[0] return a + (b - a) / 2.0 def size(self) -> int: """Return number of elements.""" return self.count def isEmpty(self) -> bool: """Check if empty.""" return self.count == 0 # Test edge casesmf = RobustMedianFinder()print(f"Empty median: {mf.findMedian()}") # None mf.addNum(42)print(f"Single element: {mf.findMedian()}") # 42.0 for _ in range(5): mf.addNum(7)print(f"All same (7s): {mf.findMedian()}") # 7.0 # Test large numbers (near overflow)import sysmf2 = RobustMedianFinder()mf2.addNum(sys.maxsize - 1)mf2.addNum(sys.maxsize - 2)print(f"Large nums: {mf2.findMedian()}") # Works without overflowA harder variation asks for the median of a sliding window—the most recent k elements, not all elements ever seen. This is crucial for time-series analysis where old data becomes irrelevant.
The Challenge:
Our basic dual-heap only supports insertion. Sliding window requires deletion of the element that falls out of the window. Heaps don't support efficient arbitrary deletion!
Solution: Lazy Deletion with Hash Maps
Instead of physically removing elements, we:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
import heapqfrom collections import defaultdictfrom typing import List def medianSlidingWindow(nums: List[int], k: int) -> List[float]: """ LeetCode 480: Sliding Window Median Find median of each sliding window of size k. Uses dual heaps with lazy deletion: - Track deleted elements in a hash map - Clean up deleted elements when they reach heap root - Maintain balance based on "effective" (non-deleted) counts Time: O(n log k) amortized Space: O(k) for heaps + O(k) for deletion map """ result = [] # max_heap (lower half, negated) and min_heap (upper half) max_heap = [] # negated values min_heap = [] # Count of deleted elements (lazy deletion) deleted = defaultdict(int) # Effective sizes (actual elements minus deleted) max_size = 0 min_size = 0 def add(num: int) -> None: nonlocal max_size, min_size if not max_heap or num <= -max_heap[0]: heapq.heappush(max_heap, -num) max_size += 1 else: heapq.heappush(min_heap, num) min_size += 1 def remove(num: int) -> None: nonlocal max_size, min_size # Lazy delete: just mark for later removal deleted[num] += 1 # Update effective size based on which heap it's in if max_heap and num <= -max_heap[0]: max_size -= 1 else: min_size -= 1 def balance() -> None: nonlocal max_size, min_size # Rebalance based on effective sizes while max_size > min_size + 1: val = -heapq.heappop(max_heap) max_size -= 1 heapq.heappush(min_heap, val) min_size += 1 cleanup_top(max_heap, True) while min_size > max_size: val = heapq.heappop(min_heap) min_size -= 1 heapq.heappush(max_heap, -val) max_size += 1 cleanup_top(min_heap, False) def cleanup_top(heap: list, is_max: bool) -> None: """Remove deleted elements from top of heap.""" while heap: top = -heap[0] if is_max else heap[0] if deleted[top] > 0: deleted[top] -= 1 heapq.heappop(heap) else: break def get_median() -> float: cleanup_top(max_heap, True) cleanup_top(min_heap, False) if max_size > min_size: return float(-max_heap[0]) return (-max_heap[0] + min_heap[0]) / 2.0 # Build initial window for i in range(k): add(nums[i]) balance() result.append(get_median()) # Slide the window for i in range(k, len(nums)): # Add new element, remove old element add(nums[i]) remove(nums[i - k]) balance() result.append(get_median()) return result # Examplenums = [1, 3, -1, -3, 5, 3, 6, 7]k = 3print(medianSlidingWindow(nums, k))# Output: [1.0, -1.0, -1.0, 3.0, 5.0, 6.0] # Window [1, 3, -1]: sorted = [-1, 1, 3], median = 1# Window [3, -1, -3]: sorted = [-3, -1, 3], median = -1# etc.Lazy deletion is amortized O(log k) per operation. Each element is pushed once and popped once (when window slides past it), giving total O(n log k). However, the constants are higher than the basic algorithm, and the implementation is significantly more complex.
The running median technique has numerous practical applications in production systems.
The dual-heap technique generalizes to any percentile! For the p-th percentile, maintain heaps at sizes proportional to p and (100-p). For example, for 90th percentile: keep 90% of elements in max_heap, 10% in min_heap. The max_heap root is the 90th percentile.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
import heapq class PercentileTracker: """ Track any percentile in a data stream using dual heaps. For p-th percentile: - max_heap contains floor(n * p / 100) elements - min_heap contains the rest - max_heap root is the p-th percentile """ def __init__(self, percentile: float = 50.0): """Initialize with target percentile (0-100).""" if not 0 < percentile < 100: raise ValueError("Percentile must be between 0 and 100") self.p = percentile / 100.0 # Internal ratio self.max_heap = [] # Lower p% self.min_heap = [] # Upper (100-p)% self.count = 0 def add(self, num: float) -> None: self.count += 1 # Add to appropriate heap if not self.max_heap or num <= -self.max_heap[0]: heapq.heappush(self.max_heap, -num) else: heapq.heappush(self.min_heap, num) # Rebalance based on target percentile self._rebalance() def _rebalance(self) -> None: """Maintain sizes: max_heap ≈ p * n, min_heap ≈ (1-p) * n.""" target_max_size = int(self.count * self.p) # At minimum, max_heap should have 1 element if count > 0 if target_max_size < 1 and self.count > 0: target_max_size = 1 while len(self.max_heap) > target_max_size + 1: heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap)) while len(self.max_heap) < target_max_size and self.min_heap: heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap)) def get_percentile(self) -> float: """Return the current p-th percentile value.""" if not self.max_heap: raise ValueError("No data") return -self.max_heap[0] # Example: Track 90th percentile latencyp90_tracker = PercentileTracker(90) latencies = [10, 25, 50, 75, 100, 150, 200, 500, 1000]for latency in latencies: p90_tracker.add(latency) print(f"After {latency}ms: P90 = {p90_tracker.get_percentile()}ms") # Output shows P90 tracking as data arrivesWe've thoroughly explored the median maintenance pattern—one of the most elegant dual-heap applications. Let's consolidate the key insights:
What's Next:
We've now covered three major heap patterns: k-th element, k-way merge, and median maintenance. In the next page, we'll explore task scheduling with priorities—where heaps enable efficient management of work queues with varying urgency levels, a pattern fundamental to operating systems, job schedulers, and workflow engines.
You now understand the dual-heap pattern deeply—from the insight behind partitioning data around the median, through the invariants that guarantee correctness, to advanced variations like sliding window and arbitrary percentiles. This technique appears in countless real-time analytics systems.