Loading content...
In December 2020, security analysts at FireEye noticed something unusual in their authentication logs: an employee account had authenticated from an IP address in a different country just hours after a legitimate login from California. This anomaly—one event among billions—was the thread that, when pulled, unraveled the SolarWinds attack and led to the discovery of a nation-state campaign affecting thousands of organizations.
Log analysis is the art and science of extracting security insights from massive volumes of event data. It's the bridge between passive log collection and active threat detection. A terabyte of logs is worthless if you can't search it effectively, correlate events across sources, or distinguish the signals of an attack from the noise of normal operations.
This page covers the complete discipline of security log analysis—from search query optimization to statistical correlation, from command-line forensics to SIEM querying, from real-time streaming analysis to historical investigation.
By the end of this page, you will understand: (1) Log search and query techniques across platforms, (2) Event correlation methods for detecting multi-stage attacks, (3) Statistical analysis for pattern detection, (4) Timeline reconstruction for forensic investigation, (5) Real-time streaming analysis, and (6) Security Information and Event Management (SIEM) best practices.
Effective log analysis starts with effective searching. The ability to quickly find relevant events across billions of records separates useful log infrastructure from expensive storage.
Different platforms use different query languages. Understanding the major options is essential:
| Platform | Language | Paradigm | Learning Curve |
|---|---|---|---|
| Splunk | SPL | Pipes and transformations | Medium |
| Elasticsearch | KQL / Lucene | Full-text search + JSON | Medium |
| Microsoft Sentinel | KQL (Kusto) | Pipes and tabular | Medium-High |
| Grafana Loki | LogQL | Label-based + regex | Low-Medium |
| Google Chronicle | YARA-L / UDM Search | Rule-based detection | Medium |
| CLI Tools | grep, awk, jq | Unix pipes | Varies |
Splunk's Search Processing Language (SPL) is the industry standard for security log analysis:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
| ============================================| SPLUNK SPL SECURITY ANALYSIS QUERIES| ============================================ | --- Basic Search ---| Find all failed authentication eventsindex=security sourcetype=linux:auth "Failed password" | --- Field Extraction and Filtering ---| Failed logins with extracted fieldsindex=security sourcetype=linux:auth "Failed password"| rex field=_raw "Failed password for (?<user>\S+) from (?<src_ip>\S+)"| where isnotnull(user)| table _time, host, user, src_ip | --- Aggregation ---| Count failed logins per source IPindex=security sourcetype=linux:auth "Failed password"| rex field=_raw "from (?<src_ip>\d+\.\d+\.\d+\.\d+)"| stats count as attempts by src_ip| sort - attempts| head 20 | --- Time-based Analysis ---| Brute force detection: >50 failures in 5 minutes per IPindex=security sourcetype=linux:auth "Failed password"| rex field=_raw "from (?<src_ip>\d+\.\d+\.\d+\.\d+)"| bucket _time span=5m| stats count as attempts by _time, src_ip| where attempts > 50 | --- Transaction Analysis ---| Find login sequences: failure followed by success (compromised?)index=security sourcetype=linux:auth| rex field=_raw "(?<action>Failed|Accepted) password for (?<user>\S+) from (?<src_ip>\S+)"| transaction user src_ip maxspan=10m| where mvcount(action) > 1 AND mvindex(action, 0)="Failed" AND mvindex(action, -1)="Accepted"| table _time, user, src_ip, duration, eventcount | --- Statistical Anomaly ---| Detect unusual data transfer volumesindex=proxy | bucket _time span=1h| stats sum(bytes_out) as total_bytes by _time, user| eventstats avg(total_bytes) as avg_bytes, stdev(total_bytes) as stdev_bytes by user| eval zscore = (total_bytes - avg_bytes) / stdev_bytes| where zscore > 3| sort - zscore | --- Subsearch Correlation ---| Find which IPs that failed login also succeeded elsewhereindex=security sourcetype=linux:auth "Failed password"| rex field=_raw "from (?<src_ip>\d+\.\d+\.\d+\.\d+)"| stats count by src_ip| where count > 10| fields src_ip| join type=inner src_ip [search index=security sourcetype=linux:auth "Accepted password" | rex field=_raw "from (?<src_ip>\d+\.\d+\.\d+\.\d+)" | stats count as success_count by src_ip]| table src_ip, count, success_count | --- Rare Value Detection ---| Find processes that only ran once (potential malware)index=sysmon EventCode=1| stats count by Image| where count == 1| table Image, count123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
// Elasticsearch Security Queries // Query 1: Find failed login attemptsGET security-logs-*/_search{ "query": { "bool": { "must": [ { "match": { "event.category": "authentication" } }, { "match": { "event.outcome": "failure" } } ], "filter": [ { "range": { "@timestamp": { "gte": "now-24h" } } } ] } }, "size": 100, "sort": [{ "@timestamp": "desc" }]} // Query 2: Aggregate failed logins by source IPGET security-logs-*/_search{ "size": 0, "query": { "bool": { "must": [ { "match": { "event.outcome": "failure" } }, { "match": { "event.category": "authentication" } } ], "filter": [ { "range": { "@timestamp": { "gte": "now-1h" } } } ] } }, "aggs": { "by_source_ip": { "terms": { "field": "source.ip", "size": 20, "order": { "_count": "desc" } } } }} // Query 3: Time histogram of authentication eventsGET security-logs-*/_search{ "size": 0, "query": { "match": { "event.category": "authentication" } }, "aggs": { "auth_over_time": { "date_histogram": { "field": "@timestamp", "calendar_interval": "hour" }, "aggs": { "by_outcome": { "terms": { "field": "event.outcome" } } } } }} // Query 4: Find rare processes (potential malware)GET sysmon-*/_search{ "size": 0, "query": { "match": { "event.code": "1" } }, "aggs": { "rare_processes": { "rare_terms": { "field": "process.executable", "max_doc_count": 1 } } }} // Query 5: Geolocation analysis of authenticationGET security-logs-*/_search{ "size": 0, "query": { "bool": { "must": [ { "match": { "event.category": "authentication" } }, { "match": { "event.outcome": "success" } } ], "filter": [ { "range": { "@timestamp": { "gte": "now-7d" } } } ] } }, "aggs": { "by_user": { "terms": { "field": "user.name", "size": 100 }, "aggs": { "countries": { "terms": { "field": "source.geo.country_name", "size": 10 } } } } }}For raw log files or quick analysis, Unix command-line tools remain powerful:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
#!/bin/bash# Command-line log analysis techniques # ================================================# BASIC GREP PATTERNS# ================================================ # Find all failed SSH loginsgrep "Failed password" /var/log/auth.log # Case-insensitive search for error conditionsgrep -i "error\|fail\|denied" /var/log/syslog # Extract just the IP addresses from failed loginsgrep "Failed password" /var/log/auth.log | \ grep -oE '[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}' | \ sort | uniq -c | sort -rn | head -20 # ================================================# AWK FOR STRUCTURED ANALYSIS# ================================================ # Parse Apache access log (combined format)# Extract IPs with most 404 errorsawk '$9 == 404 {print $1}' /var/log/apache2/access.log | \ sort | uniq -c | sort -rn | head -20 # Calculate average response time by endpointawk '{ endpoint=$7 time=$NF sum[endpoint] += time count[endpoint]++}END { for (e in sum) { printf "%s: %.2f ms avg (%d requests)\n", e, sum[e]/count[e], count[e] }}' /var/log/nginx/access.log # ================================================# JQ FOR JSON LOGS# ================================================ # Parse JSON logs (common in container environments)cat /var/log/app/app.json | jq -r \ 'select(.level == "error") | [.timestamp, .message] | @tsv' # Count events by categorycat /var/log/audit/audit.json | jq -r '.event_type' | \ sort | uniq -c | sort -rn # Extract specific fields and filtercat logs.json | jq -r \ 'select(.status_code >= 400 and .status_code < 500) | [.timestamp, .client_ip, .endpoint, .status_code] | @csv' # ================================================# TIME-WINDOW ANALYSIS# ================================================ # Count events per minute (for spike detection)awk '{ # Extract timestamp (assumes format: Jan 15 14:32:47) timestamp = $1 " " $2 " " substr($3, 1, 5) count[timestamp]++}END { for (t in count) { printf "%s: %d\n", t, count[t] }}' /var/log/auth.log | sort | tail -60 # ================================================# CORRELATION ACROSS FILES# ================================================ # Find IPs that appear in both auth failures and web attackscomm -12 \ <(grep "Failed password" /var/log/auth.log | \ grep -oE '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | sort -u) \ <(grep -E "sql|union|select|script" /var/log/apache2/access.log | \ awk '{print $1}' | sort -u) # ================================================# REAL-TIME MONITORING# ================================================ # Follow log with filtering (real-time alert-like behavior)tail -f /var/log/auth.log | \ grep --line-buffered "Failed password" | \ while read line; do echo "[ALERT] $line" # Could send to webhook, email, etc. doneAn inefficient query against petabytes of logs can take hours and cost significant compute resources. Optimize queries by: (1) Using time filters first (narrowest first), (2) Using indexed fields for filtering before non-indexed, (3) Limiting result sets with head/limit, (4) Pre-aggregating where possible using summary indexes or materialized views.
Event correlation connects related events across time, sources, and entities to reveal patterns that individual events cannot show. A single failed login is noise; 100 failed logins followed by a success is an attack.
| Type | Description | Example |
|---|---|---|
| Temporal | Events within time windows | Failed logins within 5 minutes |
| Sequential | Events in specific order | Reconnaissance → Exploitation → C2 |
| Cross-source | Same entity across log types | Auth log + network log + file access |
| Statistical | Deviation from baselines | 10x normal outbound traffic |
| Graphical | Relationship-based patterns | Process tree, network graph |
Effective correlation rules capture attack patterns while minimizing false positives:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
# Security event correlation rules# These can be implemented in SIEM platforms rules: # ======================================== # BRUTE FORCE DETECTION # ======================================== - name: "Brute Force Authentication Attack" description: "Multiple failed logins followed by success from same IP" data_sources: - authentication_logs correlation: type: sequence window: 10m sequence: - step: failures filter: event.category: authentication event.outcome: failure group_by: [source.ip, user.name] threshold: ">= 5" - step: success filter: event.category: authentication event.outcome: success group_by: [source.ip, user.name] after: failures within: 2m severity: high mitre_attack: T1110 # ======================================== # LATERAL MOVEMENT DETECTION # ======================================== - name: "Lateral Movement - Multiple Host Access" description: "Single user accessing many hosts in short time" data_sources: - authentication_logs - network_logs correlation: type: aggregation window: 15m filter: event.category: authentication event.outcome: success group_by: user.name conditions: - metric: distinct_count(host.name) operator: ">=" value: 5 severity: high mitre_attack: T1021 # ======================================== # DATA EXFILTRATION DETECTION # ======================================== - name: "Potential Data Exfiltration" description: "Large outbound data transfer to unusual destination" data_sources: - network_flow_logs - proxy_logs correlation: type: statistical window: 1h filter: network.direction: outbound group_by: [source.ip, destination.ip] conditions: - metric: sum(network.bytes) operator: ">" value: 100MB - filter: "destination.ip NOT IN known_cloud_services" severity: critical mitre_attack: T1041 # ======================================== # ATTACK CHAIN DETECTION # ======================================== - name: "Full Attack Chain - Initial Access to Exfiltration" description: "Detect complete attack lifecycle" correlation: type: sequence window: 24h sequence: - step: initial_access filter: event.category: authentication event.outcome: success source.geo.country_name: NOT IN [normal_countries] - step: reconnaissance filter: process.name: IN [whoami, net, ipconfig, systeminfo, tasklist] after: initial_access within: 1h - step: privilege_escalation filter: OR: - event.action: "elevation_success" - process.name: IN [mimikatz, rubeus, procdump] after: reconnaissance - step: lateral_movement filter: event.category: authentication event.outcome: success source.ip: IN [internal_ranges] user.name: "${initial_access.user.name}" after: privilege_escalation - step: exfiltration filter: network.direction: outbound network.bytes: ">1GB" after: lateral_movement severity: critical mitre_attack: [T1078, T1087, T1003, T1021, T1041]Real attacks span multiple log sources. Here's how to correlate across them:
1234567891011121314151617181920212223242526272829303132333435363738
// Cross-source correlation in KQL (Microsoft Sentinel)// Scenario: User downloads malware and it executes // Step 1: Find suspicious downloads from proxy logslet SuspiciousDownloads = ProxyLogs | where TimeGenerated > ago(1h) | where HttpMethod == "GET" | where UrlExtension in ("exe", "dll", "ps1", "bat", "vbs") | where ResponseCode == 200 | project DownloadTime=TimeGenerated, UserPrincipal, ClientIP, Url, FileName=extract(@"[^/]+$", 0, Url); // Step 2: Find process executions from endpoint logslet ProcessExecutions = DeviceProcessEvents | where TimeGenerated > ago(1h) | where ActionType == "ProcessCreated" | project ExecTime=TimeGenerated, DeviceName, AccountName, ProcessName=FileName, CommandLine, ParentProcessName; // Step 3: Correlate by username and filenameSuspiciousDownloads| join kind=inner ( ProcessExecutions) on $left.UserPrincipal == $right.AccountName| where ProcessName contains FileName // Downloaded file was executed| where ExecTime between (DownloadTime .. DownloadTime + 10m) // Within 10 mins| project DownloadTime, ExecTime, User=UserPrincipal, SourceIP=ClientIP, DownloadUrl=Url, ExecutedProcess=ProcessName, CommandLine, Device=DeviceName| extend TimeDelta = ExecTime - DownloadTimeCorrelation requires knowing that 'jsmith@corp.com' in email logs, 'CORP\jsmith' in Windows logs, and '192.168.1.50' in network logs are all the same person. Entity resolution—mapping different identifiers to common entities—is critical for effective correlation. Invest in identity mapping tables and consistent normalization.
Statistical analysis transforms raw event counts into meaningful security insights. It answers questions like: "Is this behavior unusual?" and "How unusual?"
| Technique | Use Case | Alert Condition |
|---|---|---|
| Z-score | Detect outliers in normally distributed data | z > 3 (3 standard deviations) |
| Percentile thresholds | Identify extreme values | Value > 99th percentile |
| Moving average | Detect trend changes | Value > 2x 7-day moving avg |
| Rare event detection | Find values that seldom occur | Seen < 3 times ever |
| Frequency analysis | Detect periodic patterns | Unexpected periodicity (beacon) |
| Entropy analysis | Detect randomness (tunneling) | High entropy in DNS queries |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
"""Statistical log analysis techniques for security detection.""" import numpy as npfrom collections import Counterfrom typing import List, Dict, Tupleimport math class StatisticalLogAnalyzer: """Statistical analysis methods for security log data.""" def detect_outliers_zscore(self, values: List[float], threshold: float = 3.0) -> List[Tuple[int, float, float]]: """ Detect outliers using Z-score method. Returns list of (index, value, zscore) for outliers. """ if len(values) < 10: return [] mean = np.mean(values) std = np.std(values) if std == 0: return [] outliers = [] for i, val in enumerate(values): zscore = (val - mean) / std if abs(zscore) > threshold: outliers.append((i, val, zscore)) return outliers def detect_rare_events(self, events: List[str], max_occurrences: int = 2) -> List[Tuple[str, int]]: """ Find events that occur rarely (potential indicators). """ counts = Counter(events) return [(event, count) for event, count in counts.items() if count <= max_occurrences] def detect_beaconing(self, timestamps: List[float], tolerance: float = 0.1) -> Dict: """ Detect periodic communication (C2 beaconing). Returns detected intervals and confidence. """ if len(timestamps) < 10: return {"detected": False, "reason": "insufficient data"} # Calculate inter-arrival times sorted_ts = sorted(timestamps) intervals = [sorted_ts[i+1] - sorted_ts[i] for i in range(len(sorted_ts)-1)] if not intervals: return {"detected": False} # Find most common interval (with tolerance) interval_counts = {} for interval in intervals: # Round to nearest 10 seconds for grouping rounded = round(interval / 10) * 10 interval_counts[rounded] = interval_counts.get(rounded, 0) + 1 if not interval_counts: return {"detected": False} most_common_interval = max(interval_counts, key=interval_counts.get) frequency = interval_counts[most_common_interval] / len(intervals) # If >60% of intervals are similar, likely beaconing is_beacon = frequency > 0.6 and most_common_interval > 0 return { "detected": is_beacon, "interval_seconds": most_common_interval, "confidence": frequency, "total_events": len(timestamps) } def calculate_entropy(self, data: str) -> float: """ Calculate Shannon entropy. High entropy in DNS subdomains may indicate tunneling. """ if not data: return 0 # Count character frequencies counts = Counter(data) length = len(data) # Calculate entropy entropy = 0 for count in counts.values(): probability = count / length if probability > 0: entropy -= probability * math.log2(probability) return entropy def detect_dns_tunneling(self, dns_queries: List[str], entropy_threshold: float = 4.0, length_threshold: int = 30) -> List[Dict]: """ Detect potential DNS tunneling by analyzing query patterns. """ suspicious = [] for query in dns_queries: # Extract subdomain (everything before first two domain levels) parts = query.split('.') if len(parts) < 3: continue subdomain = '.'.join(parts[:-2]) # Check length if len(subdomain) > length_threshold: entropy = self.calculate_entropy(subdomain) if entropy > entropy_threshold: suspicious.append({ "query": query, "subdomain": subdomain, "length": len(subdomain), "entropy": entropy, "indicators": ["high_entropy", "long_subdomain"] }) return suspicious # Example usageif __name__ == "__main__": analyzer = StatisticalLogAnalyzer() # Beaconing detection # Simulated C2 beacon every 60 seconds with jitter beacon_times = [i * 60 + np.random.uniform(-5, 5) for i in range(50)] result = analyzer.detect_beaconing(beacon_times) print(f"Beacon detection: {result}") # DNS tunneling detection queries = [ "www.google.com", # Normal "mail.example.com", # Normal "aGVsbG8gd29ybGQgdGhpcyBpcyBhIHRlc3Q.evil.com", # Suspicious! "c2VjcmV0IGRhdGEgZXhmaWx0cmF0aW9u.malware.net", # Suspicious! ] suspicious = analyzer.detect_dns_tunneling(queries) print(f"Suspicious DNS: {suspicious}")1234567891011121314151617181920212223242526272829303132333435363738394041424344
| --- Calculate baseline and detect anomalies --- | Compare current hour to same hour historical baselineindex=web_access| bucket _time span=1h| stats count as current_count by _time| eval hour=strftime(_time, "%H")| eval day_of_week=strftime(_time, "%w")| join type=outer hour day_of_week [ | search index=web_access earliest=-30d latest=-1d | bucket _time span=1h | stats count as hist_count by _time | eval hour=strftime(_time, "%H") | eval day_of_week=strftime(_time, "%w") | stats avg(hist_count) as baseline_avg, stdev(hist_count) as baseline_stdev by hour, day_of_week]| eval zscore = (current_count - baseline_avg) / baseline_stdev| where zscore > 3 | --- Detect rare user-agent strings (potential malware) ---index=web_access| stats count by user_agent| sort count| head 20| where count < 5 | --- Detect unusual process->network combinations ---index=sysmon EventCode=3| stats count by Image, DestinationIp| eventstats sum(count) as total_for_process by Image| eval ratio = count / total_for_process| where ratio < 0.01 AND count < 5| sort - count | --- Moving average comparison for data transfer ---index=proxy| bucket _time span=1h| stats sum(bytes_out) as hourly_bytes by _time, user| streamstats avg(hourly_bytes) as moving_avg window=168 by user| eval ratio = hourly_bytes / moving_avg| where ratio > 5| table _time, user, hourly_bytes, moving_avg, ratioAlways calculate baselines relative to time-of-day and day-of-week. Comparing Monday 9 AM traffic to Sunday 3 AM baseline will generate false positives. Build hourly baselines from the same hour of previous weeks for accurate anomaly detection.
During incident response, reconstructing the exact sequence of events is critical. Timeline analysis answers: What happened first? What did the attacker do after gaining access? How long were they present?
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
"""Forensic timeline generation from multiple log sources.""" from dataclasses import dataclassfrom datetime import datetime, timezonefrom typing import List, Optionalimport json @dataclassclass TimelineEvent: """Normalized timeline event.""" timestamp: datetime source: str # Which log source event_type: str # Category of event action: str # What happened subject: str # Who/what performed action target: Optional[str] # What was affected details: dict # Additional context severity: str = "info" def to_dict(self): return { "timestamp": self.timestamp.isoformat(), "source": self.source, "event_type": self.event_type, "action": self.action, "subject": self.subject, "target": self.target, "details": self.details, "severity": self.severity } class ForensicTimeline: """Build forensic timeline from multiple sources.""" def __init__(self): self.events: List[TimelineEvent] = [] self.parsers = {} def add_auth_log_events(self, log_lines: List[str]): """Parse Linux auth.log format.""" import re patterns = [ (r"Failed password for (?:invalid user )?(S+) from (S+)", "authentication", "failed_login"), (r"Accepted password for (S+) from (S+)", "authentication", "successful_login"), (r"session opened for user (S+)", "session", "session_start"), (r"session closed for user (S+)", "session", "session_end"), ] for line in log_lines: # Extract timestamp (assumes syslog format) ts_match = re.match(r"(w+s+d+s+d+:d+:d+)", line) if not ts_match: continue # Parse timestamp (add current year) ts_str = ts_match.group(1) timestamp = datetime.strptime( f"2024 {ts_str}", "%Y %b %d %H:%M:%S" ).replace(tzinfo=timezone.utc) # Match patterns for pattern, event_type, action in patterns: match = re.search(pattern, line) if match: self.events.append(TimelineEvent( timestamp=timestamp, source="auth.log", event_type=event_type, action=action, subject=match.group(1), target=match.group(2) if len(match.groups()) > 1 else None, details={"raw": line} )) break def add_process_events(self, sysmon_events: List[dict]): """Parse Sysmon-style process events.""" for event in sysmon_events: self.events.append(TimelineEvent( timestamp=datetime.fromisoformat(event["timestamp"]), source="sysmon", event_type="process", action="process_create", subject=event.get("User", "unknown"), target=event.get("Image", ""), details={ "command_line": event.get("CommandLine", ""), "parent": event.get("ParentImage", ""), "pid": event.get("ProcessId", "") }, severity="info" )) def add_network_events(self, flow_records: List[dict]): """Parse network flow records.""" for flow in flow_records: self.events.append(TimelineEvent( timestamp=datetime.fromisoformat(flow["timestamp"]), source="network", event_type="network", action="connection", subject=flow["src_ip"], target=f"{flow['dst_ip']}:{flow['dst_port']}", details={ "bytes": flow.get("bytes", 0), "protocol": flow.get("protocol", "tcp") } )) def build_timeline(self) -> List[TimelineEvent]: """Sort all events by timestamp.""" self.events.sort(key=lambda e: e.timestamp) return self.events def identify_attack_phases(self) -> dict: """Attempt to identify attack phases in timeline.""" phases = { "initial_access": [], "execution": [], "persistence": [], "privilege_escalation": [], "discovery": [], "lateral_movement": [], "collection": [], "exfiltration": [] } for event in self.events: # Initial access indicators if event.action in ["successful_login", "failed_login"]: phases["initial_access"].append(event) # Execution indicators if event.event_type == "process": if any(tool in event.target.lower() for tool in ["powershell", "cmd", "wscript", "cscript"]): phases["execution"].append(event) # Discovery indicators if event.event_type == "process": discovery_tools = ["whoami", "hostname", "ipconfig", "net user", "net group", "systeminfo"] if any(tool in event.details.get("command_line", "").lower() for tool in discovery_tools): phases["discovery"].append(event) # Add more phase detection logic... return phases def generate_report(self) -> str: """Generate human-readable timeline report.""" self.build_timeline() report = [] report.append("=" * 60) report.append("FORENSIC TIMELINE REPORT") report.append("=" * 60) report.append(f"Total Events: {len(self.events)}") if self.events: report.append(f"Time Range: {self.events[0].timestamp} to {self.events[-1].timestamp}") report.append("") report.append("TIMELINE OF EVENTS:") report.append("-" * 60) for event in self.events: report.append( f"[{event.timestamp}] [{event.source}] [{event.event_type}] " f"{event.action}: {event.subject} -> {event.target}" ) # Add phase analysis phases = self.identify_attack_phases() report.append("") report.append("ATTACK PHASE ANALYSIS:") report.append("-" * 60) for phase, events in phases.items(): if events: report.append(f"\n{phase.upper()} ({len(events)} events):") for event in events[:5]: # Show first 5 report.append(f" - {event.timestamp}: {event.action}") return "\n".join(report)Timeline accuracy depends entirely on synchronized timestamps. If servers have clock drift, events will appear in wrong order. Always verify NTP synchronization before relying on cross-source timelines. Consider using relative ordering (sequence of events on same host) when absolute time is uncertain.
While historical analysis is valuable for investigation, real-time analysis is essential for immediate threat detection. Streaming analysis processes events as they arrive, enabling rapid response.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
// Apache Flink streaming security detection// Real-time brute force detection import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.api.common.functions.AggregateFunction; public class BruteForceDetection { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Read from Kafka DataStream<AuthEvent> authEvents = env .addSource(new KafkaSource<>("auth-events")) .map(json -> parseAuthEvent(json)); // Detect brute force: >10 failures in 5 minutes from same IP DataStream<SecurityAlert> bruteForceAlerts = authEvents // Filter to only failure events .filter(event -> event.outcome.equals("failure")) // Key by source IP .keyBy(event -> event.sourceIp) // 5-minute tumbling window .timeWindow(Time.minutes(5)) // Count failures per IP .aggregate(new CountAggregator()) // Filter to threshold .filter(count -> count.total > 10) // Generate alert .map(count -> new SecurityAlert( "BRUTE_FORCE", "HIGH", String.format("%d failed logins from %s in 5 minutes", count.total, count.sourceIp) )); // Detect impossible travel: same user from different countries // within physically impossible time DataStream<SecurityAlert> impossibleTravel = authEvents .filter(event -> event.outcome.equals("success")) .keyBy(event -> event.username) // Session window: group events close in time .window(EventTimeSessionWindows.withGap(Time.minutes(30))) .process(new ImpossibleTravelDetector()); // Detect successful login after failures (potential compromise) DataStream<SecurityAlert> compromiseAlerts = authEvents .keyBy(event -> event.sourceIp + ":" + event.username) // Pattern matching .process(new PatternProcessor()) .map(match -> new SecurityAlert( "POTENTIAL_COMPROMISE", "CRITICAL", "Successful login after multiple failures" )); // Send alerts to downstream systems bruteForceAlerts.addSink(new AlertSink()); impossibleTravel.addSink(new AlertSink()); compromiseAlerts.addSink(new AlertSink()); env.execute("Security Detection Pipeline"); } // Aggregator to count events static class CountAggregator implements AggregateFunction<AuthEvent, CountAccumulator, CountResult> { @Override public CountAccumulator createAccumulator() { return new CountAccumulator(); } @Override public CountAccumulator add(AuthEvent event, CountAccumulator acc) { acc.count++; acc.sourceIp = event.sourceIp; return acc; } @Override public CountResult getResult(CountAccumulator acc) { return new CountResult(acc.sourceIp, acc.count); } @Override public CountAccumulator merge(CountAccumulator a, CountAccumulator b) { a.count += b.count; return a; } }}| Window Type | Description | Security Use Case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping | Count events per 5-minute bucket |
| Sliding | Fixed-size, overlapping | Detect spikes: count in last 5 min, every 1 min |
| Session | Variable-size, activity-based | Group user session activity |
| Global | Single window, all events | Count unique values over time |
Smaller windows detect faster but may miss patterns spanning window boundaries. Larger windows are more accurate but increase detection latency. Use sliding windows with overlap to balance both—a 5-minute window sliding every 1 minute catches patterns without boundary issues.
Security Information and Event Management (SIEM) platforms are the central nervous system of security operations. Effective SIEM deployment requires thoughtful architecture and ongoing tuning.
| Technique | Description | Impact |
|---|---|---|
| Whitelisting | Exclude known-good sources (backup servers, monitoring) | 50-90% reduction in specific alerts |
| Threshold tuning | Adjust thresholds based on baseline data | Reduces noise without losing signal |
| Alert aggregation | Group related alerts into single incident | Reduces analyst workload |
| Risk-based prioritization | Score by asset criticality + threat severity | Focus on highest-impact alerts |
| Contextual suppression | Suppress during known maintenance windows | Eliminates planned false positives |
| Machine learning | Learn analyst feedback to auto-close low-value | Continuous improvement |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
# Detection Rule Template with Lifecycle Metadata rule: # ======================================== # IDENTIFICATION # ======================================== id: "DET-2024-0042" name: "Suspicious PowerShell Download Cradle" version: "2.3" status: "production" # draft | testing | production | deprecated # ======================================== # RULE LOGIC # ======================================== data_sources: - sysmon_process_creation - windows_security query: | process.name: "powershell.exe" AND process.command_line: (*DownloadString* OR *WebClient* OR *IWR* OR *Invoke-WebRequest*) # ======================================== # TUNING # ======================================== suppressions: - field: process.parent.executable values: - "C:\ProgramData\chocolatey\choco.exe" # Package manager - "C:\Program Files\SCCM\*" # SCCM deployments reason: "Legitimate automated software deployment" - field: user.name values: - "svc_backup" - "svc_automation" reason: "Known automation service accounts" expires: "2024-12-31" # Review suppressions periodically # ======================================== # RESPONSE # ======================================== severity: medium priority: 3 # 1-5, used with asset criticality response: immediate: - "Create incident ticket" - "Collect process tree from endpoint" investigation: - "Verify if download URL is known-bad" - "Check if user account is expected to run such commands" - "Review historical activity from this host" # ======================================== # CONTEXT # ======================================== description: | Detects PowerShell commands commonly used to download and execute remote code. This pattern is frequently used by attackers for initial access and malware delivery. mitre_attack: - T1059.001 # PowerShell - T1105 # Ingress Tool Transfer references: - "https://attack.mitre.org/techniques/T1059/001/" - "Internal: KB-SEC-0234" false_positives: - "Legitimate automation scripts" - "Software deployment tools (SCCM, Chocolatey)" - "Developer workstations (may need different threshold)" # ======================================== # LIFECYCLE TRACKING # ======================================== created: "2024-01-15" created_by: "jsmith@security" last_modified: "2024-03-22" modified_by: "analyst2@security" review_frequency: "quarterly" last_reviewed: "2024-03-01" next_review: "2024-06-01" effectiveness: true_positives_30d: 12 false_positives_30d: 8 precision: 0.60 # Needs tuning if below 0.70 changelog: - date: "2024-03-22" author: "analyst2" change: "Added SCCM suppression after 15 FPs from deployments" - date: "2024-02-15" author: "jsmith" change: "Expanded to include IWR alias"A detection rule should have at least 70% precision (70% of alerts are true positives) to be valuable. Below this threshold, analysts learn to ignore it. Track precision for every rule and either tune or disable rules that consistently underperform.
Log analysis transforms raw event data into security insights. Without effective analysis, even perfect log collection is worthless. Let's consolidate the key concepts:
What's Next:
We conclude this module with compliance—the regulatory and legal frameworks that govern audit and logging requirements, and how to build systems that satisfy these mandates.
You now possess practical skills in security log analysis—from query optimization to event correlation, from statistical detection to timeline reconstruction. These skills are the core of security operations work.