Loading content...
The previous page introduced three feed generation approaches. Now we dive deep into the mechanics of fanout—the process of distributing content from one source to many destinations. Understanding fanout at a granular level is essential for building reliable, scalable social media systems.
Why This Matters:
Fanout is where abstract design decisions become concrete engineering challenges. A seemingly simple "push tweet to followers" operation involves:
This page equips you with the implementation depth expected of a senior engineer designing production systems.
By the end of this page, you'll understand message queue architectures for fanout, idempotency and failure recovery patterns, edge case handling (deletes, blocks, protected accounts), and performance optimization techniques used in production Twitter-scale systems.
Fanout on write (push-based) requires a robust asynchronous processing pipeline. The tweet API returns quickly to the user while background workers handle the actual fanout. Let's build this system component by component.
The fanout pipeline uses a message queue to decouple tweet creation from distribution:
Each message in the fanout queue contains everything needed to distribute a tweet:
123456789101112131415161718192021222324252627282930313233
// Protocol Buffer definition for fanout messagessyntax = "proto3"; message FanoutMessage { // Unique identifier for idempotency string idempotency_key = 1; // Tweet being distributed string tweet_id = 2; string author_id = 3; int64 created_at_ms = 4; // Fanout metadata FanoutType type = 5; int32 priority = 6; // 1=high (regular), 2=low (celebrity partials) // Follower batch (for partitioned fanout) repeated string follower_ids = 7; int32 batch_number = 8; int32 total_batches = 9; // Retry metadata int32 attempt_count = 10; int64 first_attempt_ms = 11;} enum FanoutType { FANOUT_TYPE_UNKNOWN = 0; FANOUT_TYPE_NEW_TWEET = 1; FANOUT_TYPE_RETWEET = 2; FANOUT_TYPE_DELETE = 3; // Remove from timelines FANOUT_TYPE_ACCOUNT_UPDATE = 4; // Block, mute, etc.}When a user with 100K followers posts, we don't want one worker to handle all 100K operations. Instead, we partition the fanout into batches:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
"""Fanout Partitioner Splits large fanout operations into manageable batches.Each batch becomes a separate message in the queue, enablingparallel processing across multiple workers.""" from dataclasses import dataclassfrom typing import List, Iteratorimport hashlib @dataclassclass FanoutBatch: tweet_id: str author_id: str batch_number: int total_batches: int follower_ids: List[str] idempotency_key: str class FanoutPartitioner: def __init__( self, batch_size: int = 5000, max_workers: int = 100 ): self.batch_size = batch_size self.max_workers = max_workers def partition( self, tweet_id: str, author_id: str, follower_ids: List[str] ) -> Iterator[FanoutBatch]: """ Split follower list into batches for parallel processing. Strategy: - Batch size of 5000 balances overhead vs parallelism - Each batch gets unique idempotency key for safe retries - Batches can be processed by any available worker """ total_followers = len(follower_ids) total_batches = (total_followers + self.batch_size - 1) // self.batch_size for batch_num in range(total_batches): start = batch_num * self.batch_size end = min(start + self.batch_size, total_followers) batch_followers = follower_ids[start:end] # Create deterministic idempotency key idempotency_key = self._generate_idempotency_key( tweet_id, batch_num ) yield FanoutBatch( tweet_id=tweet_id, author_id=author_id, batch_number=batch_num, total_batches=total_batches, follower_ids=batch_followers, idempotency_key=idempotency_key ) def _generate_idempotency_key( self, tweet_id: str, batch_num: int ) -> str: """ Deterministic key enables safe retries. Same tweet + batch always produces same key. """ data = f"{tweet_id}:{batch_num}" return hashlib.sha256(data.encode()).hexdigest()[:32]Workers must be idempotent—processing the same message twice should not corrupt timelines:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
"""Idempotent Fanout Worker Processes fanout batches with exactly-once semantics (at-least-oncedelivery + idempotent processing = effective exactly-once). Key properties:1. Idempotent: Re-processing same batch is safe2. Resumable: Partial failures can be retried3. Observable: All operations are logged and metered""" import asynciofrom typing import Optionalimport logging logger = logging.getLogger(__name__) class FanoutWorker: def __init__( self, redis_cluster, idempotency_cache, metrics_client ): self.redis = redis_cluster self.idem_cache = idempotency_cache self.metrics = metrics_client async def process_batch(self, batch: FanoutBatch) -> bool: """ Process a fanout batch idempotently. Returns True if work was done, False if already processed. """ # Step 1: Check idempotency if await self._already_processed(batch.idempotency_key): logger.info( f"Skipping duplicate batch: {batch.idempotency_key}" ) self.metrics.increment("fanout.duplicate_skipped") return False # Step 2: Perform fanout try: await self._execute_fanout(batch) # Step 3: Mark as processed (after success) await self._mark_processed(batch.idempotency_key) self.metrics.increment( "fanout.batch_processed", tags={"batch_size": len(batch.follower_ids)} ) return True except Exception as e: logger.error( f"Fanout failed for batch {batch.idempotency_key}: {e}" ) self.metrics.increment("fanout.batch_failed") raise # Let queue handle retry async def _execute_fanout(self, batch: FanoutBatch): """ Push tweet to all followers' timelines in this batch. Uses Redis pipeline for efficiency. """ # Redis pipeline for batch efficiency pipe = self.redis.pipeline(transaction=False) for follower_id in batch.follower_ids: timeline_key = f"timeline:{follower_id}" # ZADD with tweet_id as member, timestamp as score pipe.zadd( timeline_key, {batch.tweet_id: batch.created_at_ms} ) # Trim to prevent unbounded growth pipe.zremrangebyrank(timeline_key, 0, -801) # Execute all operations atomically await pipe.execute() async def _already_processed(self, key: str) -> bool: """Check if this batch was already processed.""" return await self.idem_cache.exists(key) async def _mark_processed(self, key: str): """Mark batch as processed with TTL for cleanup.""" # Keep for 24 hours (enough to handle delayed retries) await self.idem_cache.set(key, "1", ex=86400)In distributed systems, messages can be delivered multiple times due to retries, network issues, or worker failures. Without idempotency, a tweet could appear twice in a user's timeline, or worse, each retry could add another copy. Always design for at-least-once delivery with idempotent handling.
Fanout on read (pull-based) shifts complexity to the read path. Instead of background workers, the timeline API itself must efficiently aggregate content from multiple sources. Let's build an optimized implementation.
The key to acceptable read latency is aggressive parallelization:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
"""Parallel Timeline Builder Fetches tweets from all followed accounts concurrently,then merges them into a single timeline. Optimized forlatency through parallelism, caching, and early termination.""" import asynciofrom typing import List, Dict, Optionalfrom dataclasses import dataclassimport heapq @dataclassclass TweetWithSource: tweet_id: str author_id: str created_at: float content: str metrics: Dict class ParallelTimelineBuilder: def __init__( self, tweet_cache, # L1: In-memory user tweet cache tweet_store, # L2: Persistent tweet storage social_graph, concurrency_limit: int = 50, timeout_ms: int = 200 ): self.tweet_cache = tweet_cache self.tweet_store = tweet_store self.social_graph = social_graph self.semaphore = asyncio.Semaphore(concurrency_limit) self.timeout = timeout_ms / 1000 async def build_timeline( self, user_id: str, limit: int = 50 ) -> List[TweetWithSource]: """ Build timeline by fetching from all followed accounts in parallel. Optimization strategies: 1. Parallel fetching with bounded concurrency 2. Per-user tweet caching 3. Timeout-based early termination 4. K-way merge with heap for efficiency """ # Get following list following_ids = await self.social_graph.get_following(user_id) if not following_ids: return [] # Fetch tweets from all sources in parallel tasks = [ self._fetch_user_tweets_with_timeout(uid, limit=20) for uid in following_ids ] # Wait for all with timeout try: results = await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=self.timeout ) except asyncio.TimeoutError: # Return partial results from completed tasks results = [ t.result() if t.done() and not t.exception() else [] for t in tasks ] # Filter out failed fetches all_tweets = [] for result in results: if isinstance(result, list): all_tweets.extend(result) # Merge and sort sorted_tweets = sorted( all_tweets, key=lambda t: t.created_at, reverse=True ) return sorted_tweets[:limit] async def _fetch_user_tweets_with_timeout( self, user_id: str, limit: int ) -> List[TweetWithSource]: """ Fetch a user's recent tweets with concurrency control. """ async with self.semaphore: # Try cache first cached = await self.tweet_cache.get_user_tweets(user_id, limit) if cached: return cached # Fall back to persistent store tweets = await self.tweet_store.get_user_tweets(user_id, limit) # Populate cache for next time await self.tweet_cache.set_user_tweets(user_id, tweets) return tweetsWithout caching, pull-based would be prohibitively expensive. We use a multi-tier caching strategy:
| Tier | Storage | TTL | Hit Rate Target | Purpose |
|---|---|---|---|---|
| L1 - Request | In-memory hash | Request duration | N/A | Dedupe within single timeline build |
| L2 - User Tweets | Local Redis | 5 minutes | 90% | Recent tweets per author |
| L3 - Hot Tweets | Regional Redis | 1 hour | 99% | Viral/trending content |
| L4 - Tweet Store | Distributed DB | Permanent | 100% | Source of truth |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
"""Tiered Tweet Cache Multi-level caching for optimal read performance in pull-based systems.Each tier has different capacity, latency, and TTL characteristics.""" class TieredTweetCache: def __init__( self, local_cache, # L2: Local Redis regional_cache, # L3: Regional Redis cluster tweet_store # L4: Persistent store ): self.local = local_cache self.regional = regional_cache self.store = tweet_store async def get_user_tweets( self, user_id: str, limit: int = 20 ) -> List[Dict]: """ Fetch user's tweets with tiered cache lookup. """ cache_key = f"user_tweets:{user_id}" # L2: Try local cache first (lowest latency) tweets = await self.local.get(cache_key) if tweets: return tweets[:limit] # L3: Try regional cache tweets = await self.regional.get(cache_key) if tweets: # Populate L2 for subsequent requests await self.local.set(cache_key, tweets, ex=300) return tweets[:limit] # L4: Fetch from persistent store tweets = await self.store.get_user_tweets(user_id, limit=50) # Populate caches await self.regional.set(cache_key, tweets, ex=3600) await self.local.set(cache_key, tweets, ex=300) return tweets[:limit] async def get_tweet_by_id(self, tweet_id: str) -> Optional[Dict]: """ Fetch single tweet with hot content optimization. Viral tweets get cached at higher tiers. """ # Check hot tweet cache first (L3) hot_key = f"tweet:hot:{tweet_id}" tweet = await self.regional.get(hot_key) if tweet: return tweet # Standard lookup cache_key = f"tweet:{tweet_id}" tweet = await self.local.get(cache_key) if tweet: return tweet # Fetch from store tweet = await self.store.get_tweet(tweet_id) if tweet: await self.local.set(cache_key, tweet, ex=3600) # If tweet is viral, promote to hot cache if self._is_viral(tweet): await self.regional.set(hot_key, tweet, ex=3600) return tweet def _is_viral(self, tweet: Dict) -> bool: """Detect viral content for special caching.""" metrics = tweet.get('metrics', {}) return ( metrics.get('views', 0) > 100000 or metrics.get('retweets', 0) > 1000 or metrics.get('likes', 0) > 10000 )For pull-based systems, short TTLs (5 minutes) on user tweet caches provide a good balance: fresh enough for most users, cached enough to avoid database overload. Viral content caches can have longer TTLs since the tweets don't change—only their metrics do.
Production systems must handle numerous edge cases that can corrupt data or degrade user experience. Let's examine the most critical ones.
When a user deletes a tweet, it must be removed from all timelines:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
"""Tweet Deletion Handler Deletion is more expensive than creation because:1. Tweet must be removed from author's store2. Tweet must be removed from ALL followers' cached timelines3. Caches across multiple tiers must be invalidated4. This happens synchronously or near-synchronously for user experience""" class TweetDeletionHandler: def __init__( self, tweet_store, timeline_cache, social_graph, cache_tiers, deletion_queue ): self.tweet_store = tweet_store self.timeline_cache = timeline_cache self.social_graph = social_graph self.cache = cache_tiers self.deletion_queue = deletion_queue async def delete_tweet( self, tweet_id: str, author_id: str, hard_delete: bool = False ): """ Delete a tweet and remove from all timelines. Strategy: 1. Mark tweet as deleted (soft delete for audit) 2. Remove from author's tweet list 3. Invalidate caches 4. Queue timeline removal for followers (async) """ # Step 1: Soft delete in primary store await self.tweet_store.mark_deleted(tweet_id) # Step 2: Remove from author's tweet list await self.tweet_store.remove_from_user_timeline( author_id, tweet_id ) # Step 3: Invalidate all cache tiers await self._invalidate_caches(tweet_id, author_id) # Step 4: Queue follower timeline cleanups followers = await self.social_graph.get_followers(author_id) await self._queue_timeline_removals(tweet_id, followers) return {"deleted": True, "cleanup_queued": len(followers)} async def _invalidate_caches(self, tweet_id: str, author_id: str): """Invalidate tweet across all cache tiers.""" await asyncio.gather( self.cache.local.delete(f"tweet:{tweet_id}"), self.cache.regional.delete(f"tweet:hot:{tweet_id}"), self.cache.local.delete(f"user_tweets:{author_id}"), self.cache.regional.delete(f"user_tweets:{author_id}"), ) async def _queue_timeline_removals( self, tweet_id: str, follower_ids: List[str] ): """ Queue timeline cleanup jobs for background processing. Similar to fanout, but removes instead of adds. """ for batch_start in range(0, len(follower_ids), 5000): batch = follower_ids[batch_start:batch_start + 5000] await self.deletion_queue.enqueue({ "type": "TIMELINE_REMOVAL", "tweet_id": tweet_id, "follower_ids": batch })When user A blocks user B, several things must happen:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
"""Block Handler Implements the complex cascade of operations when a user blocks another.Must be atomic where possible, eventually consistent where necessary.""" class BlockHandler: async def block_user(self, blocker_id: str, blocked_id: str): """ Execute block operation with all required cascading effects. """ # Record the block relationship (synchronous, must succeed) await self.relationship_store.create_block(blocker_id, blocked_id) # Remove any existing follow relationships (synchronous) await asyncio.gather( self.social_graph.unfollow(blocker_id, blocked_id), self.social_graph.unfollow(blocked_id, blocker_id) ) # Queue async cleanups await self._queue_timeline_cleanup(blocker_id, blocked_id) await self._queue_timeline_cleanup(blocked_id, blocker_id) # Invalidate caches await self._invalidate_relationship_caches(blocker_id, blocked_id) return {"blocked": True} async def _queue_timeline_cleanup( self, viewer_id: str, author_id: str ): """ Remove all of author's tweets from viewer's timeline. """ # Get author's recent tweet IDs tweets = await self.tweet_store.get_user_tweet_ids( author_id, limit=1000 ) # Queue removal from viewer's timeline await self.cleanup_queue.enqueue({ "type": "BLOCK_CLEANUP", "viewer_id": viewer_id, "tweet_ids": tweets }) def should_filter_tweet( self, viewer_id: str, author_id: str ) -> bool: """ Runtime filter for timeline generation. Called during both push and pull paths. """ # Check block relationship cache return self.relationship_cache.is_blocked(viewer_id, author_id)Protected accounts add approval requirements to the follow flow:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
"""Protected Account Handler Protected accounts require follow approval before their tweetsare visible. This affects both the follow flow and fanout logic.""" class ProtectedAccountHandler: async def request_follow( self, requester_id: str, protected_user_id: str ): """ Request to follow a protected account. Creates pending request, not immediate follow. """ # Verify account is protected user = await self.user_service.get_user(protected_user_id) if not user.is_protected: # Not protected, do immediate follow return await self.social_graph.follow( requester_id, protected_user_id ) # Create pending follow request await self.follow_requests.create( requester_id=requester_id, target_id=protected_user_id, status="PENDING" ) # Notify protected user await self.notifications.send( protected_user_id, type="FOLLOW_REQUEST", from_user=requester_id ) return {"status": "pending", "requires_approval": True} async def approve_follow_request( self, protected_user_id: str, requester_id: str ): """ Approve a pending follow request. Now creates the actual follow relationship. """ # Verify pending request exists request = await self.follow_requests.get( requester_id, protected_user_id ) if not request or request.status != "PENDING": raise ValueError("No pending request found") # Create the follow relationship await self.social_graph.follow(requester_id, protected_user_id) # Mark request as approved await self.follow_requests.update_status( requester_id, protected_user_id, "APPROVED" ) # Backfill requester's timeline with protected user's tweets await self._backfill_timeline(requester_id, protected_user_id) return {"approved": True} async def _backfill_timeline( self, follower_id: str, protected_user_id: str ): """ After approval, add protected user's recent tweets to timeline. """ tweets = await self.tweet_store.get_user_tweets( protected_user_id, limit=100 ) for tweet in tweets: await self.timeline_cache.push_tweet( follower_id, tweet['id'], tweet['created_at'] )Protected accounts still use the same fanout mechanism as public accounts—the difference is only in who can become a follower. Once approved, followers receive tweets through the normal fanout path. The protection layer is at the follow relationship level, not the fanout level.
Production systems employ numerous optimizations to meet latency SLOs. Here are techniques used by Twitter and similar platforms:
When multiple operations target the same timeline, coalesce them:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
"""Update Coalescing When processing high-volume updates, batch operations to the sametimeline together. This reduces Redis round-trips significantly.""" class UpdateCoalescer: def __init__(self, flush_interval_ms: int = 50, max_batch_size: int = 100): self.buffer: Dict[str, List[Tuple]] = {} # timeline_id -> [(tweet_id, ts)] self.flush_interval = flush_interval_ms / 1000 self.max_batch = max_batch_size self._flush_task = None async def add_update( self, timeline_id: str, tweet_id: str, timestamp: float ): """ Buffer an update for potential coalescing. """ if timeline_id not in self.buffer: self.buffer[timeline_id] = [] self.buffer[timeline_id].append((tweet_id, timestamp)) # Flush if batch is full if len(self.buffer[timeline_id]) >= self.max_batch: await self._flush_timeline(timeline_id) # Ensure flush task is scheduled if self._flush_task is None: self._flush_task = asyncio.create_task(self._periodic_flush()) async def _flush_timeline(self, timeline_id: str): """Flush all buffered updates for a timeline.""" updates = self.buffer.pop(timeline_id, []) if not updates: return key = f"timeline:{timeline_id}" # Single ZADD with multiple members members = {tweet_id: ts for tweet_id, ts in updates} pipe = self.redis.pipeline() pipe.zadd(key, members) pipe.zremrangebyrank(key, 0, -801) await pipe.execute() async def _periodic_flush(self): """Flush all buffers periodically.""" while self.buffer: await asyncio.sleep(self.flush_interval) # Snapshot and clear buffer to_flush = dict(self.buffer) self.buffer.clear() # Flush all timelines for timeline_id in to_flush: await self._flush_timeline(timeline_id) self._flush_task = NoneWhen checking if a tweet already exists in a timeline, Bloom filters provide fast negative lookups:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
"""Timeline Bloom Filter Bloom filters for fast "definitely not in timeline" checks.Reduces unnecessary writes during fanout and dedupes reads. False positive rate of 1% is acceptable—we just do an extra check.False negatives are impossible—Bloom filter guarantee.""" from pybloom_live import BloomFilterfrom typing import Optional class TimelineBloomFilter: def __init__( self, expected_items: int = 1000, false_positive_rate: float = 0.01 ): self.filters: Dict[str, BloomFilter] = {} self.expected = expected_items self.fp_rate = false_positive_rate def get_or_create_filter(self, timeline_id: str) -> BloomFilter: if timeline_id not in self.filters: self.filters[timeline_id] = BloomFilter( capacity=self.expected, error_rate=self.fp_rate ) return self.filters[timeline_id] def might_contain(self, timeline_id: str, tweet_id: str) -> bool: """ Returns True if tweet MIGHT be in timeline. Returns False if tweet is DEFINITELY NOT in timeline. """ if timeline_id not in self.filters: return False # No filter = no tweets return tweet_id in self.filters[timeline_id] def add_tweet(self, timeline_id: str, tweet_id: str): """Record that tweet is in timeline.""" filter = self.get_or_create_filter(timeline_id) filter.add(tweet_id) def should_skip_write( self, timeline_id: str, tweet_id: str ) -> bool: """ Optimization: skip write if tweet definitely already present. Returns False (don't skip) if: - Bloom filter says probably present (we should verify) - No filter exists This catches the common case of duplicate fanout messages. """ if self.might_contain(timeline_id, tweet_id): # Possibly exists - conservative: don't skip return False # Definitely doesn't exist - safe to write return TrueProtect the system from cascading failures with adaptive timeouts:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
"""Circuit Breaker for Fanout When downstream systems (Redis, databases) are struggling, the circuitbreaker prevents overwhelming them further—enabling graceful degradation.""" from enum import Enumimport time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if recovered class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 30.0, success_threshold: int = 3 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = 0 async def execute(self, operation, fallback=None): """ Execute operation with circuit breaker protection. """ if self.state == CircuitState.OPEN: # Check if recovery timeout has passed if time.time() - self.last_failure_time > self.recovery_timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: # Still open - use fallback or fail fast if fallback: return await fallback() raise CircuitOpenError("Circuit breaker is open") try: result = await operation() if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: # Recovered - close circuit self.state = CircuitState.CLOSED self.failure_count = 0 return result except Exception as e: self._record_failure() raise def _record_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN # Usage in fanout serviceclass ResilientFanoutService: def __init__(self): self.circuit = CircuitBreaker( failure_threshold=10, recovery_timeout=60 ) async def fanout_with_protection(self, batch): async def do_fanout(): return await self._execute_fanout(batch) async def fallback(): # Queue for retry instead of failing await self.retry_queue.enqueue(batch) return {"queued_for_retry": True} return await self.circuit.execute(do_fanout, fallback)Production systems layer multiple protection mechanisms: circuit breakers prevent cascade failures, rate limiters control throughput, bulkheads isolate components, and timeouts prevent resource exhaustion. Each layer handles a different failure mode.
You cannot optimize what you cannot measure. Fanout systems require comprehensive observability:
| Metric | Type | What It Tells You | Alert Threshold (Example) |
|---|---|---|---|
| fanout_queue_depth | Gauge | Backlog of pending fanouts | 100K messages |
| fanout_latency_p99 | Histogram | Time from tweet to all timelines | 5 seconds |
| fanout_batch_size | Histogram | Followers per fanout operation | Unusual spikes |
| timeline_cache_hit_rate | Counter | Efficiency of cached timelines | < 95% |
| timeline_build_latency_p50 | Histogram | Read path performance | 100ms |
| celebrity_tweet_rate | Counter | High-fanout tweet frequency | 10/minute |
| circuit_breaker_state | Gauge | System health status | OPEN state |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
"""Fanout Metrics Collection Comprehensive instrumentation for fanout operations.Uses dimensional metrics for slicing and dicing.""" from prometheus_client import Counter, Histogram, Gaugeimport functools # Define metricsFANOUT_MESSAGES = Counter( 'fanout_messages_total', 'Total fanout messages processed', ['status', 'author_type'] # success/failure, regular/celebrity) FANOUT_LATENCY = Histogram( 'fanout_latency_seconds', 'Time to complete fanout batch', ['author_type'], buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]) FANOUT_BATCH_SIZE = Histogram( 'fanout_batch_size', 'Number of followers per fanout batch', buckets=[10, 50, 100, 500, 1000, 5000, 10000]) QUEUE_DEPTH = Gauge( 'fanout_queue_depth', 'Current fanout queue depth') TIMELINE_LATENCY = Histogram( 'timeline_build_latency_seconds', 'Time to build user timeline', ['method'], # push/pull/hybrid buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]) def instrument_fanout(func): """Decorator to instrument fanout operations.""" @functools.wraps(func) async def wrapper(self, batch, *args, **kwargs): author_type = 'celebrity' if batch.is_celebrity else 'regular' FANOUT_BATCH_SIZE.observe(len(batch.follower_ids)) with FANOUT_LATENCY.labels(author_type=author_type).time(): try: result = await func(self, batch, *args, **kwargs) FANOUT_MESSAGES.labels( status='success', author_type=author_type ).inc() return result except Exception as e: FANOUT_MESSAGES.labels( status='failure', author_type=author_type ).inc() raise return wrapperMetrics reveal optimization opportunities. If 'celebrity_tweet_rate' correlates with 'fanout_queue_depth' spikes, you know to prioritize hybrid fanout improvements. If 'timeline_cache_hit_rate' drops during certain hours, investigate cache sizing or TTL tuning.
You've now mastered the deep mechanics of fanout strategies. Let's consolidate:
What's Next:
With fanout mechanics understood, the next page explores Tweet Storage and Retrieval—how to efficiently store hundreds of billions of tweets and retrieve them with sub-millisecond latency. We'll cover data modeling, sharding strategies, indexing approaches, and the role of different storage technologies in Twitter's architecture.
You now understand fanout at production depth—message queue architectures, idempotency patterns, edge case handling, and performance optimizations. This knowledge is essential for designing any large-scale content distribution system, from social media feeds to notification systems to activity streams.