Loading content...
Autoencoders offer a fundamentally different approach to anomaly detection compared to the geometric methods we've studied. Instead of explicitly defining a boundary around normal data, autoencoders learn to reconstruct normal data through a compressed representation. The key insight: anomalies are hard to reconstruct.
When trained on normal data, an autoencoder learns the essential features and patterns of that data. When presented with an anomaly—something structurally different—the autoencoder fails to reconstruct it well, producing high reconstruction error. This error becomes our anomaly score.
This approach has compelling advantages:
By the end of this page, you will understand: (1) The reconstruction error principle for anomaly detection, (2) Autoencoder architecture design for different data types, (3) Training strategies and regularization for robust anomaly detection, (4) Variational Autoencoders (VAE) for probabilistic anomaly scoring, (5) Threshold selection methods for converting scores to decisions, and (6) Practical considerations for deployment and monitoring.
An autoencoder is a neural network that learns to copy its input to its output through a bottleneck—a hidden layer with fewer dimensions than the input. This forces the network to learn a compressed representation.
Architecture Overview:
$$x \xrightarrow{\text{Encoder } f} z \xrightarrow{\text{Decoder } g} \hat{x}$$
Why Anomalies Have High Reconstruction Error:
When trained on normal data only:
For anomalies:
| Loss Function | Formula | Best For | Properties |
|---|---|---|---|
| MSE | Σ(xᵢ - x̂ᵢ)² | Continuous data, images | Sensitive to outlier pixels; smooth gradients |
| MAE | Σ|xᵢ - x̂ᵢ| | Data with sparse anomalies | Robust to single large errors; less sensitive |
| Binary Cross-Entropy | -Σ[xᵢ log x̂ᵢ + (1-xᵢ) log(1-x̂ᵢ)] | Binary/normalized images | Assumes sigmoid output; interprets as probability |
| Cosine Distance | 1 - (x·x̂)/(||x|| ||x̂||) | Direction-sensitive data | Ignores magnitude; focuses on pattern similarity |
| SSIM-based | 1 - SSIM(x, x̂) | Image data | Captures structural similarity; perceptually meaningful |
Without a bottleneck, an autoencoder could simply learn the identity function—copying input to output perfectly, even for anomalies. The compression forces the network to prioritize: it learns to reconstruct what it sees most often (normal data) at the expense of rare patterns (anomalies).
Bottleneck size is a key hyperparameter: • Too small → poor reconstruction of even normal data • Too large → can memorize anomalies too, losing detection power • Just right → captures normal data manifold, fails gracefully on anomalies
Mathematical Intuition:
Consider the autoencoder as learning a low-dimensional manifold M that approximates the distribution of normal data. The reconstruction function can be viewed as:
$$\hat{x} = \text{proj}_M(x) + \epsilon$$
where proj_M is the projection onto the learned manifold.
For normal data: x is already near M, so ||x - x̂|| is small. For anomalies: x is far from M, so ||x - x̂|| is large.
The reconstruction error is essentially measuring distance to the normal data manifold—the same intuition underlying One-Class SVM and SVDD, but learned rather than geometrically defined.
The autoencoder architecture should be tailored to the data type and the nature of expected anomalies. We explore common architectures and their design considerations.
1. Fully Connected (Dense) Autoencoders:
The simplest architecture, suitable for tabular data and low-dimensional inputs.
Input (d) → Dense(128) → Dense(64) → Dense(latent_dim) → Dense(64) → Dense(128) → Output (d)
Design considerations:
2. Convolutional Autoencoders (for images):
Leverage spatial structure in images for efficient representation.
Conv(32) → Conv(64) → Conv(128) → Flatten → Latent → Reshape → Deconv(128) → Deconv(64) → Deconv(32) → Output
Design considerations:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
import torchimport torch.nn as nnimport torch.nn.functional as F class DenseAutoencoder(nn.Module): """ Fully connected autoencoder for tabular data. Architecture: input → encoder → latent → decoder → output Uses ReLU activation and symmetric structure. """ def __init__(self, input_dim, latent_dim=32, hidden_dims=[128, 64]): super().__init__() # Encoder encoder_layers = [] prev_dim = input_dim for hidden_dim in hidden_dims: encoder_layers.extend([ nn.Linear(prev_dim, hidden_dim), nn.ReLU(), nn.BatchNorm1d(hidden_dim) ]) prev_dim = hidden_dim encoder_layers.append(nn.Linear(prev_dim, latent_dim)) self.encoder = nn.Sequential(*encoder_layers) # Decoder (reverse of encoder) decoder_layers = [] prev_dim = latent_dim for hidden_dim in reversed(hidden_dims): decoder_layers.extend([ nn.Linear(prev_dim, hidden_dim), nn.ReLU(), nn.BatchNorm1d(hidden_dim) ]) prev_dim = hidden_dim decoder_layers.append(nn.Linear(prev_dim, input_dim)) self.decoder = nn.Sequential(*decoder_layers) def encode(self, x): return self.encoder(x) def decode(self, z): return self.decoder(z) def forward(self, x): z = self.encode(x) x_recon = self.decode(z) return x_recon, z def reconstruction_error(self, x): """Compute reconstruction error for anomaly scoring.""" x_recon, _ = self.forward(x) # MSE per sample error = torch.mean((x - x_recon) ** 2, dim=1) return error class ConvAutoencoder(nn.Module): """ Convolutional autoencoder for image data. Uses strided convolutions for downsampling and transposed convolutions for upsampling. """ def __init__(self, in_channels=1, latent_dim=128, base_channels=32): super().__init__() # Encoder: progressively downsample self.encoder = nn.Sequential( # 28x28 → 14x14 nn.Conv2d(in_channels, base_channels, 3, stride=2, padding=1), nn.BatchNorm2d(base_channels), nn.LeakyReLU(0.2), # 14x14 → 7x7 nn.Conv2d(base_channels, base_channels * 2, 3, stride=2, padding=1), nn.BatchNorm2d(base_channels * 2), nn.LeakyReLU(0.2), # 7x7 → 4x4 (approximately) nn.Conv2d(base_channels * 2, base_channels * 4, 3, stride=2, padding=1), nn.BatchNorm2d(base_channels * 4), nn.LeakyReLU(0.2), nn.Flatten(), ) # Calculate flattened size (for 28x28 input) self.flatten_size = base_channels * 4 * 4 * 4 # 128 * 16 = 2048 # Latent projection self.fc_encode = nn.Linear(self.flatten_size, latent_dim) self.fc_decode = nn.Linear(latent_dim, self.flatten_size) # Decoder: progressively upsample self.decoder = nn.Sequential( # 4x4 → 7x7 nn.ConvTranspose2d(base_channels * 4, base_channels * 2, 3, stride=2, padding=1), nn.BatchNorm2d(base_channels * 2), nn.LeakyReLU(0.2), # 7x7 → 14x14 nn.ConvTranspose2d(base_channels * 2, base_channels, 3, stride=2, padding=1, output_padding=1), nn.BatchNorm2d(base_channels), nn.LeakyReLU(0.2), # 14x14 → 28x28 nn.ConvTranspose2d(base_channels, in_channels, 3, stride=2, padding=1, output_padding=1), nn.Sigmoid() # Output in [0, 1] ) self.base_channels = base_channels def encode(self, x): h = self.encoder(x) z = self.fc_encode(h) return z def decode(self, z): h = self.fc_decode(z) h = h.view(-1, self.base_channels * 4, 4, 4) x_recon = self.decoder(h) return x_recon def forward(self, x): z = self.encode(x) x_recon = self.decode(z) return x_recon, z def reconstruction_error(self, x): """Per-sample reconstruction error (MSE across all pixels).""" x_recon, _ = self.forward(x) error = torch.mean((x - x_recon) ** 2, dim=[1, 2, 3]) return error class LSTMAutoencoder(nn.Module): """ LSTM-based autoencoder for sequence data. Encoder: LSTM that produces a fixed-size encoding from variable-length input Decoder: LSTM that reconstructs the sequence from the encoding """ def __init__(self, input_dim, hidden_dim=64, latent_dim=32, num_layers=2): super().__init__() self.hidden_dim = hidden_dim self.latent_dim = latent_dim self.num_layers = num_layers # Encoder LSTM self.encoder_lstm = nn.LSTM( input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True ) # Latent projection (from bidirectional: hidden_dim * 2) self.fc_encode = nn.Linear(hidden_dim * 2, latent_dim) self.fc_decode = nn.Linear(latent_dim, hidden_dim) # Decoder LSTM self.decoder_lstm = nn.LSTM( hidden_dim, hidden_dim, num_layers, batch_first=True ) # Output projection self.output_layer = nn.Linear(hidden_dim, input_dim) def encode(self, x): # x: (batch, seq_len, input_dim) _, (h_n, _) = self.encoder_lstm(x) # h_n: (num_layers * 2, batch, hidden_dim) for bidirectional # Take last layer, both directions h_forward = h_n[-2] h_backward = h_n[-1] h = torch.cat([h_forward, h_backward], dim=1) z = self.fc_encode(h) return z def decode(self, z, seq_len): batch_size = z.size(0) h = self.fc_decode(z) # Repeat the latent vector for each time step decoder_input = h.unsqueeze(1).repeat(1, seq_len, 1) # Decode output, _ = self.decoder_lstm(decoder_input) x_recon = self.output_layer(output) return x_recon def forward(self, x): z = self.encode(x) x_recon = self.decode(z, x.size(1)) return x_recon, z def reconstruction_error(self, x): """Per-sequence reconstruction error.""" x_recon, _ = self.forward(x) error = torch.mean((x - x_recon) ** 2, dim=[1, 2]) return error # Example usageif __name__ == "__main__": # Dense autoencoder for tabular data print("Dense Autoencoder:") model = DenseAutoencoder(input_dim=100, latent_dim=16) x = torch.randn(32, 100) x_recon, z = model(x) errors = model.reconstruction_error(x) print(f" Input: {x.shape}, Latent: {z.shape}, Output: {x_recon.shape}") print(f" Reconstruction errors: {errors.shape}, mean: {errors.mean():.4f}") # Conv autoencoder for images print("\nConv Autoencoder:") model = ConvAutoencoder(in_channels=1, latent_dim=64) x = torch.randn(16, 1, 28, 28) x_recon, z = model(x) errors = model.reconstruction_error(x) print(f" Input: {x.shape}, Latent: {z.shape}, Output: {x_recon.shape}") print(f" Reconstruction errors: {errors.shape}, mean: {errors.mean():.4f}")Key principles for autoencoder architecture in anomaly detection:
Training autoencoders for anomaly detection requires careful attention to prevent the model from learning to reconstruct anomalies. The goal is a model that generalizes perfectly to unseen normal data while failing gracefully on anomalies.
Training Data Requirements:
Key Training Considerations:
Denoising Autoencoders (DAE):
A particularly effective variant for anomaly detection. The training objective changes:
$$\min_\theta \mathbb{E}{x \sim p{data}} \left[ ||x - g_\theta(f_\theta(\tilde{x}))||^2 \right]$$
where x̃ is a corrupted version of x (e.g., with added Gaussian noise or masked features).
Why DAE works for anomaly detection:
Corruption strategies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
import torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import DataLoader, TensorDatasetimport numpy as np class AnomalyDetectorTrainer: """ Trainer for autoencoder-based anomaly detection. Includes: - Optional denoising training - Early stopping on validation loss - Robust training with outlier downweighting - Threshold calibration """ def __init__( self, model, learning_rate=1e-3, weight_decay=1e-5, denoising=True, noise_factor=0.2, early_stopping_patience=10, device='cuda' if torch.cuda.is_available() else 'cpu' ): self.model = model.to(device) self.device = device self.denoising = denoising self.noise_factor = noise_factor self.patience = early_stopping_patience self.optimizer = optim.Adam( model.parameters(), lr=learning_rate, weight_decay=weight_decay ) self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=5 ) self.train_losses = [] self.val_losses = [] self.threshold = None def add_noise(self, x): """Add Gaussian noise for denoising autoencoder training.""" noise = torch.randn_like(x) * self.noise_factor return x + noise def train_epoch(self, train_loader): """Train for one epoch.""" self.model.train() total_loss = 0 n_batches = 0 for batch in train_loader: x = batch[0].to(self.device) # Optional denoising if self.denoising: x_input = self.add_noise(x) else: x_input = x # Forward pass x_recon, _ = self.model(x_input) # Reconstruction loss (MSE) loss = torch.mean((x - x_recon) ** 2) # Backward pass self.optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() total_loss += loss.item() n_batches += 1 return total_loss / n_batches def validate(self, val_loader): """Compute validation loss.""" self.model.eval() total_loss = 0 n_batches = 0 with torch.no_grad(): for batch in val_loader: x = batch[0].to(self.device) x_recon, _ = self.model(x) loss = torch.mean((x - x_recon) ** 2) total_loss += loss.item() n_batches += 1 return total_loss / n_batches def fit(self, X_train, X_val=None, epochs=100, batch_size=64): """ Train the autoencoder on normal data. Parameters: ----------- X_train : array-like Training data (should be mostly/all normal) X_val : array-like, optional Validation data (normal) for early stopping epochs : int Maximum number of epochs batch_size : int Batch size for training """ # Prepare data loaders train_dataset = TensorDataset(torch.FloatTensor(X_train)) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) if X_val is not None: val_dataset = TensorDataset(torch.FloatTensor(X_val)) val_loader = DataLoader(val_dataset, batch_size=batch_size) else: val_loader = None best_val_loss = float('inf') patience_counter = 0 best_state = None for epoch in range(epochs): train_loss = self.train_epoch(train_loader) self.train_losses.append(train_loss) if val_loader: val_loss = self.validate(val_loader) self.val_losses.append(val_loss) self.scheduler.step(val_loss) # Early stopping if val_loss < best_val_loss: best_val_loss = val_loss best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} patience_counter = 0 else: patience_counter += 1 if epoch % 10 == 0: print(f"Epoch {epoch}: train_loss={train_loss:.6f}, val_loss={val_loss:.6f}") if patience_counter >= self.patience: print(f"Early stopping at epoch {epoch}") break else: if epoch % 10 == 0: print(f"Epoch {epoch}: train_loss={train_loss:.6f}") # Restore best model if best_state is not None: self.model.load_state_dict(best_state) # Calibrate threshold on training data self.calibrate_threshold(X_train) return self def calibrate_threshold(self, X_normal, percentile=95): """ Set anomaly threshold based on reconstruction errors of normal data. Points with error above this threshold are classified as anomalies. """ errors = self.compute_anomaly_scores(X_normal) self.threshold = np.percentile(errors, percentile) print(f"Threshold set at {percentile}th percentile: {self.threshold:.6f}") return self.threshold def compute_anomaly_scores(self, X): """Compute reconstruction error for each sample.""" self.model.eval() X_tensor = torch.FloatTensor(X).to(self.device) with torch.no_grad(): errors = self.model.reconstruction_error(X_tensor) return errors.cpu().numpy() def predict(self, X): """ Predict if samples are normal (1) or anomalies (-1). """ if self.threshold is None: raise ValueError("Threshold not set. Call calibrate_threshold first.") scores = self.compute_anomaly_scores(X) return np.where(scores <= self.threshold, 1, -1) def decision_function(self, X): """ Return anomaly scores (higher = more anomalous). """ return self.compute_anomaly_scores(X) # Example: Training pipelineif __name__ == "__main__": from sklearn.datasets import make_blobs from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, roc_auc_score # Generate normal data X_normal, _ = make_blobs(n_samples=1000, centers=[[0, 0]], cluster_std=1.0, random_state=42) X_normal = X_normal.astype(np.float32) # Split into train/val X_train, X_val = train_test_split(X_normal, test_size=0.2, random_state=42) # Generate test data with anomalies X_test_normal, _ = make_blobs(n_samples=100, centers=[[0, 0]], cluster_std=1.0, random_state=43) X_anomalies = np.random.uniform(-5, 5, size=(50, 2)).astype(np.float32) X_test = np.vstack([X_test_normal, X_anomalies]) y_test = np.array([1] * 100 + [-1] * 50) # Create and train model from autoencoder_architectures import DenseAutoencoder model = DenseAutoencoder(input_dim=2, latent_dim=1, hidden_dims=[16, 8]) trainer = AnomalyDetectorTrainer(model, denoising=True, noise_factor=0.1) trainer.fit(X_train, X_val, epochs=100, batch_size=32) # Evaluate predictions = trainer.predict(X_test) scores = trainer.decision_function(X_test) print("\nClassification Report:") print(classification_report(y_test, predictions, target_names=['Anomaly', 'Normal'])) print(f"AUROC: {roc_auc_score(y_test == -1, scores):.4f}")Variational Autoencoders (VAEs) offer a probabilistic perspective on autoencoder-based anomaly detection. Instead of simply compressing and reconstructing, VAEs learn a latent probability distribution and provide principled uncertainty estimates.
VAE Architecture:
Encoder qφ(z|x): Maps input x to a distribution over latent variables
Decoder pθ(x|z): Maps latent sample z to a distribution over outputs
Training Objective (ELBO):
$$\mathcal{L}{ELBO} = \mathbb{E}{q_\phi(z|x)}[\log p_\theta(x|z)] - D_{KL}(q_\phi(z|x) || p(z))$$
VAEs provide multiple anomaly signals:
This principled probabilistic framework often outperforms deterministic autoencoders, especially when anomalies are subtle or when uncertainty quantification is important.
Anomaly Scoring with VAE:
Several anomaly scores can be derived from a trained VAE:
1. Reconstruction Probability: $$\log p_\theta(x|z) \text{ (averaged over samples } z \sim q_\phi(z|x))$$
Lower probability = more anomalous.
2. ELBO Score: $$\mathcal{L}{ELBO}(x) = \mathbb{E}[\log p(x|z)] - D{KL}$$
Lower ELBO = input is poorly explained by the model = anomaly.
3. Reconstruction Error + KL:
A simple combination that works well in practice: $$\text{score}(x) = ||x - \hat{x}||^2 + \beta \cdot D_{KL}(q(z|x) || p(z))$$
where β weights the importance of latent deviation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as np class VAE(nn.Module): """ Variational Autoencoder for anomaly detection. Provides probabilistic anomaly scores based on reconstruction probability and KL divergence from prior. """ def __init__(self, input_dim, latent_dim=32, hidden_dims=[128, 64]): super().__init__() self.latent_dim = latent_dim # Encoder encoder_layers = [] prev_dim = input_dim for hidden_dim in hidden_dims: encoder_layers.extend([ nn.Linear(prev_dim, hidden_dim), nn.ReLU(), nn.BatchNorm1d(hidden_dim) ]) prev_dim = hidden_dim self.encoder = nn.Sequential(*encoder_layers) # Latent mean and variance self.fc_mu = nn.Linear(prev_dim, latent_dim) self.fc_logvar = nn.Linear(prev_dim, latent_dim) # Decoder decoder_layers = [] prev_dim = latent_dim for hidden_dim in reversed(hidden_dims): decoder_layers.extend([ nn.Linear(prev_dim, hidden_dim), nn.ReLU(), nn.BatchNorm1d(hidden_dim) ]) prev_dim = hidden_dim decoder_layers.append(nn.Linear(prev_dim, input_dim)) self.decoder = nn.Sequential(*decoder_layers) def encode(self, x): """Encode input to latent distribution parameters.""" h = self.encoder(x) mu = self.fc_mu(h) logvar = self.fc_logvar(h) return mu, logvar def reparameterize(self, mu, logvar): """Reparameterization trick: z = mu + sigma * epsilon.""" std = torch.exp(0.5 * logvar) eps = torch.randn_like(std) return mu + eps * std def decode(self, z): """Decode latent representation to reconstruction.""" return self.decoder(z) def forward(self, x): """Forward pass returning reconstruction and latent distribution params.""" mu, logvar = self.encode(x) z = self.reparameterize(mu, logvar) x_recon = self.decode(z) return x_recon, mu, logvar, z def loss_function(self, x, x_recon, mu, logvar, beta=1.0): """ VAE loss = Reconstruction loss + beta * KL divergence beta controls the weight of KL divergence (beta-VAE formulation) """ # Reconstruction loss (MSE) recon_loss = F.mse_loss(x_recon, x, reduction='none').sum(dim=1) # KL divergence: D_KL(q(z|x) || p(z)) where p(z) = N(0, I) # = -0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2) kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1) # Total loss per sample total_loss = recon_loss + beta * kl_loss return total_loss.mean(), recon_loss.mean(), kl_loss.mean() def anomaly_score(self, x, n_samples=10, beta=1.0): """ Compute anomaly score for each input sample. Uses multiple latent samples for more stable estimation. Higher score = more anomalous. Parameters: ----------- x : Tensor Input samples n_samples : int Number of latent samples for Monte Carlo estimation beta : float Weight of KL divergence in the score """ self.eval() with torch.no_grad(): mu, logvar = self.encode(x) # Monte Carlo sampling recon_errors = [] for _ in range(n_samples): z = self.reparameterize(mu, logvar) x_recon = self.decode(z) recon_error = torch.sum((x - x_recon) ** 2, dim=1) recon_errors.append(recon_error) # Average reconstruction error avg_recon_error = torch.stack(recon_errors).mean(dim=0) # KL divergence kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1) # Combined score score = avg_recon_error + beta * kl_div return score def reconstruction_probability(self, x, n_samples=100): """ Estimate log p(x) using importance sampling. Lower log probability = more anomalous. """ self.eval() with torch.no_grad(): mu, logvar = self.encode(x) log_probs = [] for _ in range(n_samples): z = self.reparameterize(mu, logvar) x_recon = self.decode(z) # Log probability under Gaussian decoder # Assuming unit variance: log p(x|z) ∝ -0.5 * ||x - x_recon||^2 log_prob = -0.5 * torch.sum((x - x_recon) ** 2, dim=1) log_probs.append(log_prob) # Log-sum-exp for stable averaging log_probs = torch.stack(log_probs) avg_log_prob = torch.logsumexp(log_probs, dim=0) - np.log(n_samples) return avg_log_prob class VAEAnomalyDetector: """ Complete VAE-based anomaly detection pipeline. """ def __init__(self, input_dim, latent_dim=32, hidden_dims=[128, 64], beta=1.0, device='cuda' if torch.cuda.is_available() else 'cpu'): self.vae = VAE(input_dim, latent_dim, hidden_dims).to(device) self.device = device self.beta = beta self.threshold = None def fit(self, X_train, epochs=100, batch_size=64, learning_rate=1e-3): """Train the VAE on normal data.""" from torch.utils.data import DataLoader, TensorDataset dataset = TensorDataset(torch.FloatTensor(X_train)) loader = DataLoader(dataset, batch_size=batch_size, shuffle=True) optimizer = torch.optim.Adam(self.vae.parameters(), lr=learning_rate) self.vae.train() for epoch in range(epochs): total_loss = 0 for batch in loader: x = batch[0].to(self.device) x_recon, mu, logvar, _ = self.vae(x) loss, recon, kl = self.vae.loss_function(x, x_recon, mu, logvar, self.beta) optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() if epoch % 20 == 0: print(f"Epoch {epoch}: loss={total_loss/len(loader):.4f}") # Calibrate threshold self.calibrate_threshold(X_train) return self def calibrate_threshold(self, X_normal, percentile=95): """Set threshold based on normal data scores.""" scores = self.decision_function(X_normal) self.threshold = np.percentile(scores, percentile) print(f"Threshold: {self.threshold:.4f}") def decision_function(self, X): """Compute anomaly scores (higher = more anomalous).""" X_tensor = torch.FloatTensor(X).to(self.device) scores = self.vae.anomaly_score(X_tensor, beta=self.beta) return scores.cpu().numpy() def predict(self, X): """Predict normal (1) or anomaly (-1).""" scores = self.decision_function(X) return np.where(scores <= self.threshold, 1, -1) # Example usageif __name__ == "__main__": from sklearn.datasets import make_moons from sklearn.metrics import roc_auc_score, classification_report # Generate crescent-shaped normal data X_normal, _ = make_moons(n_samples=500, noise=0.05, random_state=42) X_normal = X_normal.astype(np.float32) # Test data X_test_normal, _ = make_moons(n_samples=100, noise=0.05, random_state=43) X_anomalies = np.random.uniform(-2, 3, size=(50, 2)).astype(np.float32) X_test = np.vstack([X_test_normal, X_anomalies]) y_test = np.array([1] * 100 + [-1] * 50) # Train VAE detector = VAEAnomalyDetector(input_dim=2, latent_dim=2, hidden_dims=[32, 16], beta=0.5) detector.fit(X_normal, epochs=100) # Evaluate predictions = detector.predict(X_test) scores = detector.decision_function(X_test) print("\nClassification Report:") print(classification_report(y_test, predictions, target_names=['Anomaly', 'Normal'])) print(f"AUROC: {roc_auc_score(y_test == -1, scores):.4f}")Autoencoders produce continuous anomaly scores (reconstruction errors). Converting these to binary decisions requires setting a threshold. This is one of the most critical—and often underestimated—steps in deploying anomaly detection.
The Challenge:
Unlike supervised classification, we typically lack labeled anomalies to optimize the threshold. We must rely on:
Common Threshold Selection Methods:
| Method | Approach | Pros | Cons |
|---|---|---|---|
| Percentile-based | Set threshold at p-th percentile of training errors | Simple; controls training false positive rate | Assumes training data is clean; sensitive to p choice |
| Statistical | μ + k·σ for Gaussian-assumed errors | Principled for Gaussian data | Reconstruction errors often non-Gaussian |
| Extreme Value Theory | Fit GPD to tail of error distribution | Handles heavy-tailed distributions | Requires more data; complex |
| Contamination-based | Assume ε fraction of training is anomalous | Handles contaminated training sets | Requires knowing contamination rate |
| Elbow method | Find knee in sorted error curve | Adaptive to data | Subjective; may not have clear elbow |
| Domain-driven | Set based on cost of FP vs FN | Business-aligned | Needs labeled data or expert input |
Dynamic Thresholding:
In production, static thresholds can become stale as data distributions shift. Dynamic approaches include:
Multi-Threshold Strategies:
Instead of a single threshold, use multiple levels:
This graduated approach reduces alert fatigue while maintaining sensitivity.
A poorly chosen threshold can make an excellent model useless:
• Too low: Excessive false positives → alert fatigue → ignored alerts • Too high: Missed anomalies → false sense of security
Best practice: Start conservative (lower threshold, more FPs), then adjust based on operational feedback. It's easier to reduce alert volume than to recover from a missed critical anomaly.
We've comprehensively explored autoencoder-based anomaly detection, from fundamental principles through advanced variational methods to practical deployment considerations.
What's Next:
In the following page, we explore broader Neural Network Approaches for anomaly detection, including generative adversarial networks (GANs), deep one-class methods, and hybrid architectures that combine neural feature extraction with classical anomaly detection algorithms.
You now have a thorough understanding of autoencoder-based anomaly detection. You can design appropriate architectures, implement training pipelines with proper regularization, use VAEs for probabilistic scoring, and select thresholds for operational deployment. These skills form the foundation for modern deep learning approaches to anomaly detection.