Loading learning content...
Sequence labeling is the canonical application of Conditional Random Fields. The task is deceptively simple: given a sequence of inputs (words, characters, time steps), assign a label to each element. Yet this simple formulation underlies some of the most important NLP applications:
This page brings together everything we've learned about CRFs to show how they're applied to solve these real-world problems. We'll cover labeling schemes, evaluation metrics, implementation details, and best practices.
By the end of this page, you will understand: (1) The IOB/BIO and related labeling schemes for entity annotation, (2) NER and POS tagging as canonical CRF applications, (3) Evaluation metrics: precision, recall, F1, and entity-level vs. token-level, (4) Complete implementation workflow from data to trained model, and (5) Error analysis and debugging strategies.
When entities span multiple tokens, we need a way to encode boundaries. Several schemes exist, each with trade-offs.
BIO (Beginning-Inside-Outside) Scheme:
The most common scheme. Each token is labeled:
Example:
| Token | Label | Meaning |
|---|---|---|
| Barack | B-PER | Start of person name |
| Obama | I-PER | Continuation of person name |
| was | O | Not an entity |
| born | O | Not an entity |
| in | O | Not an entity |
| Honolulu | B-LOC | Start of location |
| , | O | Not an entity |
| Hawaii | B-LOC | Start of (different) location |
Notice: Two consecutive B-tags indicate two separate entities. This handles adjacent entities of the same type.
BIOES (Begin-Inside-Outside-End-Singleton) Scheme:
More expressive scheme that explicitly marks entity boundaries:
Same example in BIOES:
| Token | Label | Meaning |
|---|---|---|
| Barack | B-PER | Start of multi-token person |
| Obama | E-PER | End of multi-token person |
| was | O | Not an entity |
| born | O | Not an entity |
| in | O | Not an entity |
| Honolulu | S-LOC | Single-token location |
| , | O | Not an entity |
| Hawaii | S-LOC | Single-token location |
BIOES vs. BIO:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
from typing import List, Tuple, Dictfrom dataclasses import dataclass @dataclassclass Entity: """Represents an extracted entity.""" entity_type: str start: int # Token index end: int # Token index (exclusive) text: str def bio_to_entities(tokens: List[str], labels: List[str]) -> List[Entity]: """ Convert BIO labels to entity spans. Args: tokens: List of tokens labels: List of BIO labels Returns: List of Entity objects """ entities = [] current_entity = None current_start = None for i, (token, label) in enumerate(zip(tokens, labels)): if label.startswith('B-'): # Close previous entity if exists if current_entity: entities.append(Entity( entity_type=current_entity, start=current_start, end=i, text=' '.join(tokens[current_start:i]) )) # Start new entity current_entity = label[2:] # Remove 'B-' prefix current_start = i elif label.startswith('I-'): # Continue current entity (if matching type) entity_type = label[2:] if current_entity != entity_type: # Type mismatch - close previous and start new if current_entity: entities.append(Entity( entity_type=current_entity, start=current_start, end=i, text=' '.join(tokens[current_start:i]) )) current_entity = entity_type current_start = i else: # 'O' label # Close current entity if exists if current_entity: entities.append(Entity( entity_type=current_entity, start=current_start, end=i, text=' '.join(tokens[current_start:i]) )) current_entity = None current_start = None # Don't forget last entity if current_entity: entities.append(Entity( entity_type=current_entity, start=current_start, end=len(tokens), text=' '.join(tokens[current_start:]) )) return entities def bioes_to_entities(tokens: List[str], labels: List[str]) -> List[Entity]: """ Convert BIOES labels to entity spans. """ entities = [] current_entity = None current_start = None for i, (token, label) in enumerate(zip(tokens, labels)): if label.startswith('S-'): # Singleton entity entity_type = label[2:] entities.append(Entity( entity_type=entity_type, start=i, end=i+1, text=token )) current_entity = None elif label.startswith('B-'): # Start of multi-token entity current_entity = label[2:] current_start = i elif label.startswith('E-'): # End of multi-token entity if current_entity: entities.append(Entity( entity_type=current_entity, start=current_start, end=i+1, text=' '.join(tokens[current_start:i+1]) )) current_entity = None elif label.startswith('I-'): # Inside - just continue pass else: # 'O' current_entity = None return entities def validate_bio_sequence(labels: List[str]) -> List[str]: """ Validate and fix invalid BIO sequences. Invalid: I-X without preceding B-X or I-X Fix: Convert orphan I-X to B-X """ fixed = [] prev_entity = None for label in labels: if label.startswith('I-'): entity_type = label[2:] if prev_entity != entity_type: # Invalid - convert to B- fixed.append(f'B-{entity_type}') prev_entity = entity_type else: fixed.append(label) elif label.startswith('B-'): prev_entity = label[2:] fixed.append(label) else: prev_entity = None fixed.append(label) return fixed # Exampletokens = ["Barack", "Obama", "was", "born", "in", "Honolulu", ",", "Hawaii"]bio_labels = ["B-PER", "I-PER", "O", "O", "O", "B-LOC", "O", "B-LOC"]bioes_labels = ["B-PER", "E-PER", "O", "O", "O", "S-LOC", "O", "S-LOC"] print("BIO Example:")for tok, label in zip(tokens, bio_labels): print(f" {tok:12} {label}") print("\nExtracted entities from BIO:")for entity in bio_to_entities(tokens, bio_labels): print(f" {entity.entity_type}: '{entity.text}' [{entity.start}:{entity.end}]") print("\nBIOES Example:")for tok, label in zip(tokens, bioes_labels): print(f" {tok:12} {label}") print("\nExtracted entities from BIOES:")for entity in bioes_to_entities(tokens, bioes_labels): print(f" {entity.entity_type}: '{entity.text}' [{entity.start}:{entity.end}]")CRFs naturally learn that certain transitions are invalid (e.g., I-PER cannot follow B-LOC). You can also enforce these constraints explicitly by setting transition scores to -∞ for invalid pairs. This guarantees valid output sequences even early in training.
Named Entity Recognition is the task of identifying and classifying named entities in text. It's one of the most important information extraction tasks, serving as a foundation for question answering, knowledge graph construction, and document understanding.
Common Entity Types:
| Type | Abbreviation | Examples |
|---|---|---|
| Person | PER | Barack Obama, Marie Curie |
| Organization | ORG | Google, United Nations |
| Location | LOC | Paris, Mount Everest |
| Geo-Political Entity | GPE | France, New York City |
| Date | DATE | January 15, 2024 |
| Time | TIME | 3:30 PM, morning |
| Money | MONEY | $50, €100 million |
| Percentage | PERCENT | 50%, three percent |
Standard Datasets:
NER Challenges:
1. Ambiguity:
2. Nested Entities:
3. Rare and Emerging Entities:
4. Domain Shift:
Why CRFs Excel at NER:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
from typing import List, Dict, Setimport re class NERFeatureExtractor: """ Feature extractor specialized for Named Entity Recognition. Implements standard NER features plus domain-specific patterns. """ def __init__( self, person_gazetteer: Set[str] = None, org_gazetteer: Set[str] = None, loc_gazetteer: Set[str] = None, ): self.person_gaz = person_gazetteer or set() self.org_gaz = org_gazetteer or set() self.loc_gaz = loc_gazetteer or set() def extract_features( self, tokens: List[str], position: int ) -> Dict[str, float]: """ Extract features for NER at given position. Returns dictionary of feature_name -> value. """ features = {} n = len(tokens) word = tokens[position] # ============== Word Identity ============== features['word.lower()'] = word.lower() # Previous/next words if position > 0: features['word[-1].lower()'] = tokens[position-1].lower() else: features['BOS'] = True if position < n - 1: features['word[+1].lower()'] = tokens[position+1].lower() else: features['EOS'] = True # ============== Orthographic ============== features['is_capitalized'] = word[0].isupper() if word else False features['is_all_caps'] = word.isupper() features['is_all_lower'] = word.islower() features['is_title'] = word.istitle() # Mixed case patterns features['has_mixed_case'] = ( any(c.isupper() for c in word) and any(c.islower() for c in word) and not word.istitle() ) # ============== Shape ============== features['word_shape'] = self._get_word_shape(word) features['short_word_shape'] = self._get_short_shape(word) # ============== Affixes ============== for length in [2, 3, 4]: if len(word) >= length: features[f'prefix_{length}'] = word[:length].lower() features[f'suffix_{length}'] = word[-length:].lower() # ============== Character Patterns ============== features['has_digit'] = any(c.isdigit() for c in word) features['all_digits'] = word.isdigit() features['has_hyphen'] = '-' in word features['has_period'] = '.' in word features['has_apostrophe'] = "'" in word # Number patterns features['is_year'] = bool(re.match(r'^(19|20)\d{2}$', word)) features['is_money'] = bool(re.match(r'^\$[\d,]+(\.\d{2})?$', word)) features['is_percent'] = bool(re.match(r'^\d+(\.\d+)?%$', word)) # ============== Gazetteer Lookups ============== word_lower = word.lower() features['in_person_gaz'] = word_lower in self.person_gaz features['in_org_gaz'] = word_lower in self.org_gaz features['in_loc_gaz'] = word_lower in self.loc_gaz # ============== Context Features ============== # Position in sentence features['is_first_word'] = (position == 0) features['is_last_word'] = (position == n - 1) # Surrounding capitalization if position > 0: features['prev_is_cap'] = tokens[position-1][0].isupper() if tokens[position-1] else False if position < n - 1: features['next_is_cap'] = tokens[position+1][0].isupper() if tokens[position+1] else False # ============== Trigger Words ============== # Words that often precede entities if position > 0: prev = tokens[position-1].lower() features['prev_is_title'] = prev in {'mr', 'mrs', 'ms', 'dr', 'prof', 'president', 'ceo'} features['prev_is_prep'] = prev in {'in', 'at', 'from', 'to', 'of'} return features def _get_word_shape(self, word: str) -> str: """Convert word to shape pattern.""" shape = [] for c in word: if c.isupper(): shape.append('X') elif c.islower(): shape.append('x') elif c.isdigit(): shape.append('d') else: shape.append(c) return ''.join(shape) def _get_short_shape(self, word: str) -> str: """Collapsed shape pattern.""" shape = self._get_word_shape(word) result = [shape[0]] if shape else [] for c in shape[1:]: if c != result[-1]: result.append(c) return ''.join(result) # Exampletokens = ["President", "Barack", "Obama", "visited", "the", "United", "Nations", "in", "New", "York"] extractor = NERFeatureExtractor( person_gazetteer={'obama', 'biden', 'trump'}, org_gazetteer={'united nations', 'google', 'apple'}, loc_gazetteer={'new york', 'paris', 'london'}) print("NER Features for 'Barack' (position 1):")features = extractor.extract_features(tokens, 1)for name, value in sorted(features.items())[:15]: print(f" {name}: {value}")Part-of-Speech (POS) tagging assigns grammatical categories to each word in a sentence. While seemingly a solved problem (97%+ accuracy on standard benchmarks), it remains important as a preprocessing step for parsing, NER, and other NLP tasks.
Penn Treebank Tag Set (45 tags):
| Tag | Description | Example |
|---|---|---|
| NN | Noun, singular | dog, city |
| NNS | Noun, plural | dogs, cities |
| NNP | Proper noun, singular | John, Paris |
| NNPS | Proper noun, plural | Americans |
| VB | Verb, base form | run, eat |
| VBD | Verb, past tense | ran, ate |
| VBG | Verb, gerund | running, eating |
| VBN | Verb, past participle | run, eaten |
| VBP | Verb, non-3rd person | run, eat |
| VBZ | Verb, 3rd person singular | runs, eats |
| JJ | Adjective | big, red |
| JJR | Adjective, comparative | bigger, redder |
| JJS | Adjective, superlative | biggest, reddest |
| RB | Adverb | quickly, very |
| DT | Determiner | the, a, some |
| IN | Preposition | in, on, with |
| ... | ... | ... |
Universal Dependencies Tag Set (17 tags):
A simpler, cross-linguistic tag set:
| Tag | Description |
|---|---|
| NOUN | Noun |
| VERB | Verb |
| ADJ | Adjective |
| ADV | Adverb |
| PRON | Pronoun |
| DET | Determiner |
| ADP | Adposition (preposition/postposition) |
| NUM | Numeral |
| CONJ | Conjunction |
| PART | Particle |
| PUNCT | Punctuation |
| ... | ... |
POS Tagging Challenges:
Why CRFs Work Well for POS:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
from typing import List, Dict class POSFeatureExtractor: """ Feature extractor for Part-of-Speech tagging. """ def __init__(self): self.common_suffixes = { 'verb': ['ing', 'ed', 'en', 'es', 's'], 'noun': ['tion', 'ness', 'ment', 'ity', 'er', 'or'], 'adj': ['al', 'ful', 'less', 'ous', 'ive', 'able', 'ible'], 'adv': ['ly'] } def extract_features( self, tokens: List[str], position: int ) -> Dict[str, float]: """ Extract POS-specific features. """ features = {} n = len(tokens) word = tokens[position] word_lower = word.lower() # ============== Word Features ============== features['word'] = word_lower # Window words for offset in [-2, -1, 1, 2]: idx = position + offset if 0 <= idx < n: features[f'word[{offset:+d}]'] = tokens[idx].lower() # ============== Morphological ============== # Suffixes (crucial for POS) for length in [1, 2, 3, 4]: if len(word) >= length: suffix = word_lower[-length:] features[f'suffix{length}'] = suffix # Check against known POS suffixes for pos, suffixes in self.common_suffixes.items(): if suffix in suffixes: features[f'has_{pos}_suffix'] = True # Prefixes for length in [2, 3]: if len(word) >= length: features[f'prefix{length}'] = word_lower[:length] # ============== Orthographic ============== features['is_cap'] = word[0].isupper() if word else False features['all_caps'] = word.isupper() and len(word) > 1 features['has_digit'] = any(c.isdigit() for c in word) features['has_hyphen'] = '-' in word features['all_digits'] = word.isdigit() # ============== Word Patterns ============== features['is_punct'] = word in '.,;:!?()[]{}"\'/-' features['is_quote'] = word in '"\'``\'\'' # Length features features['word_length'] = len(word) features['is_short'] = len(word) <= 2 # ============== Context Patterns ============== # Bigram features if position > 0: features['bigram[-1,0]'] = f"{tokens[position-1].lower()}_{word_lower}" if position < n - 1: features['bigram[0,+1]'] = f"{word_lower}_{tokens[position+1].lower()}" # ============== Position ============== features['is_first'] = (position == 0) features['is_last'] = (position == n - 1) return features # Examplesentence = "The quick brown fox jumps over the lazy dog".split() extractor = POSFeatureExtractor()print("POS Features for 'jumps' (position 4):")features = extractor.extract_features(sentence, 4)for name, value in sorted(features.items())[:12]: print(f" {name}: {value}")State-of-the-art POS taggers achieve ~97.5% accuracy on Wall Street Journal text. This seems high, but consider: with 97.5% per-token accuracy, a 20-word sentence has only (0.975)^20 ≈ 60% chance of being completely correct. Errors compound for downstream tasks.
Evaluating sequence labeling models requires care—raw accuracy can be misleading. Different tasks call for different metrics.
Token-Level Accuracy:
$$\text{Accuracy} = \frac{\text{Correctly labeled tokens}}{\text{Total tokens}}$$
Entity-Level Evaluation (for NER):
An entity is correct only if BOTH:
$$\text{Precision} = \frac{\text{Correctly predicted entities}}{\text{Total predicted entities}}$$
$$\text{Recall} = \frac{\text{Correctly predicted entities}}{\text{Total gold entities}}$$
$$F_1 = \frac{2 \cdot \text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}$$
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
from typing import List, Dict, Set, Tuplefrom collections import defaultdictfrom dataclasses import dataclass @dataclassclass Entity: entity_type: str start: int end: int text: str = "" def __hash__(self): return hash((self.entity_type, self.start, self.end)) def __eq__(self, other): return (self.entity_type == other.entity_type and self.start == other.start and self.end == other.end) def compute_ner_metrics( gold_entities: List[Entity], pred_entities: List[Entity]) -> Dict[str, float]: """ Compute NER evaluation metrics. Uses exact match: both boundaries and type must match. """ gold_set = set(gold_entities) pred_set = set(pred_entities) true_positives = gold_set & pred_set false_positives = pred_set - gold_set false_negatives = gold_set - pred_set tp = len(true_positives) fp = len(false_positives) fn = len(false_negatives) precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 return { 'precision': precision, 'recall': recall, 'f1': f1, 'true_positives': tp, 'false_positives': fp, 'false_negatives': fn } def compute_metrics_by_type( gold_entities: List[Entity], pred_entities: List[Entity]) -> Dict[str, Dict[str, float]]: """ Compute metrics separately for each entity type. """ # Group by type gold_by_type = defaultdict(list) pred_by_type = defaultdict(list) for entity in gold_entities: gold_by_type[entity.entity_type].append(entity) for entity in pred_entities: pred_by_type[entity.entity_type].append(entity) all_types = set(gold_by_type.keys()) | set(pred_by_type.keys()) results = {} for entity_type in all_types: results[entity_type] = compute_ner_metrics( gold_by_type[entity_type], pred_by_type[entity_type] ) return results def compute_seqeval_metrics( gold_labels: List[List[str]], pred_labels: List[List[str]]) -> Dict[str, float]: """ Compute metrics using seqeval-style evaluation. This is the standard evaluation used in research papers. """ all_gold_entities = [] all_pred_entities = [] for sent_idx, (gold, pred) in enumerate(zip(gold_labels, pred_labels)): # Extract entities from BIO labels gold_ents = extract_entities_with_offset(gold, sent_idx * 1000) pred_ents = extract_entities_with_offset(pred, sent_idx * 1000) all_gold_entities.extend(gold_ents) all_pred_entities.extend(pred_ents) return compute_ner_metrics(all_gold_entities, all_pred_entities) def extract_entities_with_offset(labels: List[str], offset: int) -> List[Entity]: """Extract entities with position offset for unique identification.""" entities = [] current_type = None current_start = None for i, label in enumerate(labels): if label.startswith('B-'): if current_type: entities.append(Entity(current_type, current_start + offset, i + offset)) current_type = label[2:] current_start = i elif label.startswith('I-'): if current_type is None or label[2:] != current_type: if current_type: entities.append(Entity(current_type, current_start + offset, i + offset)) current_type = label[2:] current_start = i else: if current_type: entities.append(Entity(current_type, current_start + offset, i + offset)) current_type = None if current_type: entities.append(Entity(current_type, current_start + offset, len(labels) + offset)) return entities # Example evaluationgold = [Entity('PER', 0, 2), Entity('LOC', 5, 6), Entity('ORG', 8, 10)]pred = [Entity('PER', 0, 2), Entity('LOC', 5, 7), Entity('ORG', 8, 10)] # LOC boundary wrong print("NER Evaluation Example")print("=" * 50)print(f"Gold entities: {[(e.entity_type, e.start, e.end) for e in gold]}")print(f"Pred entities: {[(e.entity_type, e.start, e.end) for e in pred]}") metrics = compute_ner_metrics(gold, pred)print(f"\nMetrics:")print(f" Precision: {metrics['precision']:.3f}")print(f" Recall: {metrics['recall']:.3f}")print(f" F1: {metrics['f1']:.3f}")print(f" TP: {metrics['true_positives']}, FP: {metrics['false_positives']}, FN: {metrics['false_negatives']}")| Task | Primary Metric | Why |
|---|---|---|
| NER | Entity-level F1 | Boundaries matter; token accuracy misleading |
| POS Tagging | Token accuracy | Each token independent; no entity spans |
| Chunking | F1 (chunk-level) | Like NER with syntactic chunks |
| Slot Filling | Slot F1 | Entity-level for extracted slots |
Strict evaluation (require exact match) can undervalue partially correct predictions. 'Barack Obama' predicted as 'Obama' (missing 'Barack') counts as both FP and FN. Some evaluations use relaxed matching (partial overlap), but strict is standard for research comparison.
Let's put everything together into a complete sequence labeling pipeline using a CRF.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
"""Complete CRF Sequence Labeling Pipeline This module implements a full NER system using CRFs:1. Data loading and preprocessing2. Feature extraction3. Model training with regularization4. Evaluation on test set5. Prediction on new text""" import numpy as npfrom typing import List, Tuple, Dictfrom collections import defaultdictimport pickle class CRFSequenceLabeler: """ Complete CRF-based sequence labeler. """ def __init__( self, feature_templates: List[str] = None, regularization: float = 1.0 ): self.feature_templates = feature_templates or [ 'word', 'word_lower', 'suffix2', 'suffix3', 'prefix2', 'prefix3', 'is_capitalized', 'word_shape' ] self.regularization = regularization self.feature_to_id = {} self.label_to_id = {} self.id_to_label = {} self.weights = None def fit( self, X: List[List[str]], # List of token sequences y: List[List[str]], # List of label sequences epochs: int = 10, learning_rate: float = 0.1, verbose: bool = True ): """ Train the CRF model. Args: X: List of token sequences y: List of label sequences epochs: Number of training epochs learning_rate: SGD learning rate verbose: Print training progress """ # Step 1: Build vocabularies self._build_vocabularies(X, y) if verbose: print(f"Vocabulary: {len(self.feature_to_id)} features, {len(self.label_to_id)} labels") # Step 2: Initialize weights self.weights = np.zeros(len(self.feature_to_id)) # Step 3: Training loop (SGD) for epoch in range(epochs): total_loss = 0.0 # Shuffle data indices = np.random.permutation(len(X)) for idx in indices: tokens, labels = X[idx], y[idx] label_ids = [self.label_to_id[l] for l in labels] # Compute gradient and loss gradient, loss = self._compute_gradient(tokens, label_ids) # Add regularization gradient gradient -= self.regularization * self.weights reg_loss = 0.5 * self.regularization * np.sum(self.weights ** 2) # SGD update self.weights += learning_rate * gradient total_loss += loss + reg_loss if verbose: avg_loss = total_loss / len(X) print(f"Epoch {epoch+1}/{epochs}: loss = {avg_loss:.4f}") def predict(self, X: List[List[str]]) -> List[List[str]]: """ Predict labels for sequences. Args: X: List of token sequences Returns: List of predicted label sequences """ predictions = [] for tokens in X: label_ids = self._viterbi(tokens) labels = [self.id_to_label[lid] for lid in label_ids] predictions.append(labels) return predictions def _build_vocabularies(self, X: List[List[str]], y: List[List[str]]): """Build feature and label vocabularies from training data.""" # Collect labels labels = set() for label_seq in y: labels.update(label_seq) self.label_to_id = {l: i for i, l in enumerate(sorted(labels))} self.id_to_label = {i: l for l, i in self.label_to_id.items()} # Collect features (feature_name, label combinations) feature_set = set() L = len(self.label_to_id) for tokens in X: for i in range(len(tokens)): feat_dict = self._extract_features(tokens, i) for feat_name in feat_dict: for label_id in range(L): feature_set.add((feat_name, label_id)) # Add transition features for l1 in range(L + 1): # +1 for START for l2 in range(L): feature_set.add((f'trans_{l1}_{l2}', -1)) # -1 indicates transition self.feature_to_id = {f: i for i, f in enumerate(sorted(feature_set))} def _extract_features(self, tokens: List[str], position: int) -> Dict[str, float]: """Extract features at a position.""" features = {} word = tokens[position] n = len(tokens) features['word'] = word features['word_lower'] = word.lower() features['is_cap'] = word[0].isupper() if word else False # Suffixes and prefixes for length in [2, 3]: if len(word) >= length: features[f'suffix{length}'] = word[-length:].lower() features[f'prefix{length}'] = word[:length].lower() # Context words if position > 0: features['prev_word'] = tokens[position-1].lower() else: features['BOS'] = True if position < n - 1: features['next_word'] = tokens[position+1].lower() else: features['EOS'] = True return features def _compute_gradient( self, tokens: List[str], label_ids: List[int] ) -> Tuple[np.ndarray, float]: """Compute gradient for a single sequence.""" n = len(tokens) L = len(self.label_to_id) # Run forward-backward log_alpha, log_Z = self._forward(tokens) log_beta = self._backward(tokens) gradient = np.zeros(len(self.feature_to_id)) # Observed features (correct labels) for i in range(n): y = label_ids[i] y_prev = label_ids[i-1] if i > 0 else L # L = START # Emission features feat_dict = self._extract_features(tokens, i) for feat_name, value in feat_dict.items(): key = (feat_name, y) if key in self.feature_to_id: gradient[self.feature_to_id[key]] += 1.0 # Transition features trans_key = (f'trans_{y_prev}_{y}', -1) if trans_key in self.feature_to_id: gradient[self.feature_to_id[trans_key]] += 1.0 # Expected features (under model distribution) for i in range(n): feat_dict = self._extract_features(tokens, i) for y in range(L): # Marginal P(y_i = y | x) log_marginal = log_alpha[i, y] + log_beta[i, y] - log_Z marginal = np.exp(log_marginal) for feat_name in feat_dict: key = (feat_name, y) if key in self.feature_to_id: gradient[self.feature_to_id[key]] -= marginal # Transition expected features if i == 0: for y in range(L): trans_key = (f'trans_{L}_{y}', -1) # START -> y if trans_key in self.feature_to_id: log_marg = log_alpha[0, y] + log_beta[0, y] - log_Z gradient[self.feature_to_id[trans_key]] -= np.exp(log_marg) else: for y_prev in range(L): for y in range(L): trans_key = (f'trans_{y_prev}_{y}', -1) if trans_key in self.feature_to_id: # Pairwise marginal trans_score = self._get_transition_score(y_prev, y) emit_score = self._get_emission_score(tokens, i, y) log_marg = (log_alpha[i-1, y_prev] + trans_score + emit_score + log_beta[i, y] - log_Z) gradient[self.feature_to_id[trans_key]] -= np.exp(log_marg) # Compute loss = -log P(y|x) score = sum( self._get_emission_score(tokens, i, label_ids[i]) + self._get_transition_score(label_ids[i-1] if i > 0 else L, label_ids[i]) for i in range(n) ) loss = log_Z - score return gradient, loss def _forward(self, tokens: List[str]) -> Tuple[np.ndarray, float]: """Forward algorithm.""" n = len(tokens) L = len(self.label_to_id) log_alpha = np.full((n, L), -np.inf) # Initialize for y in range(L): trans = self._get_transition_score(L, y) # START -> y emit = self._get_emission_score(tokens, 0, y) log_alpha[0, y] = trans + emit # Recursion for i in range(1, n): for y in range(L): emit = self._get_emission_score(tokens, i, y) scores = [ log_alpha[i-1, y_prev] + self._get_transition_score(y_prev, y) + emit for y_prev in range(L) ] log_alpha[i, y] = self._logsumexp(np.array(scores)) log_Z = self._logsumexp(log_alpha[n-1]) return log_alpha, log_Z def _backward(self, tokens: List[str]) -> np.ndarray: """Backward algorithm.""" n = len(tokens) L = len(self.label_to_id) log_beta = np.full((n, L), -np.inf) log_beta[n-1] = 0.0 for i in range(n-2, -1, -1): for y in range(L): scores = [ self._get_transition_score(y, y_next) + self._get_emission_score(tokens, i+1, y_next) + log_beta[i+1, y_next] for y_next in range(L) ] log_beta[i, y] = self._logsumexp(np.array(scores)) return log_beta def _viterbi(self, tokens: List[str]) -> List[int]: """Viterbi decoding.""" n = len(tokens) L = len(self.label_to_id) delta = np.full((n, L), -np.inf) psi = np.zeros((n, L), dtype=int) # Initialize for y in range(L): delta[0, y] = (self._get_transition_score(L, y) + self._get_emission_score(tokens, 0, y)) # Recursion for i in range(1, n): for y in range(L): emit = self._get_emission_score(tokens, i, y) scores = [ delta[i-1, y_prev] + self._get_transition_score(y_prev, y) + emit for y_prev in range(L) ] psi[i, y] = np.argmax(scores) delta[i, y] = scores[psi[i, y]] # Backtrack path = [np.argmax(delta[n-1])] for i in range(n-1, 0, -1): path.append(psi[i, path[-1]]) return list(reversed(path)) def _get_emission_score(self, tokens: List[str], pos: int, label: int) -> float: """Get emission score for position and label.""" feat_dict = self._extract_features(tokens, pos) score = 0.0 for feat_name in feat_dict: key = (feat_name, label) if key in self.feature_to_id: score += self.weights[self.feature_to_id[key]] return score def _get_transition_score(self, prev_label: int, label: int) -> float: """Get transition score.""" key = (f'trans_{prev_label}_{label}', -1) if key in self.feature_to_id: return self.weights[self.feature_to_id[key]] return 0.0 @staticmethod def _logsumexp(arr: np.ndarray) -> float: max_val = np.max(arr) if max_val == -np.inf: return -np.inf return max_val + np.log(np.sum(np.exp(arr - max_val))) # Example usageif __name__ == "__main__": # Sample training data X_train = [ ["John", "works", "at", "Google", "."], ["Mary", "lives", "in", "Paris", "."], ] y_train = [ ["B-PER", "O", "O", "B-ORG", "O"], ["B-PER", "O", "O", "B-LOC", "O"], ] # Train model model = CRFSequenceLabeler(regularization=0.1) model.fit(X_train, y_train, epochs=50, learning_rate=0.1, verbose=True) # Predict X_test = [["Tom", "joined", "Microsoft", "."]] predictions = model.predict(X_test) print("\nPredictions:") for tokens, labels in zip(X_test, predictions): for tok, lab in zip(tokens, labels): print(f" {tok}: {lab}")Understanding why a model fails is as important as building it. Systematic error analysis reveals opportunities for improvement.
Error Analysis Process:
Debugging Checklist:
| Issue | Possible Cause | Solution |
|---|---|---|
| Very low F1 | Feature bugs, data format issues | Check feature extraction on examples |
| High precision, low recall | Model too conservative | Reduce regularization, add recall-oriented features |
| Low precision, high recall | Model too aggressive | Increase regularization |
| Poor on rare types | Insufficient training examples | Augment data, use transfer learning |
| Good dev, poor test | Domain shift | Analyze test domain differences |
| Training doesn't converge | Learning rate issues | Adjust learning rate, check gradients |
Build a confusion matrix for entity types. If PER is often confused with ORG, examine the features—are company names and person names sharing too many features? Add discriminative features like gazetteers or context patterns.
We have covered the complete application of CRFs to sequence labeling tasks. Here are the key concepts:
Module Complete:
You now have complete knowledge of Conditional Random Fields—from mathematical formulation through linear-chain structure, feature engineering, training algorithms, to practical sequence labeling applications. CRFs represent a powerful fusion of probabilistic modeling with structured prediction, and their principles underlie many modern sequence labeling systems.
Where CRFs Fit Today:
While neural approaches (BiLSTM-CRF, Transformer-CRF) have largely replaced traditional feature engineering, the CRF layer itself remains valuable. State-of-the-art named entity recognition and other sequence labeling systems continue to use CRF decoders on top of neural encoders, demonstrating the enduring importance of structured output modeling.
Congratulations! You have mastered Conditional Random Fields. You understand the mathematical formulation, linear-chain structure, feature engineering, training algorithms, and sequence labeling applications. This knowledge forms the foundation for advanced structured prediction and modern neural sequence labeling systems.