Loading learning content...
Consider this scenario: Your e-commerce platform is running a flash sale, and you need to notify 10 million users that the sale has started. You have 5 minutes to reach everyone before the highest-demand items sell out. If you send notifications one at a time at 100 per second, it would take 27 hours—far too late. But if you send all 10 million simultaneously, you'll overwhelm your infrastructure, hit platform rate limits, and potentially crash your own systems under the load of users flooding your app.
This is the batching and throttling challenge: how do you send massive volumes of notifications quickly while respecting system constraints?
The answer involves sophisticated orchestration—batching notifications for efficiency, throttling to respect rate limits, prioritizing for fairness, and distributing load across infrastructure. These techniques separate amateur push notification systems from production-grade platforms handling billions of messages.
Large platforms send staggering volumes: Twitter/X delivers billions of notifications daily. Gaming platforms send millions during live events. E-commerce sites send hundreds of millions during Black Friday. All of this requires careful batching and throttling strategies.
Before designing batching strategies, you must understand the constraints imposed by push platforms. Both explicit rate limits and implicit throttling mechanisms affect your maximum throughput.
APNs Rate Limits:
Apple doesn't publish explicit rate limits, but empirical observations indicate:
FCM Rate Limits:
Google publishes clearer limits:
Web Push Rate Limits:
Each browser push service has its own limits:
| Platform | Documented Limit | Practical Guidance |
|---|---|---|
| APNs | Not published | Monitor for 429 errors; reduce rate if occurring |
| FCM | ~240/min per device | Use batch API; monitor quota in console |
| Web Push (Chrome) | Same as FCM | Respect Retry-After headers on 429 |
| Web Push (Firefox) | Not published | Conservative rate limiting recommended |
Exceeding rate limits doesn't just cause immediate request failures. Platforms may impose temporary 'soft bans' that throttle your throughput for minutes or hours. Chronic violation can lead to more severe consequences. Always implement rate limiting client-side rather than relying on platform rejection.
Batching groups multiple notifications together for more efficient processing. Different batching strategies apply at different layers of your system:
API-Level Batching:
Some push platforms support sending to multiple recipients in a single API call:
FCM Batch API:
APNs HTTP/2 Multiplexing:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
# FCM batch sending implementation from firebase_admin import messagingimport asynciofrom typing import List, Dict class FCMBatchSender: """ Efficient batch sender for FCM notifications. Key optimizations: 1. Group tokens into batches of 500 (FCM limit) 2. Send batches in parallel (respecting connection limits) 3. Process results and handle errors per-token """ BATCH_SIZE = 500 # FCM maximum MAX_CONCURRENT_BATCHES = 10 # Limit parallel requests def __init__(self, token_service, metrics): self.tokens = token_service self.metrics = metrics self.semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_BATCHES) async def send_to_segment( self, segment_query: Dict, notification: messaging.Notification, data: Dict[str, str], android_config: messaging.AndroidConfig = None ) -> BatchSendResult: """ Send notification to all tokens matching segment query. Handles: - Streaming tokens from database (memory efficient) - Batching into groups of 500 - Parallel batch sending with concurrency control - Per-token result processing """ total_sent = 0 total_success = 0 total_failed = 0 invalid_tokens = [] # Stream tokens in batches from database async for token_batch in self.tokens.stream_by_segment( segment_query, batch_size=self.BATCH_SIZE ): # Send batch with concurrency control async with self.semaphore: result = await self._send_batch( tokens=token_batch, notification=notification, data=data, android_config=android_config ) total_sent += result.total total_success += result.success_count total_failed += result.failure_count invalid_tokens.extend(result.invalid_tokens) # Clean up invalid tokens if invalid_tokens: await self.tokens.invalidate_batch(invalid_tokens) return BatchSendResult( total_sent=total_sent, success_count=total_success, failure_count=total_failed, invalid_token_count=len(invalid_tokens) ) async def _send_batch( self, tokens: List[str], notification: messaging.Notification, data: Dict[str, str], android_config: messaging.AndroidConfig ) -> SingleBatchResult: """Send a single batch of up to 500 tokens.""" # Construct multicast message message = messaging.MulticastMessage( tokens=tokens, notification=notification, data=data, android=android_config ) start_time = time.time() try: # Send batch response = messaging.send_multicast(message) duration_ms = (time.time() - start_time) * 1000 self.metrics.record_batch_send( batch_size=len(tokens), success_count=response.success_count, failure_count=response.failure_count, duration_ms=duration_ms ) # Process individual results invalid_tokens = [] for idx, send_response in enumerate(response.responses): if not send_response.success: error = send_response.exception if self._is_token_invalid_error(error): invalid_tokens.append(tokens[idx]) return SingleBatchResult( total=len(tokens), success_count=response.success_count, failure_count=response.failure_count, invalid_tokens=invalid_tokens ) except Exception as e: self.metrics.record_batch_error(str(e)) raise def _is_token_invalid_error(self, error) -> bool: """Check if error indicates invalid token.""" if error is None: return False error_code = getattr(error, 'code', '') return error_code in ('UNREGISTERED', 'INVALID_ARGUMENT') # Parallel batch orchestration for massive sendsclass MassNotificationOrchestrator: """ Orchestrates sending millions of notifications efficiently. Architecture: 1. Producer: Fetches target tokens and enqueues batches 2. Workers: Multiple workers consume and send batches 3. Results: Aggregate results for reporting """ def __init__(self, queue, sender, num_workers=20): self.queue = queue self.sender = sender self.num_workers = num_workers async def send_campaign( self, campaign_id: str, segment_query: Dict, notification: Dict ) -> CampaignResult: """ Execute a notification campaign to a large segment. """ # Create workers workers = [ asyncio.create_task(self._worker(campaign_id)) for _ in range(self.num_workers) ] # Producer: enqueue batches batch_count = 0 async for token_batch in self.tokens.stream_by_segment(segment_query): await self.queue.enqueue({ 'campaign_id': campaign_id, 'tokens': token_batch, 'notification': notification }) batch_count += 1 # Signal workers to stop (poison pills) for _ in range(self.num_workers): await self.queue.enqueue(None) # Wait for workers to complete results = await asyncio.gather(*workers) return self._aggregate_results(results) async def _worker(self, campaign_id: str) -> WorkerResult: """Worker that consumes and sends batches.""" total = 0 success = 0 while True: batch = await self.queue.dequeue() if batch is None: # Poison pill break result = await self.sender.send_batch( batch['tokens'], batch['notification'] ) total += result.total success += result.success_count return WorkerResult(total=total, success=success)Application-Level Batching:
Beyond API batching, application-level batching aggregates notification triggers before processing:
This reduces database queries (one lookup for 100 users vs. 100 separate queries) and enables deduplication (don't send 5 similar notifications in 5 seconds).
Throttling limits the rate at which notifications are sent. This protects both your infrastructure and the receiving platforms. Several throttling strategies are commonly used:
Rate Limiting Algorithms:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
# Rate limiter implementations for push notifications import timeimport asynciofrom abc import ABC, abstractmethod class RateLimiter(ABC): """Abstract base for rate limiters.""" @abstractmethod async def acquire(self, count: int = 1) -> float: """ Acquire permission to send 'count' notifications. Returns seconds waited (0 if no wait needed). """ pass class TokenBucketLimiter(RateLimiter): """ Token bucket rate limiter. Pros: Allows controlled bursts, smooth limiting Cons: More complex than fixed window Good for: Push notification sending where you want to allow short bursts but maintain average rate. """ def __init__( self, rate: float, # Tokens per second bucket_size: int, # Maximum burst capacity ): self.rate = rate self.bucket_size = bucket_size self.tokens = bucket_size # Start full self.last_refill = time.monotonic() self.lock = asyncio.Lock() async def acquire(self, count: int = 1) -> float: async with self.lock: total_wait = 0.0 while True: # Refill tokens based on elapsed time now = time.monotonic() elapsed = now - self.last_refill self.tokens = min( self.bucket_size, self.tokens + elapsed * self.rate ) self.last_refill = now if self.tokens >= count: self.tokens -= count return total_wait # Need to wait for tokens to refill tokens_needed = count - self.tokens wait_time = tokens_needed / self.rate total_wait += wait_time await asyncio.sleep(wait_time) class SlidingWindowLimiter(RateLimiter): """ Sliding window rate limiter. Tracks request counts in sub-windows for smoother limiting than fixed windows. """ def __init__( self, rate: int, # Requests per window window_seconds: int, # Window size precision: int = 10 # Sub-windows for smoothing ): self.rate = rate self.window_seconds = window_seconds self.precision = precision self.granularity = window_seconds / precision self.buckets = [0] * precision self.last_bucket_time = time.monotonic() self.current_bucket = 0 self.lock = asyncio.Lock() async def acquire(self, count: int = 1) -> float: async with self.lock: self._advance_buckets() current_count = sum(self.buckets) if current_count + count <= self.rate: self.buckets[self.current_bucket] += count return 0.0 # Need to wait - calculate wait time wait_time = self.granularity await asyncio.sleep(wait_time) return wait_time + await self.acquire(count) def _advance_buckets(self): """Advance bucket position based on elapsed time.""" now = time.monotonic() elapsed = now - self.last_bucket_time buckets_to_advance = int(elapsed / self.granularity) if buckets_to_advance > 0: # Clear old buckets for i in range(min(buckets_to_advance, self.precision)): self.current_bucket = (self.current_bucket + 1) % self.precision self.buckets[self.current_bucket] = 0 self.last_bucket_time = now class AdaptiveRateLimiter(RateLimiter): """ Adaptive rate limiter that adjusts based on platform feedback. Reduces rate when seeing 429 errors, increases when successful. This is particularly useful for platforms without published limits. """ def __init__( self, initial_rate: float, min_rate: float, max_rate: float ): self.current_rate = initial_rate self.min_rate = min_rate self.max_rate = max_rate self.bucket = TokenBucketLimiter( rate=initial_rate, bucket_size=int(initial_rate) # 1 second of burst ) self.success_streak = 0 self.lock = asyncio.Lock() async def acquire(self, count: int = 1) -> float: return await self.bucket.acquire(count) async def record_success(self): """Call after successful send.""" async with self.lock: self.success_streak += 1 # Gradually increase rate after sustained success if self.success_streak >= 100: new_rate = min( self.max_rate, self.current_rate * 1.1 # 10% increase ) await self._update_rate(new_rate) self.success_streak = 0 async def record_rate_limit_error(self): """Call when receiving 429 Too Many Requests.""" async with self.lock: self.success_streak = 0 # Immediately reduce rate new_rate = max( self.min_rate, self.current_rate * 0.5 # 50% reduction ) await self._update_rate(new_rate) async def _update_rate(self, new_rate: float): self.current_rate = new_rate self.bucket = TokenBucketLimiter( rate=new_rate, bucket_size=int(new_rate) )In multi-instance deployments, rate limits must be coordinated across instances. Use Redis with atomic operations (INCR, EXPIRE) or dedicated rate limiting services. Each instance sharing a Redis-backed token bucket ensures global rate compliance.
Not all notifications are equally important. When you have limited throughput capacity, you must decide which notifications to send first. Priority queuing ensures critical messages aren't blocked by lower-priority bulk sends.
Priority Levels:
Define clear priority tiers for your notification types:
| Priority | Examples | Characteristics |
|---|---|---|
| Critical (P0) | Security alerts, 2FA codes, order confirmations | Skip queue, send immediately, high platform priority |
| High (P1) | New messages, real-time updates, driver assignments | Priority queue, short TTL, sent before P2/P3 |
| Normal (P2) | Activity updates, reminders, recommendations | Standard queue, moderate TTL, delayed during high load |
| Low (P3) | Marketing campaigns, digests, non-urgent updates | Best-effort, aggressive batching, rate limited separately |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
# Priority-based notification queue system from enum import IntEnumimport heapqimport asynciofrom dataclasses import dataclass, fieldfrom typing import Any class Priority(IntEnum): CRITICAL = 0 # Highest priority (lowest number) HIGH = 1 NORMAL = 2 LOW = 3 @dataclass(order=True)class PriorityNotification: """ Notification wrapper for priority queue ordering. Priority ordering: 1. Priority level (P0 before P1) 2. Timestamp (earlier before later within same priority) """ priority: Priority timestamp: float notification: Any = field(compare=False) class PriorityNotificationQueue: """ Multi-priority notification queue with separate rate limits per priority. Architecture: - Separate queues per priority level - Workers preferentially drain higher-priority queues - Starvation prevention for lower priorities """ def __init__(self): self.queues = { Priority.CRITICAL: asyncio.PriorityQueue(), Priority.HIGH: asyncio.PriorityQueue(), Priority.NORMAL: asyncio.PriorityQueue(), Priority.LOW: asyncio.PriorityQueue(), } # Rate limits per priority (notifications per second) self.rate_limits = { Priority.CRITICAL: None, # Unlimited Priority.HIGH: 1000, Priority.NORMAL: 500, Priority.LOW: 100, } # Starvation prevention: minimum service rate for each priority self.min_service_rate = { Priority.CRITICAL: 1.0, # Always process if available Priority.HIGH: 0.8, # 80% of capacity Priority.NORMAL: 0.5, # 50% of capacity Priority.LOW: 0.1, # 10% of capacity minimum } async def enqueue( self, notification: dict, priority: Priority = Priority.NORMAL ): """Add notification to appropriate priority queue.""" item = PriorityNotification( priority=priority, timestamp=time.time(), notification=notification ) await self.queues[priority].put(item) async def dequeue(self) -> PriorityNotification | None: """ Get next notification respecting priority order. Implements weighted fair queuing to prevent starvation: - Higher priorities served more frequently - Lower priorities guaranteed minimum service """ # Check queues in priority order for priority in Priority: queue = self.queues[priority] if not queue.empty(): # For critical, always dequeue immediately if priority == Priority.CRITICAL: return await queue.get() # For others, apply weighted selection if self._should_service(priority): return await queue.get() # All queues empty or weighted selection chose to wait # Fall back to any available work for priority in Priority: if not self.queues[priority].empty(): return await self.queues[priority].get() return None def _should_service(self, priority: Priority) -> bool: """ Determine if this priority should be serviced based on weighted fair queuing to prevent starvation. """ # Implementation of weighted scheduling # Could use token bucket per priority, or random weighted selection import random return random.random() < (1.0 / (priority + 1)) def get_queue_depths(self) -> dict: """Get current queue depths for monitoring.""" return { priority.name: self.queues[priority].qsize() for priority in Priority } class PriorityAwareWorkerPool: """ Worker pool that processes notifications respecting priorities. """ def __init__( self, queue: PriorityNotificationQueue, sender, num_workers: int = 10 ): self.queue = queue self.sender = sender self.num_workers = num_workers self.workers = [] self.running = False async def start(self): """Start all workers.""" self.running = True self.workers = [ asyncio.create_task(self._worker(i)) for i in range(self.num_workers) ] async def stop(self): """Gracefully stop all workers.""" self.running = False await asyncio.gather(*self.workers, return_exceptions=True) async def _worker(self, worker_id: int): """Individual worker processing loop.""" while self.running: item = await self.queue.dequeue() if item is None: await asyncio.sleep(0.1) # Brief pause when empty continue try: await self.sender.send( item.notification, platform_priority='high' if item.priority <= Priority.HIGH else 'normal' ) except Exception as e: # Log and continue - don't let one failure stop worker logger.error(f"Worker {worker_id} send error: {e}")Beware of priority inflation—teams marking everything as 'critical.' Establish clear criteria for each priority level and enforce them. If everything is critical, nothing is. Regular audits of notification priorities help maintain system effectiveness.
Beyond system-level throttling, you should implement user-level throttling to prevent notification fatigue. Bombarding users with too many notifications leads to permission revocation and app uninstalls.
User Experience Throttling:
Per-User Rate Limits
Time-Based Throttling
Content Deduplication
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
# User-level notification throttling from datetime import datetime, timedeltaimport pytz class UserNotificationThrottler: """ Throttles notifications per user to prevent fatigue. Rules: 1. Maximum notifications per hour/day 2. Quiet hours based on user timezone 3. Deduplication of similar notifications 4. Category-specific limits """ DEFAULT_LIMITS = { 'hourly_max': 10, 'daily_max': 50, 'quiet_hours_start': 22, # 10 PM 'quiet_hours_end': 8, # 8 AM } CATEGORY_LIMITS = { 'transactional': {'hourly_max': 20, 'daily_max': 100}, # Higher for transactions 'messaging': {'hourly_max': 50, 'daily_max': 200}, # Chat apps need more 'marketing': {'hourly_max': 2, 'daily_max': 5}, # Very restrictive 'social': {'hourly_max': 10, 'daily_max': 30}, } def __init__(self, cache, user_preferences): self.cache = cache # Redis or similar self.preferences = user_preferences async def should_send( self, user_id: str, notification_type: str, category: str = 'default', content_hash: str = None ) -> ThrottleDecision: """ Determine if notification should be sent to user. Returns: ThrottleDecision with: - allowed: bool - reason: str if not allowed - defer_until: datetime if should be deferred """ # Get user preferences user_prefs = await self.preferences.get(user_id) # Check 1: Deduplication if content_hash: if await self._is_duplicate(user_id, content_hash): return ThrottleDecision( allowed=False, reason="duplicate_notification", details="Similar notification sent recently" ) # Check 2: Quiet hours quiet_check = await self._check_quiet_hours(user_id, user_prefs) if not quiet_check.allowed: return quiet_check # Check 3: Rate limits limits = self._get_limits(category, user_prefs) # Check hourly limit hourly_key = f"notif_count:{user_id}:hourly:{self._hour_bucket()}" hourly_count = await self.cache.get(hourly_key) or 0 if hourly_count >= limits['hourly_max']: return ThrottleDecision( allowed=False, reason="hourly_limit_exceeded", details=f"Sent {hourly_count}/{limits['hourly_max']} this hour" ) # Check daily limit daily_key = f"notif_count:{user_id}:daily:{self._day_bucket()}" daily_count = await self.cache.get(daily_key) or 0 if daily_count >= limits['daily_max']: return ThrottleDecision( allowed=False, reason="daily_limit_exceeded", details=f"Sent {daily_count}/{limits['daily_max']} today" ) # All checks passed - record this notification await self._record_notification(user_id, content_hash) return ThrottleDecision(allowed=True) async def _check_quiet_hours( self, user_id: str, user_prefs: dict ) -> ThrottleDecision: """Check if current time is within user's quiet hours.""" # Get user timezone tz_name = user_prefs.get('timezone', 'UTC') try: tz = pytz.timezone(tz_name) except: tz = pytz.UTC user_time = datetime.now(tz) hour = user_time.hour quiet_start = user_prefs.get('quiet_hours_start', 22) quiet_end = user_prefs.get('quiet_hours_end', 8) # Handle overnight quiet hours (e.g., 22:00 - 08:00) is_quiet = False if quiet_start > quiet_end: is_quiet = hour >= quiet_start or hour < quiet_end else: is_quiet = quiet_start <= hour < quiet_end if is_quiet: # Calculate when quiet hours end if hour >= quiet_start: # Quiet ends tomorrow quiet_end_time = user_time.replace( hour=quiet_end, minute=0, second=0 ) + timedelta(days=1) else: # Quiet ends today quiet_end_time = user_time.replace( hour=quiet_end, minute=0, second=0 ) return ThrottleDecision( allowed=False, reason="quiet_hours", defer_until=quiet_end_time, details=f"User quiet hours: {quiet_start}:00 - {quiet_end}:00" ) return ThrottleDecision(allowed=True) async def _is_duplicate( self, user_id: str, content_hash: str, window_minutes: int = 60 ) -> bool: """Check if similar notification was sent recently.""" key = f"notif_content:{user_id}:{content_hash}" exists = await self.cache.get(key) return exists is not None async def _record_notification( self, user_id: str, content_hash: str | None ): """Record notification for rate limiting and deduplication.""" # Increment counters hourly_key = f"notif_count:{user_id}:hourly:{self._hour_bucket()}" daily_key = f"notif_count:{user_id}:daily:{self._day_bucket()}" await self.cache.incr(hourly_key) await self.cache.expire(hourly_key, 3600) # 1 hour TTL await self.cache.incr(daily_key) await self.cache.expire(daily_key, 86400) # 24 hour TTL # Record content hash for deduplication if content_hash: content_key = f"notif_content:{user_id}:{content_hash}" await self.cache.set(content_key, 1, ex=3600) # 1 hour dedup windowInstead of sending 10 'new follower' notifications, send one '10 new followers' notification. This dramatically improves user experience while maintaining engagement. Build aggregation logic for common notification patterns.
Backpressure occurs when the rate of incoming notification requests exceeds your system's processing capacity. Without proper handling, queues grow unbounded, memory exhausts, and systems crash. Effective backpressure management is essential for system stability.
Backpressure Signals:
Backpressure Strategies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
# Backpressure handling for notification systems class BackpressureController: """ Monitors system health and applies backpressure when needed. """ # Thresholds for backpressure actions QUEUE_WARNING_THRESHOLD = 10000 QUEUE_CRITICAL_THRESHOLD = 50000 QUEUE_SHED_THRESHOLD = 100000 LATENCY_WARNING_MS = 1000 LATENCY_CRITICAL_MS = 5000 def __init__(self, queue, metrics, scaler): self.queue = queue self.metrics = metrics self.scaler = scaler self.current_state = BackpressureState.NORMAL async def check_and_apply(self) -> BackpressureState: """ Evaluate system state and apply appropriate backpressure. Called periodically by health check loop. """ queue_depth = self.queue.get_total_depth() avg_latency = await self.metrics.get_avg_send_latency() error_rate = await self.metrics.get_recent_error_rate() # Determine new state new_state = self._evaluate_state(queue_depth, avg_latency, error_rate) if new_state != self.current_state: await self._transition(self.current_state, new_state) self.current_state = new_state return new_state def _evaluate_state( self, queue_depth: int, latency_ms: float, error_rate: float ) -> BackpressureState: """Determine appropriate state based on metrics.""" if queue_depth > self.QUEUE_SHED_THRESHOLD: return BackpressureState.SHEDDING if (queue_depth > self.QUEUE_CRITICAL_THRESHOLD or latency_ms > self.LATENCY_CRITICAL_MS or error_rate > 0.1): return BackpressureState.CRITICAL if (queue_depth > self.QUEUE_WARNING_THRESHOLD or latency_ms > self.LATENCY_WARNING_MS or error_rate > 0.05): return BackpressureState.ELEVATED return BackpressureState.NORMAL async def _transition( self, from_state: BackpressureState, to_state: BackpressureState ): """Handle state transition with appropriate actions.""" logger.warning(f"Backpressure state: {from_state} -> {to_state}") self.metrics.record_state_transition(from_state, to_state) if to_state == BackpressureState.ELEVATED: # Start scaling up workers await self.scaler.scale_up(factor=1.5) elif to_state == BackpressureState.CRITICAL: # Aggressive scaling, start dropping low-priority await self.scaler.scale_up(factor=2.0) await self._enable_priority_filtering(min_priority=Priority.NORMAL) elif to_state == BackpressureState.SHEDDING: # Maximum scaling, only critical notifications await self.scaler.scale_to_max() await self._enable_priority_filtering(min_priority=Priority.HIGH) await self._alert_operations("Load shedding activated") elif to_state == BackpressureState.NORMAL and from_state != BackpressureState.NORMAL: # Recovery - gradually restore await self._disable_priority_filtering() await self.scaler.scale_to_baseline() async def _enable_priority_filtering(self, min_priority: Priority): """Enable dropping of notifications below minimum priority.""" self.queue.set_min_priority(min_priority) logger.warning(f"Priority filtering enabled: min={min_priority}") async def _disable_priority_filtering(self): """Resume accepting all priority levels.""" self.queue.set_min_priority(Priority.LOW) logger.info("Priority filtering disabled") async def should_accept(self, priority: Priority) -> bool: """ Called by API layer to determine if new notification should be accepted. Returns False if notification should be rejected due to backpressure. """ if self.current_state == BackpressureState.NORMAL: return True if self.current_state == BackpressureState.ELEVATED: return True # Accept all but may have delays if self.current_state == BackpressureState.CRITICAL: return priority <= Priority.NORMAL if self.current_state == BackpressureState.SHEDDING: return priority <= Priority.HIGH return TrueAlways return meaningful errors when shedding load. HTTP 429 Too Many Requests with Retry-After header tells clients to back off. Never silently drop critical notifications—if you must reject them, make it explicit so the calling system can handle appropriately.
Batching and throttling parameters require ongoing tuning based on real-world behavior. Effective monitoring is essential for identifying optimization opportunities and detecting problems.
Key Metrics to Monitor:
| Metric | Healthy Range | Action if Abnormal |
|---|---|---|
| Queue Depth | < 10,000 | Scale workers, investigate bottleneck |
| Queue Wait Time | < 30 seconds | Increase throughput, check rate limits |
| Batch Fill Rate | 80% full batches | Adjust batch timeout if too many partial batches |
| Rate Limit Hits (429s) | < 1% | Reduce rate, check platform quotas |
| Worker Utilization | 60-80% | Scale up if > 80%, down if < 40% |
| Shed Notifications | 0 ideally | Investigate if shedding regularly |
Tuning Guidelines:
Batch Size Tuning:
Rate Limit Tuning:
Worker Count Tuning:
Regularly load test your notification system in a staging environment. Simulate campaign sends, viral content notifications, and traffic spikes. Identify breaking points before they occur in production. Document the maximum sustainable throughput for capacity planning.
Batching and throttling transform push notification systems from fragile single-message senders to robust high-throughput platforms. Here are the essential takeaways:
Module Complete:
You've now completed the Push Notifications module. You understand mobile push architecture, the specifics of APNs and FCM, web push protocols, delivery mechanics, and the operational techniques for sending at scale. This knowledge enables you to design and operate push notification systems serving millions of users reliably and efficiently.
Congratulations! You've mastered push notification systems—from foundational architecture through platform-specific implementations to operational excellence at scale. You're now equipped to design, implement, and operate push notification infrastructure for applications serving millions of users.