Loading learning content...
The Active Object pattern appears in diverse domains wherever there's a need to decouple request submission from execution while maintaining a clean object-oriented interface. Understanding these real-world applications helps you recognize opportunities to apply the pattern in your own systems and adapt it to specific requirements.
In this page, we'll explore comprehensive examples from game development, financial trading, UI frameworks, IoT systems, and distributed architectures. Each example demonstrates how the core Active Object structure adapts to domain-specific constraints while preserving its fundamental benefits.
By the end of this page, you will understand how Active Object applies to:
• Game engine AI systems with frame-rate constraints • Trading systems with strict latency requirements • UI frameworks managing thread affinity • IoT gateways processing sensor streams • Complete production-ready implementation patterns
The Challenge:
Game engines must render frames at 60 FPS (or higher), giving rendering code approximately 16ms per frame. AI decision-making for NPCs (Non-Player Characters) can be computationally expensive—pathfinding, behavior trees, utility calculations, and world-state analysis may take 50-200ms per NPC.
Running AI on the render thread would cause visible stuttering. Running it on a background thread without synchronization causes race conditions when AI decisions modify game state. Active Object provides the perfect solution.
Design Objectives:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
/** * Active Object for Game AI Processing * Decouples AI computation from the render loop */ // ============================================// DOMAIN TYPES// ============================================ interface GameState { readonly playerPosition: Vector3; readonly npcStates: ReadonlyMap<string, NPCState>; readonly worldObjects: ReadonlyArray<WorldObject>; readonly timestamp: number;} interface NPCState { readonly id: string; readonly position: Vector3; readonly health: number; readonly alertLevel: AlertLevel; readonly currentBehavior: BehaviorType;} interface AIDecision { readonly npcId: string; readonly action: AIAction; readonly targetPosition?: Vector3; readonly priority: number; readonly validUntil: number; // Don't execute stale decisions} // ============================================// AI ACTIVE OBJECT// ============================================ interface AIProcessor { computeDecision(npcId: string, worldSnapshot: GameState): Promise<AIDecision>; computeBatchDecisions(npcIds: string[], worldSnapshot: GameState): Promise<AIDecision[]>; setDifficultyLevel(level: DifficultyLevel): Promise<void>;} /** * AI Processor Proxy - called from main game thread */class AIProcessorProxy implements AIProcessor { private readonly queue: PriorityActivationQueue; constructor(queue: PriorityActivationQueue) { this.queue = queue; } computeDecision(npcId: string, worldSnapshot: GameState): Promise<AIDecision> { const future = new Future<AIDecision>(); // High priority for individual AI queries (usually player-triggered) const request = new ComputeDecisionRequest(npcId, worldSnapshot, future); this.queue.enqueue(request, Priority.HIGH); return future.toPromise(); } computeBatchDecisions(npcIds: string[], worldSnapshot: GameState): Promise<AIDecision[]> { const future = new Future<AIDecision[]>(); // Normal priority for batch processing const request = new ComputeBatchDecisionsRequest(npcIds, worldSnapshot, future); this.queue.enqueue(request, Priority.NORMAL); return future.toPromise(); } setDifficultyLevel(level: DifficultyLevel): Promise<void> { const future = new Future<void>(); // Background priority for configuration changes const request = new SetDifficultyRequest(level, future); this.queue.enqueue(request, Priority.BACKGROUND); return future.toPromise(); }} /** * AI Processor Servant - the actual AI implementation * Only accessed from the AI scheduler thread */class AIProcessorServant { private difficultyLevel: DifficultyLevel = DifficultyLevel.NORMAL; private readonly pathfinder: Pathfinder; private readonly behaviorTree: BehaviorTreeEvaluator; private readonly cache: AIDecisionCache; constructor() { this.pathfinder = new Pathfinder(); this.behaviorTree = new BehaviorTreeEvaluator(); this.cache = new AIDecisionCache(); } computeDecision(npcId: string, worldSnapshot: GameState): AIDecision { const npc = worldSnapshot.npcStates.get(npcId); if (!npc) { throw new Error(`NPC ${npcId} not found in snapshot`); } // Check cache for recent decisions const cached = this.cache.get(npcId, worldSnapshot.timestamp); if (cached) { return cached; } // Evaluate behavior tree to determine action type const behaviorContext = this.createBehaviorContext(npc, worldSnapshot); const action = this.behaviorTree.evaluate(npc.currentBehavior, behaviorContext); // If action requires movement, compute path let targetPosition: Vector3 | undefined; if (action.requiresMovement) { const path = this.pathfinder.findPath( npc.position, action.targetLocation, worldSnapshot.worldObjects ); targetPosition = path.nextWaypoint; } const decision: AIDecision = { npcId, action: action.type, targetPosition, priority: this.calculatePriority(npc, action), validUntil: worldSnapshot.timestamp + this.getDecisionValidityMs() }; // Cache for subsequent frames this.cache.set(npcId, decision); return decision; } computeBatchDecisions(npcIds: string[], worldSnapshot: GameState): AIDecision[] { // Sort NPCs by distance to player for priority processing const sortedNpcs = this.sortByPlayerProximity(npcIds, worldSnapshot); return sortedNpcs.map(npcId => this.computeDecision(npcId, worldSnapshot)); } setDifficultyLevel(level: DifficultyLevel): void { this.difficultyLevel = level; this.behaviorTree.setAggressiveness(this.getDifficultyAggressiveness(level)); this.cache.clear(); // Invalidate cached decisions } private getDecisionValidityMs(): number { // Decisions valid for longer on easier difficulties return this.difficultyLevel === DifficultyLevel.EASY ? 500 : 200; } // ... additional helper methods} // ============================================// GAME LOOP INTEGRATION// ============================================ class GameEngine { private readonly aiProcessor: AIProcessor; private readonly pendingAIDecisions: Map<string, Promise<AIDecision>> = new Map(); private readonly appliedDecisions: Map<string, AIDecision> = new Map(); constructor(aiProcessor: AIProcessor) { this.aiProcessor = aiProcessor; } /** * Called every frame - must complete in ~16ms */ update(deltaTime: number): void { // 1. Capture current game state as immutable snapshot const worldSnapshot = this.captureWorldSnapshot(); // 2. Check for completed AI decisions this.processCompletedAIDecisions(); // 3. Apply valid decisions to NPCs this.applyAIDecisions(deltaTime); // 4. Request AI updates for NPCs that need new decisions this.requestAIUpdates(worldSnapshot); // 5. Continue with other game logic, physics, etc. this.updateGameLogic(deltaTime); } private processCompletedAIDecisions(): void { for (const [npcId, promise] of this.pendingAIDecisions) { // Check if promise is resolved (non-blocking) const result = this.tryGetResult(promise); if (result.isComplete) { if (result.value) { this.appliedDecisions.set(npcId, result.value); } this.pendingAIDecisions.delete(npcId); } } } private requestAIUpdates(worldSnapshot: GameState): void { const npcsNeedingUpdate = this.getNpcsNeedingAIUpdate(); for (const npcId of npcsNeedingUpdate) { // Don't request if there's already a pending decision if (this.pendingAIDecisions.has(npcId)) continue; // Request new AI decision (returns immediately) const decisionPromise = this.aiProcessor.computeDecision(npcId, worldSnapshot); this.pendingAIDecisions.set(npcId, decisionPromise); } }}Notice how the game engine passes an immutable snapshot of the world state to the AI Processor. This eliminates race conditions—the AI works on a frozen view of the world, while the main thread continues updating the live state. The AI's decisions are valid for the state they were computed against, not necessarily the current state.
The Challenge:
Trading systems must balance competing requirements: orders must be processed quickly (latency-sensitive), but also validated thoroughly (compliance, risk limits, market rules). Some validation checks are fast (in-memory), while others are slow (external service calls, database queries).
Active Object allows the fast path to proceed quickly while slower validation happens asynchronously, with futures used to signal when all checks complete.
Design Objectives:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
/** * Active Object for Order Processing in Trading System */ // ============================================// DOMAIN TYPES// ============================================ interface Order { readonly orderId: string; readonly symbol: string; readonly side: 'BUY' | 'SELL'; readonly quantity: number; readonly price: number; readonly orderType: 'LIMIT' | 'MARKET'; readonly traderId: string; readonly timestamp: number;} interface OrderResult { readonly orderId: string; readonly status: OrderStatus; readonly executionPrice?: number; readonly filledQuantity?: number; readonly rejectionReason?: string; readonly validationDetails?: ValidationResult[];} type OrderStatus = 'PENDING' | 'VALIDATED' | 'EXECUTING' | 'FILLED' | 'REJECTED' | 'CANCELLED'; interface ValidationResult { readonly validator: string; readonly passed: boolean; readonly message?: string; readonly latencyMs: number;} // ============================================// ORDER PROCESSOR ACTIVE OBJECT// ============================================ interface OrderProcessor { submitOrder(order: Order): CancellableOperation<OrderResult>; getOrderStatus(orderId: string): Promise<OrderStatus>; cancelOrder(orderId: string): Promise<boolean>;} /** * Order Processor Proxy - called from trading API handlers */class OrderProcessorProxy implements OrderProcessor { private readonly queue: PriorityActivationQueue; private readonly pendingOrders: Map<string, CancellationToken> = new Map(); submitOrder(order: Order): CancellableOperation<OrderResult> { const future = new Future<OrderResult>(); const cancellationToken = new CancellationToken(); // Store for potential cancellation this.pendingOrders.set(order.orderId, cancellationToken); // Determine priority based on order type const priority = order.orderType === 'MARKET' ? Priority.CRITICAL // Market orders need immediate processing : Priority.HIGH; const request = new ProcessOrderRequest(order, future, cancellationToken); this.queue.enqueue(request, priority); // Auto-cleanup when complete future.toPromise().finally(() => { this.pendingOrders.delete(order.orderId); }); return { result: future.toPromise(), cancel: () => { cancellationToken.cancel(); } }; } async cancelOrder(orderId: string): Promise<boolean> { const token = this.pendingOrders.get(orderId); if (token) { token.cancel(); return true; } // Order might already be processing - send cancellation request const future = new Future<boolean>(); const request = new CancelOrderRequest(orderId, future); this.queue.enqueue(request, Priority.CRITICAL); return future.toPromise(); } getOrderStatus(orderId: string): Promise<OrderStatus> { const future = new Future<OrderStatus>(); const request = new GetOrderStatusRequest(orderId, future); this.queue.enqueue(request, Priority.NORMAL); return future.toPromise(); }} /** * Order Processor Servant - actual order processing logic */class OrderProcessorServant { private readonly riskEngine: RiskEngine; private readonly complianceService: ComplianceService; private readonly marketDataService: MarketDataService; private readonly executionEngine: ExecutionEngine; private readonly orderStore: OrderStore; private readonly auditLog: AuditLog; async processOrder(order: Order, cancellationToken: CancellationToken): Promise<OrderResult> { const validationResults: ValidationResult[] = []; try { // 1. Fast inline validations (synchronous, in-memory) this.performFastValidations(order, validationResults); cancellationToken.throwIfCancelled(); // 2. Parallel async validations const [riskResult, complianceResult, priceResult] = await Promise.all([ this.validateRisk(order), this.validateCompliance(order), this.validatePrice(order) ]); validationResults.push(riskResult, complianceResult, priceResult); cancellationToken.throwIfCancelled(); // 3. Check all validations passed const allPassed = validationResults.every(v => v.passed); if (!allPassed) { const failedValidation = validationResults.find(v => !v.passed); return this.createRejection(order, failedValidation!, validationResults); } // 4. Execute order cancellationToken.throwIfCancelled(); const executionResult = await this.executionEngine.execute(order); // 5. Audit and return await this.auditLog.recordExecution(order, executionResult); return { orderId: order.orderId, status: 'FILLED', executionPrice: executionResult.price, filledQuantity: executionResult.quantity, validationDetails: validationResults }; } catch (error) { if (error instanceof CancellationError) { await this.auditLog.recordCancellation(order); return { orderId: order.orderId, status: 'CANCELLED', validationDetails: validationResults }; } await this.auditLog.recordError(order, error as Error); return { orderId: order.orderId, status: 'REJECTED', rejectionReason: (error as Error).message, validationDetails: validationResults }; } } private performFastValidations(order: Order, results: ValidationResult[]): void { const startTime = Date.now(); // Symbol exists if (!this.marketDataService.isValidSymbol(order.symbol)) { results.push({ validator: 'SymbolValidator', passed: false, message: `Unknown symbol: ${order.symbol}`, latencyMs: Date.now() - startTime }); return; } // Quantity within limits const limits = this.orderStore.getOrderLimits(order.traderId); if (order.quantity > limits.maxQuantity) { results.push({ validator: 'QuantityValidator', passed: false, message: `Quantity ${order.quantity} exceeds limit ${limits.maxQuantity}`, latencyMs: Date.now() - startTime }); return; } results.push({ validator: 'FastValidation', passed: true, latencyMs: Date.now() - startTime }); } private async validateRisk(order: Order): Promise<ValidationResult> { const startTime = Date.now(); try { const passed = await this.riskEngine.checkPositionLimits(order); return { validator: 'RiskEngine', passed, message: passed ? undefined : 'Position limit exceeded', latencyMs: Date.now() - startTime }; } catch (error) { return { validator: 'RiskEngine', passed: false, message: `Risk check failed: ${(error as Error).message}`, latencyMs: Date.now() - startTime }; } } private async validateCompliance(order: Order): Promise<ValidationResult> { const startTime = Date.now(); try { // External compliance service call (may be slow) const result = await this.complianceService.checkOrder(order); return { validator: 'ComplianceService', passed: result.approved, message: result.reason, latencyMs: Date.now() - startTime }; } catch (error) { return { validator: 'ComplianceService', passed: false, message: `Compliance check failed: ${(error as Error).message}`, latencyMs: Date.now() - startTime }; } }}Trading systems must handle partial failures carefully. If validation passes but execution fails, the system must NOT leave an order in an inconsistent state. Notice how the Servant tracks validation results and records comprehensive audit logs at each stage—this enables recovery and investigation if anything goes wrong.
The Challenge:
Most UI frameworks require that UI updates occur only on a specific thread—the "UI thread" or "main thread." Background operations (network requests, file I/O, heavy computations) must run on worker threads, but their results must be delivered back to the UI thread for display.
Active Object provides an elegant solution: the Scheduler runs on the UI thread, processing Method Requests that represent UI updates. Background work submits results through the queue, automatically marshaling them to the correct thread.
Design Objectives:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
/** * Active Object for UI Thread Marshaling * Ensures all UI updates occur on the main thread */ // ============================================// UI ACTIVE OBJECT// ============================================ interface UIService { updateProgress(taskId: string, percent: number): Promise<void>; displayResult(taskId: string, result: unknown): Promise<void>; showError(taskId: string, error: Error): Promise<void>; showNotification(message: string, type: NotificationType): Promise<void>;} /** * UI Service Proxy - can be called from any thread * Marshals all calls to the UI thread */class UIServiceProxy implements UIService { private readonly uiQueue: ActivationQueue; constructor(uiQueue: ActivationQueue) { this.uiQueue = uiQueue; } updateProgress(taskId: string, percent: number): Promise<void> { const future = new Future<void>(); const request = new UpdateProgressRequest(taskId, percent, future); this.uiQueue.enqueue(request); return future.toPromise(); } displayResult(taskId: string, result: unknown): Promise<void> { const future = new Future<void>(); const request = new DisplayResultRequest(taskId, result, future); this.uiQueue.enqueue(request); return future.toPromise(); } showError(taskId: string, error: Error): Promise<void> { const future = new Future<void>(); const request = new ShowErrorRequest(taskId, error, future); this.uiQueue.enqueue(request); return future.toPromise(); } showNotification(message: string, type: NotificationType): Promise<void> { const future = new Future<void>(); const request = new ShowNotificationRequest(message, type, future); this.uiQueue.enqueue(request); return future.toPromise(); }} /** * UI Service Servant - directly manipulates UI components * Only runs on UI thread */class UIServiceServant { private readonly progressBars: Map<string, ProgressBar>; private readonly resultPanels: Map<string, ResultPanel>; private readonly notificationManager: NotificationManager; updateProgress(taskId: string, percent: number): void { const progressBar = this.progressBars.get(taskId); if (progressBar) { progressBar.setValue(percent); progressBar.setText(`${percent}%`); } } displayResult(taskId: string, result: unknown): void { const panel = this.resultPanels.get(taskId); if (panel) { panel.showResult(result); panel.setStatus('complete'); } // Hide progress bar const progressBar = this.progressBars.get(taskId); if (progressBar) { progressBar.hide(); } } showError(taskId: string, error: Error): void { const panel = this.resultPanels.get(taskId); if (panel) { panel.showError(error); panel.setStatus('error'); } this.notificationManager.show({ message: `Task ${taskId} failed: ${error.message}`, type: 'error', duration: 5000 }); } showNotification(message: string, type: NotificationType): void { this.notificationManager.show({ message, type, duration: type === 'error' ? 8000 : 3000 }); }} /** * UI Scheduler - runs on UI thread, integrates with event loop */class UIScheduler { private readonly queue: ActivationQueue; private readonly servant: UIServiceServant; private readonly batchSize: number = 10; // Process up to N updates per frame constructor(queue: ActivationQueue, servant: UIServiceServant) { this.queue = queue; this.servant = servant; } /** * Start processing - called once on UI thread initialization */ start(): void { this.scheduleNextBatch(); } private scheduleNextBatch(): void { // Use requestAnimationFrame or equivalent to stay on UI thread requestAnimationFrame(() => { this.processNextBatch(); this.scheduleNextBatch(); }); } private async processNextBatch(): Promise<void> { let processed = 0; while (processed < this.batchSize) { const request = await this.queue.tryDequeue(); // Non-blocking if (!request) break; try { request.call(this.servant); } catch (error) { console.error('UI update error:', error); } processed++; } }} // ============================================// BACKGROUND TASK INTEGRATION// ============================================ /** * Background Task that uses UI Service for updates */class FileUploadTask { private readonly file: File; private readonly uiService: UIService; // Receives proxy, transparently marshals private readonly taskId: string; constructor(file: File, uiService: UIService) { this.file = file; this.uiService = uiService; this.taskId = `upload-${Date.now()}`; } async execute(): Promise<void> { try { const totalChunks = Math.ceil(this.file.size / CHUNK_SIZE); for (let i = 0; i < totalChunks; i++) { const chunk = this.file.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE); await this.uploadChunk(chunk, i); // Update progress - automatically marshaled to UI thread const percent = Math.round(((i + 1) / totalChunks) * 100); await this.uiService.updateProgress(this.taskId, percent); } // Display completion - automatically marshaled to UI thread await this.uiService.displayResult(this.taskId, { fileName: this.file.name, size: this.file.size }); } catch (error) { // Show error - automatically marshaled to UI thread await this.uiService.showError(this.taskId, error as Error); } }} // ============================================// USAGE// ============================================ // Initialization (on UI thread)const uiQueue = new ActivationQueue();const uiServant = new UIServiceServant();const uiScheduler = new UIScheduler(uiQueue, uiServant);uiScheduler.start(); // Create proxy that can be passed to any threadconst uiService: UIService = new UIServiceProxy(uiQueue); // Background work (on worker thread)async function handleFileUpload(file: File) { const task = new FileUploadTask(file, uiService); // Run on worker thread - UI updates happen safely on UI thread await runOnWorker(() => task.execute());}The UI Scheduler integrates with the platform's event loop (requestAnimationFrame in browsers, message pump in desktop frameworks). By processing updates in batches synchronized with the render cycle, the UI remains smooth even under heavy update load. This pattern is used internally by frameworks like React, Vue, and WPF/WinUI.
The Challenge:
IoT gateways receive continuous streams of sensor data from many devices. Data must be validated, transformed, aggregated, and forwarded to cloud services. Each sensor may have different data rates and processing requirements. The gateway must handle bursts without losing data.
Active Object provides the architecture: each sensor type gets its own Active Object with appropriate queue sizing and processing strategies. The queue absorbs bursts; the scheduler processes at a sustainable rate; backpressure signals when the system is overwhelmed.
Design Objectives:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
/** * Active Object for IoT Sensor Data Processing */ // ============================================// DOMAIN TYPES// ============================================ interface SensorReading { readonly sensorId: string; readonly sensorType: SensorType; readonly timestamp: number; readonly value: number | object; readonly quality: DataQuality;} interface ProcessedReading { readonly originalReading: SensorReading; readonly processedValue: number; readonly aggregations?: Aggregations; readonly alerts?: Alert[];} interface CloudPublishResult { readonly success: boolean; readonly messageId?: string; readonly error?: Error;} // ============================================// SENSOR PROCESSOR ACTIVE OBJECT// ============================================ interface SensorProcessor { processReading(reading: SensorReading): Promise<ProcessedReading>; processBatch(readings: SensorReading[]): Promise<ProcessedReading[]>; publishToCloud(processed: ProcessedReading[]): Promise<CloudPublishResult>;} /** * Sensor Processor with separate queues per priority */class SensorProcessorProxy implements SensorProcessor { private readonly processingQueue: ActivationQueue; private readonly publishQueue: ActivationQueue; constructor( processingQueue: ActivationQueue, publishQueue: ActivationQueue ) { this.processingQueue = processingQueue; this.publishQueue = publishQueue; } processReading(reading: SensorReading): Promise<ProcessedReading> { const future = new Future<ProcessedReading>(); // Determine priority based on sensor characteristics const priority = this.determinePriority(reading); const request = new ProcessReadingRequest(reading, future); this.processingQueue.enqueue(request, priority); return future.toPromise(); } processBatch(readings: SensorReading[]): Promise<ProcessedReading[]> { const future = new Future<ProcessedReading[]>(); const request = new ProcessBatchRequest(readings, future); this.processingQueue.enqueue(request, Priority.NORMAL); return future.toPromise(); } publishToCloud(processed: ProcessedReading[]): Promise<CloudPublishResult> { const future = new Future<CloudPublishResult>(); // Publishing is separate queue - isolates cloud latency from processing const request = new PublishToCloudRequest(processed, future); this.publishQueue.enqueue(request); return future.toPromise(); } private determinePriority(reading: SensorReading): Priority { // Alert conditions get high priority if (reading.sensorType === SensorType.ALERT_TRIGGER) { return Priority.CRITICAL; } // Environmental sensors (temperature, humidity) - normal if ([SensorType.TEMPERATURE, SensorType.HUMIDITY].includes(reading.sensorType)) { return Priority.NORMAL; } // Status updates - background return Priority.LOW; }} /** * Sensor Processor Servant */class SensorProcessorServant { private readonly transformers: Map<SensorType, DataTransformer>; private readonly aggregator: DataAggregator; private readonly alertEngine: AlertEngine; private readonly cloudClient: CloudClient; processReading(reading: SensorReading): ProcessedReading { // 1. Transform raw value based on sensor type const transformer = this.transformers.get(reading.sensorType); const transformedValue = transformer ? transformer.transform(reading.value) : reading.value as number; // 2. Update running aggregations const aggregations = this.aggregator.update(reading.sensorId, transformedValue); // 3. Check alert conditions const alerts = this.alertEngine.evaluate(reading, transformedValue, aggregations); return { originalReading: reading, processedValue: transformedValue, aggregations, alerts: alerts.length > 0 ? alerts : undefined }; } processBatch(readings: SensorReading[]): ProcessedReading[] { return readings.map(r => this.processReading(r)); } async publishToCloud(processed: ProcessedReading[]): Promise<CloudPublishResult> { const maxRetries = 3; let lastError: Error | undefined; for (let attempt = 0; attempt < maxRetries; attempt++) { try { const messageId = await this.cloudClient.publish({ readings: processed, timestamp: Date.now(), gatewayId: this.getGatewayId() }); return { success: true, messageId }; } catch (error) { lastError = error as Error; // Exponential backoff await this.delay(Math.pow(2, attempt) * 1000); } } return { success: false, error: lastError }; }} // ============================================// GATEWAY INTEGRATION// ============================================ /** * IoT Gateway orchestrating multiple Active Objects */class IoTGateway { private readonly sensorProcessors: Map<SensorType, SensorProcessorProxy> = new Map(); private readonly cloudPublisher: CloudPublisherActiveObject; private readonly batchBuffer: BatchBuffer; constructor() { // Create specialized processors for different sensor types this.sensorProcessors.set( SensorType.TEMPERATURE, this.createProcessor('temperature', { queueSize: 10000, workers: 2 }) ); this.sensorProcessors.set( SensorType.VIBRATION, this.createProcessor('vibration', { queueSize: 50000, workers: 4 }) // High-frequency ); this.sensorProcessors.set( SensorType.ALERT_TRIGGER, this.createProcessor('alert', { queueSize: 1000, workers: 1 }) // Low-latency ); // Batch buffer for efficient cloud publishing this.batchBuffer = new BatchBuffer({ maxSize: 100, maxAgeMs: 5000, onFlush: (batch) => this.publishBatch(batch) }); } /** * Called by sensor drivers when new readings arrive */ async handleSensorReading(reading: SensorReading): Promise<void> { // Get appropriate processor for sensor type const processor = this.sensorProcessors.get(reading.sensorType); if (!processor) { console.warn(`No processor for sensor type: ${reading.sensorType}`); return; } try { // Process the reading (queued, async) const processed = await processor.processReading(reading); // Add to batch for cloud publishing this.batchBuffer.add(processed); // If there are alerts, publish immediately if (processed.alerts && processed.alerts.length > 0) { await this.publishAlerts(processed.alerts); } } catch (error) { // Queue was full or processing failed this.metrics.recordDropped(reading.sensorType); console.error(`Failed to process reading from ${reading.sensorId}:`, error); } } /** * Handle backpressure when queue is full */ private onQueueFull(sensorType: SensorType): void { // Signal back to sensor drivers to slow down this.flowController.applyBackpressure(sensorType); // Record metrics this.metrics.recordBackpressure(sensorType); }}Different sensor types have different queue requirements:
• High-frequency sensors (vibration, accelerometer): Large queues to absorb bursts, multiple workers for throughput • Alert sensors: Small queues but high priority; you want these processed immediately, not buffered • Periodic sensors (temperature): Moderate queues; data is less time-sensitive
The Active Object pattern makes these trade-offs explicit and configurable per sensor type.
Now let's bring everything together with a complete, production-ready Active Object implementation template. This can serve as a starting point for your own implementations.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
/** * Production-Ready Active Object Template * Includes all advanced features: priority, cancellation, timeout, monitoring */ // ============================================// CONFIGURATION// ============================================ interface ActiveObjectConfig { name: string; queueCapacity: number; workerCount: number; defaultTimeoutMs: number; enableMetrics: boolean;} const DEFAULT_CONFIG: ActiveObjectConfig = { name: 'ActiveObject', queueCapacity: 10000, workerCount: 1, defaultTimeoutMs: 30000, enableMetrics: true}; // ============================================// COMPLETE ACTIVE OBJECT IMPLEMENTATION// ============================================ /** * Generic Active Object factory * Creates an Active Object from any service interface */class ActiveObjectFactory<TService> { static create<TService>( servant: TService, config: Partial<ActiveObjectConfig> = {} ): ActiveObject<TService> { const fullConfig = { ...DEFAULT_CONFIG, ...config }; return new ActiveObjectImpl(servant, fullConfig); }} class ActiveObjectImpl<TService> implements ActiveObject<TService> { private readonly config: ActiveObjectConfig; private readonly activationQueue: PriorityActivationQueue; private readonly scheduler: Scheduler; private readonly servant: TService; private readonly metrics: ActiveObjectMetrics; private isRunning = false; constructor(servant: TService, config: ActiveObjectConfig) { this.config = config; this.servant = servant; this.activationQueue = new PriorityActivationQueue(config.queueCapacity); this.metrics = new ActiveObjectMetrics(config.name, config.enableMetrics); // Create scheduler(s) this.scheduler = config.workerCount > 1 ? new ThreadPoolScheduler(this.activationQueue, servant, config.workerCount, this.metrics) : new SingleThreadScheduler(this.activationQueue, servant, this.metrics); } /** * Create a proxy that converts method calls to async requests */ createProxy(): AsyncService<TService> { return new Proxy({} as AsyncService<TService>, { get: (target, methodName: string) => { return (...args: unknown[]) => { return this.invoke(methodName, args); }; } }); } /** * Invoke a method asynchronously */ invoke<TResult>( methodName: string, args: unknown[], options: InvokeOptions = {} ): CancellablePromise<TResult> { const { priority = Priority.NORMAL, timeoutMs = this.config.defaultTimeoutMs, cancellationToken = new CancellationToken() } = options; const future = new Future<TResult>(); const request = new GenericMethodRequest<TResult>( this.servant, methodName, args, future, cancellationToken ); // Record enqueue time for metrics request.enqueueTime = Date.now(); // Enqueue with priority this.activationQueue.enqueue(request, priority); // Set up timeout const deadline = Date.now() + timeoutMs; const timeoutHandle = setTimeout(() => { if (!future.isDone()) { cancellationToken.cancel(); future.reject(new TimeoutError(`${methodName} timed out after ${timeoutMs}ms`)); } }, timeoutMs); // Clear timeout on completion future.toPromise().finally(() => clearTimeout(timeoutHandle)); return { promise: future.toPromise(), cancel: () => cancellationToken.cancel(), deadline }; } /** * Start the Active Object */ start(): void { if (this.isRunning) return; this.isRunning = true; this.scheduler.start(); console.log(`[${this.config.name}] Started`); } /** * Stop the Active Object */ async stop(): Promise<void> { if (!this.isRunning) return; this.isRunning = false; await this.scheduler.stop(); console.log(`[${this.config.name}] Stopped`); } /** * Get health status */ async getHealth(): Promise<HealthStatus> { const queueDepth = await this.activationQueue.size(); const utilization = queueDepth / this.config.queueCapacity; if (utilization > 0.9) { return { status: 'UNHEALTHY', reason: 'Queue nearly full' }; } if (utilization > 0.7) { return { status: 'DEGRADED', reason: 'Queue filling up' }; } return { status: 'HEALTHY' }; } /** * Get metrics snapshot */ getMetrics(): MetricsSnapshot { return this.metrics.snapshot(); }} // ============================================// USAGE EXAMPLE// ============================================ // Define your service interfaceinterface ImageService { processImage(imageId: string, data: Buffer): ProcessedImage; getImage(imageId: string): Image; deleteImage(imageId: string): void;} // Create a servant (plain implementation)class ImageServiceServant implements ImageService { processImage(imageId: string, data: Buffer): ProcessedImage { // Actual implementation return { id: imageId, processed: true }; } getImage(imageId: string): Image { return { id: imageId, data: null }; } deleteImage(imageId: string): void { // Delete logic }} // Create Active Objectconst servant = new ImageServiceServant();const activeObject = ActiveObjectFactory.create(servant, { name: 'ImageProcessor', queueCapacity: 5000, workerCount: 4, defaultTimeoutMs: 60000}); // Start itactiveObject.start(); // Get the async proxyconst imageService: AsyncService<ImageService> = activeObject.createProxy(); // Use it like a regular async serviceasync function handleRequest() { try { // All methods are now async! const result = await imageService.processImage('img-123', buffer); console.log('Processed:', result); // With options const { promise, cancel } = activeObject.invoke<ProcessedImage>( 'processImage', ['img-456', buffer], { priority: Priority.HIGH, timeoutMs: 10000 } ); // Can cancel if needed // cancel(); const result2 = await promise; console.log('Processed with priority:', result2); } catch (error) { if (error instanceof TimeoutError) { console.error('Request timed out'); } else if (error instanceof CancellationError) { console.error('Request was cancelled'); } else { throw error; } }} // Graceful shutdownprocess.on('SIGTERM', async () => { await activeObject.stop(); process.exit(0);});Active Object is one of several concurrency patterns. Understanding when to use it versus alternatives helps you make appropriate architectural choices.
| Pattern | Best For | Trade-offs |
|---|---|---|
| Active Object | Objects with asynchronous method calls; maintaining object interface; serialized access to state | More complex than simple async; creates proxy/queue overhead |
| Future/Promise | Individual async operations; no need for object abstraction | No built-in queuing or serialization; each call independent |
| Actor Model | Message-driven systems; distributed computing; many independent entities | Message-based API (not method call); heavier infrastructure |
| Thread Pool | Independent tasks; CPU-bound parallelism | No object abstraction; tasks share nothing |
| Reactor/Proactor | I/O-heavy workloads; event-driven systems | Requires event loop; callback complexity |
| CSP (Channels) | Pipeline processing; explicit synchronization points | Channel-based communication; not object-oriented |
Module Complete:
You've now mastered the Active Object pattern:
Active Object is a powerful pattern for building concurrent systems that maintain clean, object-oriented interfaces. By externalizing all concurrency concerns into the pattern's infrastructure, business logic remains simple and synchronous—the Active Object handles the complexity of asynchronous execution.
Congratulations! You've completed the Active Object Pattern module. You now have the knowledge to implement this pattern in production systems, adapt it to your specific requirements, and recognize when it's the right tool for the job. The Active Object pattern is one of the most versatile concurrency patterns—use it wisely!