Loading content...
Signature-based intrusion detection catches known attacks—but what about unknown ones? When attackers use novel techniques, zero-day exploits, or legitimate tools for malicious purposes, traditional detection fails. This is where anomaly detection becomes essential.
Anomaly detection operates on a powerful premise: attacks create patterns that differ from normal behavior. Even when an attacker's specific technique is unknown, their activities—scanning networks, moving laterally, exfiltrating data—generate statistical outliers when compared against baselines of normal operation.
Consider an example: an attacker gains access to a developer's credentials and uses them to access the source code repository. There's no malware, no exploit signature, nothing traditionally 'malicious.' But that developer has never accessed the repository at 3 AM from an IP in another country. Anomaly detection can find this activity without any prior knowledge of the attacker's TTPs.
This page explores both the power and the challenges of anomaly detection—how to build systems that find genuine threats while avoiding the trap of false positives that plagues naive implementations.
By the end of this page, you will understand the theoretical foundations of anomaly detection, common statistical and machine learning techniques, how to build behavioral baselines, strategies for reducing false positives, and practical implementation patterns for security analytics. You'll gain the knowledge to design detection systems that find sophisticated attackers who evade signature-based detection.
Anomaly detection is the identification of patterns in data that do not conform to expected behavior. In security contexts, these anomalies may indicate attacks, insider threats, policy violations, or system compromises.
Types of Anomalies
Anomaly detection identifies three fundamental types of anomalies:
Point Anomalies: A single data point is anomalous compared to the rest of the data. Example: A user transferring 50GB of data when their normal daily transfer is 100MB.
Contextual (Conditional) Anomalies: A data point is anomalous in a specific context but would be normal otherwise. Example: An authentication from New York is normal, but not when the user was in London 30 minutes ago.
Collective Anomalies: A collection of related data points is anomalous together, even if individual points aren't. Example: A sequence of commands that individually are benign but together represent reconnaissance activity.
| Anomaly Type | Security Example | Detection Approach |
|---|---|---|
| Point Anomaly | Single massive data transfer | Threshold/statistical outlier detection |
| Contextual Anomaly | Normal access from impossible location | Context-aware models, conditional probability |
| Collective Anomaly | Command sequence forming attack pattern | Sequence analysis, Markov chains, LSTM networks |
The Baseline Challenge
Anomaly detection requires knowing what 'normal' looks like. This is the baseline—a statistical representation of expected behavior against which new observations are compared.
Building accurate baselines is challenging:
Effective baselines must:
Naive anomaly detection produces enormous false positive rates. In a system with millions of daily events, even 'rare' events (1 in 10,000) occur hundreds of times daily. Effective anomaly detection requires multiple strategies to distinguish true positives (attacks) from benign anomalies (unusual but legitimate activity).
Statistical methods form the foundation of anomaly detection, using mathematical properties of data distributions to identify outliers.
Z-Score (Standard Score)
The simplest statistical anomaly detector measures how many standard deviations a data point is from the mean:
Z = (X - μ) / σ
Where:
X = observed value
μ = mean of baseline data
σ = standard deviation of baseline data
A Z-score of ±3 indicates a point approximately 3 standard deviations from the mean, which would occur by chance only 0.3% of the time in a normal distribution. This is commonly used as an anomaly threshold.
Example: Detecting unusual data transfer volumes
User baseline:
Mean daily transfer: 500MB
Standard deviation: 100MB
Observed transfer: 1200MB
Z = (1200 - 500) / 100 = 7
Result: 7 standard deviations from mean → highly anomalous
Interquartile Range (IQR) Method
For non-normal distributions, IQR-based detection is more robust:
Q1 = 25th percentile
Q3 = 75th percentile
IQR = Q3 - Q1
Lower bound = Q1 - 1.5 × IQR
Upper bound = Q3 + 1.5 × IQR
Anomaly: X < Lower bound OR X > Upper bound
IQR is resistant to extreme outliers that can skew mean and standard deviation calculations.
Exponentially Weighted Moving Average (EWMA)
For time-series data with concept drift, EWMA maintains adaptive baselines:
EWMA(t) = α × X(t) + (1-α) × EWMA(t-1)
Where:
α = smoothing factor (0.1-0.3 typical)
X(t) = current observation
EWMA(t-1) = previous EWMA value
Small α values create smooth baselines resistant to short-term fluctuations. Large α values adapt quickly to changes but are more susceptible to noise.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
import numpy as npfrom typing import Tuple, Listfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass AnomalyResult: value: float is_anomaly: bool score: float method: str threshold: float details: dict class StatisticalAnomalyDetector: """Multi-method statistical anomaly detection.""" def __init__(self, baseline_data: np.ndarray, sensitivity: float = 3.0): """ Initialize detector with baseline data. Args: baseline_data: Historical normal values for baseline sensitivity: Number of std deviations for anomaly threshold """ self.mean = np.mean(baseline_data) self.std = np.std(baseline_data) self.sensitivity = sensitivity # IQR calculations self.q1 = np.percentile(baseline_data, 25) self.q3 = np.percentile(baseline_data, 75) self.iqr = self.q3 - self.q1 # EWMA state self.ewma = self.mean self.ewma_std = self.std self.alpha = 0.1 def detect_zscore(self, value: float) -> AnomalyResult: """Detect anomalies using Z-score method.""" z_score = (value - self.mean) / self.std if self.std > 0 else 0 is_anomaly = abs(z_score) > self.sensitivity return AnomalyResult( value=value, is_anomaly=is_anomaly, score=abs(z_score), method="zscore", threshold=self.sensitivity, details={ "z_score": z_score, "mean": self.mean, "std": self.std, "deviation": value - self.mean } ) def detect_iqr(self, value: float) -> AnomalyResult: """Detect anomalies using IQR method.""" lower_bound = self.q1 - 1.5 * self.iqr upper_bound = self.q3 + 1.5 * self.iqr is_anomaly = value < lower_bound or value > upper_bound # Score based on distance from bounds if value < lower_bound: score = (lower_bound - value) / self.iqr if self.iqr > 0 else 0 elif value > upper_bound: score = (value - upper_bound) / self.iqr if self.iqr > 0 else 0 else: score = 0 return AnomalyResult( value=value, is_anomaly=is_anomaly, score=score, method="iqr", threshold=1.5, # IQR multiplier details={ "lower_bound": lower_bound, "upper_bound": upper_bound, "q1": self.q1, "q3": self.q3, "iqr": self.iqr } ) def detect_ewma(self, value: float, update: bool = True) -> AnomalyResult: """ Detect anomalies using EWMA with adaptive baseline. Args: value: Current observation update: Whether to update EWMA baseline (False for known anomalies) """ deviation = abs(value - self.ewma) z_score = deviation / self.ewma_std if self.ewma_std > 0 else 0 is_anomaly = z_score > self.sensitivity result = AnomalyResult( value=value, is_anomaly=is_anomaly, score=z_score, method="ewma", threshold=self.sensitivity, details={ "ewma": self.ewma, "ewma_std": self.ewma_std, "deviation": deviation } ) # Update EWMA only with non-anomalous values if update and not is_anomaly: self.ewma = self.alpha * value + (1 - self.alpha) * self.ewma # Update std estimate (simplified) self.ewma_std = self.alpha * abs(value - self.ewma) + (1 - self.alpha) * self.ewma_std return result def detect_ensemble(self, value: float) -> AnomalyResult: """ Ensemble detection requiring agreement across methods. Reduces false positives by requiring multiple methods to flag anomaly. """ zscore_result = self.detect_zscore(value) iqr_result = self.detect_iqr(value) ewma_result = self.detect_ewma(value, update=False) votes = sum([ zscore_result.is_anomaly, iqr_result.is_anomaly, ewma_result.is_anomaly ]) # Require majority (2/3) for anomaly declaration is_anomaly = votes >= 2 # Combined score combined_score = (zscore_result.score + iqr_result.score + ewma_result.score) / 3 return AnomalyResult( value=value, is_anomaly=is_anomaly, score=combined_score, method="ensemble", threshold=2, # Votes required details={ "votes": votes, "zscore": zscore_result, "iqr": iqr_result, "ewma": ewma_result } )No single statistical method handles all data patterns well. Ensemble approaches that require multiple detection methods to agree dramatically reduce false positives while maintaining sensitivity to true anomalies. A point flagged by Z-score, IQR, and EWMA simultaneously is much more likely to be a genuine anomaly.
Machine learning extends anomaly detection beyond simple statistical models, enabling detection of complex patterns in high-dimensional data.
Supervised vs. Unsupervised Approaches
Anomaly detection typically uses unsupervised learning because:
Supervised approaches (classification) work when you have labeled examples of attacks, but they can only detect attacks similar to training data.
Common ML Anomaly Detection Algorithms
| Algorithm | How It Works | Best For | Limitations |
|---|---|---|---|
| Isolation Forest | Isolates anomalies by recursive random partitioning; anomalies require fewer partitions | High-dimensional data, fast inference | Struggles with local anomalies in clustered data |
| One-Class SVM | Learns boundary around normal data; points outside boundary are anomalies | Well-separated normal data, smaller datasets | Computationally expensive, sensitive to parameters |
| Autoencoders | Learns compressed representation; anomalies have high reconstruction error | Complex patterns, sequence data | Requires significant training data, complex tuning |
| DBSCAN | Density-based clustering; points not in clusters are anomalies | Spatial patterns, arbitrary cluster shapes | Sensitive to density parameters |
| Local Outlier Factor (LOF) | Compares local density to neighbors; anomalies have lower density | Local anomalies, varying density | Computationally expensive at scale |
Isolation Forest: A Deep Dive
Isolation Forest is particularly effective for security anomaly detection because:
Intuition: Anomalies are 'few and different'—they can be isolated from the rest of the data with fewer random partitions than normal points.
Building an Isolation Tree:
1. Randomly select a feature
2. Randomly select a split value between min and max
3. Split data, create child nodes
4. Repeat until each point is isolated
Anomaly Score:
Score = 2^(-average_path_length / normalization_factor)
Points with short path lengths (quickly isolated) have scores near 1 (anomalous)
Points with long path lengths (hard to isolate) have scores near 0.5 (normal)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
from sklearn.ensemble import IsolationForestfrom sklearn.preprocessing import StandardScalerimport numpy as npimport pandas as pdfrom typing import Tuple, List class UserBehaviorAnomalyDetector: """ Detects anomalous user behavior using Isolation Forest. Features are engineered from user activity logs. """ def __init__(self, contamination: float = 0.01): """ Args: contamination: Expected proportion of anomalies (0.01 = 1%) """ self.contamination = contamination self.model = IsolationForest( n_estimators=200, contamination=contamination, max_samples='auto', random_state=42, n_jobs=-1 # Use all cores ) self.scaler = StandardScaler() self.feature_names = [] def extract_features(self, activity_df: pd.DataFrame) -> pd.DataFrame: """ Extract behavioral features from user activity logs. Expected columns: user_id, timestamp, event_type, source_ip, bytes_transferred, resource """ features = activity_df.groupby('user_id').agg({ # Temporal features 'timestamp': [ lambda x: x.dt.hour.mean(), # avg_hour lambda x: x.dt.hour.std(), # hour_variance lambda x: x.dt.dayofweek.mean(), # avg_day lambda x: (x.dt.dayofweek >= 5).mean(), # weekend_ratio ], # Volume features 'bytes_transferred': ['sum', 'mean', 'max', 'std'], # Activity features 'event_type': [ 'count', # total_events 'nunique', # unique_event_types ], 'source_ip': 'nunique', # unique_ips 'resource': 'nunique', # unique_resources }) features.columns = [ 'avg_hour', 'hour_std', 'avg_day', 'weekend_ratio', 'bytes_sum', 'bytes_mean', 'bytes_max', 'bytes_std', 'event_count', 'unique_events', 'unique_ips', 'unique_resources' ] # Derived features features['bytes_per_event'] = features['bytes_sum'] / features['event_count'] features['resource_diversity'] = features['unique_resources'] / features['event_count'] self.feature_names = features.columns.tolist() return features.fillna(0) def fit(self, baseline_activity: pd.DataFrame): """Train detector on baseline (known-good) activity.""" features = self.extract_features(baseline_activity) # Scale features X = self.scaler.fit_transform(features) # Train Isolation Forest self.model.fit(X) return self def detect(self, activity_df: pd.DataFrame) -> pd.DataFrame: """ Detect anomalous users in activity data. Returns DataFrame with user_id, anomaly_score, is_anomaly, and top contributing features. """ features = self.extract_features(activity_df) X = self.scaler.transform(features) # Get anomaly scores (-1 = anomaly, 1 = normal) predictions = self.model.predict(X) # Convert to score (lower = more anomalous) scores = self.model.decision_function(X) results = pd.DataFrame({ 'user_id': features.index, 'anomaly_score': -scores, # Flip so higher = more anomalous 'is_anomaly': predictions == -1, }) # Add feature values for explainability for feat in self.feature_names: results[f'feat_{feat}'] = features[feat].values # Identify top contributing features for anomalies results['top_features'] = results.apply( lambda row: self._explain_anomaly(row, features) if row['is_anomaly'] else [], axis=1 ) return results.sort_values('anomaly_score', ascending=False) def _explain_anomaly(self, row, features) -> List[str]: """Identify which features most contribute to anomaly status.""" user_idx = features.index.get_loc(row['user_id']) user_features = features.iloc[user_idx] # Calculate z-scores for each feature z_scores = {} for feat in self.feature_names: col = features[feat] if col.std() > 0: z_scores[feat] = abs(user_features[feat] - col.mean()) / col.std() else: z_scores[feat] = 0 # Return top 3 features by z-score sorted_features = sorted(z_scores.items(), key=lambda x: x[1], reverse=True) return [f"{feat}: {z:.1f}σ" for feat, z in sorted_features[:3]] # Example usagedetector = UserBehaviorAnomalyDetector(contamination=0.02) # Train on 30 days of baseline databaseline_df = load_activity_logs(days=30)detector.fit(baseline_df) # Detect anomalies in today's activitytoday_df = load_activity_logs(days=1)anomalies = detector.detect(today_df) # Review flagged usersfor _, row in anomalies[anomalies['is_anomaly']].iterrows(): print(f"User {row['user_id']}: score={row['anomaly_score']:.3f}") print(f" Top features: {row['top_features']}")User and Entity Behavior Analytics (UEBA) applies anomaly detection specifically to user and system behaviors, creating comprehensive profiles and identifying deviations that may indicate compromise or insider threat.
UEBA Core Concepts
UEBA differs from generic anomaly detection by:
Entity-Centric Modeling: Building individual baselines for each user, device, application, and service rather than global models
Multi-Dimensional Analysis: Considering behavior across multiple dimensions simultaneously (access patterns, data volumes, temporal patterns, peer comparisons)
Peer Group Comparison: Comparing user behavior not just to their own baseline but to similar users (same role, department, geography)
Risk Score Aggregation: Combining multiple weak signals into composite risk scores that surface truly suspicious entities
UEBA Behavioral Dimensions
Comprehensive UEBA analyzes behavior across multiple dimensions:
Access Patterns:
Temporal Patterns:
Location Patterns:
Data Movement:
Relationship Patterns:
Risk Score Calculation
UEBA systems produce risk scores that aggregate anomaly indicators:
User Risk Score = Σ (Anomaly Score × Weight × Severity) / Normalization
Components:
- Access anomaly score × 0.25
- Temporal anomaly score × 0.15
- Location anomaly score × 0.20
- Data movement anomaly × 0.25
- Peer deviation score × 0.15
Adjustment factors:
- Asset sensitivity multiplier (1x-3x for crown jewel access)
- User privilege multiplier (higher risk for admins)
- Historical factor (prior incidents increase score)
Risk scores enable prioritization—security teams focus on the highest-risk entities rather than investigating every anomaly.
One of UEBA's most powerful techniques is peer group comparison. A financial analyst accessing a database 100 times/day might be normal for that role but would be anomalous for a marketing coordinator. By comparing users to their peers (same role, department, location), UEBA detects deviations that individual baselines would miss.
False positives are the bane of anomaly detection. A detector that generates too many false alerts becomes ignored, making real threats as likely to be missed as if there were no detector at all. Reducing false positives while maintaining detection of real threats requires multiple strategies.
Understanding Why False Positives Occur
Anomaly detection inherently conflates 'unusual' with 'malicious.' But many unusual events are legitimate:
These events are truly anomalous—they deviate from baselines—but they're not threats. The goal is distinguishing unusual-benign from unusual-malicious.
Precision-Recall Tradeoffs
Anoamly detection involves fundamental tradeoffs:
The optimal balance depends on threat model:
| Scenario | Priority | Approach |
|---|---|---|
| Protecting crown jewels | High recall | Accept more false positives for critical assets |
| General monitoring | Balanced | Optimize for manageable alert volume |
| Automated response | High precision | Only act on very high confidence anomalies |
For human-reviewed alerts, aim for precision around 50-70%—enough true positives that analysts stay engaged, few enough false positives that they can investigate all alerts.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
from datetime import datetime, timedeltafrom typing import Optional, List, Dictfrom enum import Enum class AlertFeedback(Enum): TRUE_POSITIVE = "true_positive" # Real attack FALSE_POSITIVE = "false_positive" # Not an attack NEEDS_CONTEXT = "needs_context" # Can't determine class FalsePositiveManager: """ Manages false positive feedback and uses it to improve detection. """ def __init__(self): self.suppression_rules: List[SuppressionRule] = [] self.feedback_history: List[Dict] = [] self.model_feedback_buffer: List[Dict] = [] def add_feedback(self, alert_id: str, feedback: AlertFeedback, analyst: str, reason: Optional[str] = None): """Record analyst feedback on an alert.""" feedback_record = { 'alert_id': alert_id, 'feedback': feedback, 'analyst': analyst, 'reason': reason, 'timestamp': datetime.utcnow(), 'alert_features': self._get_alert_features(alert_id) } self.feedback_history.append(feedback_record) # If consistently FP, suggest suppression rule if feedback == AlertFeedback.FALSE_POSITIVE: self._analyze_for_suppression(feedback_record) # Buffer for model retraining self.model_feedback_buffer.append(feedback_record) def _analyze_for_suppression(self, feedback: Dict): """Analyze if this FP should become a suppression rule.""" # Find similar historical FPs features = feedback['alert_features'] similar_fps = [ f for f in self.feedback_history if f['feedback'] == AlertFeedback.FALSE_POSITIVE and self._features_similar(f['alert_features'], features) ] # If 3+ similar FPs in 30 days, suggest suppression recent_cutoff = datetime.utcnow() - timedelta(days=30) recent_similar = [f for f in similar_fps if f['timestamp'] > recent_cutoff] if len(recent_similar) >= 3: suggested_rule = self._create_suppression_rule(features, recent_similar) self._notify_analysts_for_approval(suggested_rule) def should_suppress(self, alert: Dict) -> tuple[bool, Optional[str]]: """Check if alert should be suppressed by any rule.""" for rule in self.suppression_rules: if rule.matches(alert): # Check if rule has expired if rule.expires_at and datetime.utcnow() > rule.expires_at: continue return True, f"Suppressed by rule: {rule.name}" return False, None def get_precision_metrics(self, time_window_days: int = 30) -> Dict: """Calculate detection precision from feedback.""" cutoff = datetime.utcnow() - timedelta(days=time_window_days) recent_feedback = [ f for f in self.feedback_history if f['timestamp'] > cutoff ] if not recent_feedback: return {'precision': None, 'feedback_count': 0} tp = sum(1 for f in recent_feedback if f['feedback'] == AlertFeedback.TRUE_POSITIVE) fp = sum(1 for f in recent_feedback if f['feedback'] == AlertFeedback.FALSE_POSITIVE) needs_context = sum(1 for f in recent_feedback if f['feedback'] == AlertFeedback.NEEDS_CONTEXT) precision = tp / (tp + fp) if (tp + fp) > 0 else None return { 'precision': precision, 'true_positives': tp, 'false_positives': fp, 'needs_context': needs_context, 'total_feedback': len(recent_feedback), 'feedback_rate': len(recent_feedback) / self._get_alert_count(cutoff) } def retrain_model_with_feedback(self): """ Periodically incorporate feedback into anomaly models. """ if len(self.model_feedback_buffer) < 100: return # Wait for sufficient feedback # Extract features and labels X = [f['alert_features'] for f in self.model_feedback_buffer] y = [1 if f['feedback'] == AlertFeedback.TRUE_POSITIVE else 0 for f in self.model_feedback_buffer if f['feedback'] != AlertFeedback.NEEDS_CONTEXT] # This would feed into model retraining pipeline self._trigger_model_retraining(X, y) self.model_feedback_buffer = [] # Clear bufferDeploying anomaly detection at scale requires careful architectural decisions about data flow, model training, and operational integration.
Separation of Training and Inference
Anomaly detection systems should separate:
Training Pipeline (batch, periodic):
Inference Pipeline (real-time, continuous):
Feature Engineering Considerations
The quality of anomaly detection depends heavily on feature engineering—transforming raw events into meaningful dimensions for analysis.
Time-Based Aggregations:
Categorical Encoding:
Sequence Features:
Cross-Entity Features:
Consistency: Ensure feature engineering is identical in training and inference pipelines. Feature skew (different calculations in batch vs. real-time) causes model degradation.
Model Serving and Latency
Real-time anomaly detection must meet latency requirements:
| Scale | Target Latency | Architecture |
|---|---|---|
| <1K events/sec | <100ms | Single model server |
| 1K-100K events/sec | <50ms | Load-balanced model servers |
| >100K events/sec | <20ms | Embedded models, edge inference |
Optimization strategies:
Anomaly detection models degrade over time as behavior patterns change (concept drift). Monitor model performance continuously: track precision from feedback, monitor score distributions for drift, alert on sudden changes in anomaly rates. Plan for regular model retraining (weekly or monthly) to maintain accuracy.
Let's examine concrete anomaly detection scenarios that demonstrate real-world implementation:
Scenario 1: Impossible Travel Detection
Detect when a user authenticates from two geographically distant locations in a timeframe that makes physical travel impossible.
def detect_impossible_travel(user_id: str,
current_login: LoginEvent,
login_history: List[LoginEvent]) -> Optional[Alert]:
"""
Flag logins where travel speed would exceed 1000 km/h.
"""
# Get last successful login for this user
prev_login = get_last_login(user_id, login_history)
if not prev_login:
return None
# Calculate distance and time
distance_km = haversine_distance(
prev_login.geo.lat, prev_login.geo.lon,
current_login.geo.lat, current_login.geo.lon
)
time_hours = (current_login.timestamp - prev_login.timestamp).total_seconds() / 3600
if time_hours <= 0:
return None # Same moment, probably VPN
speed_kmh = distance_km / time_hours
# 1000 km/h allows for fast flights, flags impossible scenarios
if speed_kmh > 1000:
return Alert(
type="impossible_travel",
severity="high",
user=user_id,
details={
"previous_location": prev_login.geo.city,
"current_location": current_login.geo.city,
"distance_km": distance_km,
"time_hours": time_hours,
"implied_speed_kmh": speed_kmh
}
)
return None
Reducing false positives:
Scenario 2: Data Exfiltration Detection
Identify users transferring abnormally large amounts of data, potentially indicating data theft.
def detect_data_exfiltration(user_id: str,
current_window: TimeWindow,
baseline: UserBaseline) -> Optional[Alert]:
"""
Detect abnormal data transfer volumes.
"""
# Calculate transfer in current window
current_bytes = sum(e.bytes for e in current_window.events
if e.type == 'download')
# Compare to baseline
baseline_mean = baseline.daily_download_mean
baseline_std = baseline.daily_download_std
if baseline_std == 0:
z_score = 0 if current_bytes == baseline_mean else float('inf')
else:
z_score = (current_bytes - baseline_mean) / baseline_std
# Also check against peer group
peer_percentile = baseline.get_peer_percentile(current_bytes)
# Alert if >5 standard deviations AND >99th peer percentile
if z_score > 5 and peer_percentile > 99:
# Additional context for investigation
destinations = get_transfer_destinations(current_window.events)
file_types = get_file_types(current_window.events)
return Alert(
type="potential_data_exfiltration",
severity="critical" if contains_sensitive_destinations(destinations) else "high",
user=user_id,
details={
"bytes_transferred": current_bytes,
"baseline_mean": baseline_mean,
"z_score": z_score,
"peer_percentile": peer_percentile,
"destinations": destinations,
"file_types": file_types
}
)
return None
Scenario 3: Privilege Escalation Sequence Detection
Detect sequences of actions that follow known privilege escalation patterns, even when individual actions are legitimate.
from collections import deque
# Define suspicious sequences (simplified)
ESCALATION_PATTERNS = [
['failed_sudo', 'failed_sudo', 'failed_sudo', 'successful_sudo'],
['user_login', 'create_user', 'add_to_admin_group'],
['service_account_access', 'credential_read', 'admin_action'],
]
def detect_escalation_sequence(user_id: str,
event_buffer: deque,
new_event: Event) -> Optional[Alert]:
"""
Pattern-match event sequences for privilege escalation.
"""
event_buffer.append(new_event.type)
# Keep window of last 20 events
if len(event_buffer) > 20:
event_buffer.popleft()
# Check for pattern matches
event_list = list(event_buffer)
for pattern in ESCALATION_PATTERNS:
if is_subsequence(pattern, event_list):
return Alert(
type="potential_privilege_escalation",
severity="critical",
user=user_id,
details={
"matched_pattern": pattern,
"event_sequence": event_list[-len(pattern)*2:],
"time_window": get_time_window(event_buffer)
}
)
return None
Anomaly detection extends security capabilities beyond signature-based approaches, enabling detection of novel attacks, insider threats, and sophisticated adversaries who evade traditional controls.
What's Next:
With detection capabilities in place—both signature and anomaly-based—the next step is knowing what to do when threats are found. The following page covers Security Incident Response—the processes and playbooks for investigating alerts, containing threats, eradicating attackers, and recovering from security incidents.
You now understand anomaly detection principles and implementation, from statistical foundations through machine learning approaches and UEBA. This knowledge enables you to design detection systems that find sophisticated attackers who evade signature-based detection while managing the operational challenge of false positives.