Loading learning content...
Throughout this module, we've repeatedly used a powerful technique: sorting events by time and processing them in order. This is the sweep line (or scan line) paradigm—one of the most elegant and widely applicable algorithmic techniques in computer science.
The sweep line transforms multi-dimensional problems into simpler one-dimensional processing. Imagine a vertical line sweeping across a plane from left to right. As it moves, it encounters "events" (interval starts, ends, points, line segment intersections). At each event, we update some state structure. This event-driven processing often turns O(n²) brute force into O(n log n) elegance.
In this page, we'll formalize the sweep line paradigm, see it as the unifying framework behind interval problems, extend it to more complex scenarios, and explore advanced applications beyond simple counting.
By the end of this page, you will understand the sweep line paradigm formally, recognize when sweep line applies, implement sweep algorithms with various state structures, handle event types beyond simple starts/ends, apply sweep to geometric problems, and connect sweep line to broader algorithm design patterns.
The sweep line paradigm has a consistent structure across applications:
1. Event Generation
Convert input objects into events with positions (typically x-coordinates or times). Each event has:
2. Event Sorting
Sort events by position. Tie-breaking rules handle simultaneous events:
3. State Structure
Maintain a data structure representing "what's currently active" or "what's currently known" at the sweep position. This might be:
4. Event Processing
Sweep through events in sorted order. At each event:
5. Result Extraction
After processing all events (or during), extract the answer from accumulated state or queries.
| Problem | Events | State Structure | Query/Update |
|---|---|---|---|
| Meeting Rooms II | Start (+1), End (-1) | Counter | Track max counter value |
| Interval Stabbing | Interval as [start, end] | Current interval end | Place point when gap detected |
| Range Coverage | Interval bounds | Current coverage reach | Extend reach greedily |
| Rectangle Union Area | Left/right edges | Multiset of active y-ranges | Compute total y-coverage |
| Line Segment Intersection | Segment endpoints | Balanced BST of active segments | Check neighbors for intersection |
The key insight of sweep line is that ordering by one dimension converts a 2D problem into a 1D process. At each position, we only need to consider 'active' elements—those overlapping the current sweep position. This reduction is what makes sweep algorithms efficient.
Event design is crucial for correctness. Different problems require different event types and orderings.
Common Event Types:
Priority Ordering (Tie-Breaking):
When multiple events share the same position, the processing order matters:
Example 1: Counting Overlap at a Point
To correctly count intervals containing a query point p when intervals [a, b] have b = p and new intervals [p, c] start:
[a, p] as containing p, process starts before ends[a, p) excludes p, process ends before startsExample 2: Platform Sharing
If train A departs at 10:00 and train B arrives at 10:00, they can share a platform. Process departures before arrivals at the same time.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
from enum import Enumfrom dataclasses import dataclassfrom typing import Anyimport heapq class EventType(Enum): """Event types with natural priority ordering.""" # Lower value = higher priority (processed first at same position) END = 0 # Process ends first (interval closes) POINT = 1 # Then query points START = 2 # Then starts (interval opens) @dataclass(order=True)class Event: """ An event for sweep line processing. Ordering: by position first, then by type priority. Using dataclass(order=True) auto-generates comparison methods. """ position: float event_type: EventType data: Any = None # Excluded from ordering def __post_init__(self): # For proper ordering, store type's value self._type_priority = self.event_type.value def create_interval_events(intervals: list[tuple[int, int]]) -> list[Event]: """ Convert intervals to events. Each interval [start, end] becomes two events: - START at position 'start' - END at position 'end' """ events = [] for i, (start, end) in enumerate(intervals): events.append(Event(start, EventType.START, data={'interval_id': i})) events.append(Event(end, EventType.END, data={'interval_id': i})) return sorted(events, key=lambda e: (e.position, e.event_type.value)) def create_point_query_events( intervals: list[tuple[int, int]], query_points: list[int]) -> list[Event]: """ Create events for intervals and query points. Allows answering: 'How many intervals contain point p?' for multiple p. """ events = [] for i, (start, end) in enumerate(intervals): events.append(Event(start, EventType.START, data={'interval_id': i})) events.append(Event(end, EventType.END, data={'interval_id': i})) for j, point in enumerate(query_points): events.append(Event(point, EventType.POINT, data={'query_id': j})) # Sort by position, then by type priority return sorted(events, key=lambda e: (e.position, e.event_type.value)) # Example: Count intervals containing each query pointdef intervals_containing_points( intervals: list[tuple[int, int]], query_points: list[int]) -> list[int]: """ For each query point, count how many intervals contain it. Uses sweep line with event priority: - END events first (close intervals before checking) - POINT events next (query current count) - START events last (open intervals after checking) Wait, actually for "contains" we usually want START before POINT before END... This depends on open vs closed intervals. Let's assume [start, end). For [start, end) (closed-start, open-end): - START before POINT (at same position, point is inside) - POINT before END (at same position, point is NOT inside [a, p)) Adjusting EventType priorities accordingly. """ # Redefine for this variant events = [] PRIORITY_START = 0 PRIORITY_POINT = 1 PRIORITY_END = 2 for i, (start, end) in enumerate(intervals): events.append((start, PRIORITY_START, 'START', i)) events.append((end, PRIORITY_END, 'END', i)) for j, point in enumerate(query_points): events.append((point, PRIORITY_POINT, 'POINT', j)) events.sort() # Sort by (position, priority) active_count = 0 results = [0] * len(query_points) for pos, priority, event_type, idx in events: if event_type == 'START': active_count += 1 elif event_type == 'END': active_count -= 1 elif event_type == 'POINT': results[idx] = active_count return results # Example usageintervals = [(1, 5), (2, 6), (4, 8)]points = [0, 3, 5, 7] result = intervals_containing_points(intervals, points)print(f"Points: {points}")print(f"Counts: {result}") # [0, 2, 1, 1] for [1,5), [2,6), [4,8)There's no universal 'correct' priority order. It depends on: (1) Are intervals open or closed? (2) What exactly are you computing? (3) What's the natural interpretation? Always reason through a small example with ties to verify your ordering.
The power of sweep line comes from maintaining efficient state as we process events. Different problems require different state structures:
1. Counter / Variable
2. Set / Multiset
3. Ordered Set / Balanced BST
4. Segment Tree / BIT
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
from collections import defaultdictfrom sortedcontainers import SortedList # pip install sortedcontainers # ==================================================# Example 1: Simple Counter State# ================================================== def max_overlap_counter(intervals: list[tuple[int, int]]) -> int: """ Maximum overlap using counter state. State: single integer 'count' """ events = [] for start, end in intervals: events.append((start, 1)) # +1 at start events.append((end, -1)) # -1 at end events.sort(key=lambda x: (x[0], x[1])) # End before start at ties count = 0 max_count = 0 for _, delta in events: count += delta max_count = max(max_count, count) return max_count # ==================================================# Example 2: Set State - Track Active Interval IDs# ================================================== def intervals_at_each_point( intervals: list[tuple[int, int]], points: list[int]) -> dict[int, list[int]]: """ For each query point, return LIST of interval indices containing it. State: set of active interval IDs """ events = [] for i, (start, end) in enumerate(intervals): events.append((start, 0, 'START', i)) # 0 = process first events.append((end, 2, 'END', i)) # 2 = process last for j, p in enumerate(points): events.append((p, 1, 'QUERY', j)) # 1 = process middle events.sort() active = set() # State: set of active interval IDs results = {} for pos, _, event_type, idx in events: if event_type == 'START': active.add(idx) elif event_type == 'END': active.discard(idx) elif event_type == 'QUERY': results[points[idx]] = list(active) return results # ==================================================# Example 3: Ordered State - Skyline Problem# ================================================== def get_skyline(buildings: list[list[int]]) -> list[list[int]]: """ The Skyline Problem: given buildings as [left, right, height], return the skyline silhouette as list of [x, height] key points. State: multiset of active building heights (ordered) At each x, the skyline height = max of active heights (or 0 if none) Uses SortedList for O(log n) insertion/removal of heights. """ # Events: (x, type, height) # type: 0 = building starts (entering from left) # 1 = building ends (exiting to right) events = [] for left, right, height in buildings: events.append((left, 0, height)) # Start: add height events.append((right, 1, height)) # End: remove height # Sort: by x, then starts before ends, then taller starts first events.sort(key=lambda e: (e[0], e[1], -e[2] if e[1] == 0 else e[2])) # State: sorted list of active heights # We use a sorted list to easily get max active_heights = SortedList([0]) # 0 as sentinel for ground level result = [] prev_max_height = 0 for x, event_type, height in events: if event_type == 0: # Building starts active_heights.add(height) else: # Building ends active_heights.remove(height) # Current max height current_max = active_heights[-1] # If max height changed, record key point if current_max != prev_max_height: result.append([x, current_max]) prev_max_height = current_max return result # ==================================================# Example 4: Counter per Y-coordinate (for area computation)# ================================================== def rectangles_area_union(rectangles: list[list[int]]) -> int: """ Compute total area of union of axis-aligned rectangles. Sweep along x-axis, maintaining active y-ranges. At each x-event, compute change in covered area. Simplified version using coordinate compression. """ # Collect all y-coordinates for compression y_coords = set() events = [] for x1, y1, x2, y2 in rectangles: y_coords.add(y1) y_coords.add(y2) events.append((x1, 0, y1, y2)) # LEFT edge (add range) events.append((x2, 1, y1, y2)) # RIGHT edge (remove range) # Coordinate compression y_list = sorted(y_coords) y_to_idx = {y: i for i, y in enumerate(y_list)} # Count array: count[i] = number of active rectangles covering [y_list[i], y_list[i+1]) count = [0] * len(y_list) events.sort() total_area = 0 prev_x = events[0][0] if events else 0 for x, event_type, y1, y2 in events: # Compute covered y-length covered_y = 0 for i in range(len(y_list) - 1): if count[i] > 0: covered_y += y_list[i + 1] - y_list[i] # Add area since last x total_area += covered_y * (x - prev_x) prev_x = x # Update counts idx1, idx2 = y_to_idx[y1], y_to_idx[y2] delta = 1 if event_type == 0 else -1 for i in range(idx1, idx2): count[i] += delta return total_area # Example usageprint("Max overlap:", max_overlap_counter([(1, 5), (2, 6), (4, 8)])) intervals = [(0, 10), (3, 7), (5, 15)]points = [1, 5, 8, 12]print("Intervals at points:", intervals_at_each_point(intervals, points)) buildings = [[2, 9, 10], [3, 7, 15], [5, 12, 12], [15, 20, 10], [19, 24, 8]]print("Skyline:", get_skyline(buildings))Counter: when you only need aggregate (count, sum, max). Set: when you need to know which intervals are active. Ordered structure (SortedList, BST): when you need the max/min of active values, or predecessor/successor queries. Segment tree: when you need efficient range queries on the state.
The Skyline Problem is a celebrated application of sweep line that appears in coding interviews and demonstrates the paradigm's power.
Problem Statement:
Given a list of buildings where each building is represented as [left, right, height]:
left = x-coordinate of left edgeright = x-coordinate of right edgeheight = building heightReturn the skyline formed by these buildings as a list of "key points" [x, height] where the height changes.
The Sweep Line Approach:
Events: Each building creates two events:
(left, START, height) — building enters at x = left(right, END, height) — building exits at x = rightState: A multiset (sorted list) of active building heights
Processing: At each x-position, after updating the state, check if the maximum height changed. If so, record a key point.
The Critical Insight:
The skyline height at any x equals the maximum height among all buildings covering that x. By maintaining active heights in a max-queryable structure, we efficiently track this as the sweep progresses.
Buildings: [[2, 9, 10], [3, 7, 15], [5, 12, 12], [15, 20, 10], [19, 24, 8]]Skyline: [[2, 10], [3, 15], [7, 12], [12, 0], [15, 10], [20, 8], [24, 0]]At x=2, we rise to height 10. At x=3, building of height 15 starts (higher). At x=7, that building ends, dropping to 12. At x=12, last tall building ends, dropping to 0. And so on.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
import heapqfrom collections import defaultdict def get_skyline(buildings: list[list[int]]) -> list[list[int]]: """ Compute the skyline from a list of buildings. Uses a max-heap to track active building heights. Lazy deletion handles building exits. Time: O(n log n) Space: O(n) """ if not buildings: return [] # Events: (x, type, height) # type: 0 = start (entering), 1 = end (leaving) events = [] for left, right, height in buildings: events.append((left, 0, height)) # Building starts events.append((right, 1, height)) # Building ends # Sort: by x, then starts before ends, then taller starts first # For ends: process shorter heights first (doesn't matter if using lazy deletion) events.sort(key=lambda e: (e[0], e[1], -e[2])) # Max-heap of active heights (use negative for max-heap with heapq) # We'll use lazy deletion: track counts of each height heap = [0] # Start with ground level height_count = defaultdict(int) # height -> active count height_count[0] = 1 # Ground always active result = [] prev_max = 0 i = 0 while i < len(events): curr_x = events[i][0] # Process all events at the same x while i < len(events) and events[i][0] == curr_x: x, event_type, height = events[i] if event_type == 0: # Start heapq.heappush(heap, -height) height_count[height] += 1 else: # End height_count[height] -= 1 i += 1 # Lazy deletion: pop heights that are no longer active while heap and height_count[-heap[0]] == 0: heapq.heappop(heap) # Current max height curr_max = -heap[0] if heap else 0 # If max changed, record key point if curr_max != prev_max: result.append([curr_x, curr_max]) prev_max = curr_max return result # Cleaner version using SortedList (from sortedcontainers)def get_skyline_sortedlist(buildings: list[list[int]]) -> list[list[int]]: """ Skyline using SortedList for O(log n) max queries. Cleaner but requires external library. """ from sortedcontainers import SortedList events = [] for left, right, height in buildings: events.append((left, -height, 0)) # Start: negative height for sorting events.append((right, height, 1)) # End: positive height # Sort by x, then by value (starts before ends at same x, taller starts first) events.sort() active = SortedList([0]) # Active heights result = [] prev_max = 0 for x, h, event_type in events: if event_type == 0: # Start active.add(-h) else: # End active.remove(h) curr_max = active[-1] # Max is last element in sorted list if curr_max != prev_max: result.append([x, curr_max]) prev_max = curr_max return result # Testbuildings = [[2, 9, 10], [3, 7, 15], [5, 12, 12], [15, 20, 10], [19, 24, 8]]print("Skyline:", get_skyline(buildings))Sweep line extends far beyond 1D intervals into 2D geometry and beyond.
Line Segment Intersection (Bentley-Ottmann)
Given a set of line segments, find all intersection points.
Naive O(n²): Check every pair.
Sweep O((n + k) log n): Where k = number of intersections
Closest Pair of Points
Find the two closest points in a 2D plane.
Sweep approach:
d of current x (in a y-ordered structure)Voronoi Diagrams (Fortune's Algorithm)
Construct Voronoi diagram using a sweep line, maintaining a "beach line" of parabolic arcs.
Any problem where ordering by one dimension lets you process the remaining dimensions locally is a candidate for sweep line. The key question: 'Can I process objects incrementally as I move through space/time, updating a manageable state structure?'
Let's codify the patterns that make sweep line implementations robust:
Pattern 1: Event Class with Natural Ordering
@dataclass(order=True)
class Event:
position: float
priority: int # Tie-breaking
data: Any = field(compare=False) # Not for ordering
Pattern 2: Batch Processing at Same Position
When multiple events share a position, process them all before updating the result:
i = 0
while i < len(events):
curr_pos = events[i].position
while i < len(events) and events[i].position == curr_pos:
process(events[i])
i += 1
record_result_at(curr_pos)
Pattern 3: Lazy Deletion with Heaps
When using heaps that don't support arbitrary removal:
while heap and is_deleted(heap_top()):
heapq.heappop(heap)
current_value = heap_top()
Pattern 4: Coordinate Compression
When coordinates are sparse in a large range:
coords = sorted(set(all_coordinates))
coord_to_index = {c: i for i, c in enumerate(coords)}
# Now use indices instead of raw coordinates
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
from dataclasses import dataclass, fieldfrom typing import Any, Callablefrom enum import IntEnumimport heapq # =========================================# Pattern: Generic Sweep Line Framework# ========================================= class EventPriority(IntEnum): """Standard event priorities.""" END = 0 QUERY = 1 START = 2 @dataclass(order=True)class SweepEvent: """Generic sweep event with natural ordering.""" position: float priority: int # Lower = processed first at same position data: Any = field(compare=False) def sweep_line_generic( events: list[SweepEvent], on_start: Callable[[Any], None], on_end: Callable[[Any], None], on_query: Callable[[Any], Any],) -> list: """ Generic sweep line processor. Separates event generation from processing logic. """ events.sort() results = [] for event in events: if event.priority == EventPriority.START: on_start(event.data) elif event.priority == EventPriority.END: on_end(event.data) elif event.priority == EventPriority.QUERY: result = on_query(event.data) results.append(result) return results # =========================================# Example: Using the Framework# ========================================= def count_intervals_at_points(intervals, query_points): """ Use generic framework to count intervals at query points. """ events = [] for i, (start, end) in enumerate(intervals): events.append(SweepEvent(start, EventPriority.START, {'id': i})) events.append(SweepEvent(end, EventPriority.END, {'id': i})) for j, p in enumerate(query_points): events.append(SweepEvent(p, EventPriority.QUERY, {'point': p, 'idx': j})) # State count = 0 results = [0] * len(query_points) def on_start(data): nonlocal count count += 1 def on_end(data): nonlocal count count -= 1 def on_query(data): results[data['idx']] = count return (data['point'], count) sweep_line_generic(events, on_start, on_end, on_query) return results # =========================================# Pattern: Heap with Lazy Deletion# ========================================= class LazyMaxHeap: """ Max-heap supporting lazy deletion. Items can be 'removed' without actually popping; they're skipped when accessed. """ def __init__(self): self.heap = [] # Min-heap of negative values self.removed = {} # value -> removal count def push(self, value): heapq.heappush(self.heap, -value) def remove(self, value): """Mark value for removal (lazy).""" self.removed[value] = self.removed.get(value, 0) + 1 def _clean_top(self): """Remove lazily deleted items from top.""" while self.heap: top = -self.heap[0] if self.removed.get(top, 0) > 0: heapq.heappop(self.heap) self.removed[top] -= 1 else: break def max(self): """Return current maximum.""" self._clean_top() return -self.heap[0] if self.heap else 0 def pop(self): """Remove and return maximum.""" self._clean_top() return -heapq.heappop(self.heap) if self.heap else 0 # Example: Skyline with lazy heapdef skyline_with_lazy_heap(buildings): """Skyline using LazyMaxHeap.""" events = [] for left, right, height in buildings: events.append((left, 0, height)) events.append((right, 1, height)) events.sort(key=lambda e: (e[0], e[1], -e[2] if e[1] == 0 else e[2])) heap = LazyMaxHeap() heap.push(0) # Ground level result = [] prev_max = 0 i = 0 while i < len(events): curr_x = events[i][0] while i < len(events) and events[i][0] == curr_x: x, event_type, height = events[i] if event_type == 0: heap.push(height) else: heap.remove(height) i += 1 curr_max = heap.max() if curr_max != prev_max: result.append([curr_x, curr_max]) prev_max = curr_max return result # Testprint(count_intervals_at_points([(1, 5), (2, 6), (4, 8)], [0, 3, 5, 7]))print(skyline_with_lazy_heap([[2, 9, 10], [3, 7, 15], [5, 12, 12]]))Sweep line bugs are often subtle. Here are the most common issues:
Mistake 1: Wrong Event Priority
Your algorithm gives wrong results when events coincide. Debug by:
Mistake 2: Off-By-One in State Update
Updating state after recording instead of before, or vice versa. The general rule:
Mistake 3: Forgetting Ground Level / Base Case
In skyline, forgetting to initialize with height 0 (ground level). The heap should never be truly empty.
Mistake 4: Floating Point Comparison
When positions are floats, equality checks need tolerance:
if abs(event.position - current_position) < 1e-9:
Mistake 5: Not Handling Empty State
What's the max height when no buildings are active? The minimum overlap when no intervals? Always define the "nothing active" case.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
def debug_sweep(events, description="Sweep"): """ Print events in processing order for debugging. """ print(f"\n=== {description} ===") print("Sorted events:") for e in sorted(events, key=lambda x: (x[0], x[1])): print(f" Position {e[0]:6.2f}, Priority {e[1]}, Data: {e[2:]}") print() def verify_with_brute_force(intervals, query_points, sweep_result): """ Verify sweep result against naive O(n*m) computation. """ brute_result = [] for p in query_points: count = sum(1 for s, e in intervals if s <= p < e) brute_result.append(count) if sweep_result == brute_result: print("✓ Sweep result matches brute force") else: print("✗ Mismatch!") print(f" Sweep: {sweep_result}") print(f" Brute: {brute_result}") for i, p in enumerate(query_points): if sweep_result[i] != brute_result[i]: print(f" Point {p}: Sweep={sweep_result[i]}, Brute={brute_result[i]}") # Example usageintervals = [(1, 5), (2, 6), (4, 8)]points = [0, 1, 2, 3, 4, 5, 6, 7, 8] # Create events for debuggingevents = []for i, (s, e) in enumerate(intervals): events.append((s, 0, 'START', i)) events.append((e, 2, 'END', i))for j, p in enumerate(points): events.append((p, 1, 'QUERY', j)) debug_sweep(events, "Interval Count Sweep") # The actual sweep (simplified)events_sorted = sorted(events)active = 0results = [0] * len(points) for pos, priority, event_type, idx in events_sorted: if event_type == 'START': active += 1 elif event_type == 'END': active -= 1 elif event_type == 'QUERY': results[idx] = active print(f"Sweep results: {results}")verify_with_brute_force(intervals, points, results)The sweep line paradigm is a cornerstone of algorithmic thinking—a pattern that transforms complex problems into elegant sequential processing.
Module Complete:
You've now mastered the Interval Scheduling Patterns module:
These patterns form a cohesive toolkit for handling any problem involving time ranges, overlapping intervals, or resource allocation. Combined with the greedy choice property, you can now approach interval problems with confidence and precision.
Congratulations! You've completed the Interval Scheduling Patterns module. You understand covering problems, meeting room resource allocation, minimum platforms, and the powerful sweep line paradigm that unifies them. These skills apply across scheduling, geometry, and system design. You're now equipped to recognize and solve interval-based greedy problems with the rigor of a seasoned engineer.