Loading content...
Your e-commerce platform stores orders in DynamoDB. Business requirements demand that when an order status changes to 'shipped,' three things must happen:
You could build a synchronous orchestrator that makes all three calls after every order update. But that creates tight coupling, increases latency, and means order updates fail if the email service is down.
DynamoDB Streams provides a better architecture. Every change to your DynamoDB table is captured as a stream record, which can trigger downstream consumers asynchronously. The order write succeeds immediately, and the notification, inventory update, and analytics happen independently—resilient to failures and infinitely scalable.
By the end of this page, you will understand what DynamoDB Streams are and how they capture changes, stream record types and their contents, integrating Streams with AWS Lambda and Kinesis, common patterns like materialized views, replication, and auditing, ordering guarantees and exactly-once processing considerations, and operational best practices for production stream processing.
DynamoDB Streams is a fully managed change data capture (CDC) feature that records every modification to items in a DynamoDB table. When enabled, every insert, update, or delete generates a stream record that contains information about the change.
Think of Streams as a transaction log for your table—a time-ordered sequence of all changes that downstream consumers can process.
123456789101112131415161718192021222324252627282930313233343536
┌─────────────────────────────────────────────────────────────────────┐│ DynamoDB Streams Architecture │└─────────────────────────────────────────────────────────────────────┘ Application DynamoDB Consumers │ │ │ │ PutItem/UpdateItem/ │ │ │ DeleteItem │ │ ▼ ▼ │┌──────────┐ ┌─────────────┐ ││ Write │ ──────────────► │ Table │ ││ Request │ │ Partition │ │└──────────┘ └──────┬──────┘ │ │ │ ┌───────────────┴───────────────┐ │ │ Stream Record Generated │ │ │ (async, within milliseconds)│ │ └───────────────┬───────────────┘ │ ▼ │ ┌───────────────┐ │ │ Stream Shard │ │ │ (ordered by │ │ │ partition) │ │ └───────┬───────┘ │ │ │ ┌──────────────────────────┼──────────────────────────────┤ │ │ │ ▼ ▼ ▼┌─────────────────┐ ┌─────────────────────┐ ┌─────────────────┐│ AWS Lambda │ │ Kinesis Data │ │ Custom App ││ (Event-Driven) │ │ Streams/Firehose │ │ (DynamoDB ││ │ │ (Analytics/S3) │ │ Streams API) │└─────────────────┘ └─────────────────────┘ └─────────────────┘ Stream records retained for 24 hoursMultiple consumers can process same stream independentlyDynamoDB offers two streaming options: DynamoDB Streams (native, 24-hour retention, free) and Kinesis Data Streams for DynamoDB (1-365 day retention, more consumer options, additional cost). This page focuses on native DynamoDB Streams. Choose Kinesis integration when you need longer retention, enhanced fan-out, or integration with existing Kinesis infrastructure.
When you enable Streams on a table, you specify a StreamViewType that determines what information each stream record contains. Choose based on your processing needs:
| View Type | Contents | Use Case |
|---|---|---|
| KEYS_ONLY | Only the key attributes of the modified item | Trigger processing without data; minimal cost |
| NEW_IMAGE | The entire item as it appears after modification | Sync current state to other systems |
| OLD_IMAGE | The entire item as it appeared before modification | Audit trails, compliance logging |
| NEW_AND_OLD_IMAGES | Both before and after images | Diff detection, change tracking, rollback capability |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
// Stream Record structure for NEW_AND_OLD_IMAGES viewinterface DynamoDBStreamRecord { eventID: string; // Unique identifier for stream record eventName: "INSERT" | "MODIFY" | "REMOVE"; eventVersion: string; // "1.0" or "1.1" eventSource: "aws:dynamodb"; awsRegion: string; dynamodb: { Keys: Record<string, AttributeValue>; // Primary key NewImage?: Record<string, AttributeValue>; // After state OldImage?: Record<string, AttributeValue>; // Before state SequenceNumber: string; // Order within shard SizeBytes: number; // Record size StreamViewType: "KEYS_ONLY" | "NEW_IMAGE" | "OLD_IMAGE" | "NEW_AND_OLD_IMAGES"; ApproximateCreationDateTime: number; // Epoch timestamp }; eventSourceARN: string; // Stream ARN} // Example: Order status change recordconst orderStatusChangeRecord: DynamoDBStreamRecord = { eventID: "abc123...", eventName: "MODIFY", eventVersion: "1.1", eventSource: "aws:dynamodb", awsRegion: "us-east-1", dynamodb: { Keys: { orderId: { S: "ORD-12345" } }, OldImage: { orderId: { S: "ORD-12345" }, customerId: { S: "CUST-001" }, status: { S: "processing" }, total: { N: "149.99" }, updatedAt: { S: "2024-06-15T10:00:00Z" } }, NewImage: { orderId: { S: "ORD-12345" }, customerId: { S: "CUST-001" }, status: { S: "shipped" }, // Changed! total: { N: "149.99" }, trackingNumber: { S: "1Z999..." }, // New attribute! updatedAt: { S: "2024-06-15T14:30:00Z" } }, SequenceNumber: "111222333444555", SizeBytes: 256, StreamViewType: "NEW_AND_OLD_IMAGES", ApproximateCreationDateTime: 1718456000 }, eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789:table/Orders/stream/..."};Stream record size affects Lambda invocation payload size (max 6MB per batch) and data transfer costs. If you only need to know that something changed (not what changed), use KEYS_ONLY. If you only need the current state, use NEW_IMAGE. Reserve NEW_AND_OLD_IMAGES for when you genuinely need to compute diffs or track changes.
The most common way to consume DynamoDB Streams is through AWS Lambda. This integration is first-class: Lambda automatically polls the stream, manages shard iterations, handles failures with retries, and scales with throughput.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
import { DynamoDBStreamEvent, DynamoDBRecord, Context } from "aws-lambda";import { unmarshall } from "@aws-sdk/util-dynamodb"; // Lambda handler for DynamoDB Streamexport async function handler( event: DynamoDBStreamEvent, context: Context): Promise<void> { console.log(`Processing ${event.Records.length} stream records`); for (const record of event.Records) { await processRecord(record); }} async function processRecord(record: DynamoDBRecord): Promise<void> { const { eventName, dynamodb } = record; if (!dynamodb) return; // Unmarshall DynamoDB format to plain JavaScript objects const keys = dynamodb.Keys ? unmarshall(dynamodb.Keys) : null; const newImage = dynamodb.NewImage ? unmarshall(dynamodb.NewImage) : null; const oldImage = dynamodb.OldImage ? unmarshall(dynamodb.OldImage) : null; switch (eventName) { case "INSERT": console.log("New item created:", keys); await handleInsert(newImage); break; case "MODIFY": console.log("Item modified:", keys); await handleModify(oldImage, newImage); break; case "REMOVE": console.log("Item deleted:", keys); await handleRemove(oldImage); break; }} // ============================================// Example: Order Status Change Handler// ============================================async function handleModify( oldImage: Record<string, any> | null, newImage: Record<string, any> | null): Promise<void> { if (!oldImage || !newImage) return; // Detect status change if (oldImage.status !== newImage.status) { console.log(`Order ${newImage.orderId} status: ${oldImage.status} → ${newImage.status}`); // React to specific status transitions if (newImage.status === "shipped") { await sendShippingNotification(newImage); await updateInventorySystem(newImage); await publishToAnalytics("order_shipped", newImage); } if (newImage.status === "delivered") { await sendDeliveryConfirmation(newImage); await triggerReviewRequest(newImage); } if (newImage.status === "cancelled") { await refundPayment(newImage); await restoreInventory(newImage); } }} // Notification service callasync function sendShippingNotification(order: Record<string, any>): Promise<void> { await ses.send(new SendEmailCommand({ Destination: { ToAddresses: [order.customerEmail] }, Message: { Subject: { Data: `Your order ${order.orderId} has shipped!` }, Body: { Text: { Data: `Tracking number: ${order.trackingNumber}` } } }, Source: "orders@example.com" }));}| Setting | Description | Recommended Value |
|---|---|---|
| BatchSize | Max records per Lambda invocation | 100-1000 (balance throughput vs latency) |
| StartingPosition | Where to start reading | TRIM_HORIZON (oldest) or LATEST |
| MaximumBatchingWindowInSeconds | Wait time to fill batch | 0-300 (0 = immediate trigger) |
| ParallelizationFactor | Concurrent batches per shard | 1-10 (higher = more parallelism) |
| MaximumRetryAttempts | Retries before sending to DLQ | 10000 (default) or lower for faster failure |
| BisectBatchOnFunctionError | Split batch on error to isolate bad records | true (recommended) |
| MaximumRecordAgeInSeconds | Skip records older than this | 86400 (24 hours = max retention) |
If your Lambda throws an error, the ENTIRE batch is retried. This can cause repeated processing of successful records. Design for idempotency: use conditional writes, track processed record IDs, or leverage exactly-once semantics with DynamoDB transactions. Configure a Dead Letter Queue (DLQ) to capture persistently failing records.
DynamoDB Streams enable powerful architectural patterns. Let's examine the most common production use cases:
1234567891011121314151617181920212223242526272829303132333435363738394041424344
// Source Table: Orders (PK: customerId, SK: orderTimestamp)// Materialized View: ProductOrders (PK: productId, SK: orderTimestamp) async function handleOrderChange(record: DynamoDBRecord): Promise<void> { const { eventName, dynamodb } = record; const newImage = dynamodb?.NewImage ? unmarshall(dynamodb.NewImage) : null; const oldImage = dynamodb?.OldImage ? unmarshall(dynamodb.OldImage) : null; switch (eventName) { case "INSERT": // New order → Add to materialized view for each product for (const item of newImage.items) { await docClient.send(new PutCommand({ TableName: "ProductOrders", Item: { productId: item.productId, orderTimestamp: newImage.orderTimestamp, orderId: newImage.orderId, customerId: newImage.customerId, quantity: item.quantity } })); } break; case "REMOVE": // Order deleted → Remove from view for (const item of oldImage.items) { await docClient.send(new DeleteCommand({ TableName: "ProductOrders", Key: { productId: item.productId, orderTimestamp: oldImage.orderTimestamp } })); } break; case "MODIFY": // Handle product list changes (full reconciliation) // ... diff oldImage.items vs newImage.items break; }}12345678910111213141516171819202122232425262728293031323334353637383940414243
// Aggregate order metrics in real-time from stream async function updateOrderMetrics(record: DynamoDBRecord): Promise<void> { const { eventName, dynamodb } = record; const newImage = dynamodb?.NewImage ? unmarshall(dynamodb.NewImage) : null; const oldImage = dynamodb?.OldImage ? unmarshall(dynamodb.OldImage) : null; if (eventName === "INSERT" && newImage) { // New order: increment daily counter and revenue const orderDate = newImage.orderTimestamp.substring(0, 10); // YYYY-MM-DD await docClient.send(new UpdateCommand({ TableName: "OrderMetrics", Key: { metricDate: orderDate, metricType: "DAILY_SUMMARY" }, UpdateExpression: ` ADD orderCount :one, totalRevenue :revenue SET updatedAt = :now `, ExpressionAttributeValues: { ":one": 1, ":revenue": newImage.total, ":now": new Date().toISOString() } })); // Also update per-product metrics for (const item of newImage.items) { await docClient.send(new UpdateCommand({ TableName: "ProductMetrics", Key: { productId: item.productId, metricDate: orderDate }, UpdateExpression: ` ADD unitsSold :qty, revenue :itemRevenue `, ExpressionAttributeValues: { ":qty": item.quantity, ":itemRevenue": item.quantity * item.unitPrice } })); } }}Understanding DynamoDB Streams' ordering guarantees is crucial for building correct stream processors.
Ordering Guarantees:
Within a partition key: Stream records are strictly ordered. If you update item A, then item A again, the records appear in that order.
Across partition keys: No ordering guarantee. Modifications to items with different partition keys may appear in any order, even if one logically happened before another.
Within a shard: Records are strictly ordered by sequence number. Shards correspond roughly (but not exactly) to table partitions.
Implication: If your processing depends on order (e.g., "process parent before children"), you must:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
// DynamoDB Streams provides at-least-once delivery// Your processor may see the same record multiple times on retry// DESIGN FOR IDEMPOTENCY! interface ProcessedRecord { streamRecordId: string; processedAt: string; ttl: number; // Auto-cleanup after 24 hours} async function processRecordIdempotently(record: DynamoDBRecord): Promise<void> { const recordId = record.eventID!; // Step 1: Check if we've already processed this record const existing = await docClient.send(new GetCommand({ TableName: "ProcessedStreamRecords", Key: { streamRecordId: recordId }, ConsistentRead: true })); if (existing.Item) { console.log(`Record ${recordId} already processed, skipping`); return; } // Step 2: Process the record await actuallyProcessRecord(record); // Step 3: Mark as processed (with TTL for auto-cleanup) const ttl = Math.floor(Date.now() / 1000) + 86400; // 24 hours from now await docClient.send(new PutCommand({ TableName: "ProcessedStreamRecords", Item: { streamRecordId: recordId, processedAt: new Date().toISOString(), ttl: ttl }, ConditionExpression: "attribute_not_exists(streamRecordId)" }));} // ============================================// Alternative: Use Conditional Writes for Idempotency// ============================================// If your target supports conditional writes, use them instead of tracking async function updateMaterializedViewIdempotently( record: DynamoDBRecord): Promise<void> { const newImage = unmarshall(record.dynamodb!.NewImage!); const sequenceNumber = record.dynamodb!.SequenceNumber!; // Only update if this is newer than what we have try { await docClient.send(new PutCommand({ TableName: "MaterializedView", Item: { ...newImage, lastSequenceNumber: sequenceNumber }, // Only write if we haven't seen a newer record ConditionExpression: "attribute_not_exists(lastSequenceNumber) OR lastSequenceNumber < :seq", ExpressionAttributeValues: { ":seq": sequenceNumber } })); } catch (error) { if (error.name === "ConditionalCheckFailedException") { // A newer version already exists; safe to ignore console.log("Skipping older record - newer version exists"); } else { throw error; } }}Lambda may invoke your function multiple times with the same record batch due to retries, scaling events, or transient errors. NEVER assume exactly-once delivery. Always design your processor to be idempotent—producing the same result even when the same input is processed multiple times.
For advanced use cases, DynamoDB can stream changes directly to Amazon Kinesis Data Streams, providing additional capabilities beyond native DynamoDB Streams.
| Feature | Native DynamoDB Streams | Kinesis Integration |
|---|---|---|
| Retention | 24 hours (fixed) | 1-365 days (configurable) |
| Consumers | 2 concurrent readers max | Unlimited (with enhanced fan-out) |
| Cost | Free (part of DynamoDB) | Kinesis pricing applies |
| Throughput | Auto-scales with table | Shard-based provisioning |
| Consumer types | Lambda only (managed) | Lambda, KCL, Kinesis Analytics, etc. |
| Replay capability | Limited (24h, from iterator) | Full replay within retention |
When to Use Kinesis Integration:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
// Using Kinesis Client Library (KCL) to process DynamoDB changes// This gives more control than Lambda but requires more infrastructure import { KinesisClient, GetRecordsCommand } from "@aws-sdk/client-kinesis";import { unmarshall } from "@aws-sdk/util-dynamodb"; class DynamoDBStreamKinesisConsumer { private kinesis = new KinesisClient({ region: "us-east-1" }); private streamName = "dynamodb-orders-kinesis-stream"; async processRecords(shardIterator: string): Promise<string | undefined> { const response = await this.kinesis.send(new GetRecordsCommand({ ShardIterator: shardIterator, Limit: 100 })); for (const record of response.Records ?? []) { if (record.Data) { const dynamoRecord = JSON.parse( Buffer.from(record.Data).toString("utf-8") ); // Process the DynamoDB change record await this.handleChange(dynamoRecord); } } // Return next iterator for continued polling return response.NextShardIterator; } private async handleChange(dynamoRecord: any): Promise<void> { const eventName = dynamoRecord.eventName; const newImage = dynamoRecord.dynamodb?.NewImage ? unmarshall(dynamoRecord.dynamodb.NewImage) : null; console.log(`[Kinesis] Processing ${eventName} for item:`, newImage?.orderId); // Your processing logic here // Kinesis gives you more flexibility than Lambda triggers }} // ============================================// Fan-out to Kinesis Data Firehose for S3/Redshift// ============================================// Common pattern: DynamoDB → Kinesis → Firehose → S3 → Redshift // Firehose transformation Lambdaexport async function firehoseTransformer(event: any): Promise<any> { return { records: event.records.map((record: any) => { const payload = JSON.parse( Buffer.from(record.data, "base64").toString("utf-8") ); // Transform DynamoDB format to flat JSON for analytics const flattened = flattenForAnalytics(payload); return { recordId: record.recordId, result: "Ok", data: Buffer.from( JSON.stringify(flattened) + "" // Newline-delimited JSON ).toString("base64") }; }) };} function flattenForAnalytics(dynamoRecord: any): Record<string, any> { const newImage = dynamoRecord.dynamodb?.NewImage ? unmarshall(dynamoRecord.dynamodb.NewImage) : {}; return { event_name: dynamoRecord.eventName, event_time: dynamoRecord.dynamodb?.ApproximateCreationDateTime, ...newImage // Flattened item attributes };}Running DynamoDB Streams in production requires attention to monitoring, error handling, and capacity planning. Here are essential operational practices:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
import { DynamoDBStreamEvent, Context } from "aws-lambda"; interface ProcessingResult { batchItemFailures: Array<{ itemIdentifier: string }>;} // Use partial batch response for granular error handlingexport async function handler( event: DynamoDBStreamEvent, context: Context): Promise<ProcessingResult> { const failedRecords: string[] = []; for (const record of event.Records) { try { await processWithTimeout(record, context); } catch (error) { console.error(`Failed to process record ${record.eventID}:`, error); // Report this specific record as failed // Lambda will retry only this record, not the whole batch failedRecords.push(record.eventID!); } } // Return failed record IDs for retry return { batchItemFailures: failedRecords.map(id => ({ itemIdentifier: id })) };} async function processWithTimeout( record: DynamoDBRecord, context: Context): Promise<void> { // Reserve time for cleanup const remainingTime = context.getRemainingTimeInMillis(); const processingTimeout = remainingTime - 5000; // 5s buffer if (processingTimeout < 1000) { throw new Error("Insufficient time remaining"); } // Wrap processing in timeout await Promise.race([ actuallyProcessRecord(record), new Promise((_, reject) => setTimeout(() => reject(new Error("Processing timeout")), processingTimeout) ) ]);} async function actuallyProcessRecord(record: DynamoDBRecord): Promise<void> { // Your actual processing logic // - Check dependencies are healthy (circuit breaker) // - Process idempotently // - Handle specific error types differently}Returning batchItemFailures (as shown above) tells Lambda exactly which records failed. Lambda retries only those records, not the entire batch. This prevents one bad record from blocking processing of all subsequent records—a common production issue with all-or-nothing batch processing.
DynamoDB Streams transform DynamoDB from a database into an event source. Let's consolidate the key insights:
What's Next
With Streams mastered, we conclude our DynamoDB deep dive with When to Use DynamoDB—a systematic framework for deciding whether DynamoDB is the right choice for your system. We'll cover ideal use cases, warning signs that DynamoDB might not fit, migration considerations, and how DynamoDB compares to alternatives like Aurora, MongoDB, and Cassandra.
You now understand DynamoDB Streams—how they capture changes, integrate with Lambda and Kinesis, enable event-driven architectures, and their operational requirements. You can build robust stream processors for materialized views, cross-service synchronization, and real-time analytics while handling the complexities of at-least-once delivery.