Loading content...
Your model shows 99% accuracy in cross-validation. Excited, you deploy it. In production, it barely beats random guessing. What happened?
Data leakage—information from the test set contaminating the training process—is the most insidious bug in machine learning. Unlike a crash or error message, leakage silently inflates your metrics, giving you false confidence until deployment exposes the truth.
The damage isn't limited to wasted deployment effort. Leakage can:
This page provides a comprehensive treatment of data leakage: what it is, how to detect it, and—most importantly—how to build pipelines that are immune to it by construction.
Leakage is not a rare edge case. In a 2019 survey of Kaggle competitions, over 40% of top solutions were later found to contain some form of leakage. In enterprise ML, the rate is likely higher due to less scrutiny. Assume leakage is present until you've proven otherwise.
Data leakage manifests in several distinct forms. Understanding this taxonomy helps you identify and prevent each type.
Type 1: Target Leakage
The training features contain information that is only available after the target is known—information that wouldn't be available at prediction time.
Examples:
Type 2: Train-Test Contamination
Test set information influences the training process, either directly (samples leak) or indirectly (statistics leak).
Examples:
Type 3: Temporal Leakage
In time-series or temporal data, using future information to predict the past.
Examples:
Type 4: Group Leakage
Samples from the same group appear in both training and test sets, allowing the model to memorize group-specific patterns.
Examples:
| Type | Root Cause | Detection | Prevention |
|---|---|---|---|
| Target leakage | Features derived from target | Feature correlation analysis | Domain knowledge review |
| Train-test contamination | Pre-split preprocessing | Compare train/test distributions | Pipeline discipline |
| Temporal leakage | Shuffled time data | Check temporal ordering | Time-based splits |
| Group leakage | Split ignores groups | Verify group separation | GroupKFold, LOGO |
Target leakage is usually the most severe (perfect predictions possible), followed by group leakage (familiar patterns in test), then train-test contamination (subtle performance inflation). Temporal leakage severity depends on how much future information is used.
Train-test contamination is the most common form of leakage in cross-validation pipelines. It occurs when preprocessing, feature engineering, or model selection uses information from the test set.
The Core Principle
Every statistic used in transformation must be computed only on training data for that fold. The test set must be treated as if it doesn't exist during training.
Common Contamination Patterns
Pattern 1: Pre-Split Scaling
# WRONG: Leakage!
X_scaled = (X - X.mean()) / X.std()
for train, test in cv.split(X):
model.fit(X_scaled[train], y[train])
model.score(X_scaled[test], y[test])
# RIGHT: No leakage
for train, test in cv.split(X):
mean, std = X[train].mean(), X[train].std()
X_train_scaled = (X[train] - mean) / std
X_test_scaled = (X[test] - mean) / std
model.fit(X_train_scaled, y[train])
model.score(X_test_scaled, y[test])
Pattern 2: Pre-Split Feature Selection
# WRONG: Leakage!
selected_features = SelectKBest(k=10).fit(X, y).get_support()
X_selected = X[:, selected_features]
cross_val_score(model, X_selected, y, cv=5)
# RIGHT: No leakage
pipeline = Pipeline([
('select', SelectKBest(k=10)),
('model', model)
])
cross_val_score(pipeline, X, y, cv=5)
Pattern 3: Pre-Split Target Encoding
# WRONG: Leakage!
for cat_col in categorical_columns:
means = df.groupby(cat_col)[target].mean()
df[f'{cat_col}_encoded'] = df[cat_col].map(means)
cv_score = cross_val_score(model, df[features], df[target])
# RIGHT: No leakage (target encoding within each fold)
# See implementation below
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
import numpy as npimport pandas as pdfrom sklearn.base import BaseEstimator, TransformerMixinfrom sklearn.model_selection import StratifiedKFoldfrom sklearn.pipeline import Pipelinefrom typing import Dict, Optional class LeakFreeTargetEncoder(BaseEstimator, TransformerMixin): """ Target encoder that prevents leakage by computing statistics only on training data. """ def __init__( self, columns: list = None, smoothing: float = 1.0, min_samples: int = 5 ): self.columns = columns self.smoothing = smoothing self.min_samples = min_samples self.encoding_maps_: Dict[str, Dict] = {} self.global_mean_: float = 0.0 def fit(self, X: pd.DataFrame, y: np.ndarray): """ Learn encoding from training data only. """ df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X) target = pd.Series(y) self.global_mean_ = target.mean() # Determine columns to encode if self.columns is None: self.columns = df.select_dtypes(include=['object', 'category']).columns.tolist() # Compute encoding for each column for col in self.columns: # Group statistics agg = df.groupby(col)[target.name if hasattr(target, 'name') and target.name else 0].agg(['sum', 'count']) agg.columns = ['sum', 'count'] if hasattr(agg, 'columns') else agg.columns # Smoothed mean (Bayesian shrinkage) # encoded = (count * group_mean + smoothing * global_mean) / (count + smoothing) smoothed = (agg['sum'] + self.smoothing * self.global_mean_) / (agg['count'] + self.smoothing) # Apply minimum samples threshold smoothed[agg['count'] < self.min_samples] = self.global_mean_ self.encoding_maps_[col] = smoothed.to_dict() return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Apply encoding learned from training data. """ df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X) for col in self.columns: encoding_map = self.encoding_maps_.get(col, {}) df[col] = df[col].map(encoding_map).fillna(self.global_mean_) return df class LeakFreeStandardScaler(BaseEstimator, TransformerMixin): """ Standard scaler that explicitly prevents leakage. """ def __init__(self, with_mean: bool = True, with_std: bool = True): self.with_mean = with_mean self.with_std = with_std self.mean_: np.ndarray = None self.std_: np.ndarray = None def fit(self, X: np.ndarray, y=None): X_arr = np.asarray(X) self.mean_ = X_arr.mean(axis=0) if self.with_mean else 0 self.std_ = X_arr.std(axis=0) if self.with_std else 1 self.std_[self.std_ == 0] = 1 # Avoid division by zero return self def transform(self, X: np.ndarray) -> np.ndarray: X_arr = np.asarray(X) return (X_arr - self.mean_) / self.std_ def demonstrate_leakage_impact(): """ Demonstrate how leakage inflates performance metrics. """ np.random.seed(42) # Create dataset with categorical feature n = 1000 df = pd.DataFrame({ 'cat_feature': np.random.choice(['A', 'B', 'C', 'D', 'E'], n), 'num_feature': np.random.randn(n), }) # Target is actually random (no predictable relationship) df['target'] = np.random.randint(0, 2, n) # LEAKING approach: Encode on full data print("=" * 60) print("DEMONSTRATION: Leakage Impact on Metrics") print("=" * 60) print("True signal: NONE (random labels)") from sklearn.linear_model import LogisticRegression from sklearn.model_selection import cross_val_score # Pre-compute target encoding (LEAKAGE!) df_leaky = df.copy() target_means = df_leaky.groupby('cat_feature')['target'].mean() df_leaky['cat_encoded'] = df_leaky['cat_feature'].map(target_means) X_leaky = df_leaky[['cat_encoded', 'num_feature']].values y = df['target'].values scores_leaky = cross_val_score( LogisticRegression(), X_leaky, y, cv=StratifiedKFold(5, shuffle=True, random_state=42) ) print(f"With leakage: {scores_leaky.mean():.3f} ± {scores_leaky.std():.3f}") # Proper approach: Encode within CV X_proper = df[['cat_feature', 'num_feature']].copy() scores_proper = [] cv = StratifiedKFold(5, shuffle=True, random_state=42) for train_idx, test_idx in cv.split(X_proper, y): X_train = X_proper.iloc[train_idx].copy() X_test = X_proper.iloc[test_idx].copy() y_train, y_test = y[train_idx], y[test_idx] # Encode using ONLY training data train_means = X_train.assign(target=y_train).groupby('cat_feature')['target'].mean() global_mean = y_train.mean() X_train['cat_encoded'] = X_train['cat_feature'].map(train_means).fillna(global_mean) X_test['cat_encoded'] = X_test['cat_feature'].map(train_means).fillna(global_mean) model = LogisticRegression() model.fit(X_train[['cat_encoded', 'num_feature']], y_train) score = model.score(X_test[['cat_encoded', 'num_feature']], y_test) scores_proper.append(score) print(f"Without leakage: {np.mean(scores_proper):.3f} ± {np.std(scores_proper):.3f}") print(f"Expected (random): ~0.500") print(f"Leakage inflation: {scores_leaky.mean() - 0.5:.1%}") if __name__ == "__main__": demonstrate_leakage_impact()Target encoding is particularly dangerous because it directly uses the target variable. Even a small leak (e.g., global mean computed on full data) can create spurious signal. Always use out-of-fold encoding or properly nested pipelines.
The most reliable way to prevent leakage is to build pipelines that are correct by construction. This means structuring your code so that leakage is architecturally impossible.
The Sklearn Pipeline Pattern
Scikit-learn's Pipeline automatically handles train/test separation:
pipeline = Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler()),
('selector', SelectKBest(k=10)),
('model', RandomForestClassifier())
])
# cross_val_score handles everything correctly
scores = cross_val_score(pipeline, X, y, cv=5)
Each transformer's fit() is called only on training data; transform() is then applied to both train and test. This is the gold standard for leakage prevention.
When Pipelines Aren't Enough
Standard pipelines handle transformers, but some operations require custom handling:
For these, use custom transformers or explicit fold-by-fold processing.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
from sklearn.pipeline import Pipelinefrom sklearn.compose import ColumnTransformerfrom sklearn.preprocessing import StandardScaler, OneHotEncoderfrom sklearn.impute import SimpleImputerfrom sklearn.feature_selection import SelectFromModelfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.model_selection import cross_validate, StratifiedKFoldimport numpy as npimport pandas as pd def create_leakage_proof_pipeline( numeric_features: list, categorical_features: list, target_encode_features: list = None, model=None) -> Pipeline: """ Create a production-ready, leakage-proof ML pipeline. All preprocessing is encapsulated for proper train/test separation. """ # Numeric preprocessing numeric_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) # Categorical preprocessing (one-hot) categorical_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) ]) # Combine transformers preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features), ], remainder='drop' ) # Model with feature selection if model is None: model = RandomForestClassifier(n_estimators=100, random_state=42) # Full pipeline pipeline = Pipeline([ ('preprocessor', preprocessor), ('feature_selection', SelectFromModel( RandomForestClassifier(n_estimators=50, random_state=42), threshold='median' )), ('classifier', model) ]) return pipeline def evaluate_with_diagnostics( pipeline: Pipeline, X: pd.DataFrame, y: np.ndarray, cv=None, groups=None) -> dict: """ Evaluate pipeline with comprehensive leakage diagnostics. """ if cv is None: cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) scoring = { 'accuracy': 'accuracy', 'f1': 'f1', 'roc_auc': 'roc_auc' } # Perform CV results = cross_validate( pipeline, X, y, cv=cv, groups=groups, scoring=scoring, return_train_score=True, return_estimator=True, n_jobs=-1 ) # Leakage diagnostic: Check train/test gap train_test_gaps = {} for metric in scoring: train_mean = results[f'train_{metric}'].mean() test_mean = results[f'test_{metric}'].mean() gap = train_mean - test_mean train_test_gaps[metric] = gap # Leakage warning thresholds warnings = [] for metric, gap in train_test_gaps.items(): if gap > 0.15: warnings.append(f"Large train-test gap for {metric}: {gap:.2%}") if results[f'test_{metric}'].mean() > 0.95 and gap < 0.02: warnings.append(f"Suspiciously high {metric} with small gap - check for leakage!") return { 'cv_results': results, 'train_test_gaps': train_test_gaps, 'warnings': warnings, 'metrics': { metric: { 'train_mean': results[f'train_{metric}'].mean(), 'test_mean': results[f'test_{metric}'].mean(), 'test_std': results[f'test_{metric}'].std() } for metric in scoring } } class LeakageAuditor: """ Audit a pipeline for common leakage patterns. """ def __init__(self, pipeline: Pipeline): self.pipeline = pipeline self.findings = [] def audit(self, X, y) -> list: """Run all leakage audits.""" self.findings = [] self._audit_pipeline_structure() self._audit_data_types(X) self._audit_target_correlation(X, y) return self.findings def _audit_pipeline_structure(self): """Check pipeline structure for common issues.""" steps = dict(self.pipeline.steps) # Check if preprocessing is inside pipeline if 'preprocessor' not in steps and 'scaler' not in steps: self.findings.append({ 'severity': 'WARNING', 'issue': 'No preprocessing in pipeline', 'recommendation': 'Ensure all preprocessing is inside the pipeline' }) def _audit_data_types(self, X): """Check for data types that might indicate leakage.""" if isinstance(X, pd.DataFrame): for col in X.columns: if 'id' in col.lower() or 'index' in col.lower(): self.findings.append({ 'severity': 'WARNING', 'issue': f'Potential identifier column: {col}', 'recommendation': 'Verify this is not a data leak' }) if 'date' in col.lower() or 'time' in col.lower(): self.findings.append({ 'severity': 'INFO', 'issue': f'Temporal column detected: {col}', 'recommendation': 'Ensure temporal ordering is handled correctly' }) def _audit_target_correlation(self, X, y): """Check for suspiciously high target correlations.""" if isinstance(X, pd.DataFrame): X_numeric = X.select_dtypes(include=[np.number]) for col in X_numeric.columns: corr = np.corrcoef(X_numeric[col].fillna(0), y)[0, 1] if abs(corr) > 0.8: self.findings.append({ 'severity': 'CRITICAL', 'issue': f'Very high correlation with target: {col} (r={corr:.3f})', 'recommendation': 'Investigate for potential target leakage' }) def print_report(self): """Print audit findings.""" print("=" * 60) print("LEAKAGE AUDIT REPORT") print("=" * 60) if not self.findings: print("✓ No leakage issues detected") return for finding in self.findings: icon = {'CRITICAL': '🚨', 'WARNING': '⚠️', 'INFO': 'ℹ️'} print(f"{icon.get(finding['severity'], '•')} [{finding['severity']}]") print(f" Issue: {finding['issue']}") print(f" Recommendation: {finding['recommendation']}") # Example usageif __name__ == "__main__": from sklearn.datasets import make_classification # Create demo dataset X, y = make_classification( n_samples=1000, n_features=20, n_informative=10, n_redundant=5, random_state=42 ) df = pd.DataFrame( X, columns=[f'num_{i}' for i in range(15)] + [f'cat_{i}' for i in range(5)] ) # Add some categorical features for i in range(5): df[f'cat_{i}'] = pd.cut(df[f'cat_{i}'], bins=5, labels=['A', 'B', 'C', 'D', 'E']) numeric_features = [f'num_{i}' for i in range(15)] categorical_features = [f'cat_{i}' for i in range(5)] # Create and evaluate pipeline pipeline = create_leakage_proof_pipeline( numeric_features=numeric_features, categorical_features=categorical_features ) results = evaluate_with_diagnostics(pipeline, df, y) print("Evaluation Results:") for metric, values in results['metrics'].items(): print(f" {metric}: {values['test_mean']:.3f} ± {values['test_std']:.3f} " f"(gap: {values['train_mean'] - values['test_mean']:.3f})") if results['warnings']: print("Warnings:") for warning in results['warnings']: print(f" ⚠️ {warning}") # Audit pipeline print() auditor = LeakageAuditor(pipeline) auditor.audit(df, y) auditor.print_report()When leakage is subtle, you need systematic detection strategies beyond code review.
Strategy 1: Performance Sanity Checks
Leakage often produces suspiciously good results:
Strategy 2: Baseline Comparison
Compare your model against baselines that shouldn't work:
If your complex model is only marginally better than random on a "hard" problem, you might have leakage compensating for a bad model.
Strategy 3: Feature Importance Analysis
Examine which features drive predictions:
Strategy 4: Holdout Degradation Test
The definitive test: hold out data that was truly never seen during development:
If CV performance >> holdout performance, leakage is likely.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
import numpy as npimport pandas as pdfrom sklearn.model_selection import cross_val_score, train_test_splitfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.dummy import DummyClassifierfrom typing import Dict, Any class LeakageDetector: """ Systematic leakage detection for ML pipelines. """ def __init__(self, random_state: int = 42): self.random_state = random_state self.detection_results: Dict[str, Any] = {} def run_full_detection( self, pipeline, X: pd.DataFrame, y: np.ndarray, holdout_fraction: float = 0.2 ) -> Dict[str, Any]: """ Run comprehensive leakage detection suite. """ # Split true holdout FIRST X_dev, X_holdout, y_dev, y_holdout = train_test_split( X, y, test_size=holdout_fraction, random_state=self.random_state, stratify=y ) results = {} # Test 1: CV performance results['cv_performance'] = self._test_cv_performance(pipeline, X_dev, y_dev) # Test 2: Holdout performance results['holdout_performance'] = self._test_holdout( pipeline, X_dev, y_dev, X_holdout, y_holdout ) # Test 3: Baseline comparison results['baseline_comparison'] = self._test_baselines(X_dev, y_dev) # Test 4: Feature analysis results['feature_analysis'] = self._analyze_features(X, y) # Test 5: CV-Holdout gap cv_score = results['cv_performance']['mean'] holdout_score = results['holdout_performance']['score'] gap = cv_score - holdout_score results['cv_holdout_gap'] = { 'gap': gap, 'suspicious': gap > 0.10, 'severity': 'HIGH' if gap > 0.15 else 'MEDIUM' if gap > 0.10 else 'LOW' } self.detection_results = results return results def _test_cv_performance(self, pipeline, X, y) -> dict: """Test cross-validation performance.""" scores = cross_val_score( pipeline, X, y, cv=5, scoring='accuracy' ) return { 'mean': scores.mean(), 'std': scores.std(), 'scores': scores.tolist() } def _test_holdout(self, pipeline, X_train, y_train, X_holdout, y_holdout) -> dict: """Test on true holdout set.""" pipeline.fit(X_train, y_train) score = pipeline.score(X_holdout, y_holdout) return {'score': score} def _test_baselines(self, X, y) -> dict: """Compare against baselines.""" baselines = { 'majority': DummyClassifier(strategy='most_frequent'), 'stratified': DummyClassifier(strategy='stratified'), 'prior': DummyClassifier(strategy='prior') } results = {} for name, clf in baselines.items(): scores = cross_val_score(clf, X, y, cv=5, scoring='accuracy') results[name] = scores.mean() return results def _analyze_features(self, X: pd.DataFrame, y: np.ndarray) -> dict: """Analyze features for leakage indicators.""" suspicious_features = [] if isinstance(X, pd.DataFrame): numeric_cols = X.select_dtypes(include=[np.number]).columns for col in numeric_cols: # Check correlation with target valid_mask = ~X[col].isna() if valid_mask.sum() > 10: corr = np.corrcoef(X[col][valid_mask], y[valid_mask])[0, 1] if abs(corr) > 0.7: suspicious_features.append({ 'feature': col, 'correlation': corr, 'reason': 'Very high target correlation' }) # Check column names for col in X.columns: name_lower = col.lower() if any(suspicious in name_lower for suspicious in ['target', 'label', 'outcome', 'result', 'future']): suspicious_features.append({ 'feature': col, 'correlation': None, 'reason': 'Suspicious column name' }) return { 'suspicious_features': suspicious_features, 'n_suspicious': len(suspicious_features) } def print_report(self): """Print detection report.""" if not self.detection_results: print("No detection results. Run run_full_detection() first.") return r = self.detection_results print("=" * 70) print("LEAKAGE DETECTION REPORT") print("=" * 70) print(f"1. CROSS-VALIDATION PERFORMANCE") print(f" Accuracy: {r['cv_performance']['mean']:.3f} ± {r['cv_performance']['std']:.3f}") print(f"2. HOLDOUT PERFORMANCE") print(f" Accuracy: {r['holdout_performance']['score']:.3f}") print(f"3. CV-HOLDOUT GAP") gap_info = r['cv_holdout_gap'] print(f" Gap: {gap_info['gap']:.3f}") print(f" Severity: {gap_info['severity']}") if gap_info['suspicious']: print(" ⚠️ SUSPICIOUS: Large gap may indicate leakage") print(f"4. BASELINE COMPARISON") for name, score in r['baseline_comparison'].items(): print(f" {name}: {score:.3f}") model_vs_majority = r['cv_performance']['mean'] - r['baseline_comparison']['majority'] if model_vs_majority < 0.05: print(" ⚠️ WARNING: Model barely beats majority baseline") print(f"5. SUSPICIOUS FEATURES: {r['feature_analysis']['n_suspicious']}") for feat in r['feature_analysis']['suspicious_features']: corr_str = f"(r={feat['correlation']:.3f})" if feat['correlation'] else "" print(f" - {feat['feature']} {corr_str}: {feat['reason']}") # Overall verdict print("" + "=" * 70) issues = [] if gap_info['suspicious']: issues.append("Large CV-holdout gap") if r['feature_analysis']['n_suspicious'] > 0: issues.append("Suspicious features") if model_vs_majority < 0.05: issues.append("Model similar to baseline") if issues: print("⚠️ LEAKAGE WARNING: " + ", ".join(issues)) else: print("✓ No obvious leakage detected") print("=" * 70) # Demonstrationif __name__ == "__main__": from sklearn.datasets import make_classification from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler # Create dataset with intentional leakage X, y = make_classification(n_samples=1000, n_features=10, random_state=42) df = pd.DataFrame(X, columns=[f'feat_{i}' for i in range(10)]) # Add leaky feature (slightly randomized target) df['leaky_feature'] = y + np.random.normal(0, 0.1, len(y)) pipeline = Pipeline([ ('scaler', StandardScaler()), ('model', RandomForestClassifier(n_estimators=50, random_state=42)) ]) detector = LeakageDetector() results = detector.run_full_detection(pipeline, df, y) detector.print_report()Let's examine specific scenarios where leakage commonly occurs and how to fix them.
| Scenario | The Leak | Correct Approach |
|---|---|---|
| Feature scaling | Scale on full data before CV | Include scaler in pipeline |
| Missing value imputation | Compute median on full data | Imputer in pipeline |
| Feature selection | SelectKBest before CV | Feature selection in pipeline |
| PCA/dimensionality reduction | Fit PCA on full data | PCA in pipeline |
| Target encoding | Encode using full targets | Out-of-fold or in-pipeline encoding |
| SMOTE/oversampling | Oversample before CV | Oversample only training data per fold |
| Outlier removal | Remove based on full data | Detect outliers on training only |
| Temporal features | Compute lags on shuffled data | Maintain temporal order, use time-based CV |
Scenario Deep Dive: SMOTE and Oversampling
A particularly common mistake with imbalanced data:
123456789101112131415161718192021222324252627282930313233343536373839404142434445
from imblearn.over_sampling import SMOTEfrom imblearn.pipeline import Pipeline as ImbPipelinefrom sklearn.model_selection import cross_val_score, StratifiedKFoldfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.datasets import make_classificationimport numpy as np # Create imbalanced datasetX, y = make_classification( n_samples=1000, n_features=20, n_classes=2, weights=[0.9, 0.1], random_state=42) print("Class distribution:", np.bincount(y)) # WRONG: Oversample BEFORE cross-validationsmote = SMOTE(random_state=42)X_resampled, y_resampled = smote.fit_resample(X, y) scores_leaky = cross_val_score( RandomForestClassifier(random_state=42), X_resampled, y_resampled, cv=StratifiedKFold(5, shuffle=True, random_state=42), scoring='f1')print(f"With leakage (pre-SMOTE): F1 = {scores_leaky.mean():.3f} ± {scores_leaky.std():.3f}") # RIGHT: Oversample INSIDE cross-validation using imbalanced-learn Pipeline# Note: Must use imblearn's Pipeline, not sklearn'spipeline_proper = ImbPipeline([ ('smote', SMOTE(random_state=42)), ('classifier', RandomForestClassifier(random_state=42))]) scores_proper = cross_val_score( pipeline_proper, X, y, cv=StratifiedKFold(5, shuffle=True, random_state=42), scoring='f1')print(f"Without leakage (in-pipeline SMOTE): F1 = {scores_proper.mean():.3f} ± {scores_proper.std():.3f}") # The leaky version inflates F1 because synthetic samples from minority class# (which were created using info from the entire dataset) appear in test sets.When you SMOTE before CV, synthetic samples generated from test-set minority points appear in training. These are nearly identical to test points, causing massive overfitting. Always use imbalanced-learn's Pipeline to ensure SMOTE runs only on training data.
Use this checklist before deploying any model to production:
Automating Leakage Checks
Incorporate leakage checks into your CI/CD pipeline:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
"""CI/CD leakage check script.Run this in your pipeline before model deployment.""" import sysimport jsonfrom pathlib import Path def run_leakage_ci_checks( pipeline, X_dev, y_dev, X_holdout, y_holdout, groups=None, max_cv_holdout_gap: float = 0.10, min_baseline_improvement: float = 0.05) -> dict: """ Run automated leakage checks for CI/CD. Returns dict with 'passed' boolean and 'details'. Exits with code 1 if checks fail (for CI integration). """ from sklearn.model_selection import cross_val_score from sklearn.dummy import DummyClassifier import numpy as np results = { 'passed': True, 'checks': [], 'errors': [] } # Check 1: CV Performance cv_scores = cross_val_score(pipeline, X_dev, y_dev, cv=5) cv_mean = cv_scores.mean() results['checks'].append({ 'name': 'CV Performance', 'value': cv_mean, 'status': 'OK' }) # Check 2: Holdout Performance pipeline.fit(X_dev, y_dev) holdout_score = pipeline.score(X_holdout, y_holdout) results['checks'].append({ 'name': 'Holdout Performance', 'value': holdout_score, 'status': 'OK' }) # Check 3: CV-Holdout Gap gap = cv_mean - holdout_score gap_ok = gap <= max_cv_holdout_gap results['checks'].append({ 'name': 'CV-Holdout Gap', 'value': gap, 'threshold': max_cv_holdout_gap, 'status': 'OK' if gap_ok else 'FAIL' }) if not gap_ok: results['passed'] = False results['errors'].append(f"CV-Holdout gap ({gap:.3f}) exceeds threshold ({max_cv_holdout_gap})") # Check 4: Baseline Comparison baseline_score = DummyClassifier(strategy='most_frequent').fit(X_dev, y_dev).score(X_holdout, y_holdout) improvement = holdout_score - baseline_score improvement_ok = improvement >= min_baseline_improvement results['checks'].append({ 'name': 'Baseline Improvement', 'value': improvement, 'threshold': min_baseline_improvement, 'status': 'OK' if improvement_ok else 'FAIL' }) if not improvement_ok: results['passed'] = False results['errors'].append(f"Improvement over baseline ({improvement:.3f}) below threshold ({min_baseline_improvement})") # Check 5: Group Integrity (if groups provided) if groups is not None: from sklearn.model_selection import GroupKFold gkf = GroupKFold(n_splits=5) group_integrity = True for train_idx, test_idx in gkf.split(X_dev, y_dev, groups[:len(y_dev)]): train_groups = set(groups[train_idx]) test_groups = set(groups[test_idx]) if train_groups & test_groups: group_integrity = False break results['checks'].append({ 'name': 'Group Integrity', 'value': group_integrity, 'status': 'OK' if group_integrity else 'FAIL' }) if not group_integrity: results['passed'] = False results['errors'].append("Group leakage detected!") return results def main(): """Main CI entry point.""" # In real CI, load model and data from artifacts # This is a demonstration from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split import numpy as np # Demo data X, y = make_classification(n_samples=1000, n_features=20, random_state=42) X_dev, X_holdout, y_dev, y_holdout = train_test_split( X, y, test_size=0.2, random_state=42 ) pipeline = Pipeline([ ('scaler', StandardScaler()), ('model', RandomForestClassifier(n_estimators=50, random_state=42)) ]) results = run_leakage_ci_checks( pipeline, X_dev, y_dev, X_holdout, y_holdout ) # Output results print(json.dumps(results, indent=2)) # Exit with appropriate code for CI sys.exit(0 if results['passed'] else 1) if __name__ == "__main__": main()We've comprehensively covered data leakage—the silent destroyer of ML projects. Here are the essential takeaways:
You've now mastered stratified and group cross-validation, from basic stratification through grouped stratification to comprehensive leakage prevention. These techniques form the foundation of trustworthy model evaluation—ensuring your metrics reflect true generalization performance, not optimistic artifacts of data contamination.