Loading learning content...
Every computing system faces a fundamental challenge: more work exists than can be done simultaneously. Operating systems must decide which process gets CPU time next. Job schedulers must sequence thousands of batch processes. Network routers must choose which packet to forward. Game engines must prioritize which physics calculations to perform in the available frame time.
The common thread: tasks have priorities, and resources are limited.
Naive approaches—first-in-first-out queues or random selection—fail to capture urgency. A critical system alert shouldn't wait behind routine log processing. A time-sensitive user request shouldn't queue behind background maintenance.
Priority queues, implemented through heaps, provide the foundation for intelligent task scheduling. They enable O(log n) insertion and extraction while always providing access to the highest-priority task. This page explores how this fundamental capability translates into real scheduling systems.
By the end of this page, you will understand how priority queues solve scheduling problems, explore classic algorithms like task scheduling with cooldowns, master patterns for deadline-based scheduling, and see how these concepts apply to operating systems, databases, and distributed systems.
At its core, a scheduler answers a simple question: which task should execute next? Priority queues provide an elegant answer: always run the highest-priority ready task.
The Scheduling Model:
| Component | Data Structure | Operation | Complexity |
|---|---|---|---|
| Task storage | Max-heap (by priority) | — | — |
| Add new task | Heap insert | Insert with priority | O(log n) |
| Get next task | Heap peek | View max priority | O(1) |
| Execute task | Heap extract | Remove and return max | O(log n) |
| Update priority | Decrease/Increase key | Modify existing task | O(log n)* |
Note: Standard binary heaps don't efficiently support key update. For schedulers requiring priority updates, consider indexed heaps, Fibonacci heaps, or tracking indices with a hash map.
The Scheduler Loop:
while tasks exist:
if current_task is complete:
mark_complete(current_task)
if ready_queue is not empty:
current_task = ready_queue.extract_max()
execute(current_task)
else:
wait_for_arrival()
This simple loop, powered by a heap-based priority queue, forms the foundation of countless scheduling systems.
A key challenge in priority scheduling is priority inversion—when a high-priority task waits for a low-priority task holding a resource. Solutions include priority inheritance (temporarily boosting the low-priority task) and priority ceiling protocols. Heaps enable these schemes efficiently.
One of the most instructive scheduling problems involves executing tasks with a cooldown constraint: the same task type cannot run twice within n time units.
Problem Statement (LeetCode 621):
Given an array of tasks (characters) and a cooldown period n, find the minimum time to complete all tasks. Tasks of the same type must be at least n units apart.
Example:
Tasks: [A, A, A, B, B, B]
Cooldown: n = 2
One optimal schedule: A → B → idle → A → B → idle → A → B
Time: 8 units
Key Insight: Greedy with Max-Heap
The optimal strategy is greedy: always execute the task with the most remaining instances (among those not on cooldown). This minimizes idle time by ensuring the most constrained task progresses as fast as possible.
Why Greedy Works:
If we delay the most frequent task in favor of less frequent ones, we're more likely to create situations where the most frequent task is the only one left but still on cooldown—forcing idle time. By prioritizing high-frequency tasks, we interleave them optimally.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
import heapqfrom collections import Counter, dequefrom typing import List def task_scheduler(tasks: List[str], n: int) -> int: """ LeetCode 621: Task Scheduler Find minimum intervals needed to execute all tasks with cooldown. Strategy: 1. Count frequency of each task type 2. Use max-heap to always pick the task with most remaining instances 3. Track cooling tasks in a queue with their available time 4. Advance time, moving tasks from cooldown queue back to heap Time: O(total_time * log(26)) = O(total_time) since max 26 task types Space: O(26) = O(1) for task counts """ if n == 0: return len(tasks) # No cooldown, just execute all # Count frequencies freq = Counter(tasks) # Max-heap of frequencies (negated for Python) max_heap = [-count for count in freq.values()] heapq.heapify(max_heap) # Queue of (available_time, remaining_count) for cooling tasks cooldown_queue = deque() time = 0 while max_heap or cooldown_queue: time += 1 # Move tasks from cooldown to heap if their cooldown is over while cooldown_queue and cooldown_queue[0][0] <= time: available_time, remaining = cooldown_queue.popleft() heapq.heappush(max_heap, -remaining) if max_heap: # Execute highest-frequency task count = -heapq.heappop(max_heap) count -= 1 if count > 0: # Task has more instances; add to cooldown queue cooldown_queue.append((time + n + 1, count)) # else: idle (no task available) return time def task_scheduler_math(tasks: List[str], n: int) -> int: """ Mathematical solution (O(n) time, O(1) space). Key insight: The minimum time is determined by the most frequent task. If max_freq is the highest frequency: - We need (max_freq - 1) gaps, each of size (n + 1) - Plus the final row of most frequent tasks Formula: (max_freq - 1) * (n + 1) + num_max_freq_tasks But this can be less than len(tasks) if there are many different tasks (no idle needed). So take max(formula, len(tasks)). """ freq = Counter(tasks) max_freq = max(freq.values()) # Count tasks with maximum frequency num_max_freq = sum(1 for f in freq.values() if f == max_freq) # Calculate minimum based on most frequent tasks min_time = (max_freq - 1) * (n + 1) + num_max_freq # At minimum, we need len(tasks) intervals return max(min_time, len(tasks)) # Examplestasks = ['A', 'A', 'A', 'B', 'B', 'B']n = 2print(f"Heap solution: {task_scheduler(tasks, n)}") # 8print(f"Math solution: {task_scheduler_math(tasks, n)}") # 8 tasks2 = ['A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'D', 'D', 'E']n2 = 2print(f"Many tasks: {task_scheduler(tasks2, n2)}") # 12 (no idle needed)The heap simulation explicitly models the scheduling process—useful when you need the actual schedule. The mathematical formula computes only the total time—simpler and O(n). Both are worth knowing: simulation generalizes to harder variants, formula is efficient for this specific problem.
A harder scheduling problem involves tasks with specific arrival times and processing requirements, simulating a real CPU scheduler.
Problem Statement (LeetCode 1834):
Given tasks with [enqueue_time, processing_time], process them on a single CPU. At any time, pick the available task with shortest processing time (break ties by earliest index). Return the order of execution.
Key Insight:
This requires coordinating two data structures:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
import heapqfrom typing import List def getOrder(tasks: List[List[int]]) -> List[int]: """ LeetCode 1834: Single-Threaded CPU Process tasks by shortest-job-first among available tasks. Strategy: 1. Sort tasks by enqueue time (keep original indices) 2. Simulate time: add newly available tasks to min-heap 3. Process shortest job from heap 4. If heap empty, jump to next task's arrival Time: O(n log n) for sorting and heap operations Space: O(n) for heap """ # Attach indices and sort by enqueue time indexed_tasks = [(enqueue, process, idx) for idx, (enqueue, process) in enumerate(tasks)] indexed_tasks.sort() # Sort by enqueue time (first element) result = [] min_heap = [] # (processing_time, index) current_time = 0 task_idx = 0 n = len(tasks) while task_idx < n or min_heap: # If heap is empty, jump to next task's arrival if not min_heap and indexed_tasks[task_idx][0] > current_time: current_time = indexed_tasks[task_idx][0] # Add all tasks that have arrived by current_time while task_idx < n and indexed_tasks[task_idx][0] <= current_time: enqueue, process, idx = indexed_tasks[task_idx] heapq.heappush(min_heap, (process, idx)) task_idx += 1 # Process shortest job process_time, task_original_idx = heapq.heappop(min_heap) result.append(task_original_idx) current_time += process_time return result # Exampletasks = [[1, 2], [2, 4], [3, 2], [4, 1]]print(getOrder(tasks)) # [0, 2, 3, 1] # Timeline:# t=0: CPU idle# t=1: Task 0 arrives (process=2). Start task 0. Current: [0 running]# t=2: Task 1 arrives (process=4). Task 0 still running.# t=3: Task 2 arrives (process=2). Task 0 completes. Available: [1, 2]# Pick task 2 (shorter process time). Start task 2.# t=4: Task 3 arrives (process=1). Task 2 still running.# t=5: Task 2 completes. Available: [1, 3]. Pick task 3 (shortest).# t=6: Task 3 completes. Available: [1]. Start task 1.# t=10: Task 1 completes. Done.# Order: [0, 2, 3, 1]This problem models Shortest Job First scheduling, which minimizes average waiting time. In real OS schedulers, SJF is approximated since exact job duration is unknown—often using exponential averaging of past burst times.
Many real-world scheduling problems involve deadlines: tasks must complete by a certain time or incur a penalty (or become worthless). Priority queues enable optimal solutions for these problems.
Problem: Maximize Profit with Deadlines
Given n tasks, each with a deadline and profit, and each task takes 1 unit of time, maximize total profit. Each task can only be scheduled in a slot before its deadline.
Example:
Tasks: [(deadline=1, profit=40), (deadline=1, profit=15),
(deadline=2, profit=60), (deadline=2, profit=20)]
Optimal: Task 2 (d=2, p=60) at slot 1, Task 0 (d=1, p=40) at slot... wait:
Actually: Task 0 at slot 1, Task 2 at slot 2 → profit = 100
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
import heapqfrom typing import List, Tuple def max_profit_with_deadlines(tasks: List[Tuple[int, int]]) -> Tuple[int, List[int]]: """ Job Sequencing Problem Given tasks with (deadline, profit), maximize profit. Each task takes 1 unit time. Slot i can only be used for tasks with deadline >= i. Greedy approach: 1. Sort tasks by profit (descending) 2. For each task, find the latest available slot <= deadline 3. If slot found, schedule it; otherwise skip Uses Union-Find for efficient slot finding (O(α(n)) per query). Here we use simple array for clarity. Time: O(n² ) simple, O(n log n) with Union-Find Space: O(max_deadline) """ if not tasks: return 0, [] # Sort by profit descending indexed_tasks = [(profit, deadline, i) for i, (deadline, profit) in enumerate(tasks)] indexed_tasks.sort(reverse=True) max_deadline = max(d for d, p in tasks) slots = [None] * (max_deadline + 1) # slots[1] to slots[max_deadline] total_profit = 0 scheduled = [] for profit, deadline, task_id in indexed_tasks: # Find latest available slot <= deadline for slot in range(deadline, 0, -1): if slots[slot] is None: slots[slot] = task_id total_profit += profit scheduled.append(task_id) break return total_profit, scheduled def max_profit_heap_approach(tasks: List[Tuple[int, int]]) -> int: """ Alternative: Process deadlines in order using a min-heap. Strategy: 1. Sort tasks by deadline 2. Use min-heap to track selected tasks (by profit) 3. For each task, add to heap 4. If heap size exceeds current deadline, remove minimum This ensures we keep only the most profitable tasks that fit. Time: O(n log n) Space: O(n) """ # Sort by deadline sorted_tasks = sorted(tasks) # (deadline, profit) min_heap = [] # Track selected tasks by profit (min-heap to evict lowest) for deadline, profit in sorted_tasks: heapq.heappush(min_heap, profit) # If we've selected more tasks than slots allow, drop lowest profit if len(min_heap) > deadline: heapq.heappop(min_heap) return sum(min_heap) # Exampletasks = [(1, 40), (1, 15), (2, 60), (2, 20)] # (deadline, profit)profit, scheduled = max_profit_with_deadlines(tasks)print(f"Greedy: profit={profit}, tasks={scheduled}") profit2 = max_profit_heap_approach(tasks)print(f"Heap approach: profit={profit2}") # More complex exampletasks2 = [(4, 20), (1, 10), (1, 40), (1, 30), (2, 50)]print(f"Complex example: {max_profit_heap_approach(tasks2)}") # 160 (40+50+20+50)The heap approach is beautiful: by processing deadlines in order and maintaining a min-heap, we naturally keep the k most profitable tasks where k = current deadline. The heap's minimum is always ready to be evicted if a better task arrives.
A classic scheduling problem asks: given a list of meeting intervals, what's the minimum number of conference rooms needed to accommodate all meetings?
Problem Statement:
Given intervals [[start, end], ...], find the minimum number of rooms such that no two overlapping meetings share a room.
Heap Insight:
As we process meetings by start time, we need to know if any previous meeting has ended. A min-heap of end times efficiently tracks the earliest-ending ongoing meeting.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
import heapqfrom typing import List def min_meeting_rooms(intervals: List[List[int]]) -> int: """ LeetCode 253: Meeting Rooms II Find minimum conference rooms needed for all meetings. Algorithm: 1. Sort meetings by start time 2. Use min-heap to track end times of ongoing meetings 3. For each meeting: - If heap's minimum end time <= current start, reuse that room - Otherwise, need a new room 4. Heap size at any point = rooms in use Time: O(n log n) Space: O(n) """ if not intervals: return 0 # Sort by start time intervals.sort(key=lambda x: x[0]) # Min-heap of end times (rooms in use) end_times = [] for start, end in intervals: # If earliest ending meeting has ended, reuse its room if end_times and end_times[0] <= start: heapq.heappop(end_times) # Allocate room for current meeting heapq.heappush(end_times, end) # Heap size = rooms needed return len(end_times) def min_meeting_rooms_line_sweep(intervals: List[List[int]]) -> int: """ Alternative: Line sweep / chronological ordering. Create events for start (+1 room) and end (-1 room). Sort all events and track maximum concurrency. Time: O(n log n) Space: O(n) """ events = [] for start, end in intervals: events.append((start, 1)) # +1 at start events.append((end, -1)) # -1 at end # Sort: by time, then by type (-1 before +1 at same time) events.sort() rooms = 0 max_rooms = 0 for time, delta in events: rooms += delta max_rooms = max(max_rooms, rooms) return max_rooms # Examplemeetings = [[0, 30], [5, 10], [15, 20]]print(f"Min rooms: {min_meeting_rooms(meetings)}") # 2 # Timeline:# 0-30: Meeting 0# 5-10: Meeting 1 (overlaps with 0, needs 2nd room)# 15-20: Meeting 2 (overlaps with 0, 1 ended, reuse room 2)# Maximum concurrent: 2 meetings2 = [[7, 10], [2, 4]]print(f"Non-overlapping: {min_meeting_rooms(meetings2)}") # 1 meetings3 = [[1, 5], [2, 6], [3, 7], [4, 8]]print(f"All overlap: {min_meeting_rooms(meetings3)}") # 4An advanced scheduling problem involves resource-constrained selection: choosing projects when starting capital determines what's available.
Problem Statement (LeetCode 502):
Given k projects with required capital and profit, and starting with w capital, select at most k projects to maximize final capital. After completing a project, you gain its profit added to capital.
Example:
k=2, w=0
Projects: [(capital=0, profit=1), (capital=1, profit=2), (capital=1, profit=3)]
- Start: w=0. Can only do project 0 (capital=0). Do it. w=0+1=1
- Now: w=1. Can do projects 1 or 2. Pick project 2 (profit=3). w=1+3=4
- Result: w=4
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
import heapqfrom typing import List def find_maximized_capital(k: int, w: int, profits: List[int], capital: List[int]) -> int: """ LeetCode 502: IPO Maximize capital after completing at most k projects. Strategy (Two Heaps): 1. Min-heap of projects by required capital (not yet affordable) 2. Max-heap of affordable projects by profit For each of k rounds: - Move newly affordable projects from min-heap to max-heap - Pick most profitable affordable project - Add profit to capital Time: O(n log n) for heap operations Space: O(n) """ n = len(profits) # Projects sorted by capital requirement: (capital, profit, index) projects = sorted(zip(capital, profits)) max_heap = [] # Affordable projects (by profit, negated) project_idx = 0 # Next project to consider for _ in range(k): # Move all affordable projects to max-heap while project_idx < n and projects[project_idx][0] <= w: cap, profit = projects[project_idx] heapq.heappush(max_heap, -profit) project_idx += 1 # Pick most profitable affordable project if not max_heap: break # No affordable projects w += -heapq.heappop(max_heap) return w # Examplek, w = 2, 0profits = [1, 2, 3]capital = [0, 1, 1]print(f"Max capital: {find_maximized_capital(k, w, profits, capital)}") # 4 # Detailed trace:# Initial: w=0# Projects by capital: [(0, 1), (1, 2), (1, 3)]# # Round 1:# Move affordable (capital <= 0): project (0, 1)# Max-heap: [-1] (profit 1)# Pick best: profit=1, w=0+1=1## Round 2:# Move affordable (capital <= 1): projects (1, 2), (1, 3)# Max-heap: [-2, -3] (profits 2, 3)# Pick best: profit=3, w=1+3=4## Done: w=4This problem uses a two-heap pattern similar to median maintenance, but for a different purpose: one heap stages candidates (sorted by eligibility), another holds active choices (sorted by preference). This pattern appears in many resource-constrained optimization problems.
The heap-based scheduling patterns we've studied form the foundation of many production systems.
Production schedulers often combine priority with fairness, aging (preventing starvation), and resource awareness. A task's effective priority may change over time. These refinements build on the heap foundation but add complexity for production requirements.
A specific but extremely common scheduling application is timer management: tracking when events should fire.
The Timer Problem:
This is exactly a priority queue keyed by scheduled time!
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
import heapqimport timefrom dataclasses import dataclass, fieldfrom typing import Callable, Any, Optionalimport threading @dataclass(order=True)class TimerEvent: """A scheduled event.""" scheduled_time: float callback: Callable = field(compare=False) args: tuple = field(default=(), compare=False) cancelled: bool = field(default=False, compare=False) class TimerQueue: """ Efficient timer management using a min-heap. Used by: Event loops, game engines, network timeouts, scheduled tasks, animation systems. Operations: - schedule(): O(log n) - get_next(): O(1) - fire_next(): O(log n) amortized - cancel(): O(1) (lazy) """ def __init__(self): self._heap: list[TimerEvent] = [] self._lock = threading.Lock() def schedule(self, delay: float, callback: Callable, *args) -> TimerEvent: """Schedule callback after delay seconds.""" event = TimerEvent( scheduled_time=time.time() + delay, callback=callback, args=args ) with self._lock: heapq.heappush(self._heap, event) return event def schedule_at(self, timestamp: float, callback: Callable, *args) -> TimerEvent: """Schedule callback at specific timestamp.""" event = TimerEvent( scheduled_time=timestamp, callback=callback, args=args ) with self._lock: heapq.heappush(self._heap, event) return event def cancel(self, event: TimerEvent) -> None: """Cancel a scheduled event (lazy deletion).""" event.cancelled = True def time_until_next(self) -> Optional[float]: """Return seconds until next event, or None if empty.""" self._cleanup() with self._lock: if not self._heap: return None return max(0, self._heap[0].scheduled_time - time.time()) def fire_ready(self) -> int: """Fire all events whose time has come. Return count fired.""" now = time.time() fired = 0 while True: with self._lock: if not self._heap: break if self._heap[0].scheduled_time > now: break event = heapq.heappop(self._heap) if not event.cancelled: event.callback(*event.args) fired += 1 return fired def _cleanup(self) -> None: """Remove cancelled events from heap top.""" with self._lock: while self._heap and self._heap[0].cancelled: heapq.heappop(self._heap) # Example: Simulated event looptimer = TimerQueue() def log_message(msg): print(f"[{time.time():.2f}] {msg}") # Schedule eventstimer.schedule(0.1, log_message, "First (100ms)")event2 = timer.schedule(0.2, log_message, "Second (200ms)")timer.schedule(0.3, log_message, "Third (300ms)") # Cancel onetimer.cancel(event2) # Simple event loopprint(f"Starting at {time.time():.2f}")while timer.time_until_next() is not None: wait_time = timer.time_until_next() if wait_time > 0: time.sleep(min(wait_time, 0.05)) # Poll interval timer.fire_ready() print("All events processed")For very high-scale timer systems (millions of timers), hierarchical timing wheels offer O(1) insertion by trading precision. Linux kernel, Netty, and Kafka use timing wheels. The heap approach is simpler and sufficient for moderate scale.
When building scheduling systems with heaps, certain patterns and practices lead to more robust implementations.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
from typing import TypeVar, Generic, Dict, List, Optional T = TypeVar('T') class IndexedPriorityQueue(Generic[T]): """ Priority queue with O(log n) priority updates. Maintains a hash map from items to their heap indices, enabling efficient priority modifications. """ def __init__(self, key=lambda x: x): self._heap: List[T] = [] self._indices: Dict[T, int] = {} # item -> heap index self._key = key # Priority extraction function def __len__(self) -> int: return len(self._heap) def __bool__(self) -> bool: return bool(self._heap) def __contains__(self, item: T) -> bool: return item in self._indices def push(self, item: T) -> None: """Add item to queue. O(log n).""" if item in self._indices: raise ValueError("Item already in queue") idx = len(self._heap) self._heap.append(item) self._indices[item] = idx self._bubble_up(idx) def pop(self) -> T: """Remove and return highest priority item. O(log n).""" if not self._heap: raise IndexError("Queue is empty") result = self._heap[0] self._remove_at(0) return result def peek(self) -> Optional[T]: """Return highest priority item without removing. O(1).""" return self._heap[0] if self._heap else None def update_priority(self, item: T) -> None: """ Re-heapify after item's priority changed. O(log n). Call this after modifying the item's priority externally. """ if item not in self._indices: raise KeyError("Item not in queue") idx = self._indices[item] self._bubble_up(idx) self._bubble_down(idx) def remove(self, item: T) -> None: """Remove specific item. O(log n).""" if item not in self._indices: raise KeyError("Item not in queue") idx = self._indices[item] self._remove_at(idx) def _remove_at(self, idx: int) -> None: """Remove item at specific index.""" item = self._heap[idx] del self._indices[item] if idx == len(self._heap) - 1: self._heap.pop() else: last = self._heap.pop() self._heap[idx] = last self._indices[last] = idx self._bubble_up(idx) self._bubble_down(idx) def _bubble_up(self, idx: int) -> None: item = self._heap[idx] while idx > 0: parent = (idx - 1) // 2 if self._key(self._heap[parent]) <= self._key(item): break self._heap[idx] = self._heap[parent] self._indices[self._heap[parent]] = idx idx = parent self._heap[idx] = item self._indices[item] = idx def _bubble_down(self, idx: int) -> None: item = self._heap[idx] n = len(self._heap) while True: smallest = idx left = 2 * idx + 1 right = 2 * idx + 2 if left < n and self._key(self._heap[left]) < self._key(self._heap[smallest]): smallest = left if right < n and self._key(self._heap[right]) < self._key(self._heap[smallest]): smallest = right if smallest == idx: break self._heap[idx] = self._heap[smallest] self._indices[self._heap[smallest]] = idx idx = smallest self._heap[idx] = item self._indices[item] = idx # Example usagefrom dataclasses import dataclass @dataclassclass Task: name: str priority: int def __hash__(self): return hash(self.name) def __eq__(self, other): return isinstance(other, Task) and self.name == other.name pq = IndexedPriorityQueue(key=lambda t: t.priority) t1 = Task("Task A", 3)t2 = Task("Task B", 1)t3 = Task("Task C", 2) pq.push(t1)pq.push(t2)pq.push(t3) print(f"Top priority: {pq.peek()}") # Task B (priority 1) # Update priorityt1.priority = 0pq.update_priority(t1)print(f"After update: {pq.peek()}") # Task A (priority 0)We've explored how heaps enable efficient scheduling across diverse problem domains. Let's consolidate the key insights:
What's Next:
We've explored four major heap patterns: k-th element, k-way merge, median maintenance, and scheduling. In the final page of this module, we'll examine event-driven simulation—where heaps enable modeling of complex systems by efficiently managing a timeline of future events.
You now understand how heaps power scheduling systems—from the abstract priority queue concept through concrete problems like task cooldowns and meeting rooms, to real-world implementations in operating systems and databases. This knowledge is directly applicable to building production systems.