Loading content...
Data is the lifeblood of modern applications. Whether ingesting clickstream events, transforming logs for analysis, aggregating IoT sensor readings, or synchronizing data between systems, data processing pipelines are essential infrastructure components. Serverless computing has revolutionized how we build these pipelines.
Traditional data pipelines required provisioning Hadoop clusters, managing Spark installations, or maintaining dedicated ETL servers. Serverless pipelines eliminate this operational burden. Data flows through managed services—Kinesis, Lambda, Firehose, S3—with automatic scaling, pay-per-use pricing, and minimal maintenance.
This page provides a comprehensive guide to building serverless data processing pipelines. We'll cover streaming versus batch processing, common pipeline patterns, transformation strategies, error handling, and integration with analytics and machine learning services.
By the end of this page, you will understand: (1) The difference between streaming and batch processing in serverless, (2) How to design ingestion, transformation, and delivery stages, (3) Common serverless data pipeline patterns, (4) Error handling and retry strategies for data processing, (5) Integration with analytics services like Athena and Redshift, and (6) Best practices for production data pipelines.
Data pipelines generally fall into two categories: streaming (real-time) and batch (periodic). Serverless computing supports both, with different services optimized for each.
Streaming (Real-Time) Processing:
Data is processed as it arrives, with latency measured in seconds or milliseconds. Use cases:
Serverless streaming services:
Batch Processing:
Data is collected over a period and processed in bulk at scheduled intervals. Use cases:
Serverless batch services:
| Aspect | Streaming | Batch |
|---|---|---|
| Latency | Seconds to milliseconds | Minutes to hours |
| Data volume per operation | Single records or small batches | Large datasets |
| Processing pattern | Continuous, event-driven | Periodic, scheduled |
| State management | Complex (windowing, aggregation) | Simpler (full dataset access) |
| Cost model | Per invocation/record | Per invocation/duration |
| Failure impact | Affects individual records | May require full reprocessing |
| Use case fit | Time-sensitive, reactive | Analytical, comprehensive |
Lambda's event source mappings with Kinesis or DynamoDB Streams provide a hybrid model: the platform batches records (up to 10,000 or a time window) before invoking your function. This gives you streaming semantics with batched efficiency. Tune batch size and window based on latency vs. efficiency trade-offs.
Well-designed data pipelines follow a consistent architectural pattern with distinct stages: Ingestion, Transformation, Storage, and Consumption. Each stage has specific responsibilities and design considerations.
Stage 1: Ingestion
Collecting data from various sources and feeding it into the pipeline:
Ingestion must handle:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
import { APIGatewayProxyHandlerV2 } from "aws-lambda";import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";import { z } from "zod"; const kinesis = new KinesisClient({});const STREAM_NAME = process.env.KINESIS_STREAM!; // Define event schemaconst EventSchema = z.object({ eventType: z.enum(["page_view", "click", "purchase", "signup"]), userId: z.string().uuid(), timestamp: z.string().datetime(), properties: z.record(z.unknown()).optional(), sessionId: z.string().optional()}); type Event = z.infer<typeof EventSchema>; export const handler: APIGatewayProxyHandlerV2 = async (event) => { try { // Parse and validate incoming event const body = JSON.parse(event.body || "{}"); const validatedEvent = EventSchema.parse(body); // Enrich with ingestion metadata const enrichedEvent = { ...validatedEvent, ingestedAt: new Date().toISOString(), sourceIp: event.requestContext.http.sourceIp, userAgent: event.headers["user-agent"] }; // Write to Kinesis (partition by userId for ordering) await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, Data: Buffer.from(JSON.stringify(enrichedEvent)), PartitionKey: validatedEvent.userId })); return { statusCode: 202, body: JSON.stringify({ accepted: true }) }; } catch (error) { if (error instanceof z.ZodError) { return { statusCode: 400, body: JSON.stringify({ error: "Validation failed", details: error.errors }) }; } throw error; }};Stage 2: Transformation
Processing raw data into usable formats:
Stage 3: Storage
Persisting processed data for consumption:
Stage 4: Consumption
Accessing processed data:
A popular pattern organizes data into Bronze (raw), Silver (cleaned/validated), and Gold (aggregated/business-ready) tiers. Each tier has different SLAs, access patterns, and retention policies. Serverless pipelines naturally support this pattern with separate Lambda functions for each transformation stage.
Kinesis Data Streams is AWS's core streaming service, and Lambda's integration with it enables powerful real-time processing. Understanding the integration's configuration options is key to building efficient pipelines.
Event Source Mapping Configuration:
Lambda polls Kinesis and invokes your function with batches of records. Key settings:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
import { KinesisStreamHandler, KinesisStreamRecord } from "aws-lambda";import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({});const BUCKET = process.env.OUTPUT_BUCKET!; interface ProcessedEvent { eventType: string; userId: string; timestamp: string; sessionId?: string; // Enriched fields processedAt: string; hourBucket: string; isNewUser: boolean;} export const handler: KinesisStreamHandler = async (event) => { console.log(`Processing ${event.Records.length} records`); const processedEvents: ProcessedEvent[] = []; const failures: Array<{ record: KinesisStreamRecord; error: string }> = []; for (const record of event.Records) { try { // Decode and parse record const payload = Buffer.from(record.kinesis.data, "base64").toString(); const data = JSON.parse(payload); // Transform and enrich const processed = await transformEvent(data); processedEvents.push(processed); } catch (error) { console.error(`Failed to process record ${record.kinesis.sequenceNumber}`, error); failures.push({ record, error: (error as Error).message }); } } // Write batch to S3 (partitioned by hour) if (processedEvents.length > 0) { await writeToS3(processedEvents); } // Report failures (partial batch failure) if (failures.length > 0) { console.error(`${failures.length} records failed processing`); // Return failed records for retry (requires reportBatchItemFailures: true) return { batchItemFailures: failures.map(f => ({ itemIdentifier: f.record.kinesis.sequenceNumber })) }; } console.log(`Successfully processed ${processedEvents.length} events`);}; async function transformEvent(raw: any): Promise<ProcessedEvent> { const timestamp = new Date(raw.timestamp); return { eventType: raw.eventType, userId: raw.userId, timestamp: raw.timestamp, sessionId: raw.sessionId, processedAt: new Date().toISOString(), // Partition key for S3/Athena queries hourBucket: timestamp.toISOString().slice(0, 13).replace("T", "-"), // Enrichment: check if user is new (simplified) isNewUser: await checkIfNewUser(raw.userId) };} async function writeToS3(events: ProcessedEvent[]): Promise<void> { // Group by hour for partitioned storage const byHour = events.reduce((acc, event) => { const hour = event.hourBucket; if (!acc[hour]) acc[hour] = []; acc[hour].push(event); return acc; }, {} as Record<string, ProcessedEvent[]>); // Write each partition for (const [hour, hourEvents] of Object.entries(byHour)) { const key = `processed/dt=${hour}/batch-${Date.now()}.json`; const body = hourEvents.map(e => JSON.stringify(e)).join("\n"); await s3.send(new PutObjectCommand({ Bucket: BUCKET, Key: key, Body: body, ContentType: "application/x-ndjson" })); }}Partial Batch Failure Handling:
By default, if your Lambda throws an error, the entire batch is retried—including records that succeeded. This can cause duplicate processing. Enable ReportBatchItemFailures to return specific failed records:
// Return only failed sequence numbers
return {
batchItemFailures: [
{ itemIdentifier: "123456" },
{ itemIdentifier: "123457" }
]
};
Only failed records are retried; successful ones are checkpointed.
| Setting | Low Latency | High Throughput | Cost Optimized |
|---|---|---|---|
| Batch Size | 1-10 | 1,000-10,000 | 10,000 |
| Batch Window | 0s | 30-60s | 60-300s |
| Parallelization Factor | 10 | 1-5 | 1 |
| Memory | 512MB-1GB | 1-3GB | 512MB |
| Timeout | 30s | 5m | 15m |
Kinesis iterator age metric shows how far behind your processor is from the stream head. If iterator age grows continuously, your Lambda can't keep up with ingestion rate. Solutions: increase parallelization factor, add shards, optimize Lambda code, or increase memory (which also increases CPU).
S3 event notifications trigger Lambda functions when objects are created, modified, or deleted. This pattern is fundamental to many data pipelines—files land in S3, triggering processing workflows.
Common S3 Processing Patterns:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
import { S3Handler, S3Event } from "aws-lambda";import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3";import { createReadStream } from "stream";import { pipeline } from "stream/promises";import * as readline from "readline"; const s3 = new S3Client({}); interface LogEntry { timestamp: string; level: string; message: string; service?: string; traceId?: string;} export const handler: S3Handler = async (event: S3Event) => { for (const record of event.Records) { const bucket = record.s3.bucket.name; const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, " ")); console.log(`Processing s3://${bucket}/${key}`); // Skip if not a log file if (!key.endsWith(".log") && !key.endsWith(".json")) { console.log(`Skipping non-log file: ${key}`); continue; } try { // Get the object const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); // Process line by line (memory efficient for large files) const processedEntries = await processLogFile(response.Body as NodeJS.ReadableStream); // Write processed output const outputKey = key .replace("raw-logs/", "processed-logs/") .replace(".log", ".json"); await s3.send(new PutObjectCommand({ Bucket: process.env.OUTPUT_BUCKET!, Key: outputKey, Body: JSON.stringify(processedEntries, null, 2), ContentType: "application/json" })); console.log(`Processed ${processedEntries.length} entries to ${outputKey}`); } catch (error) { console.error(`Failed to process ${key}:`, error); throw error; } }}; async function processLogFile(stream: NodeJS.ReadableStream): Promise<LogEntry[]> { const entries: LogEntry[] = []; const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { try { // Parse log line (example: structured JSON logs) const entry = JSON.parse(line) as LogEntry; // Enrich and validate if (isValidEntry(entry)) { entries.push({ ...entry, timestamp: normalizeTimestamp(entry.timestamp) }); } } catch { // Skip malformed lines, log for debugging console.warn(`Malformed log line: ${line.substring(0, 100)}`); } } return entries;} function isValidEntry(entry: any): entry is LogEntry { return entry && typeof entry.timestamp === "string" && typeof entry.message === "string";} function normalizeTimestamp(ts: string): string { return new Date(ts).toISOString();}Handling Large Files:
Lambda's /tmp storage is limited to 512MB-10GB (configurable). For files larger than memory, use streaming techniques:
Best Practices for S3 Triggers:
12345678910111213141516171819202122232425262728293031323334353637383940
import { S3Client, SelectObjectContentCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({}); // Query specific records from a large CSV without downloading entire fileasync function queryLargeFile(bucket: string, key: string): Promise<any[]> { const response = await s3.send(new SelectObjectContentCommand({ Bucket: bucket, Key: key, ExpressionType: "SQL", Expression: ` SELECT s.user_id, s.event_type, s.timestamp FROM s3object s WHERE s.event_type = 'purchase' AND CAST(s.amount AS DECIMAL) > 100 `, InputSerialization: { CSV: { FileHeaderInfo: "USE", RecordDelimiter: "\n", FieldDelimiter: "," } }, OutputSerialization: { JSON: { RecordDelimiter: "\n" } } })); // Collect results from stream const results: any[] = []; for await (const event of response.Payload || []) { if (event.Records?.Payload) { const chunk = Buffer.from(event.Records.Payload).toString(); const lines = chunk.split("\n").filter(l => l.trim()); results.push(...lines.map(l => JSON.parse(l))); } } return results;}Instead of triggering Lambda directly from S3, route events through SQS first. S3 → SQS → Lambda provides built-in buffering, dead letter queues, and message visibility timeout. This improves resilience and simplifies failure handling.
ETL (Extract, Transform, Load) pipelines move data between systems while reshaping it. Serverless ETL combines Lambda functions, Step Functions, and managed services like Glue for powerful data transformation.
Pattern 1: Lambda-Native ETL
For moderate data volumes (<500MB per file, <15 minute processing), Lambda can handle the entire ETL process:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
import { ScheduledHandler } from "aws-lambda";import { DynamoDBClient, BatchWriteItemCommand } from "@aws-sdk/client-dynamodb";import { marshall } from "@aws-sdk/util-dynamodb"; const dynamodb = new DynamoDBClient({}); interface ExternalCustomer { id: string; first_name: string; last_name: string; email_address: string; phone_number: string; created_at: string; loyalty_points: number;} interface InternalCustomer { customerId: string; fullName: string; email: string; phone: string; createdAt: string; loyaltyTier: "bronze" | "silver" | "gold" | "platinum"; updatedAt: string;} export const handler: ScheduledHandler = async () => { // EXTRACT: Fetch data from external API const externalData = await extractFromExternalAPI(); console.log(`Extracted ${externalData.length} customers`); // TRANSFORM: Convert to internal format const transformedData = externalData.map(transformCustomer); console.log(`Transformed ${transformedData.length} customers`); // LOAD: Write to DynamoDB in batches await loadToDynamoDB(transformedData); console.log(`Loaded ${transformedData.length} customers to DynamoDB`);}; async function extractFromExternalAPI(): Promise<ExternalCustomer[]> { // Fetch from external CRM API const response = await fetch(`${process.env.CRM_API_URL}/customers`, { headers: { "Authorization": `Bearer ${process.env.CRM_API_KEY}` } }); if (!response.ok) { throw new Error(`CRM API error: ${response.status}`); } return response.json();} function transformCustomer(external: ExternalCustomer): InternalCustomer { return { customerId: external.id, fullName: `${external.first_name} ${external.last_name}`.trim(), email: external.email_address.toLowerCase(), phone: normalizePhone(external.phone_number), createdAt: new Date(external.created_at).toISOString(), loyaltyTier: calculateTier(external.loyalty_points), updatedAt: new Date().toISOString() };} function calculateTier(points: number): InternalCustomer["loyaltyTier"] { if (points >= 10000) return "platinum"; if (points >= 5000) return "gold"; if (points >= 1000) return "silver"; return "bronze";} async function loadToDynamoDB(customers: InternalCustomer[]): Promise<void> { const TABLE = process.env.CUSTOMERS_TABLE!; // Batch write in chunks of 25 (DynamoDB limit) for (let i = 0; i < customers.length; i += 25) { const batch = customers.slice(i, i + 25); await dynamodb.send(new BatchWriteItemCommand({ RequestItems: { [TABLE]: batch.map(customer => ({ PutRequest: { Item: marshall(customer) } })) } })); }}Pattern 2: AWS Glue for Large-Scale ETL
For larger datasets (gigabytes to terabytes), AWS Glue provides managed Spark-based ETL:
Pattern 3: Step Functions Orchestrated ETL
For complex multi-stage ETL with error handling and human approval:
Pattern 4: Kinesis Firehose for Continuous ETL
Firehose provides near-zero operations streaming ETL:
Firehose is ideal when:
| Pattern | Data Size | Complexity | Use Case |
|---|---|---|---|
| Lambda-only | <500MB | Simple transforms | API sync, format conversion |
| Kinesis Firehose | Any (streaming) | Light transforms | Log delivery, metrics ingestion |
| Step Functions | Variable | Complex workflows | Multi-step with approvals |
| AWS Glue | GB-TB | Complex joins/aggregations | Data warehouse loading |
| EMR Serverless | TB+ | Custom Spark/Presto | ML pipelines, complex analytics |
ETL jobs may be retried on failure. Design for idempotency: use upserts instead of inserts, use atomic operations, and include timestamps to identify stale data. This ensures that running the same ETL twice produces correct results.
Serverless data pipelines ultimately serve analytics and business intelligence. AWS provides several serverless analytics services that integrate seamlessly with Lambda-processed data.
Amazon Athena: Serverless SQL on S3
Athena enables SQL queries directly on S3 data without loading it into a database:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from "@aws-sdk/client-athena"; const athena = new AthenaClient({});const DATABASE = "analytics";const OUTPUT_BUCKET = process.env.ATHENA_OUTPUT_BUCKET!; export async function runAthenaQuery(query: string): Promise<any[]> { // Start query execution const startResult = await athena.send(new StartQueryExecutionCommand({ QueryString: query, QueryExecutionContext: { Database: DATABASE }, ResultConfiguration: { OutputLocation: `s3://${OUTPUT_BUCKET}/athena-results/` } })); const executionId = startResult.QueryExecutionId!; // Wait for completion await waitForQuery(executionId); // Get results const results = await athena.send(new GetQueryResultsCommand({ QueryExecutionId: executionId })); // Parse results into objects return parseAthenaResults(results);} async function waitForQuery(executionId: string): Promise<void> { const maxAttempts = 60; for (let attempt = 0; attempt < maxAttempts; attempt++) { const status = await athena.send(new GetQueryExecutionCommand({ QueryExecutionId: executionId })); const state = status.QueryExecution?.Status?.State; if (state === "SUCCEEDED") return; if (state === "FAILED" || state === "CANCELLED") { throw new Error(`Query ${state}: ${status.QueryExecution?.Status?.StateChangeReason}`); } await new Promise(resolve => setTimeout(resolve, 1000)); } throw new Error("Query timed out");} // Example: Generate daily summaryexport async function getDailySummary(date: string): Promise<any> { const query = ` SELECT event_type, COUNT(*) as event_count, COUNT(DISTINCT user_id) as unique_users FROM events WHERE date(timestamp) = date('${date}') GROUP BY event_type ORDER BY event_count DESC `; return runAthenaQuery(query);}Optimizing Athena Queries:
Athena charges by data scanned, so optimization saves money:
dt=2024-01-15/)Amazon Redshift Serverless:
For more demanding analytics workloads:
| Service | Query Latency | Concurrency | Best For |
|---|---|---|---|
| Athena | Seconds-minutes | Moderate | Ad-hoc queries, data exploration |
| Redshift Serverless | Sub-second to seconds | High | Dashboards, BI tools, complex analytics |
| OpenSearch | Milliseconds | Very high | Log search, full-text, time-series |
| Timestream | Milliseconds | High | IoT metrics, time-series data |
| QuickSight | Sub-second (cached) | High | Business dashboards, embedded analytics |
Proper partitioning can reduce Athena costs by 90%+. If you frequently query by date, partition by date. If you frequently query by region , partition by region within date.The output of your data pipeline should write data in a partitioned structure that matches common query patterns.
Data pipelines must be resilient. Data is often irreplaceable, and processing failures can have cascading effects. Robust error handling ensures data isn't lost and processing can recover from failures.
Dead Letter Queues (DLQ) for Data Pipelines:
Every pipeline stage should have a DLQ for failed records:
DLQs should contain enough context to diagnose and replay:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; const sqs = new SQSClient({}); interface DLQMessage { originalEvent: any; processingAttempts: number; lastError: { name: string; message: string; stack?: string; }; failedAt: string; sourceFunction: string; context: { requestId: string; traceId?: string; };} async function sendToDLQ( originalEvent: any, error: Error, context: { requestId: string; traceId?: string }): Promise<void> { const dlqMessage: DLQMessage = { originalEvent, processingAttempts: 1, lastError: { name: error.name, message: error.message, stack: error.stack }, failedAt: new Date().toISOString(), sourceFunction: process.env.AWS_LAMBDA_FUNCTION_NAME!, context }; await sqs.send(new SendMessageCommand({ QueueUrl: process.env.DLQ_URL!, MessageBody: JSON.stringify(dlqMessage), MessageAttributes: { ErrorType: { DataType: "String", StringValue: error.name }, OriginalTimestamp: { DataType: "String", StringValue: originalEvent.timestamp || new Date().toISOString() } } }));}Replay Strategies:
After fixing issues, you need to replay failed or missing data:
Data Validation and Alerting:
Monitor pipeline health with specific metrics:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
import { CloudWatchClient, PutMetricDataCommand } from "@aws-sdk/client-cloudwatch"; const cloudwatch = new CloudWatchClient({}); interface PipelineMetrics { recordsReceived: number; recordsProcessed: number; recordsFailed: number; processingDurationMs: number; bytesProcessed: number;} async function emitPipelineMetrics( pipelineName: string, metrics: PipelineMetrics): Promise<void> { await cloudwatch.send(new PutMetricDataCommand({ Namespace: "DataPipelines", MetricData: [ { MetricName: "RecordsReceived", Value: metrics.recordsReceived, Unit: "Count", Dimensions: [{ Name: "Pipeline", Value: pipelineName }] }, { MetricName: "RecordsProcessed", Value: metrics.recordsProcessed, Unit: "Count", Dimensions: [{ Name: "Pipeline", Value: pipelineName }] }, { MetricName: "RecordsFailed", Value: metrics.recordsFailed, Unit: "Count", Dimensions: [{ Name: "Pipeline", Value: pipelineName }] }, { MetricName: "ProcessingDuration", Value: metrics.processingDurationMs, Unit: "Milliseconds", Dimensions: [{ Name: "Pipeline", Value: pipelineName }] }, { MetricName: "SuccessRate", Value: metrics.recordsReceived > 0 ? (metrics.recordsProcessed / metrics.recordsReceived) * 100 : 100, Unit: "Percent", Dimensions: [{ Name: "Pipeline", Value: pipelineName }] } ] }));}In data pipelines, silent data loss is the worst outcome. Design for visibility: every record should either succeed, land in a DLQ, or generate an alert. Unknown states—where records disappear without trace—must be eliminated through comprehensive error handling and monitoring.
Serverless data processing pipelines combine the flexibility of Lambda with the power of managed streaming and storage services. By understanding streaming versus batch patterns, pipeline architecture, and integration with analytics services, you can build data infrastructure that scales automatically and costs nothing when idle.
Let's consolidate the key takeaways:
What's Next:
With data processing pipelines covered, we'll explore the final serverless pattern in this module: Real-Time File Processing. You'll learn how to build sophisticated file processing workflows—image manipulation, video transcoding, document processing—using serverless components.
You now have comprehensive knowledge of building serverless data processing pipelines. From streaming with Kinesis to batch with S3 triggers, from Lambda transformations to Athena analytics—these patterns enable you to build sophisticated data infrastructure that scales automatically and integrates with the broader AWS analytics ecosystem.