Loading content...
Production anomaly detection systems must handle datasets far beyond what textbook algorithms assume. Millions of transactions per day, billions of log entries, continuous sensor streams—these scales demand algorithms that go beyond O(n²) complexity.
The fundamental operations in density-based anomaly detection—finding k-nearest neighbors, computing pairwise distances, estimating local densities—are inherently expensive. A naive LOF implementation requires O(n²) distance computations, making it impractical for datasets with even 100,000 points.
This page explores the techniques that transform density-based anomaly detection from research prototypes into production-ready systems: spatial indexing, approximate algorithms, sampling strategies, distributed computation, and streaming adaptations.
This page covers: (1) Spatial indexing for efficient nearest neighbor search; (2) Approximate nearest neighbor algorithms (LSH, ANNOY, HNSW); (3) Sampling strategies for large-scale detection; (4) Streaming and incremental anomaly detection; (5) Distributed and parallel implementations; and (6) Production deployment considerations.
Before resorting to approximations, spatial data structures can dramatically accelerate exact nearest neighbor queries.
A k-d tree recursively partitions d-dimensional space by splitting on one dimension at each level.
Construction: O(n log n) Query (exact k-NN): O(k log n) average case, O(kn^(1-1/d)) worst case Space: O(n)
How it works:
Effectiveness: Excellent for d < 20. For higher dimensions, the curse of dimensionality causes nearly all branches to be visited (tree degenerates to linear search).
Ball trees partition data into nested hyperspheres rather than axis-aligned hyperplanes.
Construction: O(n log n) Query: O(k log n) when data is well-clustered Space: O(n)
Advantage over k-d trees: Better for data with spherical cluster structure or when using metrics other than Euclidean.
Disadvantage: Higher constant factors in construction and query.
Originally designed for spatial databases, R-trees use minimum bounding rectangles:
Cover trees achieve theoretically optimal query complexity for doubling dimension data:
Query: O(c^12 log n) where c is the expansion constant (intrinsic dimensionality)
For data lying on a low-dimensional manifold, cover trees can dramatically outperform k-d trees.
| Index | Build Time | Query Time (avg) | Best For | Limitation |
|---|---|---|---|---|
| k-d Tree | O(n log n) | O(k log n) | d < 20, Euclidean | High dimensions |
| Ball Tree | O(n log n) | O(k log n) | Clustered data, other metrics | High constant factors |
| R-Tree | O(n log n) | O(k log n) | d ≤ 10, spatial data | Very high dimensions |
| Cover Tree | O(n log n) | O(c^12 log n) | Low intrinsic dimension | Complex implementation |
| Brute Force | O(1) | O(n) | Baseline | Too slow for large n |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
import numpy as npimport timefrom sklearn.neighbors import NearestNeighbors, LocalOutlierFactor def benchmark_spatial_indices(n_samples: int, n_features: int, k: int = 20): """ Benchmark different spatial indexing strategies for k-NN. """ np.random.seed(42) X = np.random.randn(n_samples, n_features) results = {} # Brute Force start = time.time() nn_brute = NearestNeighbors(n_neighbors=k, algorithm='brute') nn_brute.fit(X) nn_brute.kneighbors(X) results['brute'] = time.time() - start # k-d Tree start = time.time() nn_kd = NearestNeighbors(n_neighbors=k, algorithm='kd_tree') nn_kd.fit(X) nn_kd.kneighbors(X) results['kd_tree'] = time.time() - start # Ball Tree start = time.time() nn_ball = NearestNeighbors(n_neighbors=k, algorithm='ball_tree') nn_ball.fit(X) nn_ball.kneighbors(X) results['ball_tree'] = time.time() - start return results def benchmark_lof_scalability(): """ Benchmark LOF at different scales to show real-world timing. """ print("LOF Scalability Benchmark:") print("=" * 60) print(f"{'n_samples':<12} {'d=5 (sec)':<12} {'d=20 (sec)':<12} {'d=50 (sec)':<12}") print("-" * 60) for n in [1000, 5000, 10000, 25000]: times = [] for d in [5, 20, 50]: np.random.seed(42) X = np.random.randn(n, d) start = time.time() lof = LocalOutlierFactor(n_neighbors=20, algorithm='auto') lof.fit_predict(X) elapsed = time.time() - start times.append(elapsed) print(f"{n:<12} {times[0]:<12.3f} {times[1]:<12.3f} {times[2]:<12.3f}") # Run benchmarksprint("Spatial Index Benchmark (n=10000, k=20):")print("-" * 40)for d in [5, 10, 20, 50]: results = benchmark_spatial_indices(10000, d) best = min(results, key=results.get) print(f"d={d}: Best={best} ({results[best]:.3f}s), " f"Speedup over brute: {results['brute']/results[best]:.1f}x") print()benchmark_lof_scalability()When using sklearn's NearestNeighbors with algorithm='auto', it selects brute force for small datasets, ball_tree for high-dimensional data, and kd_tree otherwise. This is often a good default, but for specific use cases (e.g., very high dimensions or specific metrics), explicit selection may be better.
When exact nearest neighbors become too expensive, approximate methods trade accuracy for speed. For anomaly detection, this tradeoff is often acceptable—we need to identify anomalous points, not perfectly rank every distance.
LSH uses hash functions that map similar points to the same bucket with high probability.
Core idea: If $d(\mathbf{x}, \mathbf{y}) \leq r$, then $P(h(\mathbf{x}) = h(\mathbf{y})) \geq p_1$. If $d(\mathbf{x}, \mathbf{y}) > cr$, then $P(h(\mathbf{x}) = h(\mathbf{y})) \leq p_2 < p_1$.
For Euclidean distance (E2LSH): $$h_{\mathbf{a},b}(\mathbf{x}) = \left\lfloor \frac{\mathbf{a} \cdot \mathbf{x} + b}{w} \right\rfloor$$
where $\mathbf{a}$ is a random vector from Gaussian distribution and $b$ is uniform in [0, w].
Query:
Complexity: O(d × L) per query where L is the number of hash tables.
ANNOY builds a forest of random projection trees:
Query:
Characteristics:
HNSW builds a multi-layer graph where upper layers have long-range connections:
Structure:
Characteristics:
| Algorithm | Index Build | Query Time | Memory | Updates | Best For |
|---|---|---|---|---|---|
| LSH | O(nL) | O(dL) | O(nL) | Possible | Very high dimensions |
| ANNOY | O(n log n × trees) | O(log n × trees) | O(n) | No (rebuild) | Static datasets |
| HNSW | O(n log n) | O(log n) | Higher | Yes | Dynamic, high recall needed |
| IVF-Flat | O(n + clusters) | O(n/clusters) | O(n) | Possible | GPU acceleration (FAISS) |
| PQ | O(n) | O(n/clusters) | Low | Limited | Billion-scale, memory limited |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
import numpy as npimport time # Note: These require additional packages# pip install annoy hnswlib faiss-cpu def demo_approximate_nn_lof(): """ Demonstrate approximate k-NN based LOF for large-scale data. """ from sklearn.neighbors import LocalOutlierFactor class ApproximateLOF: """ LOF using approximate nearest neighbors for scalability. Uses ANNOY for fast approximate k-NN queries. """ def __init__(self, k: int = 20, n_trees: int = 50): self.k = k self.n_trees = n_trees def fit_predict(self, X: np.ndarray) -> np.ndarray: """ Compute approximate LOF scores. """ try: from annoy import AnnoyIndex except ImportError: print("ANNOY not installed, falling back to sklearn") lof = LocalOutlierFactor(n_neighbors=self.k) lof.fit_predict(X) return -lof.negative_outlier_factor_ n_samples, n_features = X.shape # Build ANNOY index index = AnnoyIndex(n_features, 'euclidean') for i in range(n_samples): index.add_item(i, X[i]) index.build(self.n_trees) # Compute approximate k-NN distances k_distances = np.zeros(n_samples) neighbors = np.zeros((n_samples, self.k), dtype=int) distances = np.zeros((n_samples, self.k)) for i in range(n_samples): # Get k+1 neighbors (including self) nn_idx, nn_dist = index.get_nns_by_item( i, self.k + 1, include_distances=True ) # Exclude self (first neighbor) neighbors[i] = nn_idx[1:self.k+1] distances[i] = nn_dist[1:self.k+1] k_distances[i] = distances[i, -1] # Compute LRD (Local Reachability Density) lrd = np.zeros(n_samples) for i in range(n_samples): reach_dists = np.maximum(k_distances[neighbors[i]], distances[i]) lrd[i] = self.k / (reach_dists.sum() + 1e-10) # Compute LOF lof_scores = np.zeros(n_samples) for i in range(n_samples): neighbor_lrds = lrd[neighbors[i]] lof_scores[i] = neighbor_lrds.mean() / (lrd[i] + 1e-10) return lof_scores # Benchmark np.random.seed(42) print("Approximate LOF Benchmark:") print("=" * 60) for n in [10000, 50000, 100000]: X = np.random.randn(n, 20) # Exact LOF (only for smaller datasets) if n <= 20000: start = time.time() lof_exact = LocalOutlierFactor(n_neighbors=20) lof_exact.fit_predict(X) exact_scores = -lof_exact.negative_outlier_factor_ exact_time = time.time() - start else: exact_time = float('nan') # Approximate LOF start = time.time() approx_lof = ApproximateLOF(k=20, n_trees=50) approx_scores = approx_lof.fit_predict(X) approx_time = time.time() - start print(f"n={n}: Exact={exact_time:.2f}s, Approx={approx_time:.2f}s", end="") if n <= 20000: # Compute correlation between rankings from scipy.stats import spearmanr corr, _ = spearmanr(exact_scores, approx_scores) print(f", Rank Correlation={corr:.3f}") else: print() # Example: Using FAISS for billion-scaledef faiss_example(): """ Example of using FAISS for very large scale k-NN. """ try: import faiss except ImportError: print("FAISS not installed. Install with: pip install faiss-cpu") return np.random.seed(42) n, d, k = 100000, 64, 20 # Generate data X = np.random.randn(n, d).astype('float32') # Method 1: Flat (exact, brute force with GPU support) start = time.time() index_flat = faiss.IndexFlatL2(d) index_flat.add(X) distances_flat, indices_flat = index_flat.search(X, k + 1) flat_time = time.time() - start # Method 2: IVF (inverted file) - approximate start = time.time() nlist = 100 # Number of clusters quantizer = faiss.IndexFlatL2(d) index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist) index_ivf.train(X) index_ivf.add(X) index_ivf.nprobe = 10 # How many clusters to search distances_ivf, indices_ivf = index_ivf.search(X, k + 1) ivf_time = time.time() - start print(f"FAISS Benchmark (n={n}, d={d}):") print(f" Flat (exact): {flat_time:.3f}s") print(f" IVF (approximate): {ivf_time:.3f}s") print(f" Speedup: {flat_time/ivf_time:.1f}x") # Run demosdemo_approximate_nn_lof()When even approximate methods are too slow, sampling offers a practical path forward. The key insight: we don't need to process every point—a representative sample can reveal the same anomalies.
The simplest approach: randomly sample a subset and apply full detection.
For LOF-style detection:
When it works: If anomalies are evenly distributed and the sample is large enough to capture the density structure.
When it fails: If anomalies are rare (might not be sampled) or clustered in specific regions.
If prior clustering or structure is known:
Benefit: Preserves density structure across the dataset.
For streaming data where the total size is unknown:
Algorithm (Vitter's Algorithm R):
Property: At any point, the reservoir is a uniform random sample of items seen so far.
For anomaly detection: Maintain a reference sample, compute scores against it.
Not all points are equally important for density estimation:
Intuition: Oversample from sparse regions where anomalies live, undersample from dense cores.
Process data in chunks rather than all at once:
For LOF: Build k-NN index incrementally; use index from previous batches for new queries.
Sampling can systematically miss rare anomalies. If anomalies occur at rate 0.01% and you sample 10,000 points from 10 million, you might not sample any true anomalies! For rare event detection, consider: (1) Oversample suspicious regions; (2) Use domain knowledge to ensure coverage; (3) Combine with full-data approximate methods.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
import numpy as npfrom sklearn.neighbors import LocalOutlierFactor, NearestNeighbors class SampledLOF: """ LOF using a sampled reference set for scalability. Scores all points against a sampled reference. """ def __init__(self, k: int = 20, sample_size: int = 10000, sample_method: str = 'random'): """ Parameters: ----------- k : int Number of neighbors sample_size : int Size of reference sample sample_method : str 'random', 'stratified', or 'importance' """ self.k = k self.sample_size = sample_size self.sample_method = sample_method def _random_sample(self, X: np.ndarray) -> np.ndarray: """Uniform random sampling.""" n = len(X) sample_size = min(self.sample_size, n) indices = np.random.choice(n, size=sample_size, replace=False) return indices def _importance_sample(self, X: np.ndarray) -> np.ndarray: """Sample inversely proportional to approximate density.""" n = len(X) sample_size = min(self.sample_size, n) # Quick density estimate using random subset pilot_size = min(1000, n) pilot_idx = np.random.choice(n, size=pilot_size, replace=False) nn = NearestNeighbors(n_neighbors=min(10, pilot_size)) nn.fit(X[pilot_idx]) dists, _ = nn.kneighbors(X) approx_density = 1.0 / (dists[:, -1] + 1e-10) # Sample inversely proportional to density weights = 1.0 / (approx_density + 1e-10) weights /= weights.sum() indices = np.random.choice(n, size=sample_size, replace=False, p=weights) return indices def fit_predict(self, X: np.ndarray) -> np.ndarray: """ Compute LOF scores using sampled reference. """ n = len(X) # Sample reference points if self.sample_method == 'importance': sample_idx = self._importance_sample(X) else: sample_idx = self._random_sample(X) X_sample = X[sample_idx] # Compute LOF on sample lof_sample = LocalOutlierFactor(n_neighbors=self.k, novelty=True) lof_sample.fit(X_sample) # Score all points against sample # Note: Uses score_samples which returns negative LOF (higher = more inlier) # We negate to get positive scores where higher = more outlier scores = -lof_sample.score_samples(X) return scores class MiniBatchLOF: """ LOF computed in mini-batches for memory efficiency. """ def __init__(self, k: int = 20, batch_size: int = 10000): self.k = k self.batch_size = batch_size def fit_predict(self, X: np.ndarray) -> np.ndarray: """ Compute LOF in mini-batches. """ n = len(X) scores = np.zeros(n) # Build global k-NN index nn = NearestNeighbors(n_neighbors=self.k + 1, algorithm='auto') nn.fit(X) # Process in batches for start in range(0, n, self.batch_size): end = min(start + self.batch_size, n) batch = X[start:end] # Get neighbors from global index distances, indices = nn.kneighbors(batch) # Compute k-distances for batch points k_dists = distances[:, -1] # Last column = k-th neighbor # For each point in batch, compute LRD batch_lrd = np.zeros(end - start) for i in range(end - start): neighbor_idx = indices[i, 1:] # Exclude self neighbor_kdists = nn.kneighbors(X[neighbor_idx], return_distance=False) neighbor_kdists = np.linalg.norm( X[neighbor_idx] - X[neighbor_idx][:, np.newaxis], axis=2 )[:, -1] # Simplified LRD computation reach_dists = np.maximum(k_dists[0], distances[i, 1:]) batch_lrd[i] = self.k / (reach_dists.sum() + 1e-10) # Compute LOF for batch (simplified) # In practice, would need neighbor LRDs from previous passes scores[start:end] = 1.0 / (batch_lrd + 1e-10) # Placeholder return scores # Demonstrationnp.random.seed(42) # Generate large dataset with known anomaliesn_normal = 50000n_anomalies = 100 X_normal = np.random.randn(n_normal, 10)X_anomalies = np.random.randn(n_anomalies, 10) + 5 # Shifted anomaliesX = np.vstack([X_normal, X_anomalies])true_labels = np.array([0] * n_normal + [1] * n_anomalies) print("Sampling-Based Anomaly Detection:")print("=" * 60)print(f"Dataset: {len(X)} points, {n_anomalies} anomalies") # Full LOF (for comparison, if feasible)import time # Sampled LOF variantsfor method in ['random', 'importance']: start = time.time() sampled_lof = SampledLOF(k=20, sample_size=5000, sample_method=method) scores = sampled_lof.fit_predict(X) elapsed = time.time() - start # Evaluate: what fraction of top-k are true anomalies? top_k = 200 top_indices = np.argsort(scores)[-top_k:] precision_at_k = true_labels[top_indices].mean() print(f"{method.capitalize()} Sampling (sample_size=5000):") print(f" Time: {elapsed:.2f}s") print(f" Precision@{top_k}: {precision_at_k:.1%}")Production systems often require real-time anomaly detection on streaming data—transactions arriving continuously, sensors reporting every second, logs flooding in. Batch algorithms that recompute from scratch are impractical; we need incremental methods.
Maintain a fixed-size window of recent observations:
Algorithm:
Tradeoffs:
Extending LOF for streaming requires updating k-NN structures as points arrive:
On new point p:
HS-Trees are designed specifically for streaming anomaly detection:
Structure:
Scoring: $$\text{score}(\mathbf{x}) = \frac{1}{T} \sum_{t=1}^{T} 2^{\text{depth}_t(\mathbf{x})}$$
Deeper nodes (reached with more splits) indicate isolation → anomaly.
Updates:
Maintain compact summaries (micro-clusters) of the data:
DenStream Algorithm:
Streaming systems must handle concept drift—the normal distribution changes over time. A point that was anomalous yesterday might be normal today (and vice versa). Strategies include: (1) Exponential decay weighting on historical data; (2) Drift detection triggering model rebuilds; (3) Adaptive thresholds based on recent score distributions.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
import numpy as npfrom collections import dequefrom sklearn.neighbors import NearestNeighbors class SlidingWindowLOF: """ LOF with sliding window for streaming anomaly detection. """ def __init__(self, k: int = 20, window_size: int = 5000): """ Parameters: ----------- k : int Number of neighbors window_size : int Number of recent points to keep as reference """ self.k = k self.window_size = window_size self.window = deque(maxlen=window_size) self.nn_model = None self.rebuild_every = 100 # Rebuild index every N points self.points_since_rebuild = 0 def _rebuild_index(self): """Rebuild k-NN index from current window.""" if len(self.window) < self.k + 1: self.nn_model = None return X = np.array(list(self.window)) self.nn_model = NearestNeighbors(n_neighbors=self.k + 1, algorithm='auto') self.nn_model.fit(X) self.points_since_rebuild = 0 def score(self, point: np.ndarray) -> float: """ Score a single point against current window. Returns: -------- float : LOF-like anomaly score (higher = more anomalous) """ if len(self.window) < self.k + 1: # Not enough history return 0.0 if self.nn_model is None or self.points_since_rebuild >= self.rebuild_every: self._rebuild_index() point = np.array(point).reshape(1, -1) # Get neighbors distances, indices = self.nn_model.kneighbors(point) distances = distances[0, 1:] # Exclude potential self-match indices = indices[0, 1:] k_dist_point = distances[-1] # Get neighbors' k-distances X_window = np.array(list(self.window)) neighbor_kdists = [] for idx in indices: if idx < len(X_window): d, _ = self.nn_model.kneighbors(X_window[idx:idx+1]) neighbor_kdists.append(d[0, -1]) if not neighbor_kdists: return 0.0 neighbor_kdists = np.array(neighbor_kdists) # Compute reachability distances reach_dists = np.maximum(neighbor_kdists, distances[:len(neighbor_kdists)]) # LRD of point lrd_point = self.k / (reach_dists.sum() + 1e-10) # Approximate neighbors' LRDs (using their k-distances) neighbor_lrds = self.k / (neighbor_kdists * self.k + 1e-10) # LOF score lof = neighbor_lrds.mean() / (lrd_point + 1e-10) return lof def update(self, point: np.ndarray) -> float: """ Add point to window and return its anomaly score. """ # Score before adding to window score = self.score(point) # Add to window self.window.append(point) self.points_since_rebuild += 1 return score class ExponentialDecayReference: """ Reference distribution with exponential decay for concept drift. """ def __init__(self, decay: float = 0.99, k: int = 20): """ Parameters: ----------- decay : float Weight decay factor per observation (0.99 = 1% decay per point) k : int Number of neighbors """ self.decay = decay self.k = k self.points = [] self.weights = [] self.max_points = 10000 def update(self, point: np.ndarray) -> float: """Add point and return its score.""" # Decay existing weights self.weights = [w * self.decay for w in self.weights] # Prune low-weight points if len(self.weights) > 0: min_weight = 0.01 keep_mask = [w > min_weight for w in self.weights] self.points = [p for p, k in zip(self.points, keep_mask) if k] self.weights = [w for w, k in zip(self.weights, keep_mask) if k] # Score current point if len(self.points) < self.k + 1: score = 0.0 else: X_ref = np.array(self.points) weights = np.array(self.weights) # Weighted k-NN query dists = np.linalg.norm(X_ref - point.reshape(1, -1), axis=1) weighted_dists = dists / (weights + 1e-10) nearest = np.argsort(weighted_dists)[:self.k] # Score based on weighted distance to k-th neighbor score = weighted_dists[nearest[-1]] # Add new point with weight 1 self.points.append(point) self.weights.append(1.0) # Limit size if len(self.points) > self.max_points: self.points = self.points[-self.max_points:] self.weights = self.weights[-self.max_points:] return score # Demonstration: Streaming with concept driftnp.random.seed(42) def simulate_stream_with_drift(n_points: int = 10000): """ Simulate a data stream with concept drift. - First half: centered at origin - Second half: centered at [2, 2, ...] - Anomalies: random spikes throughout """ d = 5 points = [] labels = [] # 0 = normal, 1 = anomaly for i in range(n_points): # Concept drift: mean shifts halfway if i < n_points // 2: mean = np.zeros(d) else: mean = np.ones(d) * 2 # 1% anomalies if np.random.rand() < 0.01: point = np.random.randn(d) * 0.5 + mean + np.random.randn(d) * 5 labels.append(1) else: point = np.random.randn(d) * 0.5 + mean labels.append(0) points.append(point) return np.array(points), np.array(labels) # Run simulationstream_points, stream_labels = simulate_stream_with_drift(5000) detector = SlidingWindowLOF(k=20, window_size=1000)scores = [] for point in stream_points: score = detector.update(point) scores.append(score) scores = np.array(scores) # Evaluate detection after warmupwarmup = 1000scores_eval = scores[warmup:]labels_eval = stream_labels[warmup:] print("Streaming Anomaly Detection Results:")print("=" * 50)print(f"Total anomalies in evaluation period: {labels_eval.sum()}") # Precision at top 100top_100_idx = np.argsort(scores_eval)[-100:]precision_at_100 = labels_eval[top_100_idx].mean()print(f"Precision@100: {precision_at_100:.1%}") # Detection after drift (second half)post_drift_start = (5000 - warmup) // 2post_drift_scores = scores_eval[post_drift_start:]post_drift_labels = labels_eval[post_drift_start:]top_50_post = np.argsort(post_drift_scores)[-50:]precision_post_drift = post_drift_labels[top_50_post].mean()print(f"Precision@50 after drift: {precision_post_drift:.1%}")For truly large-scale anomaly detection (billions of points), single-machine solutions are insufficient. Distributed frameworks enable processing across clusters of machines.
LOF can be parallelized using MapReduce:
Phase 1: Compute k-distances
Phase 2: Compute LRD
Phase 3: Compute LOF
Challenge: The all-pairs distance computation in Phase 1 is expensive. Approximations are typically needed.
Spark's distributed data structures enable LOF implementation:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
# Pseudocode for Spark-based distributed LOF# Requires: PySpark """from pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.clustering import KMeansimport numpy as np def distributed_lof_approximate(df, k=20, n_partitions=100): ''' Approximate distributed LOF using partitioning. Strategy: Partition data spatially, compute LOF within partitions, then handle boundary effects. ''' spark = SparkSession.builder.getOrCreate() # Step 1: Create spatial partitions using k-means # This groups similar points together kmeans = KMeans(k=n_partitions, featuresCol='features') model = kmeans.fit(df) df_with_partition = model.transform(df) # Step 2: Compute LOF within each partition def partition_lof(partition_df): '''Compute LOF for a single partition.''' from sklearn.neighbors import LocalOutlierFactor X = np.array([row['features'].toArray() for row in partition_df]) if len(X) < k + 1: return [(row['id'], 1.0) for row in partition_df] lof = LocalOutlierFactor(n_neighbors=k) lof.fit_predict(X) scores = -lof.negative_outlier_factor_ return [(row['id'], float(score)) for row, score in zip(partition_df, scores)] # Apply LOF per partition lof_rdd = (df_with_partition .rdd .map(lambda row: (row['partition'], row)) .groupByKey() .flatMap(lambda x: partition_lof(list(x[1])))) # Step 3: Handle boundary points (optional refinement) # Points near partition boundaries should be re-scored # using neighbors from adjacent partitions return lof_rdd.toDF(['id', 'lof_score']) def distributed_sampling_lof(df, k=20, sample_fraction=0.1): ''' Distributed LOF using sampling strategy. Sample reference points, broadcast to all partitions, score each point against the sample. ''' # Sample reference points sample_df = df.sample(fraction=sample_fraction, seed=42) reference = sample_df.collect() reference_array = np.array([r['features'].toArray() for r in reference]) # Broadcast reference to all executors reference_bc = spark.sparkContext.broadcast(reference_array) def score_against_reference(features): '''Score a point against broadcasted reference.''' from sklearn.neighbors import NearestNeighbors ref = reference_bc.value nn = NearestNeighbors(n_neighbors=k) nn.fit(ref) dists, _ = nn.kneighbors([features.toArray()]) return float(dists[0].mean()) # Use mean distance as score # Apply scoring function from pyspark.sql.functions import udf from pyspark.sql.types import DoubleType score_udf = udf(score_against_reference, DoubleType()) result = df.withColumn('anomaly_score', score_udf(df['features'])) return result""" # For demonstration without Spark:# Simulated distributed LOF logic import numpy as npfrom sklearn.neighbors import LocalOutlierFactorfrom sklearn.cluster import MiniBatchKMeans def simulated_distributed_lof(X: np.ndarray, k: int = 20, n_partitions: int = 10): """ Simulate distributed LOF using partition-based approach. This demonstrates the partitioning logic that would be distributed. """ n_samples = len(X) # Partition data using k-means kmeans = MiniBatchKMeans(n_clusters=n_partitions, random_state=42) partition_labels = kmeans.fit_predict(X) # Compute LOF within each partition scores = np.zeros(n_samples) for part in range(n_partitions): mask = partition_labels == part X_part = X[mask] if len(X_part) < k + 1: scores[mask] = 1.0 # Default score for small partitions continue lof = LocalOutlierFactor(n_neighbors=min(k, len(X_part) - 1)) lof.fit_predict(X_part) scores[mask] = -lof.negative_outlier_factor_ return scores, partition_labels # Demonstratenp.random.seed(42)n = 50000d = 10 X_normal = np.random.randn(n - 100, d)X_outliers = np.random.randn(100, d) + 5 X = np.vstack([X_normal, X_outliers])true_labels = np.array([0] * (n - 100) + [1] * 100) import time # Single-machine LOFstart = time.time()lof_single = LocalOutlierFactor(n_neighbors=20)lof_single.fit_predict(X)scores_single = -lof_single.negative_outlier_factor_single_time = time.time() - start # Simulated distributedstart = time.time()scores_dist, partitions = simulated_distributed_lof(X, k=20, n_partitions=50)dist_time = time.time() - start print("Distributed LOF Simulation:")print("=" * 50)print(f"Dataset size: {n} points, {d} dimensions")print(f"Single-machine LOF: {single_time:.2f}s")print(f"Partitioned LOF (50 partitions): {dist_time:.2f}s")print(f"Speedup: {single_time/dist_time:.1f}x") # Evaluatefrom scipy.stats import spearmanrcorr, _ = spearmanr(scores_single, scores_dist)print(f"Score correlation (single vs distributed): {corr:.3f}") # Top-100 precisiontop_100_single = np.argsort(scores_single)[-100:]top_100_dist = np.argsort(scores_dist)[-100:]print(f"Top-100 precision (single): {true_labels[top_100_single].mean():.1%}")print(f"Top-100 precision (distributed): {true_labels[top_100_dist].mean():.1%}")Modern GPUs excel at parallel distance computations:
RAPIDS cuML: GPU-accelerated scikit-learn compatible library
FAISS GPU: Billion-scale nearest neighbor search
Custom CUDA Kernels:
Lambda Architecture:
Microservices Pattern:
Moving from research prototypes to production systems requires attention to numerous practical details.
Different use cases have different latency requirements:
| Use Case | Latency Target | Approach |
|---|---|---|
| Fraud detection (real-time) | < 50ms | Pre-built index, sample reference |
| Network intrusion | < 200ms | Streaming algorithms |
| Batch monitoring | Minutes to hours | Full algorithms, distributed |
| Exploratory analysis | Interactive | Approximate, visual feedback |
Index Updates:
Index Versioning:
Memory Management:
In production, simpler methods that are fast, interpretable, and maintainable often outperform complex state-of-the-art algorithms. A well-tuned isolation forest or simple k-NN distance scoring, deployed reliably with proper monitoring, typically delivers more value than a fragile cutting-edge system. Start simple, validate thoroughly, and add complexity only when clearly needed.
Scalability transforms density-based anomaly detection from an academic exercise into a production-ready tool for real-world systems.
Spatial Indexing: k-d trees and ball trees accelerate exact k-NN from O(n) to O(log n) for low-dimensional data.
Approximate Nearest Neighbors: LSH, ANNOY, HNSW trade perfect accuracy for sub-linear query time, enabling million+ scale.
Sampling Strategies: Random, stratified, and importance sampling reduce data size while preserving anomaly structure.
Streaming Algorithms: Sliding windows, incremental updates, and micro-clusters enable real-time detection on data streams.
Distributed Computation: Partition-based and broadcast-reference strategies scale to cluster-sized workloads.
| Data Scale | Recommended Approach | Expected Latency |
|---|---|---|
| < 10K points | Exact methods (sklearn) | < 1 second |
| 10K - 100K | Spatial indexing, exact | 1-10 seconds |
| 100K - 1M | Approximate NN (ANNOY, HNSW) | Seconds |
| 1M - 100M | Sampling + Approximate | Seconds to minutes |
100M | Distributed + Approximate | Minutes |
| Streaming | Sliding window, incremental | Milliseconds per point |
You have now mastered density-based methods for anomaly detection, from the foundational concept of relative density through practical production deployment. These techniques—whether using DBSCAN noise points, LOF scores, or sophisticated subspace methods—provide interpretable, effective detection for a wide range of anomaly detection problems.
Density-based anomaly detection combines intuitive geometric reasoning with powerful statistical foundations. The core insight—anomalies reside in regions of unusually low density—guides algorithm design from simple k-NN approaches to sophisticated streaming systems.
Key principles to carry forward:
Relative density matters more than absolute density. Always compare to local context.
Parameter selection is critical. k, ε, and thresholds must be tuned to your specific problem.
High dimensions require special treatment. Subspace methods and dimensionality reduction are essential.
Scalability demands trade-offs. Balance accuracy, speed, and resource usage based on your constraints.
Production is about reliability, not just accuracy. Monitoring, logging, and graceful degradation matter.
With these tools and principles, you're equipped to tackle density-based anomaly detection at any scale and in any domain.