Loading learning content...
We've explored the concepts, the testing procedures, and the trade-offs. Now it's time to bring everything together into a complete, executable algorithm for dependency-preserving decomposition. This algorithm represents the culmination of normalization theory—a systematic procedure that transforms any relation into a set of well-structured, dependency-preserving, lossless-join relations in Third Normal Form.
This isn't just academic exercise. The algorithm presented here is the foundation of database design tools, schema analyzers, and normalization utilities used in industry. Understanding it deeply enables you to either implement such tools or critically evaluate their output.
This page presents the complete dependency-preserving 3NF decomposition algorithm with full pseudocode, step-by-step implementation guidance, complexity analysis, comprehensive worked examples, and production considerations. You'll leave with a complete, implementable understanding.
The Dependency-Preserving 3NF Decomposition Algorithm (often called the 3NF Synthesis Algorithm or Bernstein's Synthesis) consists of four main phases:
Each phase builds on the previous, and together they guarantee a decomposition that is:
The canonical cover (also called minimal cover) is a simplified, equivalent set of functional dependencies. It removes redundancy from F without losing any information—critical for generating a minimal set of relations.
Definition:
A canonical cover Fc of F satisfies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
from typing import Set, List, Tuple, FrozenSetfrom dataclasses import dataclass @dataclass(frozen=True)class FD: """Immutable functional dependency representation.""" lhs: FrozenSet[str] # Left-hand side (determinant) rhs: FrozenSet[str] # Right-hand side (dependent) def __str__(self): lhs_str = ','.join(sorted(self.lhs)) rhs_str = ','.join(sorted(self.rhs)) return f"{lhs_str} → {rhs_str}" def compute_canonical_cover(fds: List[FD]) -> List[FD]: """ Compute the canonical cover of a set of functional dependencies. Algorithm: 1. Decompose RHS to single attributes 2. Remove extraneous LHS attributes 3. Remove extraneous RHS attributes (redundant FDs) 4. Recombine FDs with same LHS Time Complexity: O(n³ × m) where n = |attributes|, m = |FDs| """ # Step 1: Decompose multi-attribute RHS into single-attribute FDs # A → BC becomes A → B and A → C working_fds = [] for fd in fds: for attr in fd.rhs: working_fds.append(FD(fd.lhs, frozenset([attr]))) # Step 2: Remove extraneous LHS attributes # For each FD X → A, check if any attribute in X is extraneous changed = True while changed: changed = False new_fds = [] for fd in working_fds: if len(fd.lhs) <= 1: new_fds.append(fd) continue # Try removing each LHS attribute reduced_fd = fd for attr in fd.lhs: reduced_lhs = fd.lhs - {attr} if reduced_lhs: # Can't have empty LHS # Check if reduced_lhs → rhs is derivable test_fds = [f for f in working_fds if f != fd] test_fds.append(FD(reduced_lhs, fd.rhs)) if is_fd_derivable(fd.lhs, fd.rhs, test_fds): # Attribute is extraneous reduced_fd = FD(reduced_lhs, fd.rhs) changed = True break new_fds.append(reduced_fd) working_fds = new_fds # Step 3: Remove extraneous RHS / redundant FDs # An FD is redundant if it can be derived from the remaining FDs final_fds = [] for fd in working_fds: other_fds = [f for f in working_fds if f != fd] if not is_fd_derivable(fd.lhs, fd.rhs, other_fds): final_fds.append(fd) # Step 4: Recombine FDs with same LHS # A → B and A → C become A → BC lhs_to_rhs = {} for fd in final_fds: if fd.lhs not in lhs_to_rhs: lhs_to_rhs[fd.lhs] = set() lhs_to_rhs[fd.lhs].update(fd.rhs) canonical = [ FD(lhs, frozenset(rhs_set)) for lhs, rhs_set in lhs_to_rhs.items() ] return canonical def is_fd_derivable(lhs: FrozenSet, rhs: FrozenSet, fds: List[FD]) -> bool: """ Test if lhs → rhs is derivable from fds using attribute closure. """ closure = compute_closure(lhs, fds) return rhs.issubset(closure) def compute_closure(attrs: FrozenSet[str], fds: List[FD]) -> Set[str]: """ Compute attribute closure of attrs under fds. """ closure = set(attrs) changed = True while changed: changed = False for fd in fds: if fd.lhs.issubset(closure) and not fd.rhs.issubset(closure): closure.update(fd.rhs) changed = True return closureWithout computing the canonical cover, you might generate redundant relations (one for each redundant FD) or relations with unnecessary attributes (from extraneous LHS attributes). The canonical cover ensures a minimal, clean decomposition.
Once we have the canonical cover Fc, generating relations is straightforward: create one relation for each FD in Fc, containing exactly the attributes from that FD.
The Rule:
For each FD (X → Y) ∈ Fc, create relation Ri = X ∪ Y
The key of Ri is X (the determinant).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
@dataclassclass RelationSchema: """Schema for a decomposed relation.""" name: str attributes: FrozenSet[str] primary_key: FrozenSet[str] functional_dependencies: List[FD] def __str__(self): attrs = ', '.join(sorted(self.attributes)) key = ', '.join(sorted(self.primary_key)) return f"{self.name}({attrs}) [Key: {key}]" def generate_relations_from_canonical(canonical_cover: List[FD]) -> List[RelationSchema]: """ Phase 2: Generate one relation per FD in the canonical cover. For each FD X → Y in Fc: - Create relation with attributes X ∪ Y - Primary key is X - The FD is preserved in this relation Time Complexity: O(|Fc|) """ relations = [] for i, fd in enumerate(canonical_cover): # Relation attributes = LHS ∪ RHS attrs = fd.lhs.union(fd.rhs) # Primary key = LHS of the FD key = fd.lhs # This relation preserves this FD preserved_fds = [fd] relation = RelationSchema( name=f"R{i+1}", attributes=attrs, primary_key=key, functional_dependencies=preserved_fds ) relations.append(relation) return relations # Exampledef example_phase_2(): """Demonstrate Phase 2 with a simple example.""" # Canonical cover: {A → BC, C → D} canonical = [ FD(frozenset(['A']), frozenset(['B', 'C'])), FD(frozenset(['C']), frozenset(['D'])) ] relations = generate_relations_from_canonical(canonical) for rel in relations: print(rel) # Output: # R1(A, B, C) [Key: A] # R2(C, D) [Key: C]Why This Preserves Dependencies:
Every FD in Fc is fully contained in exactly one relation—the relation created from that FD. Since Fc⁺ = F⁺, preserving Fc means preserving F.
Why the Result is 3NF:
Each generated relation has the form R(X ∪ Y) where X → Y is in Fc. Any FD in this relation must have X (or a superkey) as its determinant, satisfying the 3NF property that non-key attributes depend only on the whole key.
The relations generated in Phase 2 preserve all dependencies, but they might not guarantee lossless join. To ensure lossless join, at least one relation must contain a candidate key of the original relation R.
The Check:
If no relation from Phase 2 contains a candidate key of R, add a new relation Rk containing exactly the attributes of one candidate key.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
def find_all_candidate_keys(all_attrs: FrozenSet[str], fds: List[FD]) -> List[FrozenSet[str]]: """ Find all candidate keys of a relation. A candidate key is a minimal set of attributes whose closure equals all attributes. Algorithm: 1. Start with attributes that never appear on RHS (must be in every key) 2. Add attributes that appear on both sides 3. Check if current set is a superkey 4. If not, incrementally add remaining attributes 5. Minimize to find all candidate keys """ # Partition attributes lhs_only = set() # Must be in every key rhs_only = set() # Never in any minimal key both_sides = set() all_lhs = set() all_rhs = set() for fd in fds: all_lhs.update(fd.lhs) all_rhs.update(fd.rhs) for attr in all_attrs: if attr in all_lhs and attr not in all_rhs: lhs_only.add(attr) elif attr in all_rhs and attr not in all_lhs: rhs_only.add(attr) else: both_sides.add(attr) # Start with LHS-only attributes (guaranteed in key) base = frozenset(lhs_only) # If base is already a superkey, it's the only candidate key if compute_closure(base, fds) >= all_attrs: return [base] if is_minimal_key(base, fds, all_attrs) else find_minimal_subsets(base, fds, all_attrs) # Need to add from both_sides until we get a superkey candidate_keys = [] from itertools import combinations for size in range(len(both_sides) + 1): for combo in combinations(both_sides, size): potential_key = base.union(combo) if compute_closure(potential_key, fds) >= all_attrs: # Check if minimal if is_minimal_key(potential_key, fds, all_attrs): candidate_keys.append(potential_key) # If we found keys at this size, stop (larger ones won't be minimal) if candidate_keys: break return candidate_keys if candidate_keys else [all_attrs] # Worst case: all attrs def is_minimal_key(key: FrozenSet[str], fds: List[FD], all_attrs: FrozenSet[str]) -> bool: """Check if key is minimal (no proper subset is a superkey).""" for attr in key: smaller = key - {attr} if smaller and compute_closure(smaller, fds) >= all_attrs: return False return True def ensure_lossless_join( relations: List[RelationSchema], original_attrs: FrozenSet[str], original_fds: List[FD]) -> List[RelationSchema]: """ Phase 3: Add a key relation if needed for lossless join. If no existing relation contains a candidate key, add one. """ candidate_keys = find_all_candidate_keys(original_attrs, original_fds) # Check if any relation contains a candidate key for rel in relations: for key in candidate_keys: if key.issubset(rel.attributes): # Already have a relation with a key - lossless guaranteed return relations # No relation contains a key - add one key_to_add = candidate_keys[0] # Pick first candidate key key_relation = RelationSchema( name=f"R_key", attributes=key_to_add, primary_key=key_to_add, functional_dependencies=[] # No FDs to preserve (just for lossless) ) return relations + [key_relation]When a relation contains a candidate key K, it can serve as the 'anchor' for joins. Since K → (all attributes), the chase algorithm will eventually produce a complete row matching the original data. Without any key-containing relation, tuples can be combined incorrectly during joins.
The final phase removes relations that are subsumed by others. If one relation's attributes are a subset of another's, the smaller one is redundant—its data can be obtained by projecting from the larger relation.
The Rule:
Remove Ri if there exists Rj such that Ri.attributes ⊂ Rj.attributes (proper subset).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
def remove_redundant_relations(relations: List[RelationSchema]) -> List[RelationSchema]: """ Phase 4: Remove relations whose attributes are a subset of another. If Ri ⊂ Rj, remove Ri (its data is available via projection from Rj). Time Complexity: O(n² × k) where n = |relations|, k = max |attributes| """ non_redundant = [] for i, rel_i in enumerate(relations): is_redundant = False for j, rel_j in enumerate(relations): if i != j: # Check if rel_i is a proper subset of rel_j if rel_i.attributes < rel_j.attributes: # Proper subset is_redundant = True break if not is_redundant: non_redundant.append(rel_i) return non_redundant def remove_redundant_optimized(relations: List[RelationSchema]) -> List[RelationSchema]: """ Optimized version using sorting by size. Smaller relations can only be subsumed by larger ones. """ # Sort by number of attributes (ascending) sorted_rels = sorted(relations, key=lambda r: len(r.attributes)) result = [] for i, rel in enumerate(sorted_rels): # Only need to check against larger relations is_subsumed = False for larger in sorted_rels[i+1:]: if rel.attributes < larger.attributes: is_subsumed = True break if not is_subsumed: result.append(rel) return resultWhen Redundancy Occurs:
Redundancy typically happens when:
Safety Note:
Removing a redundant relation doesn't affect preservation—if Ri ⊂ Rj, any FD preserved in Ri is also preserved in Rj (since Rj has all of Ri's attributes plus more).
Here's the complete algorithm, combining all four phases into a single, production-ready implementation:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
"""Complete Dependency-Preserving 3NF Decomposition Algorithm Guarantees:- Result is in Third Normal Form (3NF)- All functional dependencies are preserved- Decomposition is lossless-join Time Complexity: O(n³ × m²) where n = |attributes|, m = |FDs|Space Complexity: O(n × m)""" from typing import List, FrozenSet, Set, Tuplefrom dataclasses import dataclass @dataclass(frozen=True)class FD: lhs: FrozenSet[str] rhs: FrozenSet[str] def __str__(self): return f"{','.join(sorted(self.lhs))} → {','.join(sorted(self.rhs))}" @dataclassclass RelationSchema: name: str attributes: FrozenSet[str] primary_key: FrozenSet[str] preserved_fds: List[FD] def __str__(self): attrs = ', '.join(sorted(self.attributes)) key = ', '.join(sorted(self.primary_key)) return f"{self.name}({attrs}) [Key: {key}]" class ThreeNFDecomposer: """ Main class for 3NF decomposition with dependency preservation. """ def __init__(self, attributes: Set[str], fds: List[Tuple[Set[str], Set[str]]]): """ Initialize with relation attributes and functional dependencies. Parameters: - attributes: Set of attribute names - fds: List of (lhs, rhs) tuples representing FDs """ self.original_attrs = frozenset(attributes) self.original_fds = [ FD(frozenset(lhs), frozenset(rhs)) for lhs, rhs in fds ] def decompose(self) -> List[RelationSchema]: """ Execute the complete 3NF decomposition algorithm. Returns list of RelationSchema objects representing the decomposition. """ print("=" * 60) print("3NF DECOMPOSITION ALGORITHM") print("=" * 60) # Phase 1: Compute canonical cover print("\n[Phase 1] Computing canonical cover...") canonical = self._compute_canonical_cover() print(f" Canonical cover: {[str(fd) for fd in canonical]}") # Phase 2: Generate relations print("\n[Phase 2] Generating relations...") relations = self._generate_relations(canonical) for rel in relations: print(f" {rel}") # Phase 3: Ensure lossless join print("\n[Phase 3] Checking lossless join property...") relations = self._ensure_lossless(relations) # Phase 4: Remove redundant relations print("\n[Phase 4] Removing redundant relations...") relations = self._remove_redundant(relations) print("\n" + "=" * 60) print("FINAL DECOMPOSITION") print("=" * 60) for rel in relations: print(f" {rel}") return relations def _compute_canonical_cover(self) -> List[FD]: """Phase 1: Compute canonical cover of FDs.""" # Step 1: Decompose RHS to singletons working = [] for fd in self.original_fds: for attr in fd.rhs: working.append(FD(fd.lhs, frozenset([attr]))) # Step 2: Remove extraneous LHS attributes working = self._remove_extraneous_lhs(working) # Step 3: Remove redundant FDs working = self._remove_redundant_fds(working) # Step 4: Recombine by LHS lhs_to_rhs = {} for fd in working: if fd.lhs not in lhs_to_rhs: lhs_to_rhs[fd.lhs] = set() lhs_to_rhs[fd.lhs].update(fd.rhs) return [FD(lhs, frozenset(rhs)) for lhs, rhs in lhs_to_rhs.items()] def _remove_extraneous_lhs(self, fds: List[FD]) -> List[FD]: """Remove extraneous attributes from LHS of each FD.""" result = list(fds) changed = True while changed: changed = False new_result = [] for fd in result: reduced = fd if len(fd.lhs) > 1: for attr in fd.lhs: test_lhs = fd.lhs - {attr} if test_lhs: # Check if reduced FD still works other_fds = [f for f in result if f != fd] other_fds.append(FD(test_lhs, fd.rhs)) closure = self._compute_closure(test_lhs, other_fds) if fd.rhs.issubset(closure): reduced = FD(test_lhs, fd.rhs) changed = True break new_result.append(reduced) result = new_result return result def _remove_redundant_fds(self, fds: List[FD]) -> List[FD]: """Remove FDs that are derivable from others.""" result = [] for fd in fds: others = [f for f in fds if f != fd] closure = self._compute_closure(fd.lhs, others) if not fd.rhs.issubset(closure): result.append(fd) return result def _compute_closure(self, attrs: FrozenSet[str], fds: List[FD]) -> Set[str]: """Compute attribute closure.""" closure = set(attrs) changed = True while changed: changed = False for fd in fds: if fd.lhs.issubset(closure) and not fd.rhs.issubset(closure): closure.update(fd.rhs) changed = True return closure def _generate_relations(self, canonical: List[FD]) -> List[RelationSchema]: """Phase 2: Create one relation per FD.""" relations = [] for i, fd in enumerate(canonical): rel = RelationSchema( name=f"R{i+1}", attributes=fd.lhs.union(fd.rhs), primary_key=fd.lhs, preserved_fds=[fd] ) relations.append(rel) return relations def _ensure_lossless(self, relations: List[RelationSchema]) -> List[RelationSchema]: """Phase 3: Add key relation if needed.""" candidate_keys = self._find_candidate_keys() # Check if any relation contains a key for rel in relations: for key in candidate_keys: if key.issubset(rel.attributes): print(f" ✓ Lossless: {rel.name} contains key {set(key)}") return relations # Add key relation key = candidate_keys[0] key_rel = RelationSchema( name="R_key", attributes=key, primary_key=key, preserved_fds=[] ) print(f" + Adding key relation: {key_rel}") return relations + [key_rel] def _find_candidate_keys(self) -> List[FrozenSet[str]]: """Find candidate keys of original relation.""" # Find attributes that must be in every key all_lhs = set() all_rhs = set() for fd in self.original_fds: all_lhs.update(fd.lhs) all_rhs.update(fd.rhs) must_be_in_key = frozenset(a for a in self.original_attrs if a not in all_rhs) # Try to close this set closure = self._compute_closure(must_be_in_key, self.original_fds) if closure >= self.original_attrs: return [must_be_in_key] # Need to add more attributes from itertools import combinations remaining = self.original_attrs - must_be_in_key - frozenset(a for a in self.original_attrs if a in all_rhs and a not in all_lhs) for size in range(len(remaining) + 1): for combo in combinations(remaining, size): potential = must_be_in_key.union(combo) if self._compute_closure(potential, self.original_fds) >= self.original_attrs: return [potential] return [self.original_attrs] def _remove_redundant(self, relations: List[RelationSchema]) -> List[RelationSchema]: """Phase 4: Remove subsumed relations.""" result = [] for rel in relations: subsumed = False for other in relations: if rel != other and rel.attributes < other.attributes: print(f" - Removing {rel.name} (subsumed by {other.name})") subsumed = True break if not subsumed: result.append(rel) if len(result) == len(relations): print(" No redundant relations found") return result # =====================================================# USAGE EXAMPLE# ===================================================== if __name__ == "__main__": # Example: Employee database attributes = {'EmpID', 'Name', 'DeptID', 'DeptName', 'ManagerID'} fds = [ ({'EmpID'}, {'Name', 'DeptID'}), ({'DeptID'}, {'DeptName', 'ManagerID'}), ] decomposer = ThreeNFDecomposer(attributes, fds) result = decomposer.decompose()Let's trace through the complete algorithm with a realistic example.
Input:
Relation: CourseEnrollment(StudentID, StudentName, CourseID, CourseName, InstructorID, InstructorName, DeptID, DeptName)
Functional Dependencies:
| Step | Action | Result |
|---|---|---|
| 1a | Decompose F2: CourseID → CourseName, InstructorID, DeptID | CourseID → CourseName; CourseID → InstructorID; CourseID → DeptID |
| 1b | Decompose F3: InstructorID → InstructorName, DeptID | InstructorID → InstructorName; InstructorID → DeptID |
| 1c | Check extraneous LHS | All LHS are single attributes - none extraneous |
| 1d | Check redundant FDs | CourseID → DeptID is derivable (CourseID → InstructorID → DeptID) - REMOVE |
| 1e | Recombine by LHS | See canonical cover below |
Canonical Cover Fc:
| FD | Generated Relation | Key |
|---|---|---|
| StudentID → StudentName | R1(StudentID, StudentName) | StudentID |
| CourseID → CourseName, InstructorID | R2(CourseID, CourseName, InstructorID) | CourseID |
| InstructorID → InstructorName, DeptID | R3(InstructorID, InstructorName, DeptID) | InstructorID |
| DeptID → DeptName | R4(DeptID, DeptName) | DeptID |
Phase 3: Lossless Join Check
Candidate key of original: {StudentID, CourseID}
Does any Ri contain {StudentID, CourseID}?
Action: Add R5(StudentID, CourseID) as key relation.
Phase 4: Redundancy Check
No redundant relations.
R1(StudentID, StudentName), R2(CourseID, CourseName, InstructorID), R3(InstructorID, InstructorName, DeptID), R4(DeptID, DeptName), R5(StudentID, CourseID)
This decomposition is in 3NF, preserves all FDs, and is lossless-join. The enrollment relationship is captured in R5, linking students to courses.
We've presented the complete dependency-preserving 3NF decomposition algorithm, from theory to implementation. Let's consolidate the essential knowledge:
Congratulations! You've mastered dependency-preserving decomposition—from understanding why preservation matters, to testing algorithms, to navigating trade-offs, to implementing the complete 3NF synthesis algorithm. This knowledge forms the foundation for designing robust, efficient database schemas that maintain data integrity at scale.
What's Next:
With dependency-preserving decomposition mastered, you're ready to explore Third Normal Form (3NF) in depth—understanding its definition, identifying violations, and applying the synthesis algorithm we've developed here. The next module will build directly on this foundation.