Loading learning content...
You've learned the theory of KNN-based detection, LOF, LOCI, and the curse of dimensionality. But when it comes time to deploy, a critical question remains: what values should you use for k, thresholds, and other parameters?
This isn't merely an implementation detail—parameter choices can make the difference between a detector that catches 90% of anomalies with 1% false positives and one that either misses most anomalies or drowns you in false alarms.
Unfortunately, there's no universal answer. The optimal k for a fraud detection system processing millions of transactions differs from the optimal k for a sensor anomaly detector monitoring 100 devices. The right threshold depends on your tolerance for false positives versus false negatives.
This page synthesizes the lessons from the entire module into a comprehensive parameter selection methodology. You'll learn principled approaches for choosing k, setting thresholds, validating choices, and adapting parameters as data evolves.
By the end of this page, you will: (1) Master methods for selecting the k parameter across different methods and data characteristics, (2) Understand principled threshold selection including statistical and business-driven approaches, (3) Learn validation strategies for unsupervised anomaly detection, (4) Develop robust parameter selection pipelines for production systems, and (5) Know how to monitor and adapt parameters over time.
The parameter k (number of neighbors) is shared by KNN-based scoring, LOF, and conceptually by LOCI (through its radius settings). Its selection profoundly impacts detection behavior.
The Fundamental Tradeoff:
| Small k | Large k |
|---|---|
| Sensitive to local variations | Smooths over local structure |
| Captures micro-anomalies | Detects macro-anomalies |
| Vulnerable to noise | Robust to noise |
| May miss clustered anomalies | May miss isolated anomalies |
| Computationally cheap | Computationally stable |
Theoretical Guidelines:
Rule 1: k should exceed expected anomaly cluster size If anomalies might appear in groups of 5, use k ≥ 10 to ensure normal points influence scoring.
Rule 2: k should be smaller than smallest normal cluster If your smallest cluster has 50 points, k < 50 ensures neighbors come from the same cluster.
Rule 3: k = √n is a starting point, not a recommendation The often-cited k = √n rule provides a scale-appropriate starting point but rarely gives optimal results.
Rule 4: Higher contamination requires larger k With 10% anomalies, k = 10 means on average 1 anomaly in every normal point's neighborhood—not good. Use k ≥ 1/contamination.
Data-Driven k Selection:
Method 1: Cross-Validation (when labels exist)
The gold standard when you have labeled anomalies (even partial labels):
For k in candidate_values:
Compute anomaly scores
Calculate AUC-ROC or F1 against labels
Select k maximizing performance metric
Note: Use held-out data for validation, not training data.
Method 2: Score Stability Analysis (unsupervised)
Examine how anomaly rankings change with k:
Method 3: Contamination-Based Selection
If expected contamination rate c is known:
$$k_{min} = \lceil 1/c \rceil$$
Example: 5% contamination → k ≥ 20
Method 4: Silhouette-Inspired Approach
For each candidate k:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
import numpy as npfrom sklearn.neighbors import LocalOutlierFactorfrom scipy.stats import spearmanrfrom typing import List, Tuple, Optional def select_k_by_stability(X: np.ndarray, k_candidates: List[int] = None, stability_threshold: float = 0.9) -> int: """ Select k based on ranking stability across k values. A good k produces stable anomaly rankings when k is slightly perturbed. Parameters: ----------- X : np.ndarray Input data k_candidates : list k values to evaluate stability_threshold : float Minimum rank correlation for stability Returns: -------- best_k : int Selected k value """ n_samples = X.shape[0] if k_candidates is None: max_k = min(50, n_samples // 5) k_candidates = list(range(5, max_k + 1, 5)) # Compute scores for all k values all_scores = {} for k in k_candidates: lof = LocalOutlierFactor(n_neighbors=k, contamination='auto') lof.fit(X) all_scores[k] = -lof.negative_outlier_factor_ # Compute rank correlations between adjacent k values stability_scores = {} for i, k in enumerate(k_candidates[:-1]): next_k = k_candidates[i + 1] corr, _ = spearmanr(all_scores[k], all_scores[next_k]) # Also check correlation with k+5 if available if i + 2 < len(k_candidates): corr2, _ = spearmanr(all_scores[k], all_scores[k_candidates[i + 2]]) stability_scores[k] = (corr + corr2) / 2 else: stability_scores[k] = corr # Find k with highest stability that exceeds threshold stable_ks = [k for k, s in stability_scores.items() if s >= stability_threshold] if stable_ks: # Among stable k values, prefer middle of range best_k = sorted(stable_ks)[len(stable_ks) // 2] else: # Fall back to most stable k best_k = max(stability_scores, key=stability_scores.get) print(f"Stability scores: {stability_scores}") print(f"Selected k = {best_k}") return best_k def select_k_by_separation(X: np.ndarray, contamination: float = 0.05, k_candidates: List[int] = None) -> int: """ Select k that maximizes separation between anomaly and normal scores. Parameters: ----------- X : np.ndarray Input data contamination : float Expected proportion of anomalies k_candidates : list k values to evaluate Returns: -------- best_k : int Selected k value """ n_samples = X.shape[0] n_anomalies = int(contamination * n_samples) if k_candidates is None: max_k = min(50, n_samples // 5) k_candidates = list(range(5, max_k + 1, 5)) best_k = k_candidates[0] best_separation = 0 for k in k_candidates: lof = LocalOutlierFactor(n_neighbors=k, contamination='auto') lof.fit(X) scores = -lof.negative_outlier_factor_ # Split into "normal" and "anomaly" by score threshold threshold = np.percentile(scores, 100 * (1 - contamination)) normal_scores = scores[scores <= threshold] anomaly_scores = scores[scores > threshold] if len(anomaly_scores) == 0: continue # Separation: difference in means relative to combined std separation = (np.mean(anomaly_scores) - np.mean(normal_scores)) / np.std(scores) if separation > best_separation: best_separation = separation best_k = k print(f"Best k = {best_k} with separation = {best_separation:.3f}") return best_k def ensemble_k_selection(X: np.ndarray, k_range: Tuple[int, int] = (5, 50), n_k: int = 10) -> np.ndarray: """ Instead of selecting single k, compute ensemble scores across k values. This is often more robust than any single k selection method. Returns: -------- ensemble_scores : np.ndarray Averaged LOF scores across k values """ k_values = np.linspace(k_range[0], k_range[1], n_k, dtype=int) k_values = np.unique(k_values) # Remove duplicates all_scores = np.zeros((len(X), len(k_values))) for i, k in enumerate(k_values): lof = LocalOutlierFactor(n_neighbors=k, contamination='auto') lof.fit(X) scores = -lof.negative_outlier_factor_ # Normalize scores to [0, 1] for fair averaging scores_norm = (scores - scores.min()) / (scores.max() - scores.min() + 1e-10) all_scores[:, i] = scores_norm # Ensemble: average normalized scores ensemble_scores = np.mean(all_scores, axis=1) return ensemble_scoresOnce you have anomaly scores, you need a threshold to convert scores to binary decisions. This is often the most operationally important parameter.
Categories of Threshold Selection:
1. Contamination-Based (Percentile) Assume a known anomaly rate and take the corresponding percentile: $$\tau = Q_{1-c}(\text{scores})$$
Pros: Simple, produces expected number of detections Cons: Assumes contamination is known; ignores score distribution shape
2. Statistical (Standard Deviations) Assume scores follow a distribution and use statistical cutoffs: $$\tau = \mu + z \cdot \sigma$$
For z = 3: ~0.3% false positive rate under normality For z = 2: ~2.5% false positive rate
Pros: Principled statistical interpretation Cons: Assumes specific distribution (often violated)
3. Robust Statistical Use median and MAD instead of mean and std: $$\tau = \text{median} + z \cdot (1.4826 \times \text{MAD})$$
Pros: Robust to contamination in training data Cons: More conservative (higher threshold)
4. Cost-Based Threshold
When the costs of false positives (FP) and false negatives (FN) are known: $$\tau^* = \arg\min_\tau \left[ C_{FP} \cdot FP(\tau) + C_{FN} \cdot FN(\tau) \right]$$
Example: In fraud detection, missing a fraud (FN) costs $10,000 but investigating a false alarm (FP) costs $50. The optimal threshold heavily favors catching frauds.
5. Precision-at-K
In operational settings, you may only be able to investigate the top-K predictions:
6. Business Constraint-Based
Common constraints:
| Method | When to Use | Key Assumption | Formula |
|---|---|---|---|
| Contamination | Known anomaly rate | Contamination is accurate | 99th percentile for 1% |
| Mean + 3σ | Score normality expected | Normal distribution | μ + 3σ |
| Median + 3 MAD | Contaminated or skewed data | Approximate normality | med + 4.45×MAD |
| Cost-based | Known cost structure | Costs are accurate | Min(C_FP × FP + C_FN × FN) |
| Top-K | Fixed investigation capacity | K is operationally determined | Score(K) |
| F1 optimal | Labeled validation data | Labels are accurate | Max F1 |
Start with robust statistical threshold (median + 3×MAD). This works reasonably well without labeled data, is robust to contamination, and has clear statistical interpretation. Refine based on false positive feedback once in production.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
import numpy as npfrom scipy import statsfrom typing import Tuple, Optional def percentile_threshold(scores: np.ndarray, contamination: float = 0.05) -> float: """Contamination-based threshold.""" return np.percentile(scores, 100 * (1 - contamination)) def statistical_threshold(scores: np.ndarray, n_sigma: float = 3.0, robust: bool = True) -> float: """ Statistical threshold based on standard deviations. Parameters: ----------- scores : np.ndarray Anomaly scores n_sigma : float Number of standard deviations robust : bool If True, use median/MAD instead of mean/std Returns: -------- threshold : float """ if robust: center = np.median(scores) mad = np.median(np.abs(scores - center)) scale = 1.4826 * mad # Scale factor for normal distribution else: center = np.mean(scores) scale = np.std(scores) return center + n_sigma * scale def cost_optimal_threshold(scores: np.ndarray, labels: np.ndarray, cost_fp: float = 1.0, cost_fn: float = 10.0, n_thresholds: int = 100) -> Tuple[float, float]: """ Find threshold minimizing expected cost. Parameters: ----------- scores : np.ndarray Anomaly scores labels : np.ndarray True labels (1 = anomaly) cost_fp : float Cost of false positive cost_fn : float Cost of false negative n_thresholds : int Number of thresholds to evaluate Returns: -------- best_threshold : float min_cost : float """ thresholds = np.linspace(scores.min(), scores.max(), n_thresholds) best_threshold = thresholds[0] min_cost = float('inf') for thresh in thresholds: predictions = (scores > thresh).astype(int) fp = np.sum((predictions == 1) & (labels == 0)) fn = np.sum((predictions == 0) & (labels == 1)) cost = cost_fp * fp + cost_fn * fn if cost < min_cost: min_cost = cost best_threshold = thresh return best_threshold, min_cost def f1_optimal_threshold(scores: np.ndarray, labels: np.ndarray, n_thresholds: int = 100) -> Tuple[float, float]: """ Find threshold maximizing F1 score. Parameters: ----------- scores : np.ndarray Anomaly scores labels : np.ndarray True labels (1 = anomaly) Returns: -------- best_threshold : float max_f1 : float """ from sklearn.metrics import f1_score thresholds = np.linspace(scores.min(), scores.max(), n_thresholds) best_threshold = thresholds[0] max_f1 = 0 for thresh in thresholds: predictions = (scores > thresh).astype(int) f1 = f1_score(labels, predictions, zero_division=0) if f1 > max_f1: max_f1 = f1 best_threshold = thresh return best_threshold, max_f1 def capacity_threshold(scores: np.ndarray, daily_capacity: int = 100) -> float: """ Threshold based on investigation capacity. Sets threshold to catch exactly 'daily_capacity' anomalies. """ sorted_scores = np.sort(scores)[::-1] # Descending if daily_capacity >= len(scores): return sorted_scores[-1] return sorted_scores[daily_capacity - 1]Anomaly detection is typically unsupervised—we don't have labeled anomalies for training. This makes validation challenging but not impossible.
Strategy 1: Synthetic Anomaly Injection
Create artificial anomalies and measure detection:
Pros: Works without real labels Cons: Synthetic anomalies may not match real anomaly patterns
Strategy 2: Historical Feedback Loop
Use operational feedback to validate:
Pros: Real-world validation; improves over time Cons: Requires operational integration; slow feedback
Strategy 3: Internal Validation Metrics
Evaluate detection quality using intrinsic properties:
Contrast Ratio: $$CR = \frac{\text{mean}(\text{top-k% scores})}{\text{mean}(\text{bottom-k% scores})}$$ Higher is better—anomalies should score much higher than normals.
Score Separation: $$S = \frac{E[\text{anomaly scores}] - E[\text{normal scores}]}{\sigma(\text{all scores})}$$ Measures how many standard deviations separate anomaly from normal.
Stability Index: Measure ranking consistency across perturbations (k values, bootstrap samples).
Strategy 4: Expert Labeling of Samples
When full labeling is impossible:
This gives a practical estimate of detector quality with minimal labeling effort.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
import numpy as npfrom sklearn.neighbors import LocalOutlierFactorfrom typing import Tuple, Dict def inject_synthetic_anomalies(X: np.ndarray, n_anomalies: int = 50, method: str = 'uniform') -> Tuple[np.ndarray, np.ndarray]: """ Inject synthetic anomalies for validation. Parameters: ----------- X : np.ndarray Normal data n_anomalies : int Number of anomalies to inject method : str 'uniform', 'local', or 'cluster' Returns: -------- X_augmented : np.ndarray Data with injected anomalies labels : np.ndarray 0 for normal, 1 for synthetic anomaly """ n_samples, n_features = X.shape if method == 'uniform': # Random points in expanded bounding box X_min, X_max = X.min(axis=0), X.max(axis=0) X_range = X_max - X_min # Expand range by 50% in each direction anomalies = np.random.uniform( X_min - 0.5 * X_range, X_max + 0.5 * X_range, size=(n_anomalies, n_features) ) elif method == 'local': # Shift random normal points outward indices = np.random.choice(n_samples, n_anomalies, replace=False) anomalies = X[indices].copy() # Compute centroid and shift away centroid = X.mean(axis=0) directions = anomalies - centroid directions = directions / (np.linalg.norm(directions, axis=1, keepdims=True) + 1e-10) # Shift by 3 standard deviations shift = 3 * X.std(axis=0).mean() anomalies = anomalies + shift * directions elif method == 'cluster': # Create a small cluster in empty region X_min, X_max = X.min(axis=0), X.max(axis=0) # Find corner with least density corners = np.array([ [X_min[i] if (c >> i) & 1 else X_max[i] for i in range(n_features)] for c in range(2 ** min(n_features, 8)) ]) # Pick random corner center = corners[np.random.randint(len(corners))] # Generate cluster around this corner anomalies = center + np.random.randn(n_anomalies, n_features) * 0.5 else: raise ValueError(f"Unknown method: {method}") X_augmented = np.vstack([X, anomalies]) labels = np.array([0] * n_samples + [1] * n_anomalies) return X_augmented, labels def evaluate_on_synthetic(X: np.ndarray, k: int = 20, contamination: float = 0.05) -> Dict[str, float]: """ Evaluate detector using synthetic anomaly injection. """ from sklearn.metrics import precision_score, recall_score, f1_score results = {} for method in ['uniform', 'local', 'cluster']: n_anomalies = int(contamination * len(X) / (1 - contamination)) X_aug, y_true = inject_synthetic_anomalies(X, n_anomalies, method) # Run detector lof = LocalOutlierFactor(n_neighbors=k, contamination=contamination) y_pred = lof.fit_predict(X_aug) y_pred = (y_pred == -1).astype(int) # Convert to 0/1 results[method] = { 'precision': precision_score(y_true, y_pred, zero_division=0), 'recall': recall_score(y_true, y_pred, zero_division=0), 'f1': f1_score(y_true, y_pred, zero_division=0) } return results def compute_internal_validation(scores: np.ndarray, contamination: float = 0.05) -> Dict[str, float]: """ Compute internal validation metrics (no labels required). """ n_samples = len(scores) n_top = int(contamination * n_samples) n_bottom = int((1 - contamination) * n_samples * 0.5) sorted_scores = np.sort(scores) top_scores = sorted_scores[-n_top:] bottom_scores = sorted_scores[:n_bottom] metrics = { 'contrast_ratio': np.mean(top_scores) / (np.mean(bottom_scores) + 1e-10), 'score_separation': (np.mean(top_scores) - np.mean(bottom_scores)) / (np.std(scores) + 1e-10), 'coefficient_of_variation': np.std(scores) / (np.mean(scores) + 1e-10), 'skewness': float(((scores - scores.mean()) ** 3).mean() / (scores.std() ** 3 + 1e-10)) } return metricsIn order of preference: (1) Real labeled validation data, (2) Expert labeling of samples, (3) Operational feedback loop, (4) Synthetic anomaly injection, (5) Internal validation metrics. Use the highest-quality validation available, but always use something—never deploy blind.
Putting it all together, here's a comprehensive pipeline for parameter selection in production anomaly detection systems.
Phase 1: Initial Setup
1. Data Assessment:
- Compute dimensionality diagnostics
- Apply dimensionality reduction if needed (d > 30)
- Verify distance contrast ratio > 2
2. Initial k Selection:
- If contamination known: k >= 1/contamination
- If cluster sizes known: k < smallest cluster
- Default: k = 20 for typical datasets
- Run stability analysis to refine
3. Initial Threshold:
- Compute robust statistical threshold (median + 3×MAD)
- This is a conservative starting point
Phase 2: Validation
4. If labels available:
- Use cross-validation to optimize k and threshold for F1
- Compute AUC-ROC as quality measure
- Target: AUC > 0.85
5. If no labels:
- Inject synthetic anomalies
- Compute internal validation metrics
- Target: contrast ratio > 3, separation > 2σ
6. Stress Testing:
- Verify stability across k variations
- Check ranking consistency (>70% overlap in top-10)
- Test on bootstrap samples
Phase 3: Deployment
7. Deploy with Monitoring:
- Log all scores, not just detections
- Track score distribution over time
- Alert on distribution shifts
8. Feedback Integration:
- Collect analyst feedback on detected anomalies
- Compute operational precision weekly
- Adjust threshold to maintain target precision
9. Periodic Retraining:
- Re-fit detector monthly (or on significant drift)
- Re-evaluate k and threshold
- Document parameter changes
Phase 4: Continuous Improvement
10. A/B Testing:
- When considering parameter changes, run parallel detectors
- Compare precision/recall on shared investigation budget
- Adopt better configuration
11. Ensemble Evolution:
- Maintain ensemble of k values if stable single k elusive
- Add/remove k values based on contribution to detection quality
12. Documentation:
- Record all parameter choices and rationale
- Track performance metrics over time
- Build institutional knowledge
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
import numpy as npfrom sklearn.neighbors import LocalOutlierFactorfrom sklearn.decomposition import PCAfrom dataclasses import dataclassfrom typing import Optional, Dictimport jsonfrom datetime import datetime @dataclassclass DetectorConfig: """Configuration for production anomaly detector.""" k: int = 20 threshold_method: str = 'robust_statistical' threshold_value: Optional[float] = None n_sigma: float = 3.0 contamination: float = 0.05 reduce_dim: Optional[int] = None ensemble_k_values: Optional[list] = None def to_dict(self) -> dict: return self.__dict__.copy() class ProductionAnomalyDetector: """ Production-ready anomaly detector with complete parameter management. """ def __init__(self, config: DetectorConfig = None): self.config = config or DetectorConfig() self.pca_: Optional[PCA] = None self.lof_: Optional[LocalOutlierFactor] = None self.threshold_: Optional[float] = None self.score_stats_: Dict[str, float] = {} self._fitted = False self._fit_timestamp = None def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None): """ Fit detector with automatic parameter selection. Parameters: ----------- X : np.ndarray Training data (assumed mostly normal) y : np.ndarray, optional Labels for validation (1 = anomaly) """ n_samples, n_features = X.shape # Step 1: Dimensionality reduction if needed X_processed = self._preprocess(X, fit=True) # Step 2: Determine k if using ensemble if self.config.ensemble_k_values: k_values = self.config.ensemble_k_values self.lof_ = [ LocalOutlierFactor(n_neighbors=k, contamination='auto', novelty=True) for k in k_values ] for lof in self.lof_: lof.fit(X_processed) else: # Step 2b: Validate k k = min(self.config.k, n_samples - 1) if k < 5: raise ValueError(f"k={k} too small for n_samples={n_samples}") self.lof_ = LocalOutlierFactor( n_neighbors=k, contamination='auto', novelty=True ) self.lof_.fit(X_processed) # Step 3: Compute training scores for threshold training_scores = self._compute_scores_internal(X_processed) # Step 4: Compute and store score statistics self.score_stats_ = { 'mean': float(np.mean(training_scores)), 'std': float(np.std(training_scores)), 'median': float(np.median(training_scores)), 'mad': float(np.median(np.abs(training_scores - np.median(training_scores)))), 'min': float(np.min(training_scores)), 'max': float(np.max(training_scores)), 'p95': float(np.percentile(training_scores, 95)), 'p99': float(np.percentile(training_scores, 99)) } # Step 5: Set threshold if self.config.threshold_value is not None: self.threshold_ = self.config.threshold_value elif self.config.threshold_method == 'robust_statistical': robust_std = 1.4826 * self.score_stats_['mad'] self.threshold_ = self.score_stats_['median'] + self.config.n_sigma * robust_std elif self.config.threshold_method == 'contamination': self.threshold_ = np.percentile(training_scores, 100 * (1 - self.config.contamination)) else: raise ValueError(f"Unknown threshold method: {self.config.threshold_method}") # Step 6: Validate if labels provided if y is not None: self._validate(training_scores, y) self._fitted = True self._fit_timestamp = datetime.now().isoformat() return self def _preprocess(self, X: np.ndarray, fit: bool = True) -> np.ndarray: """Apply dimensionality reduction if configured.""" if self.config.reduce_dim is None: return X if fit: self.pca_ = PCA(n_components=self.config.reduce_dim) return self.pca_.fit_transform(X) else: return self.pca_.transform(X) def _compute_scores_internal(self, X: np.ndarray) -> np.ndarray: """Compute scores on preprocessed data.""" if isinstance(self.lof_, list): # Ensemble scoring all_scores = np.array([ -lof.score_samples(X) for lof in self.lof_ ]) # Average normalized scores all_scores_norm = (all_scores - all_scores.min(axis=1, keepdims=True)) / \ (all_scores.max(axis=1, keepdims=True) - all_scores.min(axis=1, keepdims=True) + 1e-10) return np.mean(all_scores_norm, axis=0) else: return -self.lof_.score_samples(X) def _validate(self, scores: np.ndarray, y: np.ndarray): """Validate detection quality.""" from sklearn.metrics import roc_auc_score, precision_score, recall_score auc = roc_auc_score(y, scores) predictions = (scores > self.threshold_).astype(int) precision = precision_score(y, predictions, zero_division=0) recall = recall_score(y, predictions, zero_division=0) print(f"Validation: AUC={auc:.3f}, Precision={precision:.3f}, Recall={recall:.3f}") if auc < 0.7: print("WARNING: AUC < 0.7 - detection quality is poor") def predict(self, X: np.ndarray) -> np.ndarray: """Predict anomaly labels (1 = anomaly).""" scores = self.score_samples(X) return (scores > self.threshold_).astype(int) def score_samples(self, X: np.ndarray) -> np.ndarray: """Compute anomaly scores (higher = more anomalous).""" if not self._fitted: raise RuntimeError("Detector not fitted") X_processed = self._preprocess(X, fit=False) return self._compute_scores_internal(X_processed) def export_config(self, filepath: str): """Export configuration for reproducibility.""" config = { 'detector_config': self.config.to_dict(), 'score_stats': self.score_stats_, 'threshold': self.threshold_, 'fit_timestamp': self._fit_timestamp } with open(filepath, 'w') as f: json.dump(config, f, indent=2) print(f"Configuration exported to {filepath}")Production anomaly detection requires ongoing monitoring and parameter adaptation. Data drifts, anomaly patterns evolve, and what worked last month may fail today.
Key Metrics to Monitor:
1. Score Distribution Metrics
Alert if: Significant shift from baseline (e.g., >20% change in median)
2. Detection Rate Metrics
Alert if: Detection rate changes unexpectedly (sudden spike or drop)
3. Validation Metrics
Alert if: Precision drops below target (e.g., <50%)
4. Data Drift Metrics
Alert if: Contrast ratio drops or data characteristics shift
Adaptation Strategies:
Reactive Adaptation: Adjust parameters when monitoring alerts trigger.
Example workflow:
Proactive Adaptation: Periodically re-optimize parameters on fresh data.
Example workflow (monthly):
Drift-Triggered Adaptation: Automatically re-train when data drift detected.
Example workflow:
Frequent parameter changes create operational instability (analysts can't learn what to expect). Infrequent changes miss evolving patterns. Balance: monthly re-optimization with 2-week minimum between changes. Document all changes thoroughly.
| Metric | Frequency | Alert Threshold | Response |
|---|---|---|---|
| Score median | Daily | ±20% from baseline | Investigate data drift |
| Detection count | Daily | ±50% from 7-day average | Check for incidents or drift |
| Operational precision | Weekly | <target (e.g., 50%) | Raise threshold |
| Contrast ratio | Weekly | <2 | Reduce dimensions or change method |
| k stability | Monthly | High variance in rankings | Move to ensemble or re-optimize k |
| Threshold crossing rate | Weekly | 10% of points | Threshold too low or contamination high |
We've synthesized the module's lessons into a comprehensive approach to parameter selection for distance-based anomaly detection.
Module Complete:
You've now completed the Distance-Based Methods module of Anomaly & Outlier Detection. From the foundational KNN-based detection through LOF, LOCI, the curse of dimensionality, and parameter selection, you possess comprehensive knowledge of this essential class of anomaly detection algorithms.
These methods form the backbone of many production anomaly detection systems. While newer methods like Isolation Forest and deep learning approaches exist, distance-based methods remain valuable for their interpretability, theoretical foundations, and effectiveness on appropriately-sized, moderate-dimensional data.
Congratulations! You've mastered distance-based anomaly detection—KNN methods, LOF, LOCI, the curse of dimensionality, and production parameter selection. This knowledge equips you to build, deploy, and maintain effective anomaly detection systems across a wide range of applications.