Loading learning content...
You've computed similarities between users or items. Now what?
The raw similarity matrix is just a tool—the real challenge is using it wisely to make predictions. Which neighbors should influence a prediction? How should their opinions be combined? How do we balance the wisdom of many weak neighbors against the insight of a few strong ones?
These questions define neighborhood methods—the algorithmic layer that transforms similarity computations into actionable recommendations. The choices here directly impact prediction accuracy, recommendation diversity, cold-start behavior, and computational efficiency.
This page provides systematic coverage of neighborhood selection strategies, aggregation schemes, and the engineering considerations that make neighborhood-based CF practical at scale.
By the end of this page, you will understand top-k vs threshold-based neighborhood selection, master various aggregation and weighting schemes, appreciate the bias-variance tradeoffs in neighborhood sizing, and know how to handle edge cases like cold start and sparse coverage.
The first decision: how do we form the neighborhood? This determines which users/items contribute to each prediction.
Strategy 1: Top-k Neighbors
Select exactly k most similar users/items:
N_k(u, i) = top_k({v : v ∈ U, v ≠ u, R[v][i] exists}, by sim(u,v))
Advantages:
Disadvantages:
Strategy 2: Threshold-based Neighbors
Include all users/items with similarity above threshold τ:
N_τ(u, i) = {v : v ∈ U, v ≠ u, R[v][i] exists, sim(u,v) ≥ τ}
Advantages:
Disadvantages:
Strategy 3: Hybrid (Top-k above Threshold)
First filter by threshold, then take top-k:
N_hybrid(u, i) = top_k(N_τ(u, i), by sim(u,v))
Combines quality filtering with computational bounds. Generally the best approach for production systems.
| Strategy | Parameter(s) | Coverage | Quality | Best For |
|---|---|---|---|---|
| Top-k | k (typically 20-100) | Consistent | Variable | Systems needing predictable latency |
| Threshold | τ (typically 0.1-0.5) | Variable | Guaranteed | Quality-focused applications |
| Hybrid | k and τ | Bounded | Guaranteed minimum | Production systems |
| Adaptive k | k varies per user/item | Varies | Optimized | Research/offline systems |
Start with top-k (k=50) with a minimum similarity threshold of 0.1. This provides consistent performance while filtering out noise. Tune k via cross-validation—optimal values typically range from 20-200 depending on data density.
Neighborhood size (k) is perhaps the most critical hyperparameter in neighborhood-based CF. Its effect follows the classic bias-variance tradeoff:
Small k (e.g., 5-20):
Large k (e.g., 200+):
The Sweet Spot:
Empirical studies consistently find optimal k in the 20-100 range for most datasets. The exact optimum depends on:
Visualizing the Tradeoff:
Prediction Error
↑
| \ Total Error
| \ ........
| \..... ......... Variance
| /\
| / \_______________ Bias²
| /
+-----+---+---+---+---+---+--→ k (Neighborhood Size)
5 20 50 100 200
↑
Optimal k
The total error (MSE = Bias² + Variance + Noise) is minimized at moderate k values where neither component dominates.
The optimal k varies across users and items. Power users with many ratings can benefit from larger neighborhoods. Niche items need smaller k (few relevant neighbors exist). Adaptive methods that set k per prediction outperform global k, though they add complexity.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
import numpy as npfrom typing import List, Tuple, Callable def tune_k( validation_set: List[Tuple[int, int, float]], predict_fn: Callable[[int, int, int], float], k_values: List[int] = [5, 10, 20, 50, 100, 200],) -> Tuple[int, dict]: """ Find optimal k using validation set. Args: validation_set: List of (user_id, item_id, true_rating) predict_fn: Function(user, item, k) -> predicted_rating k_values: Candidate k values to evaluate Returns: (best_k, metrics_dict) """ results = {} for k in k_values: errors = [] for user_id, item_id, true_rating in validation_set: pred = predict_fn(user_id, item_id, k) errors.append((pred - true_rating) ** 2) rmse = np.sqrt(np.mean(errors)) results[k] = { 'rmse': rmse, 'predictions': len(errors) } print(f" k={k:3d}: RMSE = {rmse:.4f}") best_k = min(results.keys(), key=lambda k: results[k]['rmse']) return best_k, results def adaptive_k( user_density: float, item_popularity: float, base_k: int = 50, min_k: int = 10, max_k: int = 200) -> int: """ Compute adaptive k based on data characteristics. Args: user_density: Fraction of items the user has rated item_popularity: Fraction of users who rated this item base_k: Starting k value Returns: Adjusted k value """ # More active users → can use larger k user_factor = 1 + np.log1p(user_density * 100) # More popular items → can use larger k item_factor = 1 + np.log1p(item_popularity * 100) # Combine factors k = int(base_k * np.sqrt(user_factor * item_factor)) return max(min_k, min(max_k, k)) # Example: adaptive k valuesprint("Adaptive k examples:")print(f" New user, obscure item: k = {adaptive_k(0.001, 0.001)}")print(f" Active user, popular item: k = {adaptive_k(0.1, 0.3)}")print(f" Active user, obscure item: k = {adaptive_k(0.1, 0.001)}")print(f" New user, popular item: k = {adaptive_k(0.001, 0.3)}")Once neighbors are selected, how do we combine their ratings into a prediction? Several aggregation schemes exist, each with different properties.
1. Simple Average:
1
r̂_ui = ───────── · Σ_{v ∈ N(u,i)} R_vi
|N(u,i)|
2. Weighted Average:
Σ_{v ∈ N(u,i)} sim(u,v) · R_vi
r̂_ui = ────────────────────────────────────
Σ_{v ∈ N(u,i)} |sim(u,v)|
3. Mean-Centered Weighted Average:
Σ_{v ∈ N(u,i)} sim(u,v) · (R_vi - r̄_v)
r̂_ui = r̄_u + ─────────────────────────────────────────────
Σ_{v ∈ N(u,i)} |sim(u,v)|
4. Z-score Normalized:
Σ_{v ∈ N(u,i)} sim(u,v) · (R_vi - r̄_v) / σ_v
r̂_ui = r̄_u + σ_u · ────────────────────────────────────────────────
Σ_{v ∈ N(u,i)} |sim(u,v)|
| Scheme | Handles Rating Bias? | Handles Scale Differences? | Complexity | Typical Use |
|---|---|---|---|---|
| Simple Average | No | No | O(k) | Binary feedback only |
| Weighted Average | No | No | O(k) | When ratings are standardized |
| Mean-Centered | Yes | Partially | O(k) | Explicit ratings (most common) |
| Z-score Normalized | Yes | Yes | O(k) | When scale variance is high |
For explicit ratings (1-5 stars), use mean-centered weighted average. It handles the most common bias (generous vs harsh raters) while being simple to implement. Switch to z-score normalization if you observe users with vastly different rating variances affecting predictions.
Beyond raw similarity weights, several refinements can improve prediction quality.
1. Case Amplification:
Amplify differences between high and low similarities:
weight(u,v) = sim(u,v)^α (α > 1)
With α=2.5, a neighbor with sim=0.8 gets weight 0.8^2.5 ≈ 0.57, while sim=0.4 gets 0.4^2.5 ≈ 0.10. This emphasizes the most similar neighbors.
Typical α values: 2.0 to 3.0
2. Significance Weighting:
Reduce weight for unreliable similarity estimates:
min(n_common, β)
weight(u,v) = sim(u,v) · ─────────────────
β
Where n_common is co-rated items and β is a threshold (typically 20-50).
With 5 co-rated items and β=50, the similarity is reduced to 5/50 = 10% of its raw value.
3. Inverse Item Frequency (IIF):
Weight agreement on rare items more heavily:
iif(i) = log(|U| / |U_i|)
new_sim(u,v) = original_correlation_with_iif_weighted_deviations
If everyone rates the most popular movie similarly, that agreement is less informative than agreement on a niche film.
4. Time Decay:
Weight recent neighbors more heavily:
weight(v, t) = sim(u,v) · exp(-λ · (t_now - t_v))
Where t_v is when neighbor v rated the item. Useful for domains where preferences evolve (e.g., fashion, music).
5. Trust/Reputation Weighting:
Weight neighbors by their prediction accuracy on past items:
trust(v) = 1 / (1 + avg_prediction_error(v))
weight(u,v) = sim(u,v) · trust(v)
Neighbors who historically made accurate predictions get more influence.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
import numpy as npfrom typing import List, Tuple, Dictfrom dataclasses import dataclass @dataclassclass Neighbor: """Neighbor with associated metadata.""" user_id: int similarity: float rating: float user_mean: float user_std: float = 1.0 n_common: int = 0 rating_time: float = 0.0 class WeightedPredictor: """ Flexible prediction with multiple weighting schemes. """ def __init__( self, case_amplification: float = 1.0, significance_threshold: int = 50, use_iif: bool = False, time_decay: float = 0.0, use_z_score: bool = False, ): self.case_amp = case_amplification self.sig_threshold = significance_threshold self.use_iif = use_iif self.time_decay = time_decay self.use_z_score = use_z_score def compute_weight( self, neighbor: Neighbor, current_time: float = 0.0 ) -> float: """Compute combined weight for a neighbor.""" weight = neighbor.similarity # Case amplification if self.case_amp != 1.0: weight = np.sign(weight) * (abs(weight) ** self.case_amp) # Significance weighting if neighbor.n_common < self.sig_threshold: weight *= neighbor.n_common / self.sig_threshold # Time decay if self.time_decay > 0 and current_time > 0: age = current_time - neighbor.rating_time weight *= np.exp(-self.time_decay * age) return weight def predict( self, target_mean: float, target_std: float, neighbors: List[Neighbor], current_time: float = 0.0 ) -> float: """ Generate prediction using weighted aggregation. Returns predicted rating. """ if not neighbors: return target_mean numerator = 0.0 denominator = 0.0 for n in neighbors: weight = self.compute_weight(n, current_time) if self.use_z_score: # Z-score normalized deviation if n.user_std > 0: deviation = (n.rating - n.user_mean) / n.user_std else: deviation = 0.0 else: # Mean-centered deviation deviation = n.rating - n.user_mean numerator += weight * deviation denominator += abs(weight) if denominator == 0: return target_mean # Aggregate deviation agg_deviation = numerator / denominator if self.use_z_score: # Scale back by target user's std prediction = target_mean + target_std * agg_deviation else: prediction = target_mean + agg_deviation return float(np.clip(prediction, 1.0, 5.0)) # Example usageif __name__ == "__main__": # Create neighbors with different characteristics neighbors = [ Neighbor(1, sim=0.9, rating=5.0, user_mean=3.8, n_common=30), Neighbor(2, sim=0.7, rating=4.0, user_mean=2.5, n_common=50), Neighbor(3, sim=0.5, rating=5.0, user_mean=4.0, n_common=10), Neighbor(4, sim=0.3, rating=4.0, user_mean=3.0, n_common=5), ] target_mean = 3.5 target_std = 1.2 # Compare different configurations configs = [ ("Basic mean-centered", {}), ("With case amplification (α=2.5)", {"case_amplification": 2.5}), ("With significance weighting", {"significance_threshold": 30}), ("Z-score normalized", {"use_z_score": True}), ("Full (all features)", { "case_amplification": 2.0, "significance_threshold": 30, "use_z_score": True }), ] print("Prediction Comparison:") print("=" * 50) for name, kwargs in configs: predictor = WeightedPredictor(**kwargs) pred = predictor.predict(target_mean, target_std, neighbors) print(f" {name:35s}: {pred:.2f}")Production systems must handle scenarios where standard neighborhood methods fail.
1. No Neighbors Found:
When no similar users/items exist who rated the target item:
Fallback strategies (in order of preference):
if len(neighbors) == 0:
if user_mean is not None:
return user_mean
elif item_mean is not None:
return item_mean
else:
return global_mean
2. Cold Start: New Users
Users with few or no ratings can't find meaningful neighbors.
Strategies:
3. Cold Start: New Items
New items have no ratings, so they appear in no neighborhoods.
Strategies:
4. Gray Sheep Users:
Some users have unusual tastes that don't match any community. Their neighborhoods are weak.
Detection: Low average similarity to assigned neighbors
Strategies:
A production recommender should never fail to return recommendations. Even a bad recommendation is better than an error page. Implement a fallback chain: personalized → popular → random. Users encountering errors churn; users seeing unpersonalized content at least stay engaged.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
from typing import List, Tuple, Optionalfrom dataclasses import dataclassfrom enum import Enum class FallbackReason(Enum): NONE = "no_fallback" NO_NEIGHBORS = "no_neighbors" COLD_USER = "cold_user" COLD_ITEM = "cold_item" LOW_CONFIDENCE = "low_confidence" @dataclassclass PredictionResult: rating: float confidence: float n_neighbors: int fallback: FallbackReason class RobustPredictor: """ Neighborhood predictor with comprehensive fallback handling. """ def __init__( self, min_neighbors: int = 3, min_avg_similarity: float = 0.1, cold_user_threshold: int = 5, ): self.min_neighbors = min_neighbors self.min_avg_similarity = min_avg_similarity self.cold_user_threshold = cold_user_threshold def predict_with_fallback( self, user_id: int, item_id: int, neighbors: List[Tuple[int, float, float]], # (id, sim, rating) user_mean: Optional[float], item_mean: Optional[float], global_mean: float, user_rating_count: int, item_rating_count: int, ) -> PredictionResult: """ Make prediction with automatic fallback handling. """ # Check for cold start if user_rating_count < self.cold_user_threshold: # Cold user: use item/global mean if item_mean is not None: return PredictionResult( rating=item_mean, confidence=0.1, n_neighbors=0, fallback=FallbackReason.COLD_USER ) return PredictionResult( rating=global_mean, confidence=0.05, n_neighbors=0, fallback=FallbackReason.COLD_USER ) if item_rating_count == 0: # Cold item: use user/global mean if user_mean is not None: return PredictionResult( rating=user_mean, confidence=0.1, n_neighbors=0, fallback=FallbackReason.COLD_ITEM ) return PredictionResult( rating=global_mean, confidence=0.05, n_neighbors=0, fallback=FallbackReason.COLD_ITEM ) # Filter to positive similarity valid_neighbors = [(i, s, r) for i, s, r in neighbors if s > 0] if len(valid_neighbors) < self.min_neighbors: # Insufficient neighbors if user_mean is not None: return PredictionResult( rating=user_mean, confidence=0.2, n_neighbors=len(valid_neighbors), fallback=FallbackReason.NO_NEIGHBORS ) return PredictionResult( rating=global_mean, confidence=0.1, n_neighbors=0, fallback=FallbackReason.NO_NEIGHBORS ) # Compute prediction avg_sim = sum(s for _, s, _ in valid_neighbors) / len(valid_neighbors) if avg_sim < self.min_avg_similarity: # Low confidence prediction # Still compute but flag it fallback = FallbackReason.LOW_CONFIDENCE else: fallback = FallbackReason.NONE # Mean-centered weighted average numerator = sum(s * (r - user_mean) for _, s, r in valid_neighbors) denominator = sum(abs(s) for _, s, _ in valid_neighbors) if denominator == 0: prediction = user_mean else: prediction = user_mean + numerator / denominator # Clamp to valid range prediction = max(1.0, min(5.0, prediction)) # Confidence based on neighbors and similarity confidence = min(1.0, (len(valid_neighbors) / 50) * avg_sim) return PredictionResult( rating=prediction, confidence=confidence, n_neighbors=len(valid_neighbors), fallback=fallback ) # Usage examplepredictor = RobustPredictor() # Scenario 1: Normal predictionresult = predictor.predict_with_fallback( user_id=1, item_id=10, neighbors=[(2, 0.8, 4.0), (3, 0.6, 5.0), (4, 0.5, 4.0)], user_mean=3.5, item_mean=4.2, global_mean=3.7, user_rating_count=50, item_rating_count=100)print(f"Normal: rating={result.rating:.2f}, conf={result.confidence:.2f}, fallback={result.fallback.value}") # Scenario 2: Cold userresult = predictor.predict_with_fallback( user_id=1, item_id=10, neighbors=[], user_mean=None, item_mean=4.2, global_mean=3.7, user_rating_count=2, item_rating_count=100)print(f"Cold user: rating={result.rating:.2f}, conf={result.confidence:.2f}, fallback={result.fallback.value}")For production systems serving millions of users, neighborhood computation must be highly optimized.
Online vs Offline Components:
The key insight is to separate what must be computed online from what can be precomputed offline:
| Component | Online/Offline | Frequency |
|---|---|---|
| User-user similarities | Offline (user-based) | Daily/Weekly |
| Item-item similarities | Offline (item-based) | Daily/Weekly |
| User mean ratings | Offline | Hourly/Daily |
| Top-k neighbors per user | Offline | Daily |
| Prediction aggregation | Online | Per request |
Memory Footprint:
With 1M users and k=50 neighbors:
With 10M items and k=100:
Incremental Updates:
Rather than recomputing all similarities daily:
This reduces daily computation by 10-100x in practice.
With proper precomputation, online prediction should take <10ms for a single user-item rating and <100ms to generate top-100 recommendations for a user. Item-based CF with precomputed neighbors typically achieves <1ms per prediction.
Let's synthesize everything into a complete workflow for production neighborhood-based CF.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
"""Complete Neighborhood-based Collaborative Filtering System This module implements a production-ready CF system with:- Offline: Similarity computation and neighbor precomputation- Online: Fast prediction and recommendation generation""" import numpy as npfrom typing import Dict, List, Tuple, Set, Optionalfrom collections import defaultdictfrom dataclasses import dataclassimport pickle @dataclassclass NeighborData: """Precomputed neighbor information.""" neighbors: List[Tuple[int, float]] # [(id, similarity), ...] mean: float std: float rating_count: int class NeighborhoodCF: """ Complete neighborhood-based CF with offline/online separation. """ def __init__( self, method: str = "item", # "item" or "user" k_neighbors: int = 50, min_overlap: int = 3, min_similarity: float = 0.1, case_amplification: float = 1.0, significance_weighting: bool = True, significance_threshold: int = 50, ): self.method = method self.k = k_neighbors self.min_overlap = min_overlap self.min_similarity = min_similarity self.case_amp = case_amplification self.sig_weight = significance_weighting self.sig_threshold = significance_threshold # Data structures self.user_ratings: Dict[int, Dict[int, float]] = defaultdict(dict) self.item_ratings: Dict[int, Dict[int, float]] = defaultdict(dict) self.user_data: Dict[int, NeighborData] = {} self.item_data: Dict[int, NeighborData] = {} self.global_mean: float = 3.0 # ================== # OFFLINE PHASE # ================== def fit(self, ratings: List[Tuple[int, int, float]]) -> 'NeighborhoodCF': """ Fit model: compute all similarities and neighbors offline. """ print("=== OFFLINE PHASE ===") # Build indices print("Building indices...") for user_id, item_id, rating in ratings: self.user_ratings[user_id][item_id] = rating self.item_ratings[item_id][user_id] = rating # Compute global mean self.global_mean = np.mean([r for u, i, r in ratings]) # Compute user statistics print("Computing user statistics...") for user_id, ratings_dict in self.user_ratings.items(): values = list(ratings_dict.values()) self.user_data[user_id] = NeighborData( neighbors=[], mean=np.mean(values), std=np.std(values) if len(values) > 1 else 1.0, rating_count=len(values) ) # Compute item statistics print("Computing item statistics...") for item_id, ratings_dict in self.item_ratings.items(): values = list(ratings_dict.values()) self.item_data[item_id] = NeighborData( neighbors=[], mean=np.mean(values), std=np.std(values) if len(values) > 1 else 1.0, rating_count=len(values) ) # Compute neighbors based on method if self.method == "item": print("Computing item-item similarities...") self._compute_item_neighbors() else: print("Computing user-user similarities...") self._compute_user_neighbors() print("=== OFFLINE COMPLETE ===\n") return self def _adjusted_cosine_items(self, item_i: int, item_j: int) -> Optional[float]: """Adjusted cosine similarity for items.""" users_i = set(self.item_ratings[item_i].keys()) users_j = set(self.item_ratings[item_j].keys()) common = users_i & users_j if len(common) < self.min_overlap: return None numerator = 0.0 sum_sq_i = 0.0 sum_sq_j = 0.0 for user in common: mean_u = self.user_data[user].mean adj_i = self.item_ratings[item_i][user] - mean_u adj_j = self.item_ratings[item_j][user] - mean_u numerator += adj_i * adj_j sum_sq_i += adj_i ** 2 sum_sq_j += adj_j ** 2 denom = np.sqrt(sum_sq_i) * np.sqrt(sum_sq_j) if denom == 0: return 0.0 sim = numerator / denom # Significance weighting if self.sig_weight: sim *= min(len(common), self.sig_threshold) / self.sig_threshold return sim def _compute_item_neighbors(self): """Compute and store top-k neighbors for each item.""" items = list(self.item_ratings.keys()) for idx, item_i in enumerate(items): if (idx + 1) % 500 == 0: print(f" Processed {idx + 1}/{len(items)} items") similarities = [] for item_j in items: if item_i != item_j: sim = self._adjusted_cosine_items(item_i, item_j) if sim is not None and sim >= self.min_similarity: similarities.append((item_j, sim)) # Sort and take top-k similarities.sort(key=lambda x: x[1], reverse=True) self.item_data[item_i].neighbors = similarities[:self.k] def _compute_user_neighbors(self): """Compute and store top-k neighbors for each user.""" users = list(self.user_ratings.keys()) for idx, user_u in enumerate(users): if (idx + 1) % 500 == 0: print(f" Processed {idx + 1}/{len(users)} users") similarities = [] mean_u = self.user_data[user_u].mean for user_v in users: if user_u != user_v: # Pearson correlation common = set(self.user_ratings[user_u].keys()) & set(self.user_ratings[user_v].keys()) if len(common) >= self.min_overlap: mean_v = self.user_data[user_v].mean num = sum( (self.user_ratings[user_u][i] - mean_u) * (self.user_ratings[user_v][i] - mean_v) for i in common ) denom = ( np.sqrt(sum((self.user_ratings[user_u][i] - mean_u)**2 for i in common)) * np.sqrt(sum((self.user_ratings[user_v][i] - mean_v)**2 for i in common)) ) if denom > 0: sim = num / denom if self.sig_weight: sim *= min(len(common), self.sig_threshold) / self.sig_threshold if sim >= self.min_similarity: similarities.append((user_v, sim)) similarities.sort(key=lambda x: x[1], reverse=True) self.user_data[user_u].neighbors = similarities[:self.k] # ================== # ONLINE PHASE # ================== def predict(self, user_id: int, item_id: int) -> float: """ Predict rating (online - must be fast). """ if self.method == "item": return self._predict_item_based(user_id, item_id) else: return self._predict_user_based(user_id, item_id) def _predict_item_based(self, user_id: int, item_id: int) -> float: """Item-based prediction.""" # Fallbacks if user_id not in self.user_data: return self.global_mean if item_id not in self.item_data: return self.user_data[user_id].mean user_mean = self.user_data[user_id].mean user_items = self.user_ratings[user_id] item_neighbors = self.item_data[item_id].neighbors numerator = 0.0 denominator = 0.0 for neighbor_id, sim in item_neighbors: if neighbor_id in user_items: weight = sim ** self.case_amp if self.case_amp != 1.0 else sim deviation = user_items[neighbor_id] - user_mean numerator += weight * deviation denominator += abs(weight) if denominator == 0: return user_mean prediction = user_mean + numerator / denominator return float(np.clip(prediction, 1.0, 5.0)) def _predict_user_based(self, user_id: int, item_id: int) -> float: """User-based prediction.""" if user_id not in self.user_data: return self.global_mean user_mean = self.user_data[user_id].mean user_neighbors = self.user_data[user_id].neighbors numerator = 0.0 denominator = 0.0 for neighbor_id, sim in user_neighbors: if item_id in self.user_ratings[neighbor_id]: neighbor_rating = self.user_ratings[neighbor_id][item_id] neighbor_mean = self.user_data[neighbor_id].mean weight = sim ** self.case_amp if self.case_amp != 1.0 else sim deviation = neighbor_rating - neighbor_mean numerator += weight * deviation denominator += abs(weight) if denominator == 0: return user_mean prediction = user_mean + numerator / denominator return float(np.clip(prediction, 1.0, 5.0)) def recommend( self, user_id: int, n: int = 10, exclude_rated: bool = True ) -> List[Tuple[int, float]]: """ Generate top-N recommendations for a user. """ if user_id not in self.user_data: # Cold user: return popular items popular = sorted( self.item_data.items(), key=lambda x: x[1].rating_count, reverse=True )[:n] return [(item_id, self.global_mean) for item_id, _ in popular] rated_items = set(self.user_ratings[user_id].keys()) if exclude_rated else set() candidates = set(self.item_data.keys()) - rated_items predictions = [ (item_id, self.predict(user_id, item_id)) for item_id in candidates ] predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n] def save(self, filepath: str): """Save model to disk.""" with open(filepath, 'wb') as f: pickle.dump(self, f) @classmethod def load(cls, filepath: str) -> 'NeighborhoodCF': """Load model from disk.""" with open(filepath, 'rb') as f: return pickle.load(f) # Demoif __name__ == "__main__": # Generate sample data np.random.seed(42) ratings = [] for user in range(100): for item in range(50): if np.random.random() < 0.2: # 20% density rating = np.random.randint(1, 6) ratings.append((user, item, float(rating))) # Train model cf = NeighborhoodCF(method="item", k_neighbors=20) cf.fit(ratings) # Make predictions print("Sample predictions:") for user, item in [(0, 10), (5, 20), (10, 30)]: pred = cf.predict(user, item) print(f" User {user}, Item {item}: {pred:.2f}") # Get recommendations print("\nTop 5 recommendations for User 0:") for item_id, score in cf.recommend(0, n=5): print(f" Item {item_id}: {score:.2f}")What's Next:
Neighborhood methods provide excellent recommendations but face scalability challenges. The next page covers scalability in collaborative filtering—distributed computation, approximate methods, and the architectural patterns that enable CF at the scale of Netflix and Amazon.
You now understand neighborhood methods comprehensively—from selection strategies and aggregation schemes to edge case handling and production optimization. This knowledge enables building robust, efficient CF systems that serve real-world recommendation needs.