Loading content...
Henry Ford revolutionized manufacturing with the assembly line: rather than one worker building an entire car, workers specialize—one installs the engine, another the wheels, another the doors. Each station works concurrently on different cars, maximizing throughput while each car still progresses through all steps in order.
This industrial insight translates directly to concurrent programming as the pipeline pattern. Instead of processing each data item sequentially through all stages, we have multiple stages executing concurrently, each handling one step of the processing. As one stage finishes with item N, it passes the result downstream and immediately begins processing item N+1.
By the end of this page, you will understand the pipeline pattern deeply: its structure and semantics, how it achieves parallelism through staging, the critical role of buffers between stages, throughput and latency analysis, implementation patterns across languages, and where pipelines appear in real systems from compilers to video processing.
The pipeline pattern organizes concurrent processing as a sequence of stages connected by channels. Each stage performs a specific transformation on data items, passing results to the next stage.
Formal Definition:
A pipeline consists of:
Key Characteristics:
| Component | Responsibility | Concurrency Property |
|---|---|---|
| Stage | Perform one step of processing | Runs in dedicated thread/goroutine |
| Channel | Buffer data between stages | Thread-safe queue (producer-consumer) |
| Source | Generate or receive input data | May be external (I/O) or internal |
| Sink | Consume final output | May be external (I/O) or aggregation |
The Pipeline Invariant:
Each data item passes through all stages in order, but multiple items are processed concurrently across different stages.
This is the key insight: while item ordering is preserved (item 1 exits before item 2), stages process different items simultaneously. With N stages, up to N items can be "in flight" at once.
Comparison with Sequential Processing:
Pipelines improve THROUGHPUT (items per second), not LATENCY (time for one item). Each item still passes through all stages sequentially. But after the pipeline fills, we complete one item per slowest-stage-time instead of one item per total-time. For large workloads, this is a massive improvement.
Understanding pipeline performance requires careful analysis of throughput, latency, and the impact of stage imbalance.
Key Performance Metrics:
Latency (time for one item to traverse the entire pipeline):
Latency = Σ(stage_time[i]) + Σ(queue_wait_time[i])
For a pipeline with stages taking t₁, t₂, t₃ time:
Throughput (items processed per unit time):
Throughput = 1 / max(stage_time[i]) // Bottleneck determines throughput
The slowest stage (bottleneck) determines overall throughput. Faster stages must wait.
12345678910111213141516171819202122232425262728
# Example: 3-stage pipeline # Stage processing times (seconds per item)stage_1_time = 0.1 # Parse: 100msstage_2_time = 0.3 # Transform: 300ms (BOTTLENECK)stage_3_time = 0.1 # Validate: 100ms # Sequential processingsequential_time_per_item = stage_1_time + stage_2_time + stage_3_time# = 0.5 seconds per item# For 1000 items: 500 seconds total # Pipeline processingpipeline_latency = stage_1_time + stage_2_time + stage_3_time# = 0.5 seconds (same as sequential for one item) pipeline_throughput = 1 / max(stage_1_time, stage_2_time, stage_3_time)# = 1 / 0.3 = 3.33 items per second# For 1000 items: ~300 seconds + 0.5 seconds startup # Speedup = sequential_time / pipeline_time# = 500 / 300.5 ≈ 1.66x improvement # With balanced stages (all 0.167s):balanced_throughput = 1 / 0.167# = 6 items per second# For 1000 items: ~167 seconds# Speedup = 500 / 167 ≈ 3x improvement (equals number of stages!)The Bottleneck Problem:
The slowest stage determines throughput. Faster stages spend time idle, waiting for the bottleneck to accept output or provide input.
Example with imbalanced stages:
| Stage | Time/Item | Utilization | Idle Time/Cycle |
|---|---|---|---|
| Stage 1 (Parse) | 100ms | 33% | 200ms waiting |
| Stage 2 (Transform) | 300ms | 100% | 0ms (bottleneck) |
| Stage 3 (Validate) | 100ms | 33% | 200ms waiting |
Strategies to Address Bottlenecks:
With 3 parallel instances of Stage 2 (300ms / 3 = 100ms effective):
Maximum speedup from pipelining = number of stages. A 4-stage pipeline can achieve at most 4x throughput vs sequential. But this only happens with perfectly balanced stages. In practice, imbalance limits gains, and the slowest stage dominates performance.
The channels between pipeline stages are bounded buffers. Their sizing profoundly affects pipeline behavior, memory usage, and latency characteristics.
Buffer Sizing Considerations:
| Buffer Size | Memory | Latency | Throughput | Behavior |
|---|---|---|---|---|
| 0 (synchronous) | Minimal | Lower (no queueing) | Limited by slowest combo | Stages synchronize on every handoff |
| 1 | Low | Low | Allows some decoupling | Minimal buffering |
| N (moderate) | Moderate | Moderate | Absorbs burst variations | Good balance |
| ∞ (unbounded) | Unbounded | Potentially huge | Producers never block | Risk of memory exhaustion |
Back-Pressure Mechanism:
Bounded buffers create back-pressure—when a downstream stage is slow, its input buffer fills, causing the upstream stage to block. This propagates upstream, eventually throttling the source.
Back-pressure is essential for:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
package main import "fmt" func main() { // Buffered channels create back-pressure stage1_to_2 := make(chan int, 5) // Buffer size 5 stage2_to_3 := make(chan int, 5) done := make(chan bool) // Stage 1: Fast producer (10ms per item) go func() { for i := 0; i < 100; i++ { stage1_to_2 <- i // Blocks if buffer full fmt.Printf("Stage 1: produced %d", i) } close(stage1_to_2) }() // Stage 2: Slow processor (100ms per item) - BOTTLENECK go func() { for item := range stage1_to_2 { time.Sleep(100 * time.Millisecond) // Slow processing stage2_to_3 <- item * 2 } close(stage2_to_3) }() // Stage 3: Fast consumer (10ms per item) go func() { for item := range stage2_to_3 { fmt.Printf("Stage 3: consumed %d", item) } done <- true }() <-done // Without back-pressure (unbounded buffers): // Stage 1 would produce all 100 items immediately // Memory usage spikes, items wait ages in buffer // With back-pressure (bounded buffers): // Stage 1 blocks after producing ~5 items // Production paces with bottleneck consumption // Steady memory usage, predictable latency}Buffer Sizing Guidelines:
Start Small: Begin with buffer size 1 or a few items. Add capacity only if profiling shows blocking.
Match to Variance: Larger buffers help when stage processing times are variable. They absorb bursts without blocking.
Consider Memory Footprint: If items are large (images, documents), even small buffer counts consume significant memory.
Profile Under Load: Optimal buffer size depends on actual workload characteristics. Measure contention and throughput.
Prefer Bounded: Almost always use bounded buffers. Unbounded buffers hide backpressure problems that will eventually cause failures.
Unbounded buffers are seductive—they never block producers. But they hide problems. If producers outpace consumers, memory grows without bound. Eventually, the system dies from memory exhaustion or unbounded latency. Bounded buffers force you to confront and solve rate mismatches.
Go's channels and goroutines are ideally suited for pipeline implementation. The language's concurrency primitives map directly to pipeline concepts.
Complete Pipeline Example:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
package main import ( "fmt" "strings") // Pipeline stage type: takes input channel, returns output channeltype Stage func(<-chan string) <-chan string // Stage 1: Read lines (source)func source(lines []string) <-chan string { out := make(chan string) go func() { for _, line := range lines { out <- line } close(out) // Signal completion }() return out} // Stage 2: Trim whitespacefunc trimStage(in <-chan string) <-chan string { out := make(chan string) go func() { for line := range in { out <- strings.TrimSpace(line) } close(out) }() return out} // Stage 3: Convert to uppercasefunc upperStage(in <-chan string) <-chan string { out := make(chan string) go func() { for line := range in { out <- strings.ToUpper(line) } close(out) }() return out} // Stage 4: Filter empty linesfunc filterEmptyStage(in <-chan string) <-chan string { out := make(chan string) go func() { for line := range in { if len(line) > 0 { out <- line } } close(out) }() return out} // Stage 5: Add line numbersfunc numberStage(in <-chan string) <-chan string { out := make(chan string) go func() { n := 1 for line := range in { out <- fmt.Sprintf("%d: %s", n, line) n++ } close(out) }() return out} // Compose pipeline from stagesfunc pipeline(input []string, stages ...Stage) <-chan string { ch := source(input) for _, stage := range stages { ch = stage(ch) } return ch} func main() { input := []string{ " hello world ", "", " foo bar ", " ", "baz qux", } // Build pipeline: source -> trim -> upper -> filter -> number results := pipeline(input, trimStage, upperStage, filterEmptyStage, numberStage, ) // Consume results for result := range results { fmt.Println(result) } // Output: // 1: HELLO WORLD // 2: FOO BAR // 3: BAZ QUX}Key Go Pipeline Idioms:
Channel Closure Propagates: When a stage closes its output channel, downstream stages' range loops terminate, cascading shutdown through the pipeline.
Each Stage Owns Its Output Channel: The stage that creates a channel is responsible for closing it.
Composable Design: Stages have uniform signature (channel in → channel out), enabling flexible composition.
No Explicit Thread Management: Goroutines are lightweight; spawn one per stage without concern.
A common extension is fan-out (one stage feeding multiple parallel workers) and fan-in (multiple sources merging into one channel). This enables parallel processing of a bottleneck stage while maintaining the pipeline structure. Go's select statement makes fan-in straightforward.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
// Fan-out: split work across multiple workersfunc fanOut(in <-chan int, numWorkers int) []<-chan int { workers := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workers[i] = worker(in) // Each worker reads from same input } return workers} // Worker processes itemsfunc worker(in <-chan int) <-chan int { out := make(chan int) go func() { for item := range in { // Heavy processing... out <- process(item) } close(out) }() return out} // Fan-in: merge multiple channels into onefunc fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) go func(c <-chan int) { defer wg.Done() for item := range c { out <- item } }(ch) } go func() { wg.Wait() close(out) }() return out} // Usage: parallelize slow stagefunc parallelPipeline(in <-chan int) <-chan int { workers := fanOut(in, 4) // 4 parallel workers merged := fanIn(workers...) // Merge results return merged}While Go's channels are particularly well-suited for pipelines, the pattern is implementable in any language with concurrency support.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
import queueimport threadingfrom typing import Callable, Iterator, Any class PipelineStage: """A pipeline stage that processes items from input queue to output queue.""" def __init__(self, func: Callable, input_q: queue.Queue, output_q: queue.Queue): self.func = func self.input_q = input_q self.output_q = output_q self.thread = threading.Thread(target=self._run) def _run(self): while True: item = self.input_q.get() if item is None: # Sentinel for shutdown self.output_q.put(None) break result = self.func(item) if result is not None: self.output_q.put(result) def start(self): self.thread.start() def join(self): self.thread.join() def create_pipeline(source: Iterator, *stages: Callable, buffer_size: int = 10): """Create a pipeline from a source and series of stage functions.""" queues = [queue.Queue(maxsize=buffer_size) for _ in range(len(stages) + 1)] stage_objects = [] # Create stages for i, func in enumerate(stages): stage = PipelineStage(func, queues[i], queues[i + 1]) stage_objects.append(stage) stage.start() # Feed source def feed_source(): for item in source: queues[0].put(item) queues[0].put(None) # Sentinel feed_thread = threading.Thread(target=feed_source) feed_thread.start() # Return output iterator def output_iterator(): while True: item = queues[-1].get() if item is None: break yield item for stage in stage_objects: stage.join() feed_thread.join() return output_iterator() # Example usagedef trim(s): return s.strip()def upper(s): return s.upper()def filter_empty(s): return s if len(s) > 0 else None lines = [" hello ", "", " world ", " ", "foo"]results = create_pipeline(iter(lines), trim, upper, filter_empty)for result in results: print(result) # HELLO, WORLD, FOOProduction pipelines must handle errors gracefully and support cancellation. Naively ignoring these concerns leads to resource leaks and hung goroutines.
Error Handling Strategies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
package main import ( "context" "fmt" "strconv") // Result type for error-aware pipelinetype Result struct { Value int Err error} // Stage with error handling and cancellationfunc parseNumbers(ctx context.Context, in <-chan string) <-chan Result { out := make(chan Result) go func() { defer close(out) for { select { case <-ctx.Done(): return // Pipeline cancelled case s, ok := <-in: if !ok { return // Input exhausted } n, err := strconv.Atoi(s) select { case out <- Result{Value: n, Err: err}: case <-ctx.Done(): return } } } }() return out} func doubleNumbers(ctx context.Context, in <-chan Result) <-chan Result { out := make(chan Result) go func() { defer close(out) for { select { case <-ctx.Done(): return case r, ok := <-in: if !ok { return } if r.Err != nil { // Propagate error out <- r continue } // Process valid value out <- Result{Value: r.Value * 2, Err: nil} } } }() return out} func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Source source := make(chan string) go func() { defer close(source) for _, s := range []string{"1", "2", "three", "4", "5"} { select { case source <- s: case <-ctx.Done(): return } } }() // Pipeline parsed := parseNumbers(ctx, source) doubled := doubleNumbers(ctx, parsed) // Consume errorCount := 0 for r := range doubled { if r.Err != nil { fmt.Printf("Error: %v", r.Err) errorCount++ if errorCount > 2 { cancel() // Too many errors, cancel pipeline } } else { fmt.Printf("Result: %d", r.Value) } }}Go's context package is the idiomatic way to propagate cancellation through pipelines. Every stage checks ctx.Done() in its select. When context is cancelled, all stages return promptly, closing their output channels and allowing goroutines to be garbage collected.
The pipeline pattern appears throughout computing systems. Recognizing these applications helps you apply the pattern appropriately.
cat file | grep pattern | sort | uniq. Each command is a stage; pipes are channels. The original pipeline pattern implementation!The CPU itself uses pipelining! Fetch → Decode → Execute → Memory → Writeback. Multiple instructions are in-flight at different stages, dramatically increasing throughput. Pipeline stalls (hazards) occur when dependencies force waiting—exactly like software pipeline bottlenecks.
The pipeline pattern is a powerful approach to structuring concurrent computation. It improves throughput by processing multiple items simultaneously across stages while maintaining orderly, comprehensible code. Let's consolidate the key insights:
What's Next:
Having explored sequential processing patterns (producer-consumer, reader-writer, pipeline), we'll next examine the map-reduce pattern—a powerful approach for parallel data processing that partitions work, processes in parallel, and aggregates results. This pattern underlies distributed computing frameworks from Hadoop to modern stream processing systems.
You now deeply understand the pipeline pattern—from its assembly-line inspiration through performance analysis to production implementation. Pipelines are essential for high-throughput data processing, appearing everywhere from shell commands to video encoding. Next, we'll explore map-reduce for parallel aggregation.