Loading learning content...
In a large-scale e-commerce platform, the orders topic might see 100,000 events per second. The fraud detection service only cares about high-value orders (>$500). The European compliance system only needs orders from EU customers. The mobile app notification service only wants orders with express shipping.
Without filtering, each service receives 100,000 events/second, discards 95% of them, and wastes enormous resources on messages it will never use. With filtering, each service receives precisely the subset it needs—5,000 events/second for fraud detection, 15,000 for EU compliance, 8,000 for express notifications.
This page explores the mechanics of message filtering: how to design filterable events, configure subscription filters, implement complex routing logic, and balance the trade-offs between filtering granularity and system complexity.
By the end of this page, you will understand attribute-based filtering, content-based routing, filter expression languages, platform-specific filtering capabilities, and best practices for designing filterable event schemas.
Filtering isn't just an optimization—it's a fundamental design concern that affects system architecture, cost, and maintainability.
| Metric | Without Filtering | With Filtering | Improvement |
|---|---|---|---|
| Messages received | 100,000/sec | 5,000/sec | 95% reduction |
| Network bandwidth | 200 MB/sec | 10 MB/sec | 95% reduction |
| Consumer instances | 50 (to handle load) | 3 (actual work) | 94% fewer instances |
| Monthly cloud cost | $15,000 | $750 | $14,250 savings |
| Consumer code complexity | Filter logic in every handler | Clean business logic only | Simpler, fewer bugs |
Decide on filtering strategy EARLY in system design. Retrofitting filtering onto an existing system is painful—it requires publisher changes, event schema changes, and consumer migrations. Building filterability into events from the start is dramatically easier.
Filtering can occur at different points in the message flow, each with distinct trade-offs:
Publisher-Side Filtering: Publisher sends to different topics based on event attributes.
How it works:
if (order.amount > 500) {
publish('orders.high-value', event);
} else {
publish('orders.standard', event);
}
Pros:
Cons:
When to use:
The most common broker-side filtering approach: messages carry attributes (also called headers or properties) that filters match against. The message payload is not inspected—only attribute key-value pairs.
Why Attributes Instead of Payload?
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
// Designing filterable events with attributes interface FilterableMessage<T> { // Payload: The actual event data (not used for filtering) data: T; // Attributes: Metadata for filtering (indexed, inspectable by broker) attributes: { // Event classification eventType: string; // 'ORDER_CREATED', 'ORDER_SHIPPED', etc. eventVersion: string; // 'v1.2' - for schema versioning // Domain categorization domain: string; // 'orders', 'inventory', 'payments' aggregate: string; // 'order', 'product', 'customer' aggregateId: string; // Specific entity ID // Common filter dimensions region: string; // 'us-east', 'eu-west', 'apac' environment: string; // 'production', 'staging', 'development' priority: string; // 'high', 'normal', 'low' // Business-specific dimensions amountTier?: string; // 'high-value', 'standard', 'micro' customerTier?: string; // 'enterprise', 'premium', 'free' channel?: string; // 'web', 'mobile', 'api' }; // System metadata (not for filtering, but useful) messageId: string; timestamp: string; correlationId?: string;} // Example: Publishing with filter-friendly attributesasync function publishOrderCreated(order: Order): Promise<void> { const message: FilterableMessage<OrderCreatedPayload> = { data: { orderId: order.id, customerId: order.customerId, items: order.items, totalAmount: order.totalAmount, // ... full payload }, attributes: { eventType: 'ORDER_CREATED', eventVersion: 'v2.1', domain: 'orders', aggregate: 'order', aggregateId: order.id, region: order.shippingAddress.region, environment: process.env.ENVIRONMENT || 'production', priority: order.isExpress ? 'high' : 'normal', amountTier: order.totalAmount > 1000 ? 'high-value' : order.totalAmount > 100 ? 'standard' : 'micro', customerTier: order.customer.tier, channel: order.sourceChannel, }, messageId: generateId(), timestamp: new Date().toISOString(), correlationId: order.sessionId, }; await broker.publish('orders', message);}Avoid high-cardinality attributes for filtering (e.g., customerId with millions of values). Filters on high-cardinality attributes create indexing overhead and may not perform well. Use categorical dimensions (region: 5 values, tier: 4 values) rather than unique identifiers.
Different messaging platforms support different filter expression syntaxes. Understanding your platform's capabilities is essential for effective filter design.
Google Pub/Sub Filter Syntax
Google Pub/Sub uses a SQL-like expression language:
-- Equality
attributes.eventType = "ORDER_CREATED"
-- Comparison (strings compared lexicographically)
attributes.priority = "high"
-- Logical AND
attributes.eventType = "ORDER_CREATED" AND attributes.region = "us-east"
-- Logical OR
attributes.eventType = "ORDER_CREATED" OR attributes.eventType = "ORDER_UPDATED"
-- NOT
NOT attributes.environment = "development"
-- Check attribute existence
hasPrefix(attributes.customAttribute, "")
-- Prefix matching
hasPrefix(attributes.region, "us-")
Limitations:
| Feature | Google Pub/Sub | AWS SNS | RabbitMQ | Azure Event Grid |
|---|---|---|---|---|
| Filter on attributes/headers | ✅ Yes | ✅ Yes | ✅ (routing key) | ✅ Yes |
| Filter on payload/body | ❌ No | ✅ Yes (with scope) | ❌ No | ✅ Yes |
| Numeric comparisons | ❌ No | ✅ Yes | ❌ No | ✅ Yes |
| Wildcard/prefix matching | ✅ hasPrefix() | ✅ prefix | ✅ * and # | ✅ BeginsWith/Contains |
| Logical AND | ✅ Yes | ✅ (implicit) | ❌ (routing key only) | ✅ (filter array) |
| Logical OR | ✅ Yes | ✅ (value array) | ✅ (multiple bindings) | ✅ (StringIn) |
| Dynamic filter updates | ✅ Yes | ✅ Yes | ✅ (rebind) | ✅ Yes |
When broker-side attribute filtering isn't sufficient, content-based routing inspects message payloads to make routing decisions. This is typically implemented with a dedicated routing service rather than built-in broker features.
When to Use Content-Based Routing:
Implementation Patterns:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
// Content-Based Router Implementation interface RoutingRule { name: string; condition: (event: any) => boolean; destinations: string[]; priority: number; // Lower = higher priority exclusive: boolean; // If true, stop after this rule matches} class ContentBasedRouter { private rules: RoutingRule[] = []; constructor(rulesConfig: RoutingRule[]) { // Sort by priority this.rules = rulesConfig.sort((a, b) => a.priority - b.priority); } async route(message: Message): Promise<void> { const event = JSON.parse(message.data); const destinations = new Set<string>(); for (const rule of this.rules) { try { if (rule.condition(event)) { rule.destinations.forEach(d => destinations.add(d)); if (rule.exclusive) { break; // Stop evaluating rules } } } catch (error) { // Rule evaluation error - log and continue this.metrics.ruleEvaluationError(rule.name, error); } } // Default destination if no rules matched if (destinations.size === 0) { destinations.add('default-handler'); } // Publish to all matched destinations await Promise.all( Array.from(destinations).map(dest => this.broker.publish(dest, message.data, message.attributes) ) ); await message.ack(); }} // Example routing rulesconst orderRoutingRules: RoutingRule[] = [ { name: 'hazmat-detection', condition: (event) => event.items.some((item: any) => item.categories?.includes('hazardous') ), destinations: ['orders.hazmat'], priority: 10, exclusive: false, // Hazmat orders may also match other rules }, { name: 'vip-customer', condition: (event) => event.customer.tier === 'enterprise' || event.customer.lifetimeValue > 100000, destinations: ['orders.vip', 'orders.analytics.vip'], priority: 20, exclusive: false, }, { name: 'bulk-order', condition: (event) => event.items.length > 10 || event.totalQuantity > 100, destinations: ['orders.bulk'], priority: 30, exclusive: true, // Bulk orders get special handling, skip other rules }, { name: 'express-shipping', condition: (event) => event.shipping.method === 'express' || event.shipping.method === 'same-day', destinations: ['orders.priority'], priority: 40, exclusive: false, },];Content-based routing is a classic Enterprise Integration Pattern (EIP). Tools like Apache Camel, Spring Integration, and MuleSoft provide declarative routing DSLs. For complex routing needs, consider these frameworks rather than building from scratch.
Real-world filtering often requires multiple independent dimensions. A consumer might need events where (region = EU) AND (eventType = ORDER_CREATED) AND (priority = high). Designing for multi-dimensional filtering requires careful planning.
Strategy 1: Compound Filters on Single Topic
One broad topic with filters combining multiple dimensions:
// All orders on one topic
subscribe('orders', {
filter: `
attributes.region = "eu-west" AND
attributes.eventType = "ORDER_CREATED" AND
attributes.priority = "high"
`
});
Pros: Simple topology, flexible filter changes
Cons: All consumers share one topic's throughput limits
Strategy 2: Topic-per-Dimension Hierarchy
Topics segment by primary dimension; filters handle secondary:
orders.eu-west (region as topic)
→ subscribe with filter: eventType = "ORDER_CREATED"
orders.created (eventType as topic)
→ subscribe with filter: region = "eu-west"
Pros: Natural scaling by dimension; clear ownership
Cons: Must choose primary dimension; some redundancy
Strategy 3: Topic Explosion (Fully Segmented)
Combinatorial topics for all dimension combinations:
orders.eu-west.created.high
orders.eu-west.created.normal
orders.us-east.shipped.high
...
Pros: Zero filter overhead; subscriptions are exact matches
Cons: Exponential topic count; management nightmare
| Strategy | Topic Count | Filter Complexity | Best When |
|---|---|---|---|
| Single topic + compound filters | 1 | High (complex expressions) | Low volume; flexible filtering needs |
| Primary dimension topics + filters | ~N (one per primary value) | Medium | Clear primary dimension; moderate volume |
| Full topic explosion | N × M × ... (combinatorial) | None | Very high volume; static dimensions |
| Hierarchical routing | 1 + domain topics | Medium (at routers) | Need transformation; clean domain boundaries |
Start with a single topic and compound filters. Only segment into multiple topics when: (1) throughput requires it, (2) clear domain boundaries exist, or (3) different retention/security policies are needed. Premature topic explosion creates management burden.
Filters aren't free—they consume broker resources and have platform-specific limitations. Understanding these constraints helps you design efficient filtering strategies.
| Platform | Max Subscriptions | Filter Size Limit | Notable Constraints |
|---|---|---|---|
| Google Pub/Sub | 10,000 per topic | 1,024 bytes | Attribute values must be strings |
| AWS SNS | 12,500,000 per topic | 256 KB policy | 100 attributes per filter policy |
| Azure Event Grid | 5,000 per topic | 25 filters per subscription | 5 advanced filter values per filter |
| RabbitMQ | Unlimited (practical: 1000s) | N/A (routing key) | Routing key max 255 bytes |
| Apache Kafka | N/A | N/A | No broker-side filtering; consumer-side only |
123456789101112131415161718192021222324252627282930
// Filter Optimization Strategies // ❌ BAD: Complex filter that's hard to evaluateconst inefficientFilter = ` (attributes.eventType = "A" OR attributes.eventType = "B" OR attributes.eventType = "C" OR attributes.eventType = "D") AND (attributes.region = "us-east" OR attributes.region = "us-west" OR attributes.region = "eu-west") AND hasPrefix(attributes.category, "electronics") AND NOT attributes.environment = "development"`; // ✅ BETTER: Simplified with category attribute// Publisher creates a derived attribute for common filter patternsconst efficientFilter = ` attributes.filterCategory = "electronics-prod-us-eu" AND attributes.eventType IN ("A", "B", "C", "D")`; // ✅ BEST: Split into topic hierarchy if volume warrants// Topic: orders.electronics.prod.{region}// Subscribe to: orders.electronics.prod.us-east, orders.electronics.prod.us-west, ... // For Kafka: Pre-filter with Kafka Streams into derived topics// Input topic: all-orders (high volume)// Kafka Streams job filters and routes to:// - orders.electronics.us// - orders.electronics.eu// - orders.clothing.us// Consumers subscribe to specific filtered topicsFilters are a common source of bugs—events silently not delivered because of filter mismatches. Robust testing and debugging practices are essential.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
// Unit testing filter expressions import { parseGooglePubSubFilter, evaluateFilter } from './filter-parser'; describe('Order filter expressions', () => { const filter = parseGooglePubSubFilter( 'attributes.eventType = "ORDER_CREATED" AND attributes.region = "eu-west"' ); it('matches EU order created events', () => { const event = { attributes: { eventType: 'ORDER_CREATED', region: 'eu-west' } }; expect(evaluateFilter(filter, event)).toBe(true); }); it('rejects non-EU regions', () => { const event = { attributes: { eventType: 'ORDER_CREATED', region: 'us-east' } }; expect(evaluateFilter(filter, event)).toBe(false); }); it('rejects wrong event types', () => { const event = { attributes: { eventType: 'ORDER_SHIPPED', region: 'eu-west' } }; expect(evaluateFilter(filter, event)).toBe(false); }); it('rejects missing attributes', () => { const event = { attributes: { eventType: 'ORDER_CREATED' } // missing region }; expect(evaluateFilter(filter, event)).toBe(false); });}); // Integration test: Verify filter deliverydescribe('Filter delivery integration', () => { it('delivers only matching events to filtered subscription', async () => { // Create subscription with filter const sub = await createSubscription('test-orders', { filter: 'attributes.priority = "high"' }); // Publish mix of events await publish('test-orders', { data: 'high-1', attributes: { priority: 'high' } }); await publish('test-orders', { data: 'low-1', attributes: { priority: 'low' } }); await publish('test-orders', { data: 'high-2', attributes: { priority: 'high' } }); // Wait and collect const received = await collectMessages(sub, { timeout: 5000 }); // Verify only high priority received expect(received).toHaveLength(2); expect(received.every(m => m.attributes.priority === 'high')).toBe(true); });});Case sensitivity: 'HIGH' ≠ 'high' in most filter engines. Missing attributes: Events without the filtered attribute typically don't match (not an error). Whitespace: Some parsers are sensitive to extra spaces. Quotes: String values often need quotes; numeric values don't. Always test with real events before production.
We've covered the comprehensive landscape of message filtering in pub-sub systems. Let's consolidate the key concepts:
What's Next:
Now that we understand how to filter and route messages efficiently, we'll explore real-world pub-sub use cases—examining how these patterns come together in production systems for event-driven architectures, CQRS, real-time analytics, and more.
You now understand the full spectrum of message filtering: from attribute-based broker filtering to content-based routing to multi-dimensional strategies. These techniques ensure consumers receive precisely the events they need, enabling efficient, scalable pub-sub architectures.