Loading content...
Twitter processes over 500 million tweets per day. Netflix streams billions of watch events. Financial systems handle millions of transactions per second. All of these systems share a common challenge: they must answer questions about "Top" or "Most" without storing every event—because there's simply too much data, and it never stops arriving.
This is the domain of stream processing—analyzing data as it flows, in real-time, with bounded memory. The heap data structure is uniquely suited for this challenge because it can maintain Top-K elements using only O(K) space, regardless of how many elements have passed through.
But stream processing introduces new complexities beyond static Top-K:
In this page, we'll master these techniques, learning to build systems that can answer "What are the top results right now?" for truly infinite data streams.
By the end of this page, you will understand: (1) how heaps enable true streaming Top-K with O(K) memory, (2) sliding window patterns for time-bounded analytics, (3) time-decay algorithms for recency-weighted rankings, (4) high-throughput optimization techniques, and (5) real-time aggregation patterns for distributed streams.
Before diving into heap-based streaming, let's clearly distinguish streaming from batch processing and understand why this distinction matters.
Batch Processing:
Stream Processing:
Why This Matters for Top-K:
In batch processing, you can always sort the entire dataset:
# Batch: O(n log n) time, O(n) space
def batch_top_k(data, k):
return sorted(data, reverse=True)[:k]
In streaming, you can't store all elements:
# Streaming: O(n log K) time, O(K) space
def streaming_top_k(stream, k):
heap = []
for element in stream: # Possibly infinite
# Process one element at a time
# Only k elements in memory at any point
...
return heap
Some problems are fundamentally unsolvable with bounded memory in streaming. For example, finding the exact median requires storing all elements (or using approximation). Top-K is special because we only need to retain K elements—everything outside the Top-K can be safely discarded.
Let's implement a robust streaming Top-K processor that handles the core use case: maintaining Top-K across an unbounded stream of elements.
The Streaming Interface:
StreamingTopK:
constructor(k: int)
process(element: T) -> void # Handle one incoming element
get_top_k() -> List[T] # Get current Top-K (any time)
get_threshold() -> T # Get admission threshold
count() -> int # Elements processed so far
is_in_top_k(element: T) -> bool # Would this element be in Top-K?
Key Properties:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
import heapqfrom typing import TypeVar, Generic, List, Optional, Callable, Iteratorfrom dataclasses import dataclassfrom time import time T = TypeVar('T') class StreamingTopK(Generic[T]): """ Streaming Top-K processor using a bounded min-heap. Maintains the K largest elements seen so far over an unbounded stream. Results available at any time. Time Complexity: - process(): O(log K) per element - get_top_k(): O(K) to copy, O(K log K) if sorted - get_threshold(): O(1) Space Complexity: O(K) Example: >>> topk = StreamingTopK(k=3, key=lambda x: x) >>> for value in [5, 2, 9, 1, 7, 6, 8]: ... topk.process(value) >>> topk.get_top_k() [9, 8, 7] """ def __init__( self, k: int, key: Callable[[T], float] = lambda x: float(x) ): """ Initialize streaming Top-K processor. Args: k: Number of top elements to maintain key: Function to extract numeric key for comparison """ if k <= 0: raise ValueError("k must be positive") self._k = k self._key = key self._heap: List[tuple] = [] # (key_value, element) self._count = 0 self._sum_processed = 0.0 # For statistics @property def k(self) -> int: return self._k @property def elements_processed(self) -> int: return self._count @property def current_size(self) -> int: return len(self._heap) def process(self, element: T) -> bool: """ Process one element from the stream. Returns: True if element was admitted to Top-K, False otherwise """ self._count += 1 key_value = self._key(element) self._sum_processed += key_value if len(self._heap) < self._k: # Still filling initial K slots heapq.heappush(self._heap, (key_value, element)) return True elif key_value > self._heap[0][0]: # New element beats the weakest in Top-K heapq.heapreplace(self._heap, (key_value, element)) return True return False def process_batch(self, elements: Iterator[T]) -> int: """ Process multiple elements efficiently. Returns count of elements admitted to Top-K. """ admitted = 0 for element in elements: if self.process(element): admitted += 1 return admitted def get_top_k(self, sorted_desc: bool = True) -> List[T]: """ Get current Top-K elements. Args: sorted_desc: If True, return in descending order by key Returns: List of Top-K elements (may have fewer if < K seen) """ if sorted_desc: sorted_heap = sorted(self._heap, reverse=True) return [element for _, element in sorted_heap] return [element for _, element in self._heap] def get_threshold(self) -> Optional[float]: """ Get current admission threshold. Elements with key > threshold will be admitted to Top-K. Returns None if Top-K not yet full. """ if len(self._heap) < self._k: return None # Any element can enter return self._heap[0][0] def would_be_admitted(self, element: T) -> bool: """ Check if element would be admitted without processing it. """ if len(self._heap) < self._k: return True return self._key(element) > self._heap[0][0] def get_statistics(self) -> dict: """ Get processing statistics. """ threshold = self.get_threshold() return { "elements_processed": self._count, "current_top_k_size": len(self._heap), "admission_threshold": threshold, "admission_rate": len(self._heap) / max(self._count, 1), "average_processed": self._sum_processed / max(self._count, 1), } # Demo: Processing a simulated event streamimport random def demo_event_stream(): """Simulate processing server metrics stream.""" # Track top 5 CPU usage readings top_cpu = StreamingTopK[dict]( k=5, key=lambda event: event["cpu_percent"] ) # Simulate metric events servers = [f"server-{i}" for i in range(100)] print("Processing 10,000 metric events...") for i in range(10000): event = { "server": random.choice(servers), "cpu_percent": random.gauss(50, 20), # Normal distribution "timestamp": time(), "event_id": i } top_cpu.process(event) # Show progress every 2000 events if (i + 1) % 2000 == 0: threshold = top_cpu.get_threshold() print(f" After {i+1} events: threshold = {threshold:.1f}%") print(f"\nFinal statistics:") stats = top_cpu.get_statistics() for key, value in stats.items(): if isinstance(value, float): print(f" {key}: {value:.2f}") else: print(f" {key}: {value}") print(f"\nTop 5 CPU spikes:") for event in top_cpu.get_top_k(): print(f" {event['server']}: {event['cpu_percent']:.1f}% " f"(event #{event['event_id']})") demo_event_stream()In many real-world scenarios, we don't want Top-K over all time—we want Top-K over a recent window. Examples:
This requires a sliding window that "forgets" old data as new data arrives.
Window Types:
The challenge: efficiently evicting elements that fall outside the window.
Standard heaps don't support efficient arbitrary deletion. If an element in the middle of the heap "expires," we can't remove it in O(log n). This is why sliding window Top-K requires more sophisticated approaches.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
from collections import dequefrom typing import TypeVar, Generic, List, Optional, Callable, Tuplefrom dataclasses import dataclassfrom time import timeimport heapq T = TypeVar('T') @dataclassclass TimestampedElement(Generic[T]): """Element with timestamp for windowing.""" timestamp: float value: T key: float class TimeWindowTopK(Generic[T]): """ Top-K over a time-based sliding window. Maintains Top-K elements that arrived within the last window_seconds. Older elements are automatically expired. Implementation Strategy: - Combine min-heap (for Top-K) with lazy expiration - Deque for tracking element ages - Periodic cleanup to bound memory Time Complexity: - process(): O(log K) amortized - get_top_k(): O(K log K) with cleanup Space Complexity: O(W) where W = elements in current window (could be larger than K during high-traffic periods) """ def __init__( self, k: int, window_seconds: float, key: Callable[[T], float] = lambda x: float(x), cleanup_interval: int = 100 ): """ Initialize time-windowed Top-K tracker. Args: k: Number of top elements to track window_seconds: Time window duration in seconds key: Function to extract comparison key cleanup_interval: Run cleanup every N insertions """ self._k = k self._window = window_seconds self._key = key self._cleanup_interval = cleanup_interval # Main storage: (key, timestamp, element) # Using key as first element for heap ordering self._heap: List[Tuple[float, float, T]] = [] # Track insertion count for periodic cleanup self._insert_count = 0 # Statistics self._total_processed = 0 self._total_expired = 0 def _current_time(self) -> float: """Get current timestamp (can be overridden for testing).""" return time() def _is_expired(self, timestamp: float, now: float) -> bool: """Check if element has expired.""" return now - timestamp > self._window def process(self, element: T, timestamp: float = None) -> bool: """ Process one element. Args: element: The element to process timestamp: Element timestamp (defaults to current time) Returns: True if element was admitted to Top-K window """ if timestamp is None: timestamp = self._current_time() self._total_processed += 1 self._insert_count += 1 key_value = self._key(element) # Insert into heap (we'll handle expiration lazily) heapq.heappush(self._heap, (key_value, timestamp, element)) # Periodic cleanup if self._insert_count >= self._cleanup_interval: self._cleanup() self._insert_count = 0 return True # Always admitted to window def _cleanup(self) -> int: """ Remove expired elements from heap. Returns number of elements removed. """ now = self._current_time() # Filter out expired elements active = [ entry for entry in self._heap if not self._is_expired(entry[1], now) ] expired_count = len(self._heap) - len(active) self._total_expired += expired_count # Rebuild heap self._heap = active heapq.heapify(self._heap) return expired_count def get_top_k(self, as_of: float = None) -> List[T]: """ Get Top-K elements within the current window. Args: as_of: Timestamp to use (defaults to current time) Returns: Top-K elements in descending order by key """ if as_of is None: as_of = self._current_time() # Get all non-expired elements active = [ entry for entry in self._heap if not self._is_expired(entry[1], as_of) ] # Sort by key descending and take top K active.sort(reverse=True) top_k = active[:self._k] return [element for _, _, element in top_k] def get_threshold(self, as_of: float = None) -> Optional[float]: """ Get current admission threshold for Top-K. Returns None if fewer than K elements in window. """ if as_of is None: as_of = self._current_time() active = [ entry for entry in self._heap if not self._is_expired(entry[1], as_of) ] if len(active) < self._k: return None # Threshold is the K-th largest key active.sort(reverse=True) return active[min(self._k - 1, len(active) - 1)][0] def get_statistics(self) -> dict: """Get window statistics.""" now = self._current_time() active_count = sum( 1 for entry in self._heap if not self._is_expired(entry[1], now) ) return { "total_processed": self._total_processed, "total_expired": self._total_expired, "heap_size": len(self._heap), "active_in_window": active_count, "window_seconds": self._window, } class CountWindowTopK(Generic[T]): """ Top-K over a count-based sliding window (last N elements). More memory-efficient than time-based when window is small. Uses a circular buffer for O(1) eviction. """ def __init__( self, k: int, window_size: int, key: Callable[[T], float] = lambda x: float(x) ): """ Initialize count-windowed Top-K tracker. Args: k: Number of top elements to track window_size: Number of recent elements to consider key: Function to extract comparison key """ self._k = k self._window_size = window_size self._key = key # Circular buffer with key values self._buffer: deque = deque(maxlen=window_size) self._count = 0 def process(self, element: T) -> Optional[T]: """ Process one element. Returns evicted element if buffer was full, else None. """ self._count += 1 key_value = self._key(element) evicted = None if len(self._buffer) == self._window_size: # Buffer full, oldest will be evicted evicted = self._buffer[0][1] self._buffer.append((key_value, element)) return evicted def get_top_k(self) -> List[T]: """Get Top-K from current window.""" # Sort buffer by key descending sorted_buffer = sorted( self._buffer, key=lambda x: x[0], reverse=True ) return [element for _, element in sorted_buffer[:self._k]] def get_window_contents(self) -> List[T]: """Get all elements in current window.""" return [element for _, element in self._buffer] # Demo: Trending topics over time windowdef demo_trending_hashtags(): """Simulate tracking trending hashtags over a 10-second window.""" from collections import Counter import random # Track top 5 hashtags in last 2 seconds trending = TimeWindowTopK[dict]( k=5, window_seconds=2.0, key=lambda x: x["count"] ) hashtags = ["#python", "#javascript", "#ai", "#coding", "#tech", "#news", "#sports", "#music"] print("Simulating hashtag stream (3 seconds of data)...") start = time() # Simulate varying hashtag frequencies hashtag_counts = Counter() while time() - start < 3: # Random hashtag with varying popularity tag = random.choices( hashtags, weights=[20, 15, 25, 10, 8, 5, 12, 5] )[0] hashtag_counts[tag] += 1 # Create event with rolling count event = {"tag": tag, "count": hashtag_counts[tag]} trending.process(event) print(f"\nStats: {trending.get_statistics()}") print("\nTop 5 hashtags in last 2 seconds:") for event in trending.get_top_k(): print(f" {event['tag']}: {event['count']} occurrences") demo_trending_hashtags()Sliding windows have a binary view of relevance—elements are either in the window or not. But many applications need gradual decay, where older elements matter less but don't suddenly disappear.
Use Cases for Time-Decay:
Common Decay Functions:
Exponential Decay: score × e^(-λ × age)
Linear Decay: score × max(0, 1 - age/window)
Newton Cooling: score × (1 + age)^(-gravity)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
import mathimport heapqfrom typing import TypeVar, Generic, List, Callable, Tuple, Optionalfrom dataclasses import dataclassfrom time import time T = TypeVar('T') @dataclassclass DecayedElement(Generic[T]): """Element with decay support.""" element: T base_score: float timestamp: float def decayed_score( self, now: float, decay_fn: Callable[[float, float], float] ) -> float: """Calculate current score with decay applied.""" age = now - self.timestamp return decay_fn(self.base_score, age) class DecayFunctions: """Common decay functions for time-weighted scoring.""" @staticmethod def exponential(half_life: float) -> Callable[[float, float], float]: """ Exponential decay with configurable half-life. Score halves every half_life seconds. """ lambda_param = math.log(2) / half_life def decay(base_score: float, age: float) -> float: return base_score * math.exp(-lambda_param * age) return decay @staticmethod def linear(window: float) -> Callable[[float, float], float]: """ Linear decay to zero at window boundary. """ def decay(base_score: float, age: float) -> float: factor = max(0.0, 1.0 - age / window) return base_score * factor return decay @staticmethod def newton_cooling(gravity: float = 1.5) -> Callable[[float, float], float]: """ Newton cooling (Hacker News style). Higher gravity = faster decay. """ def decay(base_score: float, age_hours: float) -> float: return base_score / math.pow(1 + age_hours, gravity) return decay @staticmethod def step(threshold: float) -> Callable[[float, float], float]: """ Step function: full score before threshold, zero after. """ def decay(base_score: float, age: float) -> float: return base_score if age <= threshold else 0.0 return decay class TimeDecayTopK(Generic[T]): """ Top-K with time-decay scoring. Elements have a base score that decays over time according to a configurable decay function. Rankings reflect current (decayed) scores, not original scores. Implementation: - Store (base_score, timestamp, element) in heap - Recompute decayed scores on query - Periodically prune elements with negligible scores Note: Heap is ordered by base_score for efficiency, but get_top_k() recomputes with decay. """ def __init__( self, k: int, decay_fn: Callable[[float, float], float], score_fn: Callable[[T], float], min_score_threshold: float = 0.01, prune_interval: int = 1000 ): """ Initialize time-decay Top-K tracker. Args: k: Number of top elements to maintain decay_fn: Function(base_score, age) -> decayed_score score_fn: Function to extract base score from element min_score_threshold: Prune elements below this decayed score prune_interval: Prune every N insertions """ self._k = k self._decay_fn = decay_fn self._score_fn = score_fn self._min_threshold = min_score_threshold self._prune_interval = prune_interval # Store all elements with their timestamps self._elements: List[DecayedElement[T]] = [] self._insert_count = 0 def _current_time(self) -> float: return time() def process( self, element: T, timestamp: float = None ) -> None: """Process one element.""" if timestamp is None: timestamp = self._current_time() base_score = self._score_fn(element) entry = DecayedElement(element, base_score, timestamp) self._elements.append(entry) self._insert_count += 1 if self._insert_count >= self._prune_interval: self._prune() self._insert_count = 0 def _prune(self) -> int: """Remove elements with negligible decayed scores.""" now = self._current_time() before = len(self._elements) self._elements = [ e for e in self._elements if e.decayed_score(now, self._decay_fn) >= self._min_threshold ] return before - len(self._elements) def get_top_k(self, as_of: float = None) -> List[Tuple[T, float]]: """ Get Top-K elements by current decayed score. Returns: List of (element, current_score) tuples, descending """ if as_of is None: as_of = self._current_time() # Compute current scores scored = [ (e.element, e.decayed_score(as_of, self._decay_fn)) for e in self._elements ] # Filter negligible scores and sort scored = [ (elem, score) for elem, score in scored if score >= self._min_threshold ] scored.sort(key=lambda x: x[1], reverse=True) return scored[:self._k] def get_score(self, element: T, as_of: float = None) -> Optional[float]: """Get current decayed score for an element.""" if as_of is None: as_of = self._current_time() for e in self._elements: if e.element == element: return e.decayed_score(as_of, self._decay_fn) return None # Demo: Reddit-style hot posts rankingdef demo_hot_posts(): """Simulate a hot posts ranking with time decay.""" # Use exponential decay with 1-hour half-life decay_fn = DecayFunctions.exponential(half_life=3600) hot_posts = TimeDecayTopK[dict]( k=5, decay_fn=decay_fn, score_fn=lambda post: post["upvotes"], min_score_threshold=1.0 ) # Simulate posts from different times current = time() posts = [ # Recent posts with fewer upvotes {"id": 1, "title": "Breaking news!", "upvotes": 100, "age_hours": 0.1}, {"id": 2, "title": "New discovery", "upvotes": 80, "age_hours": 0.5}, # Older posts with more upvotes {"id": 3, "title": "Yesterday's hit", "upvotes": 500, "age_hours": 5}, {"id": 4, "title": "Last week viral", "upvotes": 10000, "age_hours": 24}, {"id": 5, "title": "Classic post", "upvotes": 50000, "age_hours": 168}, # 1 week old # Mid-age, mid-score {"id": 6, "title": "Gaining traction", "upvotes": 200, "age_hours": 2}, ] print("Adding posts with varying ages and upvotes...") for post in posts: # Simulate post timestamp timestamp = current - (post["age_hours"] * 3600) hot_posts.process(post, timestamp) print(f" [{post['id']}] {post['title']}: " f"{post['upvotes']} upvotes, {post['age_hours']}h old") print("\nTop 5 'Hot' posts (with time decay):") for post, score in hot_posts.get_top_k(): original = post["upvotes"] decay_pct = (score / original) * 100 print(f" [{post['id']}] {post['title']}") print(f" Original: {original}, Decayed: {score:.1f} " f"({decay_pct:.1f}% retained)") print("\nNotice: Recent posts with fewer upvotes rank higher") print("than older posts with many more upvotes!") demo_hot_posts()The right decay parameters depend on your domain:
Start with an intuitive half-life and tune based on user engagement metrics.
When processing millions of events per second, even O(log K) operations add up. Here are optimization techniques for high-throughput streaming:
Technique 1: Threshold-Based Early Rejection
Before any heap operation, check if the element could possibly qualify:
def process_optimized(self, element):
key = self.key_fn(element)
# O(1) early rejection
if self.is_full() and key <= self.threshold:
return False # Skip heap operation entirely
# Only qualified elements touch the heap
return self.process_internal(element)
In many distributions (e.g., normal), 90%+ of elements are rejected without heap operations.
Technique 2: Batch Processing
Process elements in batches, heap-sorting only the batch winners:
def process_batch(self, elements, batch_size=1000):
for i in range(0, len(elements), batch_size):
batch = elements[i:i+batch_size]
# Pre-filter: keep only elements above threshold
qualified = [e for e in batch if self.would_qualify(e)]
# Bulk insert qualified elements
for e in qualified:
self.process(e)
Technique 3: Approximate Top-K
When exact Top-K isn't required, use probabilistic data structures:
| Technique | Speedup | Trade-off | Best For |
|---|---|---|---|
| Threshold rejection | 10-100x | None (exact) | Skewed distributions |
| Batch processing | 2-5x | Latency (batch delay) | Bulk ingestion |
| Lock-free heap | 2-4x | Complexity | Multi-threaded |
| Approximate structures | 10-1000x | Accuracy (~1% error) | Frequency counting |
| Sampling | 100x+ | Probabilistic bounds | Very large K |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
import heapqfrom typing import List, TypeVar, Callable, Tuplefrom collections import dequeimport randomfrom time import time, perf_counter T = TypeVar('T') class HighThroughputTopK: """ Optimized Top-K for high-throughput streaming. Optimizations: 1. Threshold-based early rejection (O(1) for non-qualifiers) 2. Pre-allocated heap array (no dynamic allocation) 3. Batched statistics updates 4. Minimal object creation """ __slots__ = ['_k', '_heap', '_size', '_threshold', '_count', '_rejected'] def __init__(self, k: int): self._k = k self._heap = [(float('-inf'), None)] * k # Pre-allocate self._size = 0 self._threshold = float('-inf') self._count = 0 self._rejected = 0 def process(self, key: float, value=None) -> bool: """ Process single element with early rejection. Returns True if element was admitted. """ self._count += 1 # O(1) early rejection - most common path if key <= self._threshold: self._rejected += 1 return False if self._size < self._k: # Still filling - use heap push self._heap[self._size] = (key, value) self._size += 1 self._bubble_up(self._size - 1) # Update threshold when full if self._size == self._k: self._threshold = self._heap[0][0] else: # Replace minimum self._heap[0] = (key, value) self._bubble_down(0) self._threshold = self._heap[0][0] return True def _bubble_up(self, idx: int) -> None: heap = self._heap while idx > 0: parent = (idx - 1) >> 1 if heap[parent][0] <= heap[idx][0]: break heap[parent], heap[idx] = heap[idx], heap[parent] idx = parent def _bubble_down(self, idx: int) -> None: heap = self._heap size = self._size while True: smallest = idx left = (idx << 1) + 1 right = left + 1 if left < size and heap[left][0] < heap[smallest][0]: smallest = left if right < size and heap[right][0] < heap[smallest][0]: smallest = right if smallest == idx: break heap[idx], heap[smallest] = heap[smallest], heap[idx] idx = smallest def get_top_k(self) -> List[Tuple[float, any]]: """Return Top-K sorted descending.""" result = sorted(self._heap[:self._size], reverse=True) return result def get_stats(self) -> dict: return { "processed": self._count, "rejected": self._rejected, "rejection_rate": self._rejected / max(self._count, 1), "threshold": self._threshold, } def benchmark_comparison(): """Compare standard vs optimized implementation.""" import heapq # Generate test data - exponential distribution # Most values are small, few are large n = 1_000_000 k = 100 data = [random.expovariate(1.0) for _ in range(n)] # Standard implementation start = perf_counter() standard_heap = [] for value in data: if len(standard_heap) < k: heapq.heappush(standard_heap, value) elif value > standard_heap[0]: heapq.heapreplace(standard_heap, value) standard_time = perf_counter() - start # Optimized implementation start = perf_counter() optimized = HighThroughputTopK(k) for value in data: optimized.process(value) optimized_time = perf_counter() - start print(f"Processing {n:,} elements, K={k}") print(f"\nStandard heap: {standard_time:.3f}s " f"({n/standard_time:,.0f} elem/s)") print(f"Optimized: {optimized_time:.3f}s " f"({n/optimized_time:,.0f} elem/s)") print(f"Speedup: {standard_time/optimized_time:.1f}x") stats = optimized.get_stats() print(f"\nRejection rate: {stats['rejection_rate']*100:.1f}%") print(f"Final threshold: {stats['threshold']:.3f}") # Verify correctness standard_result = sorted(standard_heap, reverse=True) optimized_result = [k for k, v in optimized.get_top_k()] assert standard_result == optimized_result, "Results don't match!" print("\n✓ Results verified identical") benchmark_comparison()Real-world streaming systems are distributed across multiple nodes. Finding global Top-K from distributed local Top-K sets requires careful coordination.
The Challenge:
Node 1: Local Top-5 = [95, 90, 85, 80, 75]
Node 2: Local Top-5 = [92, 88, 84, 82, 78]
Node 3: Local Top-5 = [97, 91, 86, 81, 76]
Global Top-5 = [97, 95, 92, 91, 90] ← Merge correctly
Key Insight: To find global Top-K, each node must keep at least K elements locally. The global Top-K is guaranteed to be in the union of local Top-K sets.
Merge Algorithm:
def merge_distributed_top_k(local_results: List[List[T]], k: int) -> List[T]:
# Use a single heap to merge all local results
heap = []
for local in local_results:
for element in local:
if len(heap) < k:
heapq.heappush(heap, element)
elif element > heap[0]:
heapq.heapreplace(heap, element)
return sorted(heap, reverse=True)
If a node keeps only Top-K locally, and the data is skewed, we might miss global Top-K elements. To guarantee correctness, local K should be at least global K. For better accuracy with skewed data, use local K = 2×global K or higher.
Hierarchical Aggregation:
For very large distributed systems, use a tree structure:
Level 0: 1000 worker nodes (local Top-K)
↓ (merge groups of 10)
Level 1: 100 aggregators (merged Top-K)
↓ (merge groups of 10)
Level 2: 10 super-aggregators
↓
Level 3: 1 final result
Each level reduces data by factor of K → logarithmic reduction
Incremental Updates:
For continuous streaming with periodic snapshots:
class DistributedTopKCoordinator:
def __init__(self, k: int, num_nodes: int):
self.k = k
self.local_results = [[] for _ in range(num_nodes)]
self.global_top_k = []
def update_from_node(self, node_id: int, local_top_k: List):
"""Receive updated Top-K from a single node."""
self.local_results[node_id] = local_top_k
self._recompute_global()
def _recompute_global(self):
"""Merge all local results into global Top-K."""
all_elements = []
for local in self.local_results:
all_elements.extend(local)
# heapq.nlargest is optimized for this
self.global_top_k = heapq.nlargest(self.k, all_elements)
We've explored how heaps enable real-time Top-K tracking over unbounded data streams. Let's consolidate:
What's Next:
The final page of this module explores real-world applications of heap-based Top-K—from task scheduling to median maintenance, showing how these patterns power production systems at scale.
You now understand how to apply heaps to infinite data streams with bounded memory. These techniques are used in every large-scale analytics system—from social media trending to real-time fraud detection.