Loading content...
Understanding topological sorting algorithms is valuable, but the true power of this knowledge emerges when applied to real-world problems. Topological sorting isn't an academic curiosity—it's the engine behind systems you use every day, from compiling code to scheduling automated workflows.
In this final page of the module, we'll explore two canonical applications in depth: task scheduling and build systems. These domains perfectly illustrate how the abstract concept of 'ordering vertices respecting edge directions' translates into critically important software infrastructure.
By the end of this page, you will understand how topological sorting powers task schedulers and build systems, how to model these problems as DAGs, additional considerations like parallel execution and incremental updates, and how to implement practical solutions for scheduling and compilation ordering.
The Task Scheduling Problem:
Given a set of tasks with dependency constraints ('Task A must complete before Task B can start'), find a valid execution order that respects all constraints.
Modeling as a DAG:
Why Topological Sorting is Natural Here:
Task scheduling is literally the motivating example for topological sorting. The edge semantics ('must precede') map perfectly to graph edges, and any valid topological order is a valid execution sequence.
Common Scheduling Scenarios:
| Context | Tasks | Dependencies | Example |
|---|---|---|---|
| Project Management | Work items/stories | Prerequisite relationships | Design → Implement → Test |
| CI/CD Pipelines | Pipeline stages | Data/artifact flow | Build → Test → Deploy |
| Workflow Automation | Automated steps | Execution order | Extract → Transform → Load |
| Course Planning | Academic courses | Prerequisites | Calculus I → Calculus II |
| Factory Processing | Manufacturing steps | Assembly order | Cut → Weld → Paint |
Whenever you have 'things to do' with 'this must happen before that' constraints, you have a topological sorting problem. Recognizing this pattern is the first step to applying the powerful algorithms you've learned.
Let's build a complete, production-quality task scheduler that goes beyond simple topological sorting to include validation, execution, and error handling:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
from collections import dequefrom dataclasses import dataclass, fieldfrom typing import Callable, Any, Optionalfrom enum import Enumimport time class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" @dataclassclass Task: """Represents a schedulable task with dependencies.""" name: str action: Callable[[], Any] # The actual work to perform dependencies: list[str] = field(default_factory=list) status: TaskStatus = TaskStatus.PENDING result: Any = None error: Optional[Exception] = None duration: float = 0.0 class TaskScheduler: """ A dependency-aware task scheduler using topological sorting. Features: - Cycle detection with detailed error messages - Task execution with timing and error handling - Support for skipping tasks dependent on failures """ def __init__(self): self.tasks: dict[str, Task] = {} def add_task(self, name: str, action: Callable, dependencies: list[str] = None): """Add a task to the scheduler.""" deps = dependencies or [] self.tasks[name] = Task(name=name, action=action, dependencies=deps) def _compute_in_degrees(self) -> dict[str, int]: """Compute in-degree for each task.""" in_degree = {name: 0 for name in self.tasks} for task in self.tasks.values(): for dep in task.dependencies: if dep not in self.tasks: raise ValueError(f"Task '{task.name}' depends on unknown task '{dep}'") in_degree[task.name] = in_degree.get(task.name, 0) + 1 # Wait, this is wrong - we need to count incoming edges # Let me fix this in_degree = {name: 0 for name in self.tasks} for task in self.tasks.values(): for dep in task.dependencies: # dep → task.name edge, so task.name's in-degree increases pass # Actually the dependency list IS incoming edges # Each task's dependencies ARE its incoming edges for name in self.tasks: in_degree[name] = len(self.tasks[name].dependencies) return in_degree def _build_adjacency(self) -> dict[str, list[str]]: """Build adjacency list (dependency → dependent).""" adj = {name: [] for name in self.tasks} for task in self.tasks.values(): for dep in task.dependencies: adj[dep].append(task.name) return adj def get_execution_order(self) -> list[str]: """ Compute valid execution order using Kahn's algorithm. Raises ValueError if cycle is detected. """ in_degree = self._compute_in_degrees() adj = self._build_adjacency() # Find all tasks with no dependencies queue = deque() for name, degree in in_degree.items(): if degree == 0: queue.append(name) order = [] while queue: current = queue.popleft() order.append(current) for dependent in adj[current]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) if len(order) != len(self.tasks): # Cycle detected - find the tasks involved cycle_tasks = [name for name, deg in in_degree.items() if deg > 0] raise ValueError( f"Circular dependency detected among tasks: {cycle_tasks}" ) return order def run(self, stop_on_failure: bool = True) -> dict[str, TaskStatus]: """ Execute all tasks in dependency order. Args: stop_on_failure: If True, skip tasks dependent on failed tasks Returns: Dictionary mapping task names to their final status """ order = self.get_execution_order() failed_tasks = set() for task_name in order: task = self.tasks[task_name] # Check if any dependency failed failed_deps = [d for d in task.dependencies if d in failed_tasks] if failed_deps and stop_on_failure: task.status = TaskStatus.SKIPPED failed_tasks.add(task_name) # Propagate failure print(f"⏭️ Skipping '{task_name}' (depends on failed: {failed_deps})") continue # Execute the task print(f"▶️ Running '{task_name}'...") task.status = TaskStatus.RUNNING start_time = time.time() try: task.result = task.action() task.status = TaskStatus.COMPLETED task.duration = time.time() - start_time print(f"✅ Completed '{task_name}' in {task.duration:.2f}s") except Exception as e: task.status = TaskStatus.FAILED task.error = e task.duration = time.time() - start_time failed_tasks.add(task_name) print(f"❌ Failed '{task_name}': {e}") return {name: task.status for name, task in self.tasks.items()} # Example usagedef main(): scheduler = TaskScheduler() # Define tasks with dependencies scheduler.add_task("setup_db", lambda: print(" Creating database...")) scheduler.add_task("run_migrations", lambda: print(" Running migrations..."), dependencies=["setup_db"]) scheduler.add_task("seed_data", lambda: print(" Seeding test data..."), dependencies=["run_migrations"]) scheduler.add_task("start_server", lambda: print(" Starting server..."), dependencies=["run_migrations"]) scheduler.add_task("run_tests", lambda: print(" Running test suite..."), dependencies=["seed_data", "start_server"]) scheduler.add_task("generate_report", lambda: print(" Generating report..."), dependencies=["run_tests"]) print("📋 Execution Plan:", scheduler.get_execution_order()) print("\n🚀 Starting execution:\n") results = scheduler.run() print("\n📊 Final Status:", results) if __name__ == "__main__": main()A linear topological order is sufficient for sequential execution, but we can do better. Tasks with no dependencies between them can execute in parallel, dramatically reducing total execution time.
The Key Insight:
In Kahn's algorithm, all tasks in the source queue at any moment are independent—none depends on the others. They can all run simultaneously!
Level-Based Parallelization:
Group tasks by their 'level'—the longest path from any source. All tasks at the same level can run in parallel.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
from collections import dequefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom dataclasses import dataclassfrom typing import Callableimport time @dataclassclass ParallelTask: name: str action: Callable dependencies: list[str] class ParallelScheduler: """ Execute tasks in parallel where dependencies allow. Uses thread pool for concurrent execution. """ def __init__(self, max_workers: int = 4): self.tasks: dict[str, ParallelTask] = {} self.max_workers = max_workers def add_task(self, name: str, action: Callable, dependencies: list[str] = None): self.tasks[name] = ParallelTask(name, action, dependencies or []) def compute_levels(self) -> dict[int, list[str]]: """ Group tasks into parallel execution levels. Level 0: tasks with no dependencies Level n: tasks whose dependencies are all at level < n """ in_degree = {name: len(task.dependencies) for name, task in self.tasks.items()} adj = {name: [] for name in self.tasks} for task in self.tasks.values(): for dep in task.dependencies: adj[dep].append(task.name) levels = {} current_level = 0 queue = deque([name for name, deg in in_degree.items() if deg == 0]) while queue: # All tasks in current queue can run in parallel level_tasks = list(queue) levels[current_level] = level_tasks # Find next level's tasks next_queue = deque() for task_name in queue: for dependent in adj[task_name]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: next_queue.append(dependent) queue = next_queue current_level += 1 return levels def run_parallel(self) -> dict[str, float]: """ Execute tasks with maximum parallelization. Returns dict mapping task name to execution time. """ levels = self.compute_levels() timings = {} print(f"📊 Execution plan: {len(levels)} parallel levels") for level, tasks in levels.items(): print(f" Level {level}: {tasks}") print() with ThreadPoolExecutor(max_workers=self.max_workers) as executor: for level, task_names in sorted(levels.items()): print(f"🔄 Starting level {level}: {task_names}") start = time.time() # Submit all tasks at this level futures = { executor.submit(self.tasks[name].action): name for name in task_names } # Wait for all to complete for future in as_completed(futures): name = futures[future] try: future.result() print(f" ✅ {name} completed") except Exception as e: print(f" ❌ {name} failed: {e}") level_time = time.time() - start for name in task_names: timings[name] = level_time print(f" Level {level} completed in {level_time:.2f}s\n") return timings # Example: Parallel build with simulated workdef main(): import random def make_work(name, duration): def work(): time.sleep(duration) return f"{name} done" return work scheduler = ParallelScheduler(max_workers=4) # Multiple independent source tasks scheduler.add_task("parse_config", make_work("parse_config", 0.5)) scheduler.add_task("load_cache", make_work("load_cache", 0.3)) scheduler.add_task("check_deps", make_work("check_deps", 0.4)) # Tasks depending on sources scheduler.add_task("compile_a", make_work("compile_a", 1.0), ["parse_config"]) scheduler.add_task("compile_b", make_work("compile_b", 0.8), ["parse_config"]) scheduler.add_task("compile_c", make_work("compile_c", 1.2), ["load_cache"]) # Final linking depends on all compilations scheduler.add_task("link", make_work("link", 0.5), ["compile_a", "compile_b", "compile_c"]) start = time.time() scheduler.run_parallel() total = time.time() - start print(f"⏱️ Total parallel time: {total:.2f}s") print(f" Sequential would be: ~4.7s") print(f" Speedup: {4.7/total:.1f}x") if __name__ == "__main__": main()The number of levels in the DAG determines the minimum possible execution time with unlimited parallelism. This 'critical path length' is a hard lower bound. Real scheduling also considers resource constraints (limited CPUs, memory, I/O bandwidth).
Build systems are perhaps the most well-known application of topological sorting. Every major programming ecosystem has build tools based on these principles—Make, Maven, Gradle, Bazel, Webpack, and countless others.
The Build Problem:
Given source files with include/import dependencies, compile them in an order where each file is compiled only after all its dependencies are compiled.
Why Build Systems Love Topological Sort:
| Tool | Language/Platform | Dependency Source | Parallelization |
|---|---|---|---|
| Make | C/C++ | Makefile rules | make -j N |
| Maven | Java | pom.xml declarations | mvn -T N |
| Gradle | JVM languages | build.gradle | parallel = true |
| Bazel | Multi-language | BUILD files | Built-in |
| Webpack | JavaScript | ES modules/require | thread-loader |
| Cargo | Rust | Cargo.toml | Built-in |
| go build | Go | import statements | Built-in |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
from pathlib import Pathfrom dataclasses import dataclassfrom collections import dequeimport hashlibimport time @dataclassclass SourceFile: path: str dependencies: list[str] content_hash: str = "" last_compiled_hash: str = "" class BuildSystem: """ A simple build system demonstrating topological sort + change detection. """ def __init__(self): self.files: dict[str, SourceFile] = {} self.compiled_order: list[str] = [] def add_file(self, path: str, dependencies: list[str], content: str): """Register a source file with its dependencies.""" content_hash = hashlib.md5(content.encode()).hexdigest()[:8] self.files[path] = SourceFile( path=path, dependencies=dependencies, content_hash=content_hash ) def _needs_recompile(self, path: str) -> bool: """Check if file needs recompilation.""" f = self.files[path] # File changed? if f.content_hash != f.last_compiled_hash: return True # Any dependency needs recompile? for dep in f.dependencies: if self._needs_recompile(dep): return True return False def get_build_order(self) -> list[str]: """Compute build order via topological sort.""" in_degree = {path: len(f.dependencies) for path, f in self.files.items()} adj = {path: [] for path in self.files} for path, f in self.files.items(): for dep in f.dependencies: adj[dep].append(path) queue = deque([p for p, deg in in_degree.items() if deg == 0]) order = [] while queue: current = queue.popleft() order.append(current) for dependent in adj[current]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) if len(order) != len(self.files): raise ValueError("Circular dependency in build graph!") return order def build(self, incremental: bool = True) -> list[str]: """ Build all files in dependency order. Returns list of files that were compiled. """ order = self.get_build_order() compiled = [] for path in order: f = self.files[path] if incremental: # Check if any dependency was just compiled deps_compiled = any(d in compiled for d in f.dependencies) file_changed = f.content_hash != f.last_compiled_hash if not file_changed and not deps_compiled: print(f" ⏭️ {path} (unchanged)") continue # Simulate compilation print(f" 🔨 Compiling {path}...") time.sleep(0.1) # Simulated work f.last_compiled_hash = f.content_hash compiled.append(path) return compiled def update_file(self, path: str, new_content: str): """Update a file's content (simulates editing).""" if path in self.files: self.files[path].content_hash = hashlib.md5(new_content.encode()).hexdigest()[:8] # Example builddef main(): build = BuildSystem() # types.h - no dependencies build.add_file("types.h", [], "typedef int MyInt;") # utils.h depends on types.h build.add_file("utils.h", ["types.h"], "#include <types.h>") # logger.h - independent build.add_file("logger.h", [], "void log(char* msg);") # main.c depends on utils.h and logger.h build.add_file("main.c", ["utils.h", "logger.h"], "main() {}") # app.c depends on main.c output build.add_file("app.c", ["main.c"], "app entry") print("🏗️ Build Order:", build.get_build_order()) print("\n📦 Initial Build:") built = build.build(incremental=False) print(f" Built {len(built)} files\n") print("📦 Rebuild (no changes):") built = build.build(incremental=True) print(f" Built {len(built)} files\n") print("✏️ Modifying utils.h...") build.update_file("utils.h", "new utils content") print("📦 Incremental Rebuild:") built = build.build(incremental=True) print(f" Built {len(built)} files (utils.h, main.c, app.c)") if __name__ == "__main__": main()Course scheduling is a frequently used interview problem and a perfect illustration of topological sorting with constraints.
The Problem (LeetCode 207 & 210):
You have n courses labeled 0 to n-1. Some courses have prerequisites: to take course i, you must first take course j. Given a list of [course, prerequisite] pairs, determine if it's possible to finish all courses. If yes, return a valid order.
Mapping to DAG:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
from collections import deque def can_finish(num_courses: int, prerequisites: list[list[int]]) -> bool: """ LeetCode 207: Course Schedule Determine if it's possible to finish all courses. """ # Build adjacency list and in-degree count adj = [[] for _ in range(num_courses)] in_degree = [0] * num_courses for course, prereq in prerequisites: adj[prereq].append(course) # prereq → course in_degree[course] += 1 # Kahn's algorithm queue = deque([i for i in range(num_courses) if in_degree[i] == 0]) completed = 0 while queue: current = queue.popleft() completed += 1 for next_course in adj[current]: in_degree[next_course] -= 1 if in_degree[next_course] == 0: queue.append(next_course) return completed == num_courses def find_order(num_courses: int, prerequisites: list[list[int]]) -> list[int]: """ LeetCode 210: Course Schedule II Return a valid order to take all courses, or empty list if impossible. """ adj = [[] for _ in range(num_courses)] in_degree = [0] * num_courses for course, prereq in prerequisites: adj[prereq].append(course) in_degree[course] += 1 queue = deque([i for i in range(num_courses) if in_degree[i] == 0]) order = [] while queue: current = queue.popleft() order.append(current) for next_course in adj[current]: in_degree[next_course] -= 1 if in_degree[next_course] == 0: queue.append(next_course) return order if len(order) == num_courses else [] # Examplesprint("Example 1: Can finish 2 courses with prereqs [[1,0]]?")print(f" Result: {can_finish(2, [[1,0]])}") # True: take 0, then 1 print("\nExample 2: Can finish with circular deps [[1,0],[0,1]]?")print(f" Result: {can_finish(2, [[1,0], [0,1]])}") # False: cycle print("\nExample 3: Order for 4 courses [[1,0],[2,0],[3,1],[3,2]]?")print(f" Result: {find_order(4, [[1,0],[2,0],[3,1],[3,2]])}") # [0,1,2,3] or [0,2,1,3]The Alien Dictionary problem is a clever application that requires deriving the DAG from the input before applying topological sort.
The Problem (LeetCode 269):
There's a new alien language using the English alphabet. The order of letters is unknown. Given a sorted dictionary of words in this language, derive the order of letters.
The Key Insight:
In a sorted list of words, we can infer letter ordering by comparing adjacent words. The first difference between two adjacent words tells us the relative order of those differing letters.
Example:
Given words: ["wrt", "wrf", "er", "ett", "rftt"]
Build a DAG: w→e, e→r, r→t, t→f Topological sort: w, e, r, t, f
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
from collections import deque def alien_order(words: list[str]) -> str: """ Derive the character ordering from a sorted alien dictionary. Returns the order string, or empty string if invalid. """ # Step 1: Build the graph # Vertices: all unique characters # Edges: derived from adjacent word comparisons # Initialize adjacency list for all characters adj = {c: set() for word in words for c in word} in_degree = {c: 0 for c in adj} # Compare adjacent words to derive ordering for i in range(len(words) - 1): word1, word2 = words[i], words[i + 1] # Edge case: prefix should come before longer word if len(word1) > len(word2) and word1.startswith(word2): return "" # Invalid input # Find first differing character for c1, c2 in zip(word1, word2): if c1 != c2: # c1 comes before c2 in alien alphabet if c2 not in adj[c1]: adj[c1].add(c2) in_degree[c2] += 1 break # Only first difference matters # Step 2: Topological sort via Kahn's algorithm queue = deque([c for c in in_degree if in_degree[c] == 0]) result = [] while queue: current = queue.popleft() result.append(current) for neighbor in adj[current]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) # Check if all characters were processed if len(result) != len(adj): return "" # Cycle detected - invalid input return ''.join(result) # Examplesprint("Example 1: ['wrt','wrf','er','ett','rftt']")print(f" Order: {alien_order(['wrt','wrf','er','ett','rftt'])}") # "wertf" print("\nExample 2: ['z','x']")print(f" Order: {alien_order(['z','x'])}") # "zx" print("\nExample 3: ['z','x','z'] (invalid - cycle)")print(f" Order: {alien_order(['z','x','z'])}") # "" (z<x and x<z is impossible)The Alien Dictionary problem teaches an important lesson: sometimes the graph isn't given directly—you need to construct it from the problem constraints. Once you recognize that adjacent word comparisons define edges, the rest follows from standard topological sorting.
While task scheduling and build systems are canonical applications, topological sorting appears in many other contexts:
Module Complete:
You've now mastered topological sorting end-to-end: from the conceptual foundations (what it is, why DAGs), through both classical algorithms (Kahn's BFS and DFS post-order), to real-world applications (scheduling, build systems, and beyond). This is a complete toolkit for one of computer science's most practically important algorithms.
Congratulations! You have completed the Topological Sorting module. You can now: define and recognize topological sorting problems, verify if a graph is a DAG, implement both Kahn's and DFS-based algorithms, detect cycles, parallelize independent tasks, and apply these techniques to build systems, schedulers, and interview problems. This knowledge is foundational for advanced graph algorithms and practical software engineering.