Loading content...
A user opens your e-commerce app. In the last 30 seconds, they've searched for "running shoes," clicked on a Nike product, and added it to their cart. Your recommendation system, retrained nightly on batch data, continues showing them winter boots based on last week's browsing history.
This is the freshness problem—the gap between when user behavior occurs and when it influences recommendations. In a world where user intent can shift in seconds, batch-trained models operating on stale data create fundamentally suboptimal experiences.
Real-time recommendations address this by incorporating fresh signals—recent clicks, searches, views, purchases—into recommendations within milliseconds. The difference is transformative: Netflix reports that incorporating real-time viewing context increased engagement by double digits. TikTok's success is built on recommendations that update with every scroll.
By the end of this page, you will understand the spectrum of real-time requirements from near-real-time to truly real-time, master streaming architectures for feature freshness, learn online learning techniques for model adaptation, and design systems that balance freshness with stability.
"Real-time" means different things in different contexts. Understanding the spectrum of freshness requirements helps you choose appropriate architectures.
Batch (Hours to Days)
Models trained periodically on accumulated data:
Near-Real-Time (Minutes)
Micro-batch processing of recent events:
Real-Time (Seconds)
Streaming processing of individual events:
Instant (Milliseconds)
Request-time computation:
| Signal Type | Update Frequency | Technical Approach | Example |
|---|---|---|---|
| Long-term preferences | Daily | Batch ETL | Movie genre preferences |
| Recent purchases | Minutes | Near-real-time stream | Don't recommend already-bought |
| Current session clicks | Seconds | Real-time stream | Similar to just-viewed items |
| Current page context | Milliseconds | Request-time | Recommendations on product page |
| Time of day | Instant | Request-time | Morning vs evening content |
| Geographic location | Instant | Request-time | Local recommendations |
The Value of Freshness:
Fresher signals are generally more predictive of immediate user intent:
| Signal Age | Predictive Power | Why |
|---|---|---|
| Last 30 seconds | Highest | Captures current intent |
| Last session | High | Same browsing context |
| Last 24 hours | Medium | Recent but may have shifted |
| Last 30 days | Lower | Preferences may have changed |
| Historical | Baseline | Stable but not urgent |
The key insight: recent signals have high predictive value but decay rapidly. A click from 30 seconds ago is highly predictive; a click from 30 days ago is much less so.
$$\text{Signal Value}(t) \approx V_0 \cdot e^{-\lambda t}$$
Where $t$ is time since event and $\lambda$ is the decay rate (varies by signal type).
The best systems combine signals at multiple timescales. Long-term preferences provide stable baseline understanding, while real-time signals capture current intent. Neither alone is sufficient—you need both the 'who they are' (historical) and 'what they want now' (real-time).
Real-time recommendations require streaming infrastructure to process events as they occur and make updated features available for serving.
Core Components:
1. Event Ingestion (Kafka, Kinesis, Pub/Sub)
2. Stream Processing (Flink, Spark Streaming, Kafka Streams)
3. Feature Store (Redis, DynamoDB, Feature Store)
4. Model Serving
Streaming Feature Patterns:
1. Session Features
Aggregate user behavior within current session:
2. Sliding Window Features
Compute statistics over rolling time windows:
3. Real-time Embeddings
Update user embeddings based on recent activity:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
import jsonfrom datetime import datetime, timedeltafrom typing import Dict, List, Optionalfrom dataclasses import dataclass, fieldimport numpy as npfrom collections import defaultdictimport asyncio @dataclassclass UserEvent: """Represents a user interaction event.""" user_id: str event_type: str # 'view', 'click', 'purchase', 'search' item_id: Optional[str] timestamp: datetime properties: Dict = field(default_factory=dict) class StreamingFeatureProcessor: """ Processes event streams to compute real-time features. Maintains per-user state and computes features on the fly. """ def __init__( self, redis_client, item_embeddings: Dict[str, np.ndarray], session_timeout_minutes: int = 30, feature_ttl_seconds: int = 3600, ): self.redis = redis_client self.item_embeddings = item_embeddings self.session_timeout = timedelta(minutes=session_timeout_minutes) self.feature_ttl = feature_ttl_seconds # In-memory session state (in production, use Flink state) self.session_state: Dict[str, Dict] = defaultdict(dict) async def process_event(self, event: UserEvent): """ Process a single event and update features. Called for each event from the stream. """ user_id = event.user_id # Get or create session state session = self._get_or_create_session(user_id, event.timestamp) # Update session based on event type if event.event_type == 'view': self._handle_view(session, event) elif event.event_type == 'click': self._handle_click(session, event) elif event.event_type == 'purchase': self._handle_purchase(session, event) elif event.event_type == 'search': self._handle_search(session, event) # Compute and store updated features features = self._compute_features(session) await self._store_features(user_id, features) def _get_or_create_session( self, user_id: str, timestamp: datetime ) -> Dict: """Get existing session or create new one.""" session = self.session_state.get(user_id) if session is None or self._is_session_expired(session, timestamp): # Start new session session = { 'start_time': timestamp, 'last_event_time': timestamp, 'viewed_items': [], 'clicked_items': [], 'purchased_items': [], 'search_queries': [], 'categories_viewed': defaultdict(int), 'price_sum': 0.0, 'price_count': 0, } self.session_state[user_id] = session else: session['last_event_time'] = timestamp return session def _is_session_expired(self, session: Dict, current_time: datetime) -> bool: """Check if session has timed out.""" return current_time - session['last_event_time'] > self.session_timeout def _handle_view(self, session: Dict, event: UserEvent): """Process view event.""" if event.item_id: session['viewed_items'].append({ 'item_id': event.item_id, 'timestamp': event.timestamp, }) # Track category category = event.properties.get('category') if category: session['categories_viewed'][category] += 1 # Track price price = event.properties.get('price') if price: session['price_sum'] += price session['price_count'] += 1 def _handle_click(self, session: Dict, event: UserEvent): """Process click event.""" if event.item_id: session['clicked_items'].append({ 'item_id': event.item_id, 'timestamp': event.timestamp, }) def _handle_purchase(self, session: Dict, event: UserEvent): """Process purchase event.""" if event.item_id: session['purchased_items'].append({ 'item_id': event.item_id, 'timestamp': event.timestamp, }) def _handle_search(self, session: Dict, event: UserEvent): """Process search event.""" query = event.properties.get('query') if query: session['search_queries'].append({ 'query': query, 'timestamp': event.timestamp, }) def _compute_features(self, session: Dict) -> Dict: """Compute feature vector from session state.""" features = {} # Session statistics features['session_view_count'] = len(session['viewed_items']) features['session_click_count'] = len(session['clicked_items']) features['session_purchase_count'] = len(session['purchased_items']) features['session_search_count'] = len(session['search_queries']) # Click-through rate in session if session['viewed_items']: features['session_ctr'] = ( len(session['clicked_items']) / len(session['viewed_items']) ) else: features['session_ctr'] = 0.0 # Average price viewed if session['price_count'] > 0: features['avg_price_viewed'] = ( session['price_sum'] / session['price_count'] ) else: features['avg_price_viewed'] = 0.0 # Top categories if session['categories_viewed']: sorted_cats = sorted( session['categories_viewed'].items(), key=lambda x: x[1], reverse=True ) features['top_category'] = sorted_cats[0][0] features['category_count'] = len(session['categories_viewed']) else: features['top_category'] = None features['category_count'] = 0 # Recent item IDs (for retrieval) recent_clicked = [ item['item_id'] for item in session['clicked_items'][-10:] ] features['recent_clicked_items'] = recent_clicked # Compute session embedding (average of recent items) session_embedding = self._compute_session_embedding(session) features['session_embedding'] = session_embedding.tolist() return features def _compute_session_embedding( self, session: Dict, decay_factor: float = 0.9, ) -> np.ndarray: """ Compute weighted average embedding of session items. More recent items have higher weight (exponential decay). """ clicked_items = session['clicked_items'] if not clicked_items: # Return zero embedding if no clicks return np.zeros(128) # Assuming 128-dim embeddings embeddings = [] weights = [] for i, item in enumerate(reversed(clicked_items[-20:])): item_id = item['item_id'] if item_id in self.item_embeddings: embeddings.append(self.item_embeddings[item_id]) # Exponential decay by position weights.append(decay_factor ** i) if not embeddings: return np.zeros(128) # Weighted average embeddings = np.array(embeddings) weights = np.array(weights) / sum(weights) return np.average(embeddings, axis=0, weights=weights) async def _store_features(self, user_id: str, features: Dict): """Store computed features in Redis.""" key = f"rt_features:{user_id}" await self.redis.setex( key, self.feature_ttl, json.dumps(features) ) class RealTimeFeatureService: """ Service for retrieving real-time features at request time. """ def __init__(self, redis_client, default_features: Dict): self.redis = redis_client self.default_features = default_features async def get_features(self, user_id: str) -> Dict: """Get real-time features for a user.""" key = f"rt_features:{user_id}" cached = await self.redis.get(key) if cached: return json.loads(cached) return self.default_features.copy() async def get_session_embedding( self, user_id: str ) -> Optional[np.ndarray]: """Get session embedding for retrieval.""" features = await self.get_features(user_id) embedding = features.get('session_embedding') if embedding: return np.array(embedding) return NoneBeyond fresh features, some systems require fresh models—models that adapt to changing user behavior and item popularity in real-time. This is the domain of online learning.
Why Online Learning?
Online Update Strategies:
1. Full Model Retraining
2. Incremental Updates
3. Online Gradient Updates
4. Bandit-Style Updates
| Approach | Freshness | Stability | Compute Cost | Complexity |
|---|---|---|---|---|
| Daily Batch | Low | High | Medium | Low |
| Hourly Incremental | Medium | Medium | Medium | Medium |
| Mini-batch Online | High | Medium-Low | High | High |
| Per-Event Bandits | Very High | Variable | Low | Medium |
The Stability-Freshness Trade-off:
Online learning introduces a fundamental tension:
The optimal balance depends on your domain:
Techniques for Stable Online Learning:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
import numpy as npfrom typing import Dict, List, Tuple, Optionalfrom dataclasses import dataclassfrom collections import dequeimport time @dataclassclass Interaction: """User-item interaction for online learning.""" user_id: str item_id: str label: float # 1.0 for positive, 0.0 for negative features: np.ndarray timestamp: float class OnlineMatrixFactorization: """ Online matrix factorization with incremental SGD updates. Updates user and item embeddings on each interaction. """ def __init__( self, n_factors: int = 64, learning_rate: float = 0.01, regularization: float = 0.01, lr_decay: float = 0.999, min_lr: float = 0.0001, ): self.n_factors = n_factors self.learning_rate = learning_rate self.base_lr = learning_rate self.regularization = regularization self.lr_decay = lr_decay self.min_lr = min_lr # Embeddings (lazy initialization) self.user_factors: Dict[str, np.ndarray] = {} self.item_factors: Dict[str, np.ndarray] = {} # Biases self.user_biases: Dict[str, float] = {} self.item_biases: Dict[str, float] = {} self.global_bias: float = 0.0 # Statistics for adaptive learning self.update_count = 0 def _get_or_init_user(self, user_id: str) -> np.ndarray: """Get user factors, initializing if new.""" if user_id not in self.user_factors: # Initialize with small random values self.user_factors[user_id] = np.random.randn( self.n_factors ) * 0.01 self.user_biases[user_id] = 0.0 return self.user_factors[user_id] def _get_or_init_item(self, item_id: str) -> np.ndarray: """Get item factors, initializing if new.""" if item_id not in self.item_factors: self.item_factors[item_id] = np.random.randn( self.n_factors ) * 0.01 self.item_biases[item_id] = 0.0 return self.item_factors[item_id] def predict(self, user_id: str, item_id: str) -> float: """Predict rating/score for user-item pair.""" user_vec = self._get_or_init_user(user_id) item_vec = self._get_or_init_item(item_id) score = ( self.global_bias + self.user_biases.get(user_id, 0.0) + self.item_biases.get(item_id, 0.0) + np.dot(user_vec, item_vec) ) return score def update(self, interaction: Interaction): """ Update model on single interaction using SGD. This is the core online learning step. """ user_id = interaction.user_id item_id = interaction.item_id label = interaction.label # Get current embeddings user_vec = self._get_or_init_user(user_id) item_vec = self._get_or_init_item(item_id) # Compute prediction and error prediction = self.predict(user_id, item_id) error = label - prediction # Get current learning rate with decay current_lr = max( self.base_lr * (self.lr_decay ** self.update_count), self.min_lr ) # SGD updates # User factors user_update = current_lr * ( error * item_vec - self.regularization * user_vec ) self.user_factors[user_id] = user_vec + user_update # Item factors item_update = current_lr * ( error * user_vec - self.regularization * item_vec ) self.item_factors[item_id] = item_vec + item_update # Biases self.user_biases[user_id] += current_lr * ( error - self.regularization * self.user_biases[user_id] ) self.item_biases[item_id] += current_lr * ( error - self.regularization * self.item_biases[item_id] ) self.update_count += 1 def batch_update(self, interactions: List[Interaction]): """Update on a batch of interactions.""" for interaction in interactions: self.update(interaction) class ReplayBuffer: """ Replay buffer for mixing historical and new data. Prevents catastrophic forgetting by replaying old interactions. """ def __init__( self, max_size: int = 100000, replay_ratio: float = 0.3, # Fraction of batch from replay ): self.buffer = deque(maxlen=max_size) self.replay_ratio = replay_ratio def add(self, interaction: Interaction): """Add interaction to buffer.""" self.buffer.append(interaction) def sample_mixed_batch( self, new_interactions: List[Interaction], batch_size: int, ) -> List[Interaction]: """ Create batch mixing new and replayed interactions. """ n_replay = int(batch_size * self.replay_ratio) n_new = batch_size - n_replay # Sample from new interactions if len(new_interactions) >= n_new: indices = np.random.choice(len(new_interactions), n_new, replace=False) new_batch = [new_interactions[i] for i in indices] else: new_batch = new_interactions # Sample from replay buffer if len(self.buffer) >= n_replay: indices = np.random.choice(len(self.buffer), n_replay, replace=False) replay_batch = [self.buffer[i] for i in indices] else: replay_batch = list(self.buffer) # Add new interactions to buffer for interaction in new_interactions: self.add(interaction) return new_batch + replay_batch class OnlineLearningService: """ Production service for online model updates. Manages update scheduling, validation, and deployment. """ def __init__( self, model: OnlineMatrixFactorization, replay_buffer: ReplayBuffer, update_interval_seconds: int = 300, # 5 minutes validation_sample_size: int = 1000, min_improvement: float = 0.001, ): self.model = model self.replay_buffer = replay_buffer self.update_interval = update_interval_seconds self.validation_sample_size = validation_sample_size self.min_improvement = min_improvement # Pending interactions since last update self.pending_interactions: List[Interaction] = [] self.last_update_time = time.time() # Validation holdout self.validation_set: List[Interaction] = [] self.current_validation_loss = float('inf') def ingest_interaction(self, interaction: Interaction): """ Ingest a new interaction. Triggers update if interval exceeded. """ # Reserve some for validation if len(self.validation_set) < self.validation_sample_size: if np.random.random() < 0.1: self.validation_set.append(interaction) return self.pending_interactions.append(interaction) # Check if update needed if time.time() - self.last_update_time > self.update_interval: self._trigger_update() def _trigger_update(self): """Perform model update cycle.""" if not self.pending_interactions: return # Create mixed batch batch = self.replay_buffer.sample_mixed_batch( self.pending_interactions, batch_size=min(len(self.pending_interactions), 10000) ) # Store current model state for rollback old_user_factors = {k: v.copy() for k, v in self.model.user_factors.items()} old_item_factors = {k: v.copy() for k, v in self.model.item_factors.items()} # Perform updates self.model.batch_update(batch) # Validate new_loss = self._compute_validation_loss() if new_loss < self.current_validation_loss - self.min_improvement: # Accept update self.current_validation_loss = new_loss print(f"Model updated. New validation loss: {new_loss:.4f}") else: # Rollback self.model.user_factors = old_user_factors self.model.item_factors = old_item_factors print(f"Update rejected. Loss {new_loss:.4f} not better than {self.current_validation_loss:.4f}") # Clear pending self.pending_interactions = [] self.last_update_time = time.time() def _compute_validation_loss(self) -> float: """Compute loss on validation set.""" if not self.validation_set: return float('inf') total_error = 0.0 for interaction in self.validation_set: pred = self.model.predict(interaction.user_id, interaction.item_id) error = (pred - interaction.label) ** 2 total_error += error return total_error / len(self.validation_set)Some features can only be computed at request time because they depend on the current context. These request-time features provide the freshest possible signals but must be computed within tight latency budgets.
Categories of Request-Time Features:
1. Temporal Context
2. Device & Location Context
3. Page Context
4. User State
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
from datetime import datetimefrom typing import Dict, List, Optionalimport asyncioimport timefrom dataclasses import dataclassfrom enum import Enum class TimeOfDay(Enum): MORNING = "morning" # 6am - 12pm AFTERNOON = "afternoon" # 12pm - 6pm EVENING = "evening" # 6pm - 10pm NIGHT = "night" # 10pm - 6am @dataclassclass RequestContext: """Context available at request time.""" user_id: Optional[str] device_type: str # 'mobile', 'tablet', 'desktop' country_code: str page_type: str # 'home', 'product', 'search', 'cart' current_item_id: Optional[str] # If on product page search_query: Optional[str] # If on search page cart_items: List[str] session_items: List[str] # Recently viewed in session timestamp: datetime class RequestTimeFeatureExtractor: """ Extract features that can only be computed at request time. All features here must be fast (< 5ms total). """ def __init__(self, item_features_cache: Dict): self.item_cache = item_features_cache def extract(self, context: RequestContext) -> Dict: """Extract all request-time features.""" features = {} # Temporal features features.update(self._temporal_features(context.timestamp)) # Device features features.update(self._device_features(context)) # Page context features features.update(self._page_features(context)) # Session features features.update(self._session_features(context)) return features def _temporal_features(self, timestamp: datetime) -> Dict: """Extract time-based features.""" hour = timestamp.hour # Time of day if 6 <= hour < 12: tod = TimeOfDay.MORNING elif 12 <= hour < 18: tod = TimeOfDay.AFTERNOON elif 18 <= hour < 22: tod = TimeOfDay.EVENING else: tod = TimeOfDay.NIGHT # Day of week dow = timestamp.weekday() is_weekend = dow >= 5 return { 'hour_of_day': hour, 'time_of_day': tod.value, 'day_of_week': dow, 'is_weekend': is_weekend, 'is_holiday_season': self._is_holiday_season(timestamp), } def _is_holiday_season(self, timestamp: datetime) -> bool: """Check if in holiday season (simplified).""" month = timestamp.month return month in [11, 12] # Nov-Dec def _device_features(self, context: RequestContext) -> Dict: """Extract device-based features.""" return { 'device_type': context.device_type, 'is_mobile': context.device_type == 'mobile', 'country': context.country_code, # Could add more: OS, browser, screen size, etc. } def _page_features(self, context: RequestContext) -> Dict: """Extract page context features.""" features = { 'page_type': context.page_type, 'has_search_query': context.search_query is not None, 'has_cart_items': len(context.cart_items) > 0, 'cart_size': len(context.cart_items), } # Current item features (if on product page) if context.current_item_id and context.current_item_id in self.item_cache: item = self.item_cache[context.current_item_id] features['current_item_category'] = item.get('category') features['current_item_price'] = item.get('price', 0) features['current_item_brand'] = item.get('brand') return features def _session_features(self, context: RequestContext) -> Dict: """Extract session-based features.""" return { 'session_depth': len(context.session_items), 'has_session_history': len(context.session_items) > 0, 'is_logged_in': context.user_id is not None, } class ParallelFeatureFetcher: """ Fetch features from multiple sources in parallel. Ensures we don't block on slow sources. """ def __init__( self, feature_service, # Redis or feature store client timeout_ms: float = 20, ): self.feature_service = feature_service self.timeout = timeout_ms / 1000 async def fetch_all( self, user_id: str, candidate_items: List[str], ) -> Dict: """ Fetch all features in parallel. Returns combined feature dict. """ # Create tasks for parallel execution tasks = { 'user_features': self._fetch_user_features(user_id), 'item_features': self._fetch_item_features_batch(candidate_items), 'real_time_features': self._fetch_real_time_features(user_id), } # Execute with timeout start = time.time() results = {} for name, task in tasks.items(): try: result = await asyncio.wait_for( task, timeout=self.timeout ) results[name] = result except asyncio.TimeoutError: print(f"Feature fetch timeout: {name}") results[name] = {} # Use empty dict as fallback elapsed_ms = (time.time() - start) * 1000 print(f"Feature fetching took {elapsed_ms:.1f}ms") return results async def _fetch_user_features(self, user_id: str) -> Dict: """Fetch pre-computed user features.""" return await self.feature_service.get_user_features(user_id) async def _fetch_item_features_batch( self, item_ids: List[str] ) -> Dict[str, Dict]: """Fetch item features in batch.""" return await self.feature_service.get_item_features_batch(item_ids) async def _fetch_real_time_features(self, user_id: str) -> Dict: """Fetch streaming-computed real-time features.""" return await self.feature_service.get_real_time_features(user_id)We've explored the critical dimension of freshness in recommendation systems. Let's consolidate the key learnings:
You now understand how to incorporate fresh signals into recommendations. Next, we'll explore diversity and fairness—ensuring recommendations are not just accurate but also varied and equitable.