Loading learning content...
The basic Active Object pattern provides a solid foundation, but production systems often require more sophisticated capabilities. How do you handle requests with different priority levels? What happens when a client needs to cancel a pending request? How do you prevent runaway operations from blocking the scheduler forever?
The Scheduler and Proxy are the two components with the richest opportunities for enhancement. The Scheduler determines how and when requests execute, while the Proxy controls what gets queued and how clients interact with the async results.
In this page, we'll explore advanced patterns for both components, transforming the basic Active Object into a production-grade concurrency infrastructure.
By the end of this page, you will understand:
• Priority scheduling and multiple queues • Thread pool integration for parallel execution • Request cancellation and timeout handling • Proxy patterns for complex client interactions • Monitoring and observability integration
The basic Scheduler processes requests in FIFO (First-In, First-Out) order. This is fair but often insufficient for real-world systems where different requests have different urgency levels. Let's explore alternative scheduling strategies:
| Strategy | Use Case | Trade-offs |
|---|---|---|
| FIFO (Queue) | General-purpose; fairness required | Simple, predictable; high-priority requests may wait behind low-priority ones |
| Priority Queue | Differentiated SLAs; urgent requests | High-priority gets fast service; low-priority may starve |
| Multiple Queues | Distinct request categories with different processing needs | Isolation between categories; requires queue selection logic |
| Weighted Fair Queuing | Proportional allocation; multiple tenants | Fair bandwidth sharing; more complex implementation |
| Deadline-Based | Real-time systems; SLA guarantees | Meets deadlines when possible; requires deadline estimation |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
/** * Priority-Based Scheduler * Processes high-priority requests before low-priority ones */ enum Priority { CRITICAL = 0, // Lowest value = highest priority HIGH = 1, NORMAL = 2, LOW = 3, BACKGROUND = 4} interface PrioritizedRequest { priority: Priority; request: MethodRequest<unknown>; enqueueTime: number; // For fairness within priority level} class PriorityActivationQueue { private readonly queues: Map<Priority, Array<MethodRequest<unknown>>> = new Map(); private readonly lock = new Mutex(); private readonly notEmpty = new ConditionVariable(); private isShutdown = false; constructor() { // Initialize queues for each priority level for (const priority of Object.values(Priority)) { if (typeof priority === 'number') { this.queues.set(priority, []); } } } async enqueue(request: MethodRequest<unknown>, priority: Priority = Priority.NORMAL): Promise<void> { await this.lock.acquire(); try { if (this.isShutdown) { throw new Error("Queue is shutdown"); } this.queues.get(priority)!.push(request); this.notEmpty.signal(); } finally { this.lock.release(); } } async dequeue(): Promise<MethodRequest<unknown> | null> { await this.lock.acquire(); try { while (this.isEmpty() && !this.isShutdown) { await this.notEmpty.wait(this.lock); } if (this.isEmpty()) { return null; } // Process highest priority first (lowest numeric value) for (const priority of [ Priority.CRITICAL, Priority.HIGH, Priority.NORMAL, Priority.LOW, Priority.BACKGROUND ]) { const queue = this.queues.get(priority)!; if (queue.length > 0) { return queue.shift()!; } } return null; } finally { this.lock.release(); } } private isEmpty(): boolean { for (const queue of this.queues.values()) { if (queue.length > 0) return false; } return true; }} /** * Priority-Aware Proxy that allows clients to specify priority */class PriorityImageProcessorProxy implements ImageProcessor { private readonly queue: PriorityActivationQueue; processImage(imageId: string, data: Buffer): Promise<ProcessedImage> { return this.processImageWithPriority(imageId, data, Priority.NORMAL); } processImageWithPriority( imageId: string, data: Buffer, priority: Priority ): Promise<ProcessedImage> { const future = new Future<ProcessedImage>(); const request = new ProcessImageRequest(imageId, data, future); this.queue.enqueue(request, priority); return future.toPromise(); } // Example: Express lane for thumbnail generation processImageThumbnail(imageId: string, data: Buffer): Promise<ProcessedImage> { return this.processImageWithPriority(imageId, data, Priority.HIGH); } // Example: Background batch processing processImageBatch(imageId: string, data: Buffer): Promise<ProcessedImage> { return this.processImageWithPriority(imageId, data, Priority.BACKGROUND); }}A naive priority queue can starve low-priority requests indefinitely if high-priority requests keep arriving. Mitigation strategies include:
• Aging: Boost priority of waiting requests over time • Priority bands with fair queuing: Guarantee minimum throughput for each priority • Separate schedulers: Dedicate capacity to low-priority work
The basic Active Object uses a single scheduler thread, which serializes all operations. This is simple and eliminates race conditions in the Servant, but it also limits throughput—only one request processes at a time.
For CPU-bound or I/O-intensive workloads, you may want parallel execution. This requires careful design to maintain thread safety while increasing concurrency.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
/** * Thread Pool Scheduler * Dispatches requests to a pool of worker threads for parallel execution */ class ThreadPoolScheduler { private readonly activationQueue: ActivationQueue; private readonly servantPool: ServantPool; private readonly workerCount: number; private readonly workers: Worker[] = []; private isRunning = false; constructor( activationQueue: ActivationQueue, servantFactory: () => ImageProcessorServant, workerCount: number = 4 ) { this.activationQueue = activationQueue; this.workerCount = workerCount; // Create a pool of Servants - each worker gets its own this.servantPool = new ServantPool(servantFactory, workerCount); } start(): void { if (this.isRunning) return; this.isRunning = true; // Spawn multiple worker threads, each running the same processing loop for (let i = 0; i < this.workerCount; i++) { const worker = this.createWorker(i); this.workers.push(worker); } console.log(`[ThreadPoolScheduler] Started with ${this.workerCount} workers`); } private createWorker(workerId: number): Worker { // In reality, this would be a true thread (Web Worker, Worker Thread, etc.) // Here we simulate with an async loop const workerLoop = async () => { // Each worker gets its own Servant instance const servant = this.servantPool.acquire(); try { while (this.isRunning) { const request = await this.activationQueue.dequeue(); if (request === null) { break; // Shutdown signal } // Workers process in parallel - each on its own Servant try { if (request.guard(servant)) { request.call(servant); } else { // Re-queue if guard fails await this.activationQueue.enqueue(request); } } catch (error) { console.error(`[Worker-${workerId}] Error:`, error); } } } finally { this.servantPool.release(servant); } console.log(`[Worker-${workerId}] Stopped`); }; // Start the worker workerLoop(); return { id: workerId } as Worker; } async stop(): Promise<void> { this.isRunning = false; await this.activationQueue.shutdown(); console.log("[ThreadPoolScheduler] Shutdown complete"); }} /** * Servant Pool - manages a pool of Servant instances * Each worker thread gets its own Servant to avoid shared state */class ServantPool { private readonly servants: Array<{ servant: ImageProcessorServant; inUse: boolean }>; private readonly lock = new Mutex(); private readonly available = new ConditionVariable(); constructor(factory: () => ImageProcessorServant, poolSize: number) { this.servants = []; for (let i = 0; i < poolSize; i++) { this.servants.push({ servant: factory(), inUse: false }); } } async acquire(): Promise<ImageProcessorServant> { await this.lock.acquire(); try { while (true) { const available = this.servants.find(s => !s.inUse); if (available) { available.inUse = true; return available.servant; } await this.available.wait(this.lock); } } finally { this.lock.release(); } } async release(servant: ImageProcessorServant): Promise<void> { await this.lock.acquire(); try { const entry = this.servants.find(s => s.servant === servant); if (entry) { entry.inUse = false; this.available.signal(); } } finally { this.lock.release(); } }}In the thread pool approach, we create one Servant per worker thread. This maintains the key benefit of Active Object—the Servant remains single-threaded and needs no synchronization. The parallel execution happens across independent Servant instances, not within a shared one.
If you need shared state across all requests, consider using thread-safe data structures or a separate shared service that the Servants access through proper synchronization.
Real-world systems often need to cancel in-flight operations. A user navigates away from a page. A timeout expires. A higher-priority request arrives. The system needs to gracefully abandon pending or running work.
Cancellation in Active Object requires coordination at multiple levels:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
/** * Cancellation Token - shared between client and request */class CancellationToken { private _isCancelled = false; private readonly callbacks: Array<() => void> = []; get isCancelled(): boolean { return this._isCancelled; } cancel(): void { if (this._isCancelled) return; this._isCancelled = true; // Notify all registered callbacks for (const callback of this.callbacks) { try { callback(); } catch (e) { /* ignore */ } } this.callbacks.length = 0; } onCancel(callback: () => void): void { if (this._isCancelled) { callback(); } else { this.callbacks.push(callback); } } throwIfCancelled(): void { if (this._isCancelled) { throw new CancellationError("Operation was cancelled"); } }} class CancellationError extends Error { constructor(message: string) { super(message); this.name = 'CancellationError'; }} /** * Cancellable Method Request */abstract class CancellableMethodRequest<T> extends MethodRequest<T> { protected readonly cancellationToken: CancellationToken; constructor(future: Future<T>, cancellationToken: CancellationToken) { super(future); this.cancellationToken = cancellationToken; // If cancelled, complete the future with cancellation error cancellationToken.onCancel(() => { this.future.reject(new CancellationError("Request was cancelled")); }); } call(servant: ImageProcessorServant): void { // Check cancellation before starting if (this.cancellationToken.isCancelled) { this.future.reject(new CancellationError("Request was cancelled")); return; } try { // Execute with cancellation token available const result = this.executeWithCancellation(servant); // Check cancellation after completion (result may be stale) if (!this.cancellationToken.isCancelled) { this.future.resolve(result); } } catch (error) { if (error instanceof CancellationError) { this.future.reject(error); } else { this.future.reject(error as Error); } } } abstract executeWithCancellation(servant: ImageProcessorServant): T;} /** * Cancellable implementation checking periodically */class CancellableProcessImageRequest extends CancellableMethodRequest<ProcessedImage> { private readonly imageId: string; private readonly data: Buffer; constructor( imageId: string, data: Buffer, future: Future<ProcessedImage>, cancellationToken: CancellationToken ) { super(future, cancellationToken); this.imageId = imageId; this.data = data; } executeWithCancellation(servant: ImageProcessorServant): ProcessedImage { // Check before each expensive operation this.cancellationToken.throwIfCancelled(); const decoded = servant.decodeImage(this.data); this.cancellationToken.throwIfCancelled(); const filtered = servant.applyFilters(decoded); this.cancellationToken.throwIfCancelled(); const resized = servant.resize(filtered); this.cancellationToken.throwIfCancelled(); return servant.compress(resized); }} /** * Cancellable Proxy returning both Future and CancellationToken */class CancellableImageProcessorProxy { private readonly queue: ActivationQueue; processImage(imageId: string, data: Buffer): CancellableOperation<ProcessedImage> { const future = new Future<ProcessedImage>(); const cancellationToken = new CancellationToken(); const request = new CancellableProcessImageRequest( imageId, data, future, cancellationToken ); this.queue.enqueue(request); return { result: future.toPromise(), cancel: () => cancellationToken.cancel() }; }} interface CancellableOperation<T> { result: Promise<T>; cancel: () => void;} // Client usageasync function clientWithCancellation() { const processor = new CancellableImageProcessorProxy(); const operation = processor.processImage("img-1", imageBuffer); // Set up timeout const timeout = setTimeout(() => { console.log("Operation timed out, cancelling..."); operation.cancel(); }, 5000); try { const result = await operation.result; clearTimeout(timeout); console.log("Processing complete:", result); } catch (error) { if (error instanceof CancellationError) { console.log("Operation was cancelled"); } else { throw error; } }}Cancellation is cooperative—the executing code must check the cancellation token periodically. Long-running operations without cancellation checks cannot be interrupted gracefully. Design your Servant methods to check cancellation at appropriate intervals (e.g., between processing stages, after each batch iteration).
Timeouts are closely related to cancellation but have subtle differences. A timeout automatically triggers cancellation after a deadline expires, but the Proxy can handle timeouts more elegantly by racing the operation against a timer.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
/** * Timeout-Aware Proxy * Wraps operations with timeout handling */ class TimeoutImageProcessorProxy { private readonly delegate: CancellableImageProcessorProxy; private readonly defaultTimeout: number; constructor(delegate: CancellableImageProcessorProxy, defaultTimeoutMs: number = 30000) { this.delegate = delegate; this.defaultTimeout = defaultTimeoutMs; } /** * Process with default timeout */ processImage(imageId: string, data: Buffer): Promise<ProcessedImage> { return this.processImageWithTimeout(imageId, data, this.defaultTimeout); } /** * Process with custom timeout */ async processImageWithTimeout( imageId: string, data: Buffer, timeoutMs: number ): Promise<ProcessedImage> { const operation = this.delegate.processImage(imageId, data); // Race between operation and timeout return Promise.race([ operation.result, this.createTimeout<ProcessedImage>(timeoutMs, () => operation.cancel()) ]); } private createTimeout<T>(ms: number, onTimeout: () => void): Promise<T> { return new Promise((_, reject) => { setTimeout(() => { onTimeout(); reject(new TimeoutError(`Operation timed out after ${ms}ms`)); }, ms); }); }} class TimeoutError extends Error { constructor(message: string) { super(message); this.name = 'TimeoutError'; }} /** * Advanced: Deadline-based timeout with queue time consideration */class DeadlineAwareRequest<T> extends CancellableMethodRequest<T> { private readonly deadline: number; // Absolute timestamp constructor( future: Future<T>, cancellationToken: CancellationToken, deadlineMs: number // Relative timeout from now ) { super(future, cancellationToken); this.deadline = Date.now() + deadlineMs; } /** * Check if deadline has passed (including time spent in queue) */ isExpired(): boolean { return Date.now() > this.deadline; } /** * Remaining time until deadline */ remainingMs(): number { return Math.max(0, this.deadline - Date.now()); }} /** * Scheduler that respects deadlines */class DeadlineAwareScheduler { async processLoop(): Promise<void> { while (this.isRunning) { const request = await this.activationQueue.dequeue(); if (request === null) break; // Check if request has already expired while in queue if (request instanceof DeadlineAwareRequest && request.isExpired()) { // Don't even start processing - it would be wasted work request.future.reject(new TimeoutError( "Request expired while waiting in queue" )); continue; } // Execute with remaining deadline request.call(this.servant); } }}Timeouts should typically include both queue wait time and execution time. A request that waits 29 seconds in a queue and then starts executing with a 30-second timeout is already nearly expired. Deadline-based timeouts (absolute timestamp) naturally handle this, while relative timeouts on just the Future miss the queue wait time.
The Proxy can implement various patterns to enhance the client experience. Here are several advanced proxy patterns commonly used in production:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
/** * Pattern 1: Batching Proxy * Accumulates requests and submits them as a batch */class BatchingImageProcessorProxy { private readonly queue: ActivationQueue; private pendingBatch: Array<{ request: BatchableRequest; future: Future<ProcessedImage>; }> = []; private batchTimer: NodeJS.Timeout | null = null; private readonly batchSize: number; private readonly batchDelayMs: number; constructor(queue: ActivationQueue, batchSize = 10, batchDelayMs = 100) { this.queue = queue; this.batchSize = batchSize; this.batchDelayMs = batchDelayMs; } processImage(imageId: string, data: Buffer): Promise<ProcessedImage> { const future = new Future<ProcessedImage>(); this.pendingBatch.push({ request: { imageId, data }, future }); // Flush if batch is full if (this.pendingBatch.length >= this.batchSize) { this.flushBatch(); } else if (!this.batchTimer) { // Start timer for partial batch this.batchTimer = setTimeout(() => this.flushBatch(), this.batchDelayMs); } return future.toPromise(); } private flushBatch(): void { if (this.batchTimer) { clearTimeout(this.batchTimer); this.batchTimer = null; } if (this.pendingBatch.length === 0) return; const batch = this.pendingBatch; this.pendingBatch = []; // Submit as single batch request const batchRequest = new BatchProcessImageRequest(batch); this.queue.enqueue(batchRequest); }} /** * Pattern 2: Deduplicating Proxy * Reuses in-flight requests for identical operations */class DeduplicatingImageProcessorProxy { private readonly queue: ActivationQueue; private readonly inFlight: Map<string, Promise<ProcessedImage>> = new Map(); processImage(imageId: string, data: Buffer): Promise<ProcessedImage> { // Create deduplication key const key = this.createKey(imageId, data); // Check if identical request is in flight const existing = this.inFlight.get(key); if (existing) { console.log(`Deduplicating request for ${imageId}`); return existing; } // Create new request const future = new Future<ProcessedImage>(); const request = new ProcessImageRequest(imageId, data, future); this.queue.enqueue(request); const promise = future.toPromise(); // Track the in-flight request this.inFlight.set(key, promise); // Remove from tracking when complete promise.finally(() => { this.inFlight.delete(key); }); return promise; } private createKey(imageId: string, data: Buffer): string { // Simple key based on imageId and data hash return `${imageId}:${this.hash(data)}`; } private hash(data: Buffer): string { // Use fast hash for deduplication return crypto.createHash('md5').update(data).digest('hex'); }} /** * Pattern 3: Rate-Limiting Proxy * Controls request rate to prevent overwhelming the system */class RateLimitingImageProcessorProxy { private readonly delegate: ImageProcessor; private readonly rateLimiter: RateLimiter; constructor(delegate: ImageProcessor, requestsPerSecond: number) { this.delegate = delegate; this.rateLimiter = new TokenBucketRateLimiter(requestsPerSecond); } async processImage(imageId: string, data: Buffer): Promise<ProcessedImage> { // Wait for rate limiter token await this.rateLimiter.acquire(); try { return await this.delegate.processImage(imageId, data); } finally { // Token is consumed; new token will be added after refill interval } }} class TokenBucketRateLimiter { private tokens: number; private readonly maxTokens: number; private readonly refillInterval: number; private lastRefill: number; constructor(tokensPerSecond: number) { this.maxTokens = tokensPerSecond; this.tokens = tokensPerSecond; this.refillInterval = 1000 / tokensPerSecond; this.lastRefill = Date.now(); } async acquire(): Promise<void> { while (true) { this.refill(); if (this.tokens >= 1) { this.tokens -= 1; return; } // Wait for next refill await new Promise(resolve => setTimeout(resolve, this.refillInterval) ); } } private refill(): void { const now = Date.now(); const elapsed = now - this.lastRefill; const newTokens = elapsed / this.refillInterval; this.tokens = Math.min(this.maxTokens, this.tokens + newTokens); this.lastRefill = now; }}Production Active Object implementations need comprehensive monitoring to understand system health, identify bottlenecks, and debug issues. Key metrics and instrumentation points include:
| Metric | What It Tells You | Alert Threshold Example |
|---|---|---|
| Queue Depth | How much pending work exists; indicates if production outpaces consumption | 80% of queue capacity |
| Queue Wait Time | How long requests wait before execution starts; latency component | 500ms p99 |
| Execution Time | How long each request takes to process; identifies slow operations | 5s p99 |
| End-to-End Latency | Total time from invocation to result delivery; what clients experience | 10s p99 |
| Throughput | Requests processed per second; capacity utilization | < 50% of expected capacity |
| Error Rate | Percentage of requests that fail; system health | 1% of requests |
| Cancellation Rate | How often requests are cancelled; may indicate timeouts or user behavior | 5% of requests |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
/** * Instrumented Scheduler with comprehensive monitoring */ interface ActiveObjectMetrics { queueDepth: Gauge; queueWaitTime: Histogram; executionTime: Histogram; requestsCompleted: Counter; requestsFailed: Counter; requestsCancelled: Counter;} class InstrumentedScheduler { private readonly metrics: ActiveObjectMetrics; private readonly servant: ImageProcessorServant; private readonly activationQueue: ActivationQueue; async processLoop(): Promise<void> { while (this.isRunning) { // Update queue depth metric this.metrics.queueDepth.set(await this.activationQueue.size()); const request = await this.activationQueue.dequeue(); if (request === null) break; // Calculate queue wait time const queueWaitMs = Date.now() - request.enqueueTime; this.metrics.queueWaitTime.observe(queueWaitMs); // Execute and measure const executionStart = Date.now(); try { if (request.guard(this.servant)) { request.call(this.servant); const executionMs = Date.now() - executionStart; this.metrics.executionTime.observe(executionMs); this.metrics.requestsCompleted.inc(); // Log slow requests if (executionMs > 5000) { console.warn(`Slow request: ${request.constructor.name} took ${executionMs}ms`); } } } catch (error) { if (error instanceof CancellationError) { this.metrics.requestsCancelled.inc(); } else { this.metrics.requestsFailed.inc(); console.error(`Request failed: ${error}`); } } } }} /** * Health check endpoint for the Active Object */class ActiveObjectHealthCheck { private readonly activationQueue: ActivationQueue; private readonly maxQueueDepth: number; private readonly maxQueueWaitMs: number; async isHealthy(): Promise<HealthStatus> { const queueDepth = await this.activationQueue.size(); const queueUtilization = queueDepth / this.maxQueueDepth; // Calculate average queue wait time from recent samples const avgWaitMs = this.getAverageQueueWaitMs(); if (queueUtilization > 0.9) { return { status: 'UNHEALTHY', reason: `Queue nearly full: ${(queueUtilization * 100).toFixed(1)}% utilization` }; } if (avgWaitMs > this.maxQueueWaitMs) { return { status: 'DEGRADED', reason: `High queue wait time: ${avgWaitMs}ms average` }; } return { status: 'HEALTHY', metrics: { queueDepth, queueUtilization, avgQueueWaitMs: avgWaitMs } }; }}For microservices, integrate with distributed tracing (e.g., OpenTelemetry, Jaeger). Pass trace context through the Method Request so that spans can be correlated across the queue boundary. This lets you trace a request from the client, through the queue wait, to execution, and back.
What's next:
With the pattern's components mastered, we'll now explore real-world use cases and complete examples. The next page presents practical applications of Active Object in game engines, trading systems, UI frameworks, and distributed systems—showing how the pattern adapts to diverse requirements.
You've mastered advanced patterns for the Scheduler and Proxy components. You understand priority scheduling, thread pool integration, cancellation, timeouts, and how proxies can implement powerful cross-cutting concerns. Next, we'll see Active Object in action across real-world systems.