Loading content...
Embargo periods prevent feature leakage by creating a gap between training and test sets. However, embargo alone cannot address label leakage—the situation where training observation labels contain information about test period outcomes.
Purging is the complementary technique that removes training observations whose labels temporally overlap with the test period. Together, embargo and purging form a comprehensive defense against temporal leakage, essential for obtaining reliable performance estimates in financial time series, event prediction, and any application where labels look forward in time.
This page provides a rigorous treatment of purging: when it's needed, how to implement it correctly, how it interacts with embargo, and the data efficiency costs you should expect.
By the end of this page, you will understand the fundamental difference between embargo and purging, master the purging algorithm, implement combined embargo-purging CV, and know how to balance leakage prevention against data efficiency.
Label leakage occurs when the target variable (label) for a training observation contains information that overlaps with the test period. This is distinct from feature leakage—it's the label, not the features, that creates the contamination.
Concrete Example: Multi-Period Returns
Consider predicting 30-day stock returns:
The label for day 185 includes returns from days 200-215—inside the test period. If this training observation is included, the model trains on information from the test period.
When Label Leakage Occurs:
| Aspect | Embargo | Purging |
|---|---|---|
| Target | Feature leakage | Label leakage |
| Mechanism | Exclude fixed gap of observations | Remove specific training obs based on label overlap |
| What's removed | All observations in gap period | Training obs whose labels touch test period |
| Affected data | Gap observations (neither train nor test) | Training observations only |
| When needed | Rolling features, autocorrelation | Multi-period targets, forward-looking labels |
Embargo without purging fails when: (embargo < label_horizon). If your labels look 30 days ahead but embargo is only 5 days, training observations from days within 25 days of the test period still leak test information through their labels. Purging removes these observations dynamically.
Purging dynamically identifies and removes training observations whose labels temporally overlap with the test period. The algorithm requires knowing each observation's label span—the time range that its target variable encompasses.
Formal Definition:
For training observation at time t with label spanning [t_start, t_end]:
In practice, for forward-looking labels (most common), t_start = t+1 and t_end = t + label_horizon.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
import numpy as npfrom typing import Tuple, Listfrom dataclasses import dataclass @dataclassclass LabelSpec: """Specification of how labels are constructed.""" horizon: int # How far ahead the label looks aggregation: str = "point" # "point", "span", or "cumulative" def get_label_end(self, obs_time: int) -> int: """Get the last time point included in this observation's label.""" if self.aggregation == "point": return obs_time + self.horizon elif self.aggregation in ["span", "cumulative"]: return obs_time + self.horizon else: return obs_time + self.horizon def compute_purge_mask( train_indices: np.ndarray, test_indices: np.ndarray, label_horizon: int) -> Tuple[np.ndarray, np.ndarray]: """ Compute which training observations should be purged. Parameters: ----------- train_indices : np.ndarray Array of training set time indices test_indices : np.ndarray Array of test set time indices label_horizon : int How many periods ahead the label/target looks Returns: -------- purged_train_mask : np.ndarray (bool) True for observations to KEEP, False for observations to PURGE purge_info : dict Statistics about what was purged """ test_start = test_indices.min() test_end = test_indices.max() # For each training observation, compute when its label ends label_ends = train_indices + label_horizon # Purge if label end >= test_start (label enters test period) # For safety, also check if label start (train_index + 1) <= test_end should_keep = label_ends < test_start n_purged = np.sum(~should_keep) purge_info = { 'n_original': len(train_indices), 'n_purged': n_purged, 'n_remaining': np.sum(should_keep), 'purge_fraction': n_purged / len(train_indices), 'purge_start': train_indices[~should_keep].min() if n_purged > 0 else None, 'purge_end': train_indices[~should_keep].max() if n_purged > 0 else None } return should_keep, purge_info def purging_cv_splits( n_samples: int, n_splits: int, min_train_size: int, test_size: int, label_horizon: int, embargo: int = 0): """ Generate CV splits with both embargo AND purging. Parameters: ----------- n_samples : int Total observations n_splits : int Number of folds min_train_size : int Initial training size (before purging) test_size : int Test set size label_horizon : int Forward look of labels (for purging) embargo : int Additional gap between train and test (for feature leakage) Yields: ------- train_indices, test_indices, purge_info """ indices = np.arange(n_samples) for split in range(n_splits): # Define test set test_start = min_train_size + split * test_size + embargo test_end = test_start + test_size if test_end > n_samples: break # Initial training set (before purging) train_end = min_train_size + split * test_size initial_train = indices[:train_end] test_indices = indices[test_start:test_end] # Apply purging keep_mask, purge_info = compute_purge_mask( initial_train, test_indices, label_horizon ) purged_train = initial_train[keep_mask] # Update purge info with fold details purge_info['fold'] = split + 1 purge_info['embargo'] = embargo purge_info['effective_gap'] = embargo + (train_end - purged_train.max() - 1) if len(purged_train) > 0 else embargo yield purged_train, test_indices, purge_info def visualize_purging(n_samples: int = 120, label_horizon: int = 15): """Visualize the effect of purging on training sets.""" print(f"PURGING VISUALIZATION (label horizon = {label_horizon})") print("=" * 70) print("Legend: [###] Kept training | [xxx] Purged | [ ] Embargo | [***] Test") print("=" * 70) min_train = 30 test_size = 20 embargo = 5 for train, test, info in purging_cv_splits( n_samples, n_splits=4, min_train_size=min_train, test_size=test_size, label_horizon=label_horizon, embargo=embargo ): bar = ['.'] * 60 scale = n_samples / 60 # Mark kept training for i in train: bar[int(i / scale)] = '#' # Mark purged (from original train_end back to first purged) train_end_original = min_train + (info['fold'] - 1) * test_size for i in range(train.max() + 1 if len(train) > 0 else 0, train_end_original): idx = int(i / scale) if idx < 60: bar[idx] = 'x' # Mark embargo for i in range(train_end_original, test[0]): idx = int(i / scale) if idx < 60: bar[idx] = ' ' # Mark test for i in test: bar[int(i / scale)] = '*' print(f"Fold {info['fold']}: |{''.join(bar)}|") print(f" Kept: {info['n_remaining']}, Purged: {info['n_purged']} ({info['purge_fraction']:.1%})") print(f" Effective gap: {info['effective_gap']} observations") print() visualize_purging() # Output example:# PURGING VISUALIZATION (label horizon = 15)# ======================================================================# Legend: [###] Kept training | [xxx] Purged | [ ] Embargo | [***] Test# ======================================================================# Fold 1: |##########xxxxxxx **********...........................|# Kept: 15, Purged: 15 (50.0%)# Effective gap: 20 observationsIn production time series CV, embargo and purging work together to create a comprehensive leakage defense. Understanding their interaction is crucial for proper implementation.
The Combined Defense:
Embargo creates a gap between training end and test start, preventing:
Purging removes training observations whose labels overlap test, preventing:
Implementation Order: Apply embargo first (defines test boundaries), then purge (removes training obs based on those boundaries).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
import numpy as npfrom typing import Generator, Tuple, Dictfrom dataclasses import dataclass @dataclassclass CombinedCVConfig: """Complete configuration for embargo + purging CV.""" n_splits: int min_train_size: int test_size: int # Embargo settings feature_embargo: int # For rolling feature leakage # Purging settings label_horizon: int # How far ahead labels look # Optional advanced settings max_train_size: int = None # Cap for hybrid sliding/expanding @property def total_effective_gap(self) -> int: """Maximum gap due to embargo + purging combined.""" return self.feature_embargo + self.label_horizon class CombinedTimeSeriesCV: """ Production-ready time series CV with embargo AND purging. This class implements the combined defense against both feature leakage (via embargo) and label leakage (via purging). """ def __init__(self, config: CombinedCVConfig): self.config = config def split(self, n_samples: int) -> Generator[Tuple[np.ndarray, np.ndarray, Dict], None, None]: """ Generate train/test splits with embargo and purging applied. Yields: ------- train_indices : np.ndarray Training set indices (after purging) test_indices : np.ndarray Test set indices metadata : dict Information about what was purged and effective gap """ cfg = self.config indices = np.arange(n_samples) for split in range(cfg.n_splits): # Step 1: Determine test set boundaries (accounting for embargo) nominal_train_end = cfg.min_train_size + split * cfg.test_size test_start = nominal_train_end + cfg.feature_embargo test_end = test_start + cfg.test_size if test_end > n_samples: break # Step 2: Get initial training set train_start = 0 if cfg.max_train_size and nominal_train_end > cfg.max_train_size: train_start = nominal_train_end - cfg.max_train_size initial_train = indices[train_start:nominal_train_end] test_indices = indices[test_start:test_end] # Step 3: Apply purging based on label horizon # Label for observation t covers [t+1, t+label_horizon] # Purge if t+label_horizon >= test_start # Keep if t < test_start - label_horizon purge_cutoff = test_start - cfg.label_horizon keep_mask = initial_train < purge_cutoff purged_train = initial_train[keep_mask] # Step 4: Compile metadata n_purged = len(initial_train) - len(purged_train) metadata = { 'fold': split + 1, 'initial_train_size': len(initial_train), 'final_train_size': len(purged_train), 'test_size': len(test_indices), 'n_purged': n_purged, 'purge_fraction': n_purged / len(initial_train) if len(initial_train) > 0 else 0, 'feature_embargo': cfg.feature_embargo, 'label_horizon': cfg.label_horizon, 'effective_gap': (test_start - (purged_train.max() + 1)) if len(purged_train) > 0 else cfg.feature_embargo, 'train_range': (purged_train.min(), purged_train.max()) if len(purged_train) > 0 else (None, None), 'test_range': (test_indices.min(), test_indices.max()) } yield purged_train, test_indices, metadata def describe(self, n_samples: int) -> str: """Generate human-readable description of CV setup.""" folds = list(self.split(n_samples)) lines = [ "Combined Embargo + Purging Cross-Validation", "=" * 50, f"Total samples: {n_samples}", f"Number of folds: {len(folds)}", f"Feature embargo: {self.config.feature_embargo}", f"Label horizon (purge): {self.config.label_horizon}", "", "Fold Details:", ] for train, test, meta in folds: lines.append( f" Fold {meta['fold']}: Train={meta['final_train_size']} " f"(purged {meta['n_purged']}), " f"Test={meta['test_size']}, " f"Gap={meta['effective_gap']}" ) # Summary statistics avg_purge = np.mean([m['purge_fraction'] for _, _, m in folds]) total_purged = sum([m['n_purged'] for _, _, m in folds]) lines.extend([ "", f"Average purge rate: {avg_purge:.1%}", f"Total observations purged: {total_purged}", ]) return "".join(lines) # Example usageconfig = CombinedCVConfig( n_splits=5, min_train_size=200, test_size=50, feature_embargo=10, # 10-period rolling features label_horizon=20, # Predicting 20 periods ahead) cv = CombinedTimeSeriesCV(config)print(cv.describe(n_samples=500)) # Sample output:# Combined Embargo + Purging Cross-Validation# ==================================================# Total samples: 500# Number of folds: 5# Feature embargo: 10# Label horizon (purge): 20## Fold Details:# Fold 1: Train=170 (purged 30), Test=50, Gap=30# Fold 2: Train=220 (purged 30), Test=50, Gap=30# ...## Average purge rate: 12.0%# Total observations purged: 150Embargo and purging both reduce the amount of data available for training and validation. Understanding this tradeoff is essential for making informed decisions about your CV strategy.
Sources of Data Loss:
Quantifying the Cost:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
import numpy as npfrom typing import Dict, List def analyze_cv_data_efficiency( n_samples: int, n_splits: int, min_train_size: int, test_size: int, feature_embargo: int, label_horizon: int, expanding: bool = True) -> Dict: """ Comprehensive analysis of data efficiency with embargo + purging. Returns metrics on data utilization and waste. """ # Calculate observation usage patterns total_observations = n_samples # Simulate the CV fold_stats = [] observations_in_train = set() observations_in_test = set() observations_purged = set() observations_embaroged = set() for split in range(n_splits): # Boundaries nominal_train_end = min_train_size + split * test_size test_start = nominal_train_end + feature_embargo test_end = test_start + test_size if test_end > n_samples: break # Training observations (before purging) train_start = 0 if expanding else max(0, nominal_train_end - min_train_size) initial_train = set(range(train_start, nominal_train_end)) # Test observations test_obs = set(range(test_start, test_end)) observations_in_test.update(test_obs) # Embargo observations embargo_obs = set(range(nominal_train_end, test_start)) observations_embaroged.update(embargo_obs) # Purged observations purge_cutoff = test_start - label_horizon kept_train = set(t for t in initial_train if t < purge_cutoff) purged_this_fold = initial_train - kept_train observations_purged.update(purged_this_fold) observations_in_train.update(kept_train) fold_stats.append({ 'fold': split + 1, 'train_size_initial': len(initial_train), 'train_size_final': len(kept_train), 'purged': len(purged_this_fold), 'test_size': len(test_obs), 'embargo_size': len(embargo_obs) }) # Observations never used (tail of data) all_used = observations_in_train | observations_in_test never_used = set(range(n_samples)) - all_used # Efficiency metrics return { 'fold_stats': fold_stats, 'total_folds': len(fold_stats), 'observations': { 'total': total_observations, 'ever_in_train': len(observations_in_train), 'ever_in_test': len(observations_in_test), 'in_embargo': len(observations_embaroged), 'purged_at_least_once': len(observations_purged), 'never_used': len(never_used) }, 'efficiency': { 'train_utilization': len(observations_in_train) / total_observations, 'test_utilization': len(observations_in_test) / total_observations, 'total_utilization': len(all_used) / total_observations, 'waste_fraction': len(never_used) / total_observations }, 'avg_purge_rate': np.mean([f['purged'] / f['train_size_initial'] for f in fold_stats if f['train_size_initial'] > 0]) } def compare_efficiency_scenarios(n_samples: int = 1000) -> None: """Compare efficiency across different embargo/purging configurations.""" scenarios = [ {'name': 'No protection', 'embargo': 0, 'label_horizon': 0}, {'name': 'Light (5/10)', 'embargo': 5, 'label_horizon': 10}, {'name': 'Medium (10/20)', 'embargo': 10, 'label_horizon': 20}, {'name': 'Heavy (20/40)', 'embargo': 20, 'label_horizon': 40}, ] print("Data Efficiency Comparison") print("=" * 70) print(f"{'Scenario':<20} {'Folds':<8} {'Train Util':<12} {'Avg Purge':<12} {'Wasted':<10}") print("-" * 70) for scenario in scenarios: result = analyze_cv_data_efficiency( n_samples=n_samples, n_splits=10, min_train_size=200, test_size=50, feature_embargo=scenario['embargo'], label_horizon=scenario['label_horizon'] ) print(f"{scenario['name']:<20} " f"{result['total_folds']:<8} " f"{result['efficiency']['train_utilization']:.1%} " f"{result['avg_purge_rate']:.1%} " f"{result['efficiency']['waste_fraction']:.1%}") compare_efficiency_scenarios() # Sample output:# Data Efficiency Comparison# ======================================================================# Scenario Folds Train Util Avg Purge Wasted # ----------------------------------------------------------------------# No protection 10 89.2% 0.0% 5.0%# Light (5/10) 9 81.5% 8.2% 12.3%# Medium (10/20) 8 72.1% 15.4% 19.8%# Heavy (20/40) 6 58.3% 28.7% 31.2%More aggressive embargo and purging reduces data efficiency but increases validity. There is no 'free' protection—you're trading data for reliability. In domains where leakage has severe consequences (quantitative finance, medical prediction), accept the efficiency cost. In domains with little temporal dependence, lighter protection may suffice.
Beyond basic purging, sophisticated applications require advanced strategies that handle complex label structures, multiple test periods, and weighted purging.
Strategy 1: Weighted Purging
Instead of binary purge (fully remove observation), apply decreasing weight based on overlap severity:
Strategy 2: Multi-Horizon Purging
When predicting at multiple horizons simultaneously (1-day, 7-day, 30-day returns), use the maximum horizon for purging to ensure all labels are clean.
Strategy 3: Event-Based Purging
For event prediction, purge based on event windows rather than fixed horizons:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
import numpy as npfrom typing import List, Tuple, Optional def weighted_purging( train_indices: np.ndarray, test_start: int, label_horizon: int, return_weights: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Apply weighted purging based on overlap severity. Instead of binary removal, assigns weights based on how much of each training observation's label overlaps with test period. Parameters: ----------- train_indices : np.ndarray Training observation indices test_start : int First test period index label_horizon : int Number of periods in each label return_weights : bool If True, return weights instead of binary mask Returns: -------- indices : np.ndarray Training indices (unchanged) weights : np.ndarray Weight for each observation (0 to 1) """ weights = np.ones(len(train_indices)) for i, t in enumerate(train_indices): label_start = t + 1 label_end = t + label_horizon if label_end < test_start: # No overlap - full weight weights[i] = 1.0 elif label_start >= test_start: # Complete overlap - zero weight weights[i] = 0.0 else: # Partial overlap - proportional weight clean_portion = (test_start - label_start) / label_horizon weights[i] = clean_portion return train_indices, weights def multi_horizon_purging( train_indices: np.ndarray, test_start: int, label_horizons: List[int]) -> np.ndarray: """ Purge for multi-horizon prediction using maximum horizon. When predicting at multiple horizons, must purge based on the longest horizon to ensure all labels are clean. """ max_horizon = max(label_horizons) purge_cutoff = test_start - max_horizon return train_indices[train_indices < purge_cutoff] def event_based_purging( train_indices: np.ndarray, test_events: List[int], event_window: int, label_horizon: int) -> np.ndarray: """ Purge training observations based on event windows. For event prediction (e.g., earnings announcements, failures), purge training observations whose labels contain any test-period event. Parameters: ----------- train_indices : np.ndarray Training observation indices test_events : List[int] Time indices of events in the test period event_window : int Window around each event (± this many periods) label_horizon : int How far ahead labels look """ if not test_events: return train_indices # For each training observation, check if its label window # contains any test event (including event window) keep_mask = np.ones(len(train_indices), dtype=bool) for i, t in enumerate(train_indices): label_start = t + 1 label_end = t + label_horizon # Check each test event for event_time in test_events: # Event window spans [event_time - window, event_time + window] event_start = event_time - event_window event_end = event_time + event_window # Does label overlap with event window? if label_start <= event_end and label_end >= event_start: keep_mask[i] = False break return train_indices[keep_mask] class PurgeAwareCV: """ Cross-validation with comprehensive purging support. Supports: - Standard fixed-horizon purging - Weighted purging for sample_weight-compatible models - Multi-horizon purging - Event-based purging """ def __init__( self, n_splits: int, min_train_size: int, test_size: int, embargo: int, label_horizon: int = None, label_horizons: List[int] = None, purge_method: str = "binary", # "binary", "weighted" event_window: int = None ): self.n_splits = n_splits self.min_train_size = min_train_size self.test_size = test_size self.embargo = embargo self.label_horizon = label_horizon or (max(label_horizons) if label_horizons else 1) self.label_horizons = label_horizons self.purge_method = purge_method self.event_window = event_window def split( self, n_samples: int, events: List[int] = None ): """Generate splits with appropriate purging.""" indices = np.arange(n_samples) for split in range(self.n_splits): # Test boundaries train_end = self.min_train_size + split * self.test_size test_start = train_end + self.embargo test_end = test_start + self.test_size if test_end > n_samples: break train_indices = indices[:train_end] test_indices = indices[test_start:test_end] # Apply purging based on method if events and self.event_window: test_events = [e for e in events if test_start <= e < test_end] purged_train = event_based_purging( train_indices, test_events, self.event_window, self.label_horizon ) weights = None elif self.purge_method == "weighted": purged_train, weights = weighted_purging( train_indices, test_start, self.label_horizon ) else: # Standard binary purging cutoff = test_start - self.label_horizon purged_train = train_indices[train_indices < cutoff] weights = None yield purged_train, test_indices, weights # Example: Weighted purgingtrain = np.arange(100)test_start = 100 indices, weights = weighted_purging(train, test_start, label_horizon=20) # Last 20 training observations have reduced weightsprint("Weighted Purging Example:")print(f"Last 25 observation weights: {weights[-25:]}")# Output: [1. 1. 1. 1. 1. 0.95 0.9 0.85 0.8 0.75 0.7 0.65 0.6 0.55 0.5 ...]Here's a complete, production-grade implementation combining embargo and purging with proper validation and diagnostics.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
import numpy as npfrom sklearn.base import BaseEstimator, clonefrom sklearn.metrics import mean_squared_error, r2_scorefrom typing import Dict, List, Optional, Callablefrom dataclasses import dataclassimport warnings @dataclassclass TimeSeriesCVResult: """Results from time series cross-validation.""" scores: Dict[str, List[float]] fold_metadata: List[dict] summary: Dict[str, float] def __repr__(self): lines = ["Time Series CV Results", "=" * 40] for metric, values in self.scores.items(): mean_val = np.mean(values) std_val = np.std(values) lines.append(f"{metric}: {mean_val:.4f} ± {std_val:.4f}") return "".join(lines) class ProductionTimeSeriesCV: """ Production-grade time series cross-validation with embargo and purging. Features: - Configurable embargo (feature protection) - Configurable purging (label protection) - Optional sample weighting for partial overlap - Comprehensive diagnostics and validation - Support for sklearn estimators """ def __init__( self, n_splits: int = 5, min_train_size: int = None, test_size: int = None, feature_embargo: int = 0, label_horizon: int = 0, use_weighted_purging: bool = False, expanding: bool = True, max_train_size: int = None, metrics: Dict[str, Callable] = None, verbose: bool = True ): self.n_splits = n_splits self.min_train_size = min_train_size self.test_size = test_size self.feature_embargo = feature_embargo self.label_horizon = label_horizon self.use_weighted_purging = use_weighted_purging self.expanding = expanding self.max_train_size = max_train_size self.verbose = verbose self.metrics = metrics or { 'rmse': lambda y, p: np.sqrt(mean_squared_error(y, p)), 'r2': r2_score } def validate_config(self, n_samples: int) -> None: """Validate CV configuration against data size.""" min_train = self.min_train_size or n_samples // (self.n_splits + 2) test_sz = self.test_size or n_samples // (self.n_splits + 2) required = min_train + test_sz + self.feature_embargo + self.label_horizon if required > n_samples: raise ValueError( f"Configuration requires at least {required} samples, " f"but only {n_samples} available." ) if self.label_horizon > 0 and self.feature_embargo == 0: warnings.warn( "Using label_horizon without feature_embargo. " "Consider adding feature_embargo for complete protection." ) def cross_validate( self, model: BaseEstimator, X: np.ndarray, y: np.ndarray, fit_params: dict = None ) -> TimeSeriesCVResult: """ Perform cross-validation with embargo and purging. Parameters: ----------- model : BaseEstimator Sklearn-compatible model X : np.ndarray Features (must be time-ordered) y : np.ndarray Target (must be time-ordered) fit_params : dict, optional Additional parameters for model.fit() Returns: -------- TimeSeriesCVResult with scores and metadata """ n_samples = len(X) self.validate_config(n_samples) min_train = self.min_train_size or n_samples // (self.n_splits + 2) test_sz = self.test_size or n_samples // (self.n_splits + 2) fit_params = fit_params or {} all_scores = {name: [] for name in self.metrics.keys()} fold_metadata = [] for split in range(self.n_splits): # Test boundaries (with embargo) nominal_train_end = min_train + split * test_sz test_start = nominal_train_end + self.feature_embargo test_end = test_start + test_sz if test_end > n_samples: break # Training boundaries train_start = 0 if self.expanding else max(0, nominal_train_end - min_train) if self.max_train_size and (nominal_train_end - train_start) > self.max_train_size: train_start = nominal_train_end - self.max_train_size train_indices = np.arange(train_start, nominal_train_end) test_indices = np.arange(test_start, test_end) # Apply purging sample_weights = None if self.label_horizon > 0: if self.use_weighted_purging: weights = np.ones(len(train_indices)) for i, t in enumerate(train_indices): label_end = t + self.label_horizon if label_end >= test_start: overlap = (label_end - test_start + 1) / self.label_horizon weights[i] = max(0, 1 - overlap) # Keep samples with weight > 0 keep_mask = weights > 0 train_indices = train_indices[keep_mask] sample_weights = weights[keep_mask] else: # Binary purging purge_cutoff = test_start - self.label_horizon train_indices = train_indices[train_indices < purge_cutoff] # Prepare data X_train = X[train_indices] y_train = y[train_indices] X_test = X[test_indices] y_test = y[test_indices] # Train model fold_model = clone(model) if sample_weights is not None and 'sample_weight' in fit_params: warnings.warn("Overwriting sample_weight with purging weights") if sample_weights is not None: try: fold_model.fit(X_train, y_train, sample_weight=sample_weights, **fit_params) except TypeError: # Model doesn't support sample_weight fold_model.fit(X_train, y_train, **fit_params) else: fold_model.fit(X_train, y_train, **fit_params) # Predict and score predictions = fold_model.predict(X_test) fold_scores = {} for name, metric_fn in self.metrics.items(): score = metric_fn(y_test, predictions) all_scores[name].append(score) fold_scores[name] = score # Metadata fold_metadata.append({ 'fold': split + 1, 'train_size': len(train_indices), 'test_size': len(test_indices), 'purged': nominal_train_end - train_start - len(train_indices), 'effective_gap': test_start - (train_indices.max() if len(train_indices) > 0 else nominal_train_end), **fold_scores }) if self.verbose: scores_str = ", ".join(f"{k}={v:.4f}" for k, v in fold_scores.items()) print(f"Fold {split+1}: train={len(train_indices)}, test={len(test_indices)}, {scores_str}") # Summary summary = {} for name, values in all_scores.items(): summary[f'{name}_mean'] = np.mean(values) summary[f'{name}_std'] = np.std(values) summary['n_folds'] = len(fold_metadata) summary['avg_train_size'] = np.mean([m['train_size'] for m in fold_metadata]) summary['total_purged'] = sum(m['purged'] for m in fold_metadata) return TimeSeriesCVResult( scores=all_scores, fold_metadata=fold_metadata, summary=summary ) # Example usageif __name__ == "__main__": from sklearn.linear_model import Ridge # Generate time series with autocorrelation np.random.seed(42) n = 500 X = np.random.randn(n, 5) y = np.zeros(n) for t in range(1, n): y[t] = 0.7 * y[t-1] + X[t] @ [0.5, 0.3, 0.2, 0.1, 0.05] + np.random.randn() * 0.3 # Cross-validate with proper protection cv = ProductionTimeSeriesCV( n_splits=5, min_train_size=100, test_size=50, feature_embargo=10, label_horizon=20, verbose=True ) results = cv.cross_validate(Ridge(alpha=1.0), X, y) print(results)Purging and embargo together form a comprehensive defense against temporal leakage, enabling reliable performance estimation for sophisticated time series applications.
You've now mastered time series cross-validation: forward chaining, sliding and expanding windows, embargo periods, and purging. These techniques form the foundation for reliable performance estimation on temporal data. Apply them rigorously to avoid the common trap of inflated backtest performance that fails in production.