Loading learning content...
Expanding window (also called anchored walk-forward or cumulative window) cross-validation represents the "more data is better" philosophy for time series validation. Starting from a fixed anchor point, the training window grows with each successive fold, always incorporating all available historical data up to the validation period.
This approach maximizes the training set size for later folds, producing models that benefit from the full historical record. When the underlying data-generating process is relatively stable, expanding window leverages the statistical power of larger samples to produce more precise, lower-variance predictions.
This page provides an in-depth treatment of expanding window cross-validation—its mechanics, optimal configuration, how to handle the growing training set's computational implications, and strategies for interpreting performance across folds with vastly different training sizes.
By the end of this page, you will master expanding window mechanics, understand how to handle growing training costs, learn to weight fold scores appropriately, and know when expanding window produces biased vs. reliable estimates.
The Expanding Window Algorithm:
The expanding window approach anchors the training set's start at the beginning of the time series while the end grows forward with each fold:
Mathematical Properties:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
import numpy as npfrom typing import Generator, Tuple, Listfrom dataclasses import dataclass @dataclassclass ExpandingWindowConfig: """Configuration for expanding window CV.""" min_train_size: int test_size: int max_train_size: int = None # Optional cap on training size gap: int = 0 def get_fold_info(self, n_samples: int) -> List[dict]: """Calculate all fold configurations.""" folds = [] train_end = self.min_train_size while train_end + self.gap + self.test_size <= n_samples: # Apply max training size cap if specified train_start = 0 if self.max_train_size and train_end > self.max_train_size: train_start = train_end - self.max_train_size test_start = train_end + self.gap test_end = test_start + self.test_size folds.append({ 'train_start': train_start, 'train_end': train_end, 'train_size': train_end - train_start, 'test_start': test_start, 'test_end': test_end }) train_end += self.test_size return folds def expanding_window_splits( n_samples: int, min_train_size: int, test_size: int, gap: int = 0, max_train_size: int = None) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: """ Generate expanding window cross-validation splits. Parameters: ----------- n_samples : int Total number of observations min_train_size : int Size of training set for first fold test_size : int Size of each validation set gap : int Embargo period between train and test max_train_size : int, optional Cap training size (creates a hybrid sliding/expanding approach) Yields: ------- train_indices, test_indices : tuple of np.ndarray """ indices = np.arange(n_samples) train_end = min_train_size while train_end + gap + test_size <= n_samples: # Standard expanding: start from 0 train_start = 0 # Optional: cap training size (hybrid approach) if max_train_size and (train_end - train_start) > max_train_size: train_start = train_end - max_train_size test_start = train_end + gap test_end = test_start + test_size yield ( indices[train_start:train_end], indices[test_start:test_end] ) train_end += test_size def visualize_expanding_window(n_samples: int = 100): """Visualize expanding window progression.""" min_train = 20 test_size = 15 print("EXPANDING WINDOW VISUALIZATION") print("=" * 60) print("Training set [###] grows each fold while test [***] slides") print("=" * 60) for fold, (train, test) in enumerate( expanding_window_splits(n_samples, min_train, test_size), 1 ): ratio = len(train) / min_train bar = ['.'] * 50 scale = n_samples / 50 for i in train: bar[int(i / scale)] = '#' for i in test: bar[int(i / scale)] = '*' print(f"Fold {fold}: {''.join(bar)}") print(f" Train size: {len(train)} ({ratio:.1f}x initial)") print() visualize_expanding_window() # Output:# EXPANDING WINDOW VISUALIZATION# ============================================================# Fold 1: ##########*******...................................# Train size: 20 (1.0x initial)## Fold 2: #################*******............................# Train size: 35 (1.8x initial)## Fold 3: ########################*******.....................# Train size: 50 (2.5x initial)# ...The expanding window's key characteristic—training set growth—creates both opportunities and challenges that practitioners must understand and manage.
Statistical Implications:
Computational Implications:
| Algorithm | Training Complexity | Impact of Expanding Window |
|---|---|---|
| Linear Regression (OLS) | O(np² + p³) | Moderate growth with n |
| Random Forest | O(n log n × trees × features) | Significant growth |
| Gradient Boosting | O(n × trees × features) | Significant growth |
| Neural Networks | O(n × epochs × parameters) | Can be substantial |
| k-Nearest Neighbors | O(1) train, O(n) predict | Prediction cost grows |
With 10 folds where training size doubles from first to last fold, the final fold can take 2-4x longer than the first (depending on algorithm). Budget total CV time accordingly—it's not simply n_folds × time_per_fold.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
import numpy as npfrom typing import Listimport time def estimate_expanding_window_runtime( n_samples: int, min_train_size: int, test_size: int, time_per_sample: float, # Training time per sample (empirically measured) algorithm_complexity: str = "linear" # "linear", "nlogn", "quadratic") -> dict: """ Estimate total runtime for expanding window CV. Parameters: ----------- n_samples : int Total samples in dataset min_train_size : int Initial training size test_size : int Size of each test fold time_per_sample : float Empirically measured training time per sample (seconds) algorithm_complexity : str How training time scales with n Returns: -------- dict with total time estimate and per-fold breakdown """ folds = [] train_end = min_train_size while train_end + test_size <= n_samples: folds.append(train_end) train_end += test_size def complexity_factor(n: int, base_n: int = 1000) -> float: """Calculate complexity multiplier relative to base.""" if algorithm_complexity == "linear": return n / base_n elif algorithm_complexity == "nlogn": return (n * np.log(n)) / (base_n * np.log(base_n)) elif algorithm_complexity == "quadratic": return (n / base_n) ** 2 else: return n / base_n # Estimate time for each fold fold_times = [] for train_size in folds: factor = complexity_factor(train_size) fold_time = time_per_sample * train_size * factor / complexity_factor(1000) fold_times.append(fold_time) total_time = sum(fold_times) # Compare to what uniform cost would be uniform_time = len(folds) * fold_times[0] overhead_ratio = total_time / uniform_time return { 'n_folds': len(folds), 'training_sizes': folds, 'fold_times': fold_times, 'total_time_seconds': total_time, 'first_fold_time': fold_times[0], 'last_fold_time': fold_times[-1], 'overhead_ratio': overhead_ratio, 'slowdown_factor': fold_times[-1] / fold_times[0] } # Example estimationresult = estimate_expanding_window_runtime( n_samples=10000, min_train_size=1000, test_size=500, time_per_sample=0.001, # 1ms per sample algorithm_complexity="nlogn" # e.g., Random Forest) print("Expanding Window Runtime Estimate")print("=" * 50)print(f"Number of folds: {result['n_folds']}")print(f"First fold time: {result['first_fold_time']:.2f}s")print(f"Last fold time: {result['last_fold_time']:.2f}s")print(f"Total time: {result['total_time_seconds']:.2f}s")print(f"Slowdown (last/first): {result['slowdown_factor']:.2f}x")print(f"Overhead vs uniform: {result['overhead_ratio']:.2f}x")In standard k-fold CV, all folds have identically-sized training sets, making fold scores approximately exchangeable (each equally reliable). In expanding window CV, fold scores are not exchangeable—they come from models with vastly different training sizes and validate on different time periods.
Why This Matters:
Solutions:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
import numpy as npfrom typing import List, Tuplefrom scipy import stats def aggregate_expanding_window_scores( fold_scores: List[float], train_sizes: List[int], method: str = "inverse_variance") -> dict: """ Aggregate fold scores accounting for non-exchangeability. Parameters: ----------- fold_scores : List[float] Validation score for each fold train_sizes : List[int] Training set size for each fold method : str Aggregation method: - "uniform": Standard averaging (baseline) - "linear": Weight proportional to training size - "inverse_variance": Weight by 1/variance estimate - "final_only": Only use the last k folds Returns: -------- dict with aggregated statistics """ scores = np.array(fold_scores) sizes = np.array(train_sizes) n_folds = len(scores) results = {} # Uniform (standard) averaging results['uniform'] = { 'mean': np.mean(scores), 'std': np.std(scores), 'se': np.std(scores) / np.sqrt(n_folds) } # Linear weighting by training size w_linear = sizes / sizes.sum() results['linear'] = { 'mean': np.sum(w_linear * scores), 'std': np.sqrt(np.sum(w_linear * (scores - np.sum(w_linear * scores))**2)), } # Inverse variance weighting (variance ~ 1/n approximation) # Larger training sets should have lower variance estimated_variance = 1.0 / sizes w_iv = (1 / estimated_variance) / np.sum(1 / estimated_variance) results['inverse_variance'] = { 'mean': np.sum(w_iv * scores), 'std': np.sqrt(np.sum(w_iv * (scores - np.sum(w_iv * scores))**2)), } # Final folds only (e.g., last 50%) cutoff = n_folds // 2 results['final_only'] = { 'mean': np.mean(scores[cutoff:]), 'std': np.std(scores[cutoff:]), 'n_folds_used': n_folds - cutoff } # Learning curve extrapolation # Fit: score = a + b/sqrt(n) # This assumes performance improves with more data sqrt_inv = 1 / np.sqrt(sizes) slope, intercept, r_value, p_value, std_err = stats.linregress(sqrt_inv, scores) # Extrapolate to "infinite" data (intercept) or to full dataset size max_train_size = sizes[-1] + (sizes[-1] - sizes[-2]) # Approximate next step extrapolated = intercept + slope / np.sqrt(max_train_size) results['extrapolated'] = { 'asymptotic_performance': intercept, 'extrapolated_to_next_fold': extrapolated, 'learning_curve_r2': r_value ** 2, 'improvement_slope': -slope # Negative slope means improving with more data } return results def diagnose_expanding_window_cv( fold_scores: List[float], train_sizes: List[int]) -> dict: """ Diagnose potential issues with expanding window CV results. """ scores = np.array(fold_scores) sizes = np.array(train_sizes) # Check for trend in scores time_corr, time_p = stats.pearsonr(np.arange(len(scores)), scores) size_corr, size_p = stats.pearsonr(sizes, scores) # Check for variance reduction with training size # Split into early and late folds mid = len(scores) // 2 early_var = np.var(scores[:mid]) late_var = np.var(scores[mid:]) issues = [] # High correlation with training size suggests learning curve effects if abs(size_corr) > 0.5 and size_p < 0.1: direction = "improving" if size_corr > 0 else "degrading" issues.append( f"Strong correlation with training size ({size_corr:.2f}): " f"performance {direction} as training grows" ) # Variance difference suggests non-exchangeability if early_var > 0 and late_var / early_var < 0.5: issues.append( "Late folds have much lower variance than early folds - " "standard error estimates may be inflated" ) # Check for potential concept drift (degrading despite more data) if time_corr < -0.3 and time_p < 0.1: issues.append( "Performance degrading over time - possible concept drift. " "Consider sliding window instead of expanding." ) return { 'train_size_correlation': size_corr, 'time_correlation': time_corr, 'early_fold_variance': early_var, 'late_fold_variance': late_var, 'variance_ratio': late_var / early_var if early_var > 0 else np.nan, 'issues': issues, 'recommendation': ( "Use weighted averaging" if issues else "Standard averaging is acceptable" ) } # Example usagefold_scores = [0.65, 0.72, 0.74, 0.76, 0.78, 0.79, 0.80, 0.81]train_sizes = [100, 150, 200, 250, 300, 350, 400, 450] aggregation = aggregate_expanding_window_scores(fold_scores, train_sizes)diagnosis = diagnose_expanding_window_cv(fold_scores, train_sizes) print("Aggregation Methods Comparison")print("=" * 50)for method, stats in aggregation.items(): if 'mean' in stats: print(f"{method}: {stats['mean']:.4f}") print(f"Diagnosis:")print(f"Train size correlation: {diagnosis['train_size_correlation']:.2f}")for issue in diagnosis['issues']: print(f" ⚠ {issue}")Capped expanding window limits maximum training size while maintaining the "start from beginning" philosophy of expanding window. This hybrid approach captures the benefits of both methods:
Configuration: Set max_train_size to cap training window. Once the cap is reached, the training window slides forward (becoming equivalent to sliding window).
When to Use Capped Expanding:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
import numpy as npfrom typing import Generator, Tuple def capped_expanding_splits( n_samples: int, min_train_size: int, max_train_size: int, test_size: int, gap: int = 0) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: """ Generate capped expanding window splits. Grows training set up to max_train_size, then slides. Parameters: ----------- n_samples : int Total observations min_train_size : int Initial training size max_train_size : int Maximum training size (cap) test_size : int Validation set size gap : int Embargo period Yields: ------- train_indices, test_indices """ indices = np.arange(n_samples) train_end = min_train_size train_start = 0 while train_end + gap + test_size <= n_samples: # Check if we've hit the cap current_size = train_end - train_start if current_size >= max_train_size: # Slide forward to maintain max size train_start = train_end - max_train_size test_start = train_end + gap test_end = test_start + test_size yield ( indices[train_start:train_end], indices[test_start:test_end] ) train_end += test_size def visualize_capped_expanding(n_samples: int = 120): """Visualize the transition from expanding to sliding.""" min_train = 20 max_train = 50 test_size = 10 print("CAPPED EXPANDING WINDOW") print("=" * 60) print(f"Min train: {min_train}, Max train: {max_train}, Test: {test_size}") print("Grows until cap, then slides") print("=" * 60) for fold, (train, test) in enumerate( capped_expanding_splits( n_samples, min_train, max_train, test_size ), 1 ): phase = "EXPANDING" if len(train) < max_train else "SLIDING" bar = ['.'] * 60 scale = n_samples / 60 for i in train: idx = min(int(i / scale), 59) bar[idx] = '#' for i in test: idx = min(int(i / scale), 59) bar[idx] = '*' print(f"Fold {fold:2d} [{phase:9s}]: {''.join(bar[:50])}") print(f" Train[{train[0]:3d}:{train[-1]+1:3d}] = {len(train)} samples") if fold > 8: print("...") break visualize_capped_expanding() # Output:# CAPPED EXPANDING WINDOW# ============================================================# Min train: 20, Max train: 50, Test: 10# Grows until cap, then slides# ============================================================# Fold 1 [EXPANDING]: ##########*****......# Train[ 0: 20] = 20 samples# Fold 2 [EXPANDING]: ###############*****......# Train[ 0: 30] = 30 samples# Fold 3 [EXPANDING]: ####################*****......# Train[ 0: 40] = 40 samples# Fold 4 [EXPANDING]: #########################*****......# Train[ 0: 50] = 50 samples# Fold 5 [SLIDING ]: .....#########################*****......# Train[ 10: 60] = 50 samples (HIT CAP - now sliding)# Fold 6 [SLIDING ]: ..........#########################*****......# Train[ 20: 70] = 50 samplesExpanding window naturally produces a learning curve—a plot of model performance against training set size. This curve is invaluable for understanding whether more data would help and for extrapolating expected performance at deployment.
Key Learning Curve Patterns:
1. Monotonically Improving (Healthy) Performance improves steadily with more data, suggesting:
2. Plateau (Saturated) Performance levels off despite more data, suggesting:
3. Initial Improvement Then Degradation (Concept Drift) Performance peaks at some training size then degrades, suggesting:
4. High Variance (Unstable) Large fluctuations across folds, suggesting:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
import numpy as npfrom typing import List, Dict, Tuplefrom scipy.optimize import curve_fitimport matplotlib.pyplot as plt def fit_learning_curve( train_sizes: List[int], scores: List[float]) -> Dict: """ Fit parametric learning curve models to expanding window results. Common models: - Power law: score = a - b * n^(-c) - Inverse sqrt: score = a - b / sqrt(n) - Logarithmic: score = a + b * log(n) """ sizes = np.array(train_sizes) scores = np.array(scores) results = {} # Model 1: Inverse square root (common for many ML algorithms) # score = a - b / sqrt(n) try: def inv_sqrt(n, a, b): return a - b / np.sqrt(n) popt, pcov = curve_fit( inv_sqrt, sizes, scores, p0=[np.max(scores), 1.0], bounds=([0, 0], [2, 100]) ) predictions = inv_sqrt(sizes, *popt) residuals = scores - predictions r_squared = 1 - np.sum(residuals**2) / np.sum((scores - np.mean(scores))**2) results['inverse_sqrt'] = { 'asymptotic_score': popt[0], 'learning_rate': popt[1], 'r_squared': r_squared, 'predict': lambda n: inv_sqrt(n, *popt) } except RuntimeError: results['inverse_sqrt'] = {'error': 'Failed to fit'} # Model 2: Power law # score = a - b * n^(-c) try: def power_law(n, a, b, c): return a - b * np.power(n, -c) popt, pcov = curve_fit( power_law, sizes, scores, p0=[np.max(scores), 1.0, 0.5], bounds=([0, 0, 0.01], [2, 100, 2.0]), maxfev=5000 ) predictions = power_law(sizes, *popt) residuals = scores - predictions r_squared = 1 - np.sum(residuals**2) / np.sum((scores - np.mean(scores))**2) results['power_law'] = { 'asymptotic_score': popt[0], 'coefficient': popt[1], 'exponent': popt[2], 'r_squared': r_squared, 'predict': lambda n: power_law(n, *popt) } except RuntimeError: results['power_law'] = {'error': 'Failed to fit'} # Estimate samples needed for target performance def samples_for_performance(target: float, model_name: str = 'inverse_sqrt') -> int: """Estimate samples needed to achieve target score.""" if model_name not in results or 'predict' not in results[model_name]: return np.nan model = results[model_name] asymptote = model['asymptotic_score'] if target > asymptote: return np.inf # Target exceeds theoretical maximum # Binary search for sample size lo, hi = sizes[0], sizes[-1] * 100 while hi - lo > 1: mid = (lo + hi) // 2 if model['predict'](mid) >= target: hi = mid else: lo = mid return hi results['samples_for_performance'] = samples_for_performance return results def plot_learning_curve_with_extrapolation( train_sizes: List[int], scores: List[float], extrapolate_to: int = None): """Visualize learning curve with fitted model and extrapolation.""" analysis = fit_learning_curve(train_sizes, scores) plt.figure(figsize=(12, 5)) # Plot 1: Raw learning curve plt.subplot(1, 2, 1) plt.scatter(train_sizes, scores, s=100, color='blue', label='CV Fold Scores') plt.xlabel('Training Set Size') plt.ylabel('Score') plt.title('Expanding Window Learning Curve') plt.grid(True, alpha=0.3) # Add fitted curve if 'inverse_sqrt' in analysis and 'predict' in analysis['inverse_sqrt']: model = analysis['inverse_sqrt'] x_smooth = np.linspace(train_sizes[0], train_sizes[-1], 100) y_smooth = [model['predict'](x) for x in x_smooth] plt.plot(x_smooth, y_smooth, 'r--', label=f"Fit (R²={model['r_squared']:.3f})") # Asymptote plt.axhline(model['asymptotic_score'], color='green', linestyle=':', label=f"Asymptote: {model['asymptotic_score']:.3f}") plt.legend() # Plot 2: Extrapolation plt.subplot(1, 2, 2) extrap_to = extrapolate_to or train_sizes[-1] * 2 if 'inverse_sqrt' in analysis and 'predict' in analysis['inverse_sqrt']: model = analysis['inverse_sqrt'] x_extrap = np.linspace(train_sizes[0], extrap_to, 100) y_extrap = [model['predict'](x) for x in x_extrap] plt.plot(x_extrap, y_extrap, 'b-', label='Extrapolated') plt.scatter(train_sizes, scores, s=100, color='blue', zorder=5) plt.axvline(train_sizes[-1], color='red', linestyle='--', label='Current data limit') plt.axhline(model['asymptotic_score'], color='green', linestyle=':') # Mark extrapolated prediction extrap_score = model['predict'](extrap_to) plt.scatter([extrap_to], [extrap_score], s=150, color='red', marker='*', zorder=5, label=f'Predicted: {extrap_score:.3f}') plt.xlabel('Training Set Size') plt.ylabel('Score') plt.title(f'Extrapolation to n={extrap_to}') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() # Exampletrain_sizes = [100, 200, 300, 400, 500, 600, 700, 800]scores = [0.72, 0.78, 0.81, 0.83, 0.845, 0.855, 0.86, 0.865] analysis = fit_learning_curve(train_sizes, scores)print(f"Asymptotic performance: {analysis['inverse_sqrt']['asymptotic_score']:.3f}")print(f"R² of fit: {analysis['inverse_sqrt']['r_squared']:.3f}") # How much data for 0.875 score?samples_needed = analysis['samples_for_performance'](0.875, 'inverse_sqrt')print(f"Estimated samples for 0.875 score: {samples_needed}")Based on extensive practical experience, here are the essential best practices for expanding window cross-validation:
Expanding window cross-validation maximizes training data usage, producing powerful models when data is stationary and more information consistently helps.
The next page introduces embargo periods—the gap between training and test sets that prevents subtle forms of temporal leakage. This is particularly critical for financial time series and high-frequency data where even small amounts of look-ahead can dramatically inflate performance estimates.