Loading content...
Every time you use your phone to navigate, your operating system schedules the next process, or your streaming service recommends content—heaps are silently at work. These unassuming data structures power some of the most critical infrastructure in modern software.
In this final page of the module, we'll explore real-world heap applications that go beyond Top-K:
Each application demonstrates how the O(log n) priority queue operations unlock solutions that would otherwise be impractical.
By the end of this page, you will understand how heaps power real-world systems: task scheduling algorithms, the elegant two-heap median trick, efficient sorted stream merging, event-driven architectures, and production patterns used by leading technology companies.
Every operating system, job scheduler, and task queue relies on priority-based scheduling. The heap is the workhorse that makes this efficient.
The Scheduling Problem:
Given a stream of tasks with different priorities and arrival times, decide which task to execute next. Goals:
Why Heaps?
| Operation | Array (sorted) | Linked List | Heap |
|---|---|---|---|
| Insert new task | O(n) | O(n) | O(log n) |
| Get highest priority | O(1) | O(1) | O(1) |
| Remove highest priority | O(1) | O(1) | O(log n) |
| Change priority | O(n) | O(n) | O(log n)* |
*With indexed heap
For a scheduler handling millions of tasks, O(log n) vs O(n) is the difference between microseconds and seconds.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
import heapqfrom dataclasses import dataclass, fieldfrom typing import List, Optional, Callable, Anyfrom time import timefrom enum import IntEnum class Priority(IntEnum): """Task priority levels (lower number = higher priority).""" CRITICAL = 0 HIGH = 1 NORMAL = 2 LOW = 3 BACKGROUND = 4 @dataclass(order=True)class Task: """ Scheduled task with priority ordering. Ordering: (priority, arrival_time, task_id) - Lower priority number = runs first - Earlier arrival = runs first (tie-breaker) - Task ID ensures stable ordering """ priority: Priority arrival_time: float = field(compare=True) task_id: int = field(compare=True) # Non-comparison fields name: str = field(compare=False) callback: Callable[[], Any] = field(compare=False, repr=False) metadata: dict = field(default_factory=dict, compare=False) class PriorityTaskScheduler: """ Production-style task scheduler using a priority queue. Features: - Multiple priority levels - FIFO ordering within priority - Task cancellation support - Statistics tracking This pattern is used in: - Operating system process schedulers - Job queue systems (Celery, Sidekiq) - Game engine update loops - Network packet schedulers """ def __init__(self): self._queue: List[Task] = [] self._task_counter = 0 self._cancelled: set = set() # Statistics self._tasks_submitted = 0 self._tasks_executed = 0 self._tasks_cancelled = 0 def submit( self, callback: Callable[[], Any], priority: Priority = Priority.NORMAL, name: str = None, **metadata ) -> int: """ Submit a task for execution. Returns: Task ID for cancellation/tracking """ self._task_counter += 1 task_id = self._task_counter task = Task( priority=priority, arrival_time=time(), task_id=task_id, name=name or f"task-{task_id}", callback=callback, metadata=metadata ) heapq.heappush(self._queue, task) self._tasks_submitted += 1 return task_id def cancel(self, task_id: int) -> bool: """ Cancel a pending task. Uses lazy deletion - task remains in queue but won't execute. """ if task_id in self._cancelled: return False self._cancelled.add(task_id) self._tasks_cancelled += 1 return True def get_next(self) -> Optional[Task]: """ Get the next task to execute (without running it). Skips cancelled tasks. """ while self._queue: task = self._queue[0] if task.task_id in self._cancelled: heapq.heappop(self._queue) self._cancelled.discard(task.task_id) continue return task return None def execute_next(self) -> Optional[Any]: """ Execute the highest-priority task. Returns the task's return value, or None if queue empty. """ while self._queue: task = heapq.heappop(self._queue) # Skip cancelled tasks if task.task_id in self._cancelled: self._cancelled.discard(task.task_id) continue # Execute task self._tasks_executed += 1 try: return task.callback() except Exception as e: print(f"Task {task.name} failed: {e}") return None return None def execute_all(self) -> int: """Execute all pending tasks. Returns count executed.""" count = 0 while self.get_next(): self.execute_next() count += 1 return count def pending_count(self) -> int: """Number of pending (non-cancelled) tasks.""" return len(self._queue) - len(self._cancelled) def get_stats(self) -> dict: return { "submitted": self._tasks_submitted, "executed": self._tasks_executed, "cancelled": self._tasks_cancelled, "pending": self.pending_count(), } # Demo: Simulating a job schedulerdef demo_job_scheduler(): scheduler = PriorityTaskScheduler() # Submit various tasks print("Submitting tasks...") scheduler.submit( lambda: print(" [CRITICAL] Database backup completed"), Priority.CRITICAL, name="db-backup" ) scheduler.submit( lambda: print(" [NORMAL] Sending welcome email"), Priority.NORMAL, name="welcome-email" ) scheduler.submit( lambda: print(" [HIGH] Processing payment"), Priority.HIGH, name="payment" ) scheduler.submit( lambda: print(" [BACKGROUND] Generating analytics"), Priority.BACKGROUND, name="analytics" ) cancel_id = scheduler.submit( lambda: print(" [LOW] This should not run!"), Priority.LOW, name="cancelled-task" ) scheduler.cancel(cancel_id) scheduler.submit( lambda: print(" [NORMAL] Updating cache"), Priority.NORMAL, name="cache-update" ) print(f"\nPending tasks: {scheduler.pending_count()}") print("\nExecuting in priority order:") scheduler.execute_all() print(f"\nStats: {scheduler.get_stats()}") demo_job_scheduler()Linux's Completely Fair Scheduler (CFS) uses a red-black tree (a balanced BST) for O(log n) scheduling. Job queues like Celery and Sidekiq use Redis-backed priority queues. Kubernetes pod scheduling uses multi-level queues with heap-based priority within each level.
Finding the median of a static array is straightforward: sort and take the middle element (O(n log n)). But what if elements arrive one at a time, and you need the current median after each arrival?
The Challenge:
Sorting after each insertion would be O(n² log n) for n elements. The two-heap trick solves this elegantly in O(n log n) total time.
The Insight:
The median divides a sorted array in half. Maintain two heaps:
The heaps stay balanced (differ by at most 1 element), and the median is always at one of the heap roots!
Elements: [1, 5, 2, 8, 3, 9, 4]
After all insertions:
Max-heap (left): [4, 3, 2, 1] ← smaller half
Min-heap (right): [5, 8, 9] ← larger half
Median = max(left) = 4 ✓ (middle of sorted: 1,2,3,4,5,8,9)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
import heapqfrom typing import List, Optional class MedianFinder: """ Running median finder using two heaps. Maintains median of all seen elements, with O(log n) insertion and O(1) median query. This pattern is used in: - Real-time statistics dashboards - Percentile monitoring (latency P50, P99) - Stream summarization - Anomaly detection baselines Time Complexity: - add_num(): O(log n) - find_median(): O(1) Space Complexity: O(n) - must store all elements """ def __init__(self): # Max-heap for left half (smaller elements) # Python only has min-heap, so we negate values self._left_max_heap: List[int] = [] # Min-heap for right half (larger elements) self._right_min_heap: List[int] = [] def add_num(self, num: int) -> None: """ Add a number to the data structure. Algorithm: 1. Add to appropriate heap based on current median 2. Rebalance if heaps differ by more than 1 Invariants maintained: - len(left) == len(right) OR len(left) == len(right) + 1 - max(left) <= min(right) """ # Step 1: Add to left (max) heap first # If empty or num <= current left max if not self._left_max_heap or num <= -self._left_max_heap[0]: heapq.heappush(self._left_max_heap, -num) # Negate for max-heap else: heapq.heappush(self._right_min_heap, num) # Step 2: Rebalance to maintain invariant self._rebalance() def _rebalance(self) -> None: """ Ensure heaps are balanced. Left can have at most 1 more element than right. """ left_size = len(self._left_max_heap) right_size = len(self._right_min_heap) if left_size > right_size + 1: # Left too big - move max to right val = -heapq.heappop(self._left_max_heap) heapq.heappush(self._right_min_heap, val) elif right_size > left_size: # Right too big - move min to left val = heapq.heappop(self._right_min_heap) heapq.heappush(self._left_max_heap, -val) def find_median(self) -> Optional[float]: """ Return the current median. - If odd count: median is the left heap's max - If even count: median is average of two middle elements """ left_size = len(self._left_max_heap) right_size = len(self._right_min_heap) if left_size == 0 and right_size == 0: return None if left_size > right_size: # Odd count - median is left's max return float(-self._left_max_heap[0]) else: # Even count - average of two middles left_max = -self._left_max_heap[0] right_min = self._right_min_heap[0] return (left_max + right_min) / 2.0 def get_count(self) -> int: """Total elements seen.""" return len(self._left_max_heap) + len(self._right_min_heap) class PercentileFinder: """ Generalized percentile finder using two heaps. Maintains any percentile (not just median/50th). Target percentile determines the size ratio of heaps. """ def __init__(self, percentile: float = 50.0): """ Initialize percentile finder. Args: percentile: Target percentile (0-100). Default 50 = median. """ if not 0 < percentile < 100: raise ValueError("Percentile must be between 0 and 100") self._percentile = percentile self._left_max_heap: List[int] = [] # Negated for max-heap self._right_min_heap: List[int] = [] def add(self, num: float) -> None: """Add a number and maintain percentile structure.""" # Add to left first if not self._left_max_heap or num <= -self._left_max_heap[0]: heapq.heappush(self._left_max_heap, -num) else: heapq.heappush(self._right_min_heap, num) self._rebalance() def _rebalance(self) -> None: """Rebalance to maintain target percentile.""" total = len(self._left_max_heap) + len(self._right_min_heap) target_left = int(total * self._percentile / 100) target_left = max(1, target_left) # At least 1 on left while len(self._left_max_heap) > target_left: val = -heapq.heappop(self._left_max_heap) heapq.heappush(self._right_min_heap, val) while len(self._left_max_heap) < target_left and self._right_min_heap: val = heapq.heappop(self._right_min_heap) heapq.heappush(self._left_max_heap, -val) def get_percentile(self) -> Optional[float]: """Get the current percentile value.""" if not self._left_max_heap: return None return float(-self._left_max_heap[0]) # Demo: Real-time latency monitoringdef demo_latency_monitoring(): """Simulate monitoring API latencies with P50, P95, P99.""" import random p50 = MedianFinder() # Median p95 = PercentileFinder(95.0) p99 = PercentileFinder(99.0) print("Simulating 1000 API requests with varying latencies...") latencies = [] for _ in range(1000): # Simulate: mostly fast, some slow, few very slow if random.random() < 0.90: latency = random.gauss(50, 15) # Normal: ~50ms elif random.random() < 0.95: latency = random.gauss(200, 50) # Slow: ~200ms else: latency = random.gauss(1000, 200) # Timeout: ~1000ms latency = max(1, latency) # No negative latencies latencies.append(latency) p50.add_num(int(latency)) p95.add(latency) p99.add(latency) print(f"\nLatency Statistics (from heap-based percentiles):") print(f" P50 (Median): {p50.find_median():.1f}ms") print(f" P95: {p95.get_percentile():.1f}ms") print(f" P99: {p99.get_percentile():.1f}ms") # Verify against sorted array sorted_latencies = sorted(latencies) print(f"\nVerification (from sorted array):") print(f" P50: {sorted_latencies[499]:.1f}ms") print(f" P95: {sorted_latencies[949]:.1f}ms") print(f" P99: {sorted_latencies[989]:.1f}ms") demo_latency_monitoring()"Find Median from Data Stream" (LeetCode #295) is a hard problem and a common interview question at top companies. The two-heap solution is the expected approach. Understanding it deeply demonstrates mastery of heap applications.
A fundamental problem in data processing: given K sorted lists (or streams), merge them into one sorted sequence.
Real-World Instances:
The Heap Solution:
Maintain a min-heap of size K, containing the current smallest element from each list. The heap always gives us the globally smallest element.
Lists:
[1, 4, 7] ← current: 1
[2, 5, 8] ← current: 2
[3, 6, 9] ← current: 3
Heap: [1, 2, 3] (with pointers to source lists)
Extract 1 → Output: [1]
Insert next from list 1 (4) → Heap: [2, 3, 4]
Extract 2 → Output: [1, 2]
Insert next from list 2 (5) → Heap: [3, 4, 5]
... continue until all lists exhausted
Time Complexity: O(n log K) where n = total elements across all lists Space Complexity: O(K) for the heap
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
import heapqfrom typing import List, Iterator, TypeVar, Tuple, Optionalfrom dataclasses import dataclass, field T = TypeVar('T') def merge_k_sorted_lists(lists: List[List[int]]) -> List[int]: """ Merge K sorted lists into one sorted list. Classic problem: LeetCode #23 "Merge K Sorted Lists" Time Complexity: O(n log K) - n = total elements - Each element is pushed and popped from heap once - Each heap operation is O(log K) Space Complexity: O(K) for heap + O(n) for output """ if not lists: return [] # Heap entries: (value, list_index, element_index) heap = [] # Initialize heap with first element from each non-empty list for i, lst in enumerate(lists): if lst: heapq.heappush(heap, (lst[0], i, 0)) result = [] while heap: val, list_idx, elem_idx = heapq.heappop(heap) result.append(val) # If this list has more elements, push the next one next_idx = elem_idx + 1 if next_idx < len(lists[list_idx]): next_val = lists[list_idx][next_idx] heapq.heappush(heap, (next_val, list_idx, next_idx)) return result class ListNode: """Linked list node for LeetCode-style problem.""" def __init__(self, val=0, next=None): self.val = val self.next = next def __lt__(self, other): return self.val < other.val def merge_k_sorted_linked_lists( lists: List[Optional[ListNode]]) -> Optional[ListNode]: """ Merge K sorted linked lists. Same algorithm, adapted for linked lists. """ # Filter out empty lists and initialize heap heap = [head for head in lists if head] heapq.heapify(heap) # Dummy head for result dummy = ListNode(0) current = dummy while heap: # Pop smallest node smallest = heapq.heappop(heap) current.next = smallest current = current.next # Push next node from same list if smallest.next: heapq.heappush(heap, smallest.next) return dummy.next def merge_k_sorted_iterators( iterators: List[Iterator[T]]) -> Iterator[T]: """ Generator-based merge for lazy evaluation. Perfect for streaming data - never materializes full result. Each next() call is O(log K). """ # Initialize heap with first element from each iterator heap = [] for i, it in enumerate(iterators): try: val = next(it) heapq.heappush(heap, (val, i, it)) except StopIteration: pass # Empty iterator while heap: val, idx, it = heapq.heappop(heap) yield val try: next_val = next(it) heapq.heappush(heap, (next_val, idx, it)) except StopIteration: pass # Iterator exhausted # Demo: Merging sorted log filesdef demo_log_merge(): """Simulate merging sorted logs from multiple servers.""" # Simulated logs (pre-sorted by timestamp) server_logs = [ [ (1000, "server1", "Request received"), (1050, "server1", "Processing started"), (1200, "server1", "Response sent"), ], [ (1010, "server2", "Request received"), (1100, "server2", "Cache hit"), (1120, "server2", "Response sent"), ], [ (1005, "server3", "Request received"), (1080, "server3", "Database query"), (1150, "server3", "Response sent"), ], ] # Extract timestamps for sorting timestamp_lists = [[log[0] for log in logs] for logs in server_logs] print("Individual server logs:") for i, logs in enumerate(server_logs): print(f" Server {i+1}: {[log[0] for log in logs]}") # Merge using heap # We need to track which log entry each timestamp came from heap = [] for i, logs in enumerate(server_logs): if logs: heapq.heappush(heap, (logs[0][0], i, 0)) print("\nMerged log timeline:") while heap: ts, server_idx, log_idx = heapq.heappop(heap) log_entry = server_logs[server_idx][log_idx] print(f" [{log_entry[0]}] {log_entry[1]}: {log_entry[2]}") next_idx = log_idx + 1 if next_idx < len(server_logs[server_idx]): next_entry = server_logs[server_idx][next_idx] heapq.heappush(heap, (next_entry[0], server_idx, next_idx)) demo_log_merge()Many systems operate on events scheduled for future times: game engines, network simulators, financial trading systems. The heap-based event queue is the core data structure powering these simulations.
The Event Queue Pattern:
EventQueue:
schedule(event, time) # Add event for future execution
get_next() # Get the soonest event
advance() # Execute next event, update clock
Use Cases:
Why Heaps?
Events don't arrive in the order they'll execute. A packet scheduled for t=100ms might be added after one scheduled for t=50ms. The heap maintains chronological ordering efficiently.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
import heapqfrom dataclasses import dataclass, fieldfrom typing import Callable, Any, Optional, Listfrom enum import Enum class EventType(Enum): PACKET_ARRIVE = "packet_arrive" PACKET_SEND = "packet_send" TIMEOUT = "timeout" ACK_RECEIVE = "ack_receive" @dataclass(order=True)class Event: """Simulation event with scheduled time.""" time: float sequence: int = field(compare=True) # Tie-breaker event_type: EventType = field(compare=False) handler: Callable[['Simulator', 'Event'], None] = field( compare=False, repr=False ) data: dict = field(default_factory=dict, compare=False) class Simulator: """ Discrete Event Simulator using a priority queue. Classic pattern for: - Network protocol simulation - Queuing theory models - Game engine loops - Financial trading simulation The event queue (heap) ensures events execute in chronological order, regardless of insertion order. """ def __init__(self): self._event_queue: List[Event] = [] self._current_time = 0.0 self._sequence = 0 # Statistics self._events_processed = 0 self._events_scheduled = 0 @property def current_time(self) -> float: return self._current_time def schedule( self, delay: float, event_type: EventType, handler: Callable[['Simulator', Event], None], **data ) -> Event: """ Schedule an event to occur after 'delay' time units. Returns the scheduled event (for cancellation if needed). """ self._sequence += 1 event = Event( time=self._current_time + delay, sequence=self._sequence, event_type=event_type, handler=handler, data=data ) heapq.heappush(self._event_queue, event) self._events_scheduled += 1 return event def schedule_absolute( self, time: float, event_type: EventType, handler: Callable[['Simulator', Event], None], **data ) -> Event: """Schedule an event at an absolute time.""" self._sequence += 1 event = Event( time=time, sequence=self._sequence, event_type=event_type, handler=handler, data=data ) heapq.heappush(self._event_queue, event) self._events_scheduled += 1 return event def peek_next(self) -> Optional[Event]: """View the next event without executing it.""" return self._event_queue[0] if self._event_queue else None def step(self) -> bool: """ Execute the next event. Returns True if an event was executed, False if queue empty. """ if not self._event_queue: return False event = heapq.heappop(self._event_queue) self._current_time = event.time self._events_processed += 1 # Execute the event handler event.handler(self, event) return True def run_until(self, end_time: float) -> int: """Run simulation until reaching end_time. Returns events processed.""" count = 0 while self._event_queue and self._event_queue[0].time <= end_time: self.step() count += 1 self._current_time = end_time return count def run_until_empty(self) -> int: """Run until no more events. Returns events processed.""" count = 0 while self.step(): count += 1 return count def get_stats(self) -> dict: return { "current_time": self._current_time, "events_scheduled": self._events_scheduled, "events_processed": self._events_processed, "events_pending": len(self._event_queue), } # Demo: Simple network simulationdef demo_network_simulation(): """Simulate TCP-like packet transmission with retries.""" sim = Simulator() # Track state packets_sent = [] packets_received = [] retransmissions = 0 def on_packet_send(sim: Simulator, event: Event): packet_id = event.data['packet_id'] packets_sent.append((sim.current_time, packet_id)) print(f" [{sim.current_time:.1f}ms] SEND packet {packet_id}") # Simulate network delay (50-100ms) with 20% packet loss import random if random.random() > 0.2: # 80% success delay = random.uniform(50, 100) sim.schedule( delay, EventType.PACKET_ARRIVE, on_packet_arrive, packet_id=packet_id ) else: print(f" [{sim.current_time:.1f}ms] LOST packet {packet_id}") # Schedule timeout for retransmission sim.schedule( 150, # Timeout after 150ms EventType.TIMEOUT, on_timeout, packet_id=packet_id ) def on_packet_arrive(sim: Simulator, event: Event): packet_id = event.data['packet_id'] if packet_id not in [p[1] for p in packets_received]: packets_received.append((sim.current_time, packet_id)) print(f" [{sim.current_time:.1f}ms] RECV packet {packet_id}") # Send ACK (simulated by just recording receipt) sim.schedule( 20, # ACK delay EventType.ACK_RECEIVE, on_ack, packet_id=packet_id ) def on_ack(sim: Simulator, event: Event): packet_id = event.data['packet_id'] print(f" [{sim.current_time:.1f}ms] ACK packet {packet_id}") def on_timeout(sim: Simulator, event: Event): nonlocal retransmissions packet_id = event.data['packet_id'] # Check if packet was already received if packet_id not in [p[1] for p in packets_received]: retransmissions += 1 print(f" [{sim.current_time:.1f}ms] TIMEOUT, retransmit packet {packet_id}") # Retransmit sim.schedule( 0, EventType.PACKET_SEND, on_packet_send, packet_id=packet_id ) # Schedule initial packet transmissions print("Simulating packet transmission with 20% loss rate...\n") for i in range(5): sim.schedule( i * 30, # Stagger sends by 30ms EventType.PACKET_SEND, on_packet_send, packet_id=i ) # Run simulation for 500ms sim.run_until(500) print(f"\nSimulation complete at t={sim.current_time:.1f}ms") print(f"Stats: {sim.get_stats()}") print(f"Packets sent: {len(packets_sent)}") print(f"Packets received: {len(packets_received)}") print(f"Retransmissions: {retransmissions}") demo_network_simulation()Every GPS navigation, network routing protocol, and game pathfinding system uses some variant of Dijkstra's algorithm—and at its core is a priority queue.
The Problem: Given a weighted graph with non-negative edge weights, find the shortest path from a source vertex to all other vertices.
The Algorithm:
Why Heaps Make It Efficient:
The critical operation is "extract minimum distance vertex." Without a heap, this is O(V) per extraction, giving O(V²) total. With a heap, it's O(log V) per extraction, giving O((V + E) log V) total.
For sparse graphs (E ≈ V), this is the difference between O(V²) and O(V log V)—crucial for large road networks with millions of intersections.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
import heapqfrom typing import List, Dict, Tuple, Optionalfrom collections import defaultdict def dijkstra( graph: Dict[int, List[Tuple[int, int]]], source: int) -> Tuple[Dict[int, int], Dict[int, Optional[int]]]: """ Dijkstra's shortest path algorithm using a min-heap. Args: graph: Adjacency list. graph[u] = [(v, weight), ...] source: Starting vertex Returns: (distances, predecessors) - distances[v] = shortest distance from source to v - predecessors[v] = previous vertex on shortest path to v Time Complexity: O((V + E) log V) - Each vertex extracted once: V × O(log V) - Each edge relaxed once: E × O(log V) Space Complexity: O(V) for distances + O(V) for heap This is the algorithm behind: - Google Maps / GPS navigation - Network routing (OSPF protocol) - Game AI pathfinding """ # Initialize distances to infinity distances = defaultdict(lambda: float('inf')) distances[source] = 0 # Track predecessors for path reconstruction predecessors = {source: None} # Priority queue: (distance, vertex) heap = [(0, source)] # Track visited vertices visited = set() while heap: current_dist, u = heapq.heappop(heap) # Skip if already processed (we may have duplicate entries) if u in visited: continue visited.add(u) # Relax all edges from u for v, weight in graph.get(u, []): if v in visited: continue new_dist = current_dist + weight if new_dist < distances[v]: distances[v] = new_dist predecessors[v] = u heapq.heappush(heap, (new_dist, v)) return dict(distances), predecessors def reconstruct_path( predecessors: Dict[int, Optional[int]], source: int, target: int) -> Optional[List[int]]: """Reconstruct the shortest path from source to target.""" if target not in predecessors: return None # No path exists path = [] current = target while current is not None: path.append(current) current = predecessors[current] path.reverse() return path # Demo: Finding shortest paths in a road networkdef demo_road_network(): """Simulate GPS navigation on a simple road network.""" # City graph: city -> [(neighbor, distance_km), ...] roads = { 'A': [('B', 10), ('C', 15)], 'B': [('A', 10), ('D', 12), ('F', 15)], 'C': [('A', 15), ('E', 10)], 'D': [('B', 12), ('E', 2), ('F', 1)], 'E': [('C', 10), ('D', 2), ('F', 5)], 'F': [('B', 15), ('D', 1), ('E', 5)], } # Convert to integer keys for dijkstra city_to_id = {city: i for i, city in enumerate(roads.keys())} id_to_city = {i: city for city, i in city_to_id.items()} graph = {} for city, neighbors in roads.items(): u = city_to_id[city] graph[u] = [(city_to_id[n], w) for n, w in neighbors] # Find shortest paths from city A source = city_to_id['A'] distances, predecessors = dijkstra(graph, source) print("Road network shortest paths from city A:\n") print("City | Distance | Path") print("-" * 40) for city in roads.keys(): target = city_to_id[city] dist = distances.get(target, float('inf')) path_ids = reconstruct_path(predecessors, source, target) if path_ids: path = " → ".join(id_to_city[i] for i in path_ids) print(f" {city} | {dist:2.0f} km | {path}") else: print(f" {city} | ∞ | No path") demo_road_network()The A* algorithm, used extensively in game AI, is Dijkstra's algorithm with an additional heuristic function guiding the search toward the goal. The priority queue remains central—it just uses f(n) = g(n) + h(n) instead of just distance g(n).
Let's look at how heaps power real production systems at major technology companies.
Pattern 1: Rate Limiting with Token Bucket
Heaps track when tokens will be available for rate-limited APIs:
class RateLimiter:
def __init__(self, rate: float, burst: int):
self.rate = rate # tokens per second
self.burst = burst
self.tokens = burst
self.last_update = time()
def allow_request(self) -> bool:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
Pattern 2: Connection Pool Management
Heaps order connections by last-use time for LRU eviction or by health score for load balancing.
Pattern 3: Memory-Bounded Caches
LRU caches use heaps (or linked lists) to efficiently evict the least-recently-used entries:
class HeapLRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.heap = [] # (access_time, key)
self.cache = {} # key -> value
self.time = 0
def get(self, key):
if key in self.cache:
self.time += 1
# Update access time (lazy - old entry stays in heap)
heapq.heappush(self.heap, (self.time, key))
return self.cache[key]
return None
| System | Company/Product | Heap Usage | Scale |
|---|---|---|---|
| Search Ranking | Top-K results per query | 8.5B queries/day | |
| Feed Ranking | Facebook/Meta | Top posts for news feed | 2B users, personalized |
| Ride Matching | Uber/Lyft | Nearest K drivers to rider | Millions of matches/day |
| Video Recommendations | Netflix/YouTube | Top-K personalized content | Billions of views/day |
| Order Book | Stock Exchanges | Best bid/ask prices | Millions of orders/second |
| Process Scheduler | Linux Kernel | Next process to run | Running on billions of devices |
Pattern 4: Huffman Coding (Data Compression)
The Huffman algorithm builds optimal prefix codes using a heap to always combine the two smallest-frequency nodes:
def build_huffman_tree(frequencies: Dict[str, int]) -> Node:
heap = [(freq, Node(char)) for char, freq in frequencies.items()]
heapq.heapify(heap)
while len(heap) > 1:
freq1, node1 = heapq.heappop(heap)
freq2, node2 = heapq.heappop(heap)
merged = Node(left=node1, right=node2)
heapq.heappush(heap, (freq1 + freq2, merged))
return heap[0][1]
This powers GZIP, PNG, JPEG, and countless other compression formats.
Consider using a heap when you need: • Repeated "find minimum/maximum" operations • Dynamic insertions where order matters • Bounded memory with priority-based eviction • Efficient merge of sorted streams • Any form of scheduling or prioritization
If you're doing insertions AND repeated min/max queries, a heap is almost certainly the right choice.
We've explored the breadth of heap applications in production systems. Let's consolidate:
Module Complete!
You've now mastered the Top-K pattern and heap applications—from the basic algorithm through memory optimization, stream processing, and real-world production patterns. These skills will serve you in coding interviews, system design discussions, and building efficient software.
Key Skills Acquired:
Congratulations! You've completed the final module of Chapter 14: Heaps & Priority Queues. You now have a comprehensive understanding of one of the most versatile data structures in computer science. From basic operations through advanced streaming patterns to production system design, heaps are now a powerful tool in your engineering arsenal.