Loading content...
Imagine a factory assembly line: raw materials enter at one end, pass through successive processing stations, and finished products emerge at the other end. Workers at each station don't wait for all materials to be fully processed by previous stations—they work continuously as materials flow through. This is pipelining, and it's the key to efficient query execution.
Without pipelining, query execution would materialize complete intermediate results at each step—writing millions of temporary tuples to disk, only to read them back for the next operator. With pipelining, tuples flow through the operator tree like water through pipes, with minimal buffering and no unnecessary materialization.
By the end of this page, you will understand how database engines execute operator trees using pipelining. You'll master the classic iterator (Volcano) model, understand blocking operators that break pipelines, explore push-based alternatives, and learn how modern vectorized and compiled execution push pipelining to its limits.
Before diving into pipelining mechanisms, let's understand what we're avoiding: full materialization.
Materialized Execution:
In a materialized execution model, each operator:
Query: SELECT name FROM employees WHERE salary > 100000 ORDER BY name;
Materialized execution:
1. Scan employees → write all 1M rows to temp1
2. Filter(salary > 100K) on temp1 → write 10K rows to temp2
3. Project(name) on temp2 → write 10K names to temp3
4. Sort(name) on temp3 → write sorted names to temp4
5. Return temp4
Total I/O: 1M + 1M + 10K + 10K + 10K + 10K + 10K + 10K + 10K = ~2M reads/writes
This is catastrophically inefficient—we're writing and reading data multiple times that could have been processed in a single pass.
Pipelined Execution:
In pipelined execution, operators pass tuples directly to one another without intermediate materialization:
Pipelined execution:
1. Scan next row from employees
2. Apply filter: salary > 100K?
3. If passes, project: extract name
4. Pass to sort buffer (sort is blocking, but buffers only 10K qualifying rows)
5. After scan complete, sort outputs sorted names
Total I/O: 1M (scan) + ~20K (sort, if spills to disk) = ~1M
Benefits:
| Aspect | Materialized | Pipelined |
|---|---|---|
| Intermediate storage | Required for every operator | Minimal (in-memory buffers) |
| Memory usage | Proportional to largest intermediate | Proportional to tuple size |
| First tuple latency | After all operators complete | After first tuple passes all operators |
| I/O operations | O(sum of intermediate sizes) | O(input + output + blocking) |
| Cache efficiency | Poor (data evicted between operators) | Excellent (data in cache) |
| Implementation | Simple | Requires coordination protocol |
Despite its inefficiency, materialization is sometimes necessary:
Modern systems use selective materialization—materialize only when necessary, pipeline everywhere else.
The Iterator Model, pioneered by the Volcano/Graefe system, is the classic approach to pipelined query execution. It's elegant, simple, and remains influential in modern systems.
Core Concept:
Each operator implements a simple interface:
Interface Iterator:
open() → Initialize operator, open child iterators
next() → Return next tuple, or end-of-stream marker
close() → Clean up resources, close child iterators
Operators are composed into a tree. The root operator's next() is called repeatedly, which recursively pulls tuples up through the tree.
Execution Flow:
root.next()child.next() to get input tuplenext() recursivelynext() returns end-of-stream1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
// Base iterator interfaceinterface Iterator: open(): void next(): Tuple | END_OF_STREAM close(): void // Table scan operatorclass TableScan implements Iterator: table: Table cursor: Cursor open(): cursor = table.openScan() next() -> Tuple | END_OF_STREAM: if cursor.hasNext(): return cursor.next() else: return END_OF_STREAM close(): cursor.close() // Filter (selection) operatorclass Filter implements Iterator: child: Iterator predicate: Expression open(): child.open() next() -> Tuple | END_OF_STREAM: while true: tuple = child.next() if tuple == END_OF_STREAM: return END_OF_STREAM if predicate.evaluate(tuple): return tuple // else: tuple filtered out, get next close(): child.close() // Project operatorclass Project implements Iterator: child: Iterator columns: List<Column> open(): child.open() next() -> Tuple | END_OF_STREAM: tuple = child.next() if tuple == END_OF_STREAM: return END_OF_STREAM return extractColumns(tuple, columns) close(): child.close()Composability:
The beauty of the iterator model is composability. Any operator can be connected to any other, as long as schemas match:
Query: SELECT name FROM employees WHERE salary > 100000
Plan tree:
Project(name)
Filter(salary > 100K)
TableScan(employees)
Execution:
project.next() calls filter.next() calls scan.next()
Tuple flows up: scan → filter → project → client
Advantages:
The model is named after the Volcano query processing system developed by Goetz Graefe in the early 1990s. It's also called the 'pull-based' model because parent operators pull tuples from children, or the 'iterator model' because of its next() interface. Most classic RDBMS implementations (PostgreSQL, MySQL) use this model or close variants.
Not all operators can produce output as soon as they receive input. Blocking operators must consume all (or a significant portion) of their input before producing any output. These operators break the pipeline, requiring materialization of their input.
Common Blocking Operators:
1. Sort: Must see all tuples to determine ordering
2. Hash Aggregation: Must build complete hash table before output
3. Hash Join (build side): Must complete hash table before probing
4. Set Operations (INTERSECT, EXCEPT): Must buffer one side
| Operator | Blocking Behavior | Pipeline Status |
|---|---|---|
| Table Scan | Non-blocking | Fully pipelined ✓ |
| Index Scan | Non-blocking | Fully pipelined ✓ |
| Filter (Selection) | Non-blocking | Fully pipelined ✓ |
| Projection | Non-blocking | Fully pipelined ✓ |
| Nested Loop Join | Non-blocking* | Pipelined (outer side) ✓ |
| Sort-Merge Join | Blocking | Pipeline breaker ✗ |
| Hash Join | Partially blocking | Build blocks, probe streams |
| Sort | Fully blocking | Pipeline breaker ✗ |
| Hash Aggregation | Fully blocking | Pipeline breaker ✗ |
| DISTINCT (hash) | Non-blocking | Streams first occurrences ✓ |
| LIMIT | Non-blocking | Fully pipelined ✓ |
| UNION ALL | Non-blocking | Fully pipelined ✓ |
Pipeline Breakers:
When a blocking operator appears in a plan, it creates a pipeline breaker—the point where tuples must be materialized (at least in memory). The query plan naturally divides into pipeline segments separated by breakers:
Query: SELECT dept, SUM(salary) FROM employees
WHERE status='active' GROUP BY dept ORDER BY dept;
Plan:
Sort(dept) ← Pipeline breaker #2
HashAggregate(dept) ← Pipeline breaker #1
Filter(status='active')
TableScan(employees)
Pipeline segments:
Segment 1: Scan → Filter → HashAggregate (build)
[Materialization: hash table]
Segment 2: HashAggregate (output) → Sort (input)
[Materialization: sort buffer]
Segment 3: Sort (output) → Client
Pipeline breakers are where memory management becomes critical. A hash aggregation with millions of groups may not fit in memory, requiring spilling strategies. Query optimizers estimate intermediate sizes to allocate memory budgets and choose between in-memory and external algorithms. Memory pressure at one breaker can force spilling, dramatically increasing I/O cost.
The iterator model is a pull-based approach: parents pull tuples from children. An alternative is the push-based model where children push tuples to parents.
Pull-Based (Iterator/Volcano):
Control flow: Top → Down (next() calls)
Data flow: Bottom → Up (tuples returned)
parent.next():
tuple = child.next() // pull from child
process(tuple)
return result
Push-Based:
Control flow: Bottom → Up (produce() calls)
Data flow: Bottom → Up (tuples passed)
child.produce():
while hasNext():
tuple = getNextTuple()
parent.consume(tuple) // push to parent
123456789101112131415161718192021222324252627282930313233343536373839
// Push-based operator interfaceinterface PushOperator: produce(): void // Start producing tuples consume(tuple): void // Receive tuple from child // Table scan in push modelclass TableScanPush implements PushOperator: table: Table parent: PushOperator produce(): for page in table.pages: for tuple in page.tuples: parent.consume(tuple) // Push to parent parent.consume(END_OF_STREAM) // Filter in push model class FilterPush implements PushOperator: predicate: Expression parent: PushOperator child: PushOperator produce(): child.produce() // Tell child to start pushing consume(tuple): if tuple == END_OF_STREAM: parent.consume(END_OF_STREAM) elif predicate.evaluate(tuple): parent.consume(tuple) // Push passing tuples // else: silently drop filtered tuples // Execution starts from bottomscan.produce() // Initiates data flow // Push model advantages:// - Fewer function calls (no return, just forward call)// - Better inlining opportunities for JIT// - Natural for producer-consumer parallelismModern systems often use hybrid models:
For example, within a scan-filter-project pipeline, push gives tight loops. But pull is used to coordinate with a blocking sort downstream.
The traditional iterator model processes one tuple at a time. This is elegant but incurs significant overhead:
Vectorized execution addresses this by processing batches of tuples (vectors) at a time, typically 1,000-10,000 tuples per batch.
Key Changes:
1234567891011121314151617181920212223242526272829303132333435363738
// Vectorized iterator interfaceinterface VectorizedIterator: next() -> Batch | END_OF_STREAM // Returns batch of tuples class Batch: size: int // Number of tuples in batch columns: Column[] // Column-oriented data selection: BitVector // Which rows are active (after filter) // Vectorized filterclass VectorizedFilter implements VectorizedIterator: child: VectorizedIterator predicate: CompiledExpression next() -> Batch: batch = child.next() if batch == END_OF_STREAM: return END_OF_STREAM // Evaluate predicate on entire column at once using SIMD // Example: salary > 100000 // Input: salary = [50K, 120K, 80K, 150K, 90K, 200K, 70K, 110K] // Output: selection = [0, 1, 0, 1, 0, 1, 0, 1] batch.selection = simd_compare_gt(batch.columns['salary'], 100000) return batch // SIMD comparison (conceptual)simd_compare_gt(column, threshold) -> BitVector: result = new BitVector(column.length) // Process 8 values at a time with AVX-512 for i = 0 to column.length step 8: vec = simd_load_8_int64(column, i) mask = simd_cmp_gt_8_int64(vec, threshold_vec) simd_store_8_bits(result, i, mask) return resultSelection Vectors:
Instead of physically removing filtered tuples (expensive copying), vectorized engines use selection vectors—bitmaps indicating which tuples are "active". Downstream operators only process selected tuples.
Benefits of Vectorized Execution:
Real-World Impact:
Systems like DuckDB, ClickHouse, and Velox achieve 10-100x speedup over tuple-at-a-time processing for analytical workloads through vectorization.
Batch size is a critical tuning parameter:
Optimal batch size is typically 1,000-4,000 tuples, depending on tuple width and cache hierarchy. Some systems adaptively tune batch size based on query characteristics.
The ultimate form of query execution optimization is query compilation: generating specialized machine code for each query, eliminating interpretation overhead entirely.
The Problem with Interpretation:
Even vectorized execution involves interpretation:
Compiled Execution:
Instead of interpreting operations, compile the entire query into native code:
Query: SELECT name FROM employees WHERE salary > 100000
Compiled to machine code equivalent of:
for (page : employees.pages) {
for (i = 0; i < page.count; i++) {
if (page.salary[i] > 100000) {
output(page.name[i]);
}
}
}
No virtual function calls, no operator abstraction—just tight, optimized loops.
12345678910111213141516171819202122232425262728293031323334353637
// Query compilation pipelineCOMPILE_QUERY(query_plan): // Step 1: Generate intermediate representation (IR) ir = generate_LLVM_IR(query_plan) // Step 2: Optimize IR optimized_ir = llvm_optimize(ir) // Step 3: Compile to machine code machine_code = llvm_compile(optimized_ir) // Step 4: Return callable function return machine_code // Example: Compiling a scan-filter-project pipelinegenerate_for_pipeline(scan, filter, project): emit_code(""" // Outer loop: iterate pages for (page_idx = 0; page_idx < num_pages; page_idx++) { page = load_page(table, page_idx); // Inner loop: iterate tuples in page for (i = 0; i < page.tuple_count; i++) { // Inlined filter evaluation salary = page.salary_column[i]; if (salary > 100000) { // Constant folded // Inlined projection result = page.name_column[i]; output_buffer.append(result); } } } """) // The generated code is as fast as hand-written C// with full optimization: inlining, constant folding,// register allocation, vectorization by the C compilerCompilation Strategies:
1. Full Query Compilation:
2. Operator-at-a-time Compilation:
3. Hybrid Vectorized + Compiled:
Compilation Overhead:
Query compilation takes time (10ms-100ms). For short-running queries, compilation overhead may exceed execution time. Solutions:
LLVM (Low-Level Virtual Machine) made query compilation practical by providing:
Before LLVM, database teams would have to implement their own code generators—a massive undertaking. Now, databases like PostgreSQL can add JIT compilation with moderate engineering effort.
Modern multi-core CPUs offer massive parallelism, but exploiting it in query execution is challenging. Pipeline parallelism and morsel-driven execution are techniques that scale query execution across many cores.
Traditional Parallelism Approaches:
1. Inter-Query Parallelism:
2. Inter-Operator Parallelism:
3. Intra-Operator Parallelism:
Morsel-Driven Parallelism:
The HyPer database introduced morsel-driven execution—a sophisticated approach that achieves excellent parallel scalability:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
// Morsel-driven parallel executionMORSEL_DRIVEN_EXECUTE(query_plan, num_workers): // Divide input into small chunks called "morsels" (e.g., 100K tuples each) morsels = partition_input(query_plan.input, MORSEL_SIZE) morsel_queue = create_work_queue(morsels) // Create shared pipeline state (e.g., hash table for join) shared_state = create_shared_state(query_plan) // Launch worker threads results = [] parallel_for worker_id in range(num_workers): while true: morsel = morsel_queue.try_dequeue() if morsel == null: break // No more work // Process morsel through entire pipeline local_result = process_pipeline(morsel, query_plan, shared_state) // Merge local results synchronized: results.append(local_result) return merge(results) // Key insight: Each worker processes complete pipeline on morsel// No inter-thread communication during morsel processing// Thread-local state + lock-free data structures minimize contention // Hash join with morsel-driven execution:BUILD_PHASE(build_morsels, shared_hash_table): parallel_for morsel in build_morsels: for tuple in morsel: bucket = hash(tuple.key) % num_buckets lock(bucket) // Fine-grained locking bucket.append(tuple) unlock(bucket) PROBE_PHASE(probe_morsels, shared_hash_table): parallel_for morsel in probe_morsels: local_results = [] for tuple in morsel: bucket = hash(tuple.key) % num_buckets // No lock needed during probe (read-only) for match in bucket: if match.key == tuple.key: local_results.append(join(tuple, match)) output(local_results)Key Properties of Morsel-Driven Execution:
Full Pipeline Processing: Each morsel is processed through the entire pipeline, maximizing cache locality.
Elasticity: Workers can be dynamically added/removed; morsel queue provides natural load balancing.
NUMA-Awareness: Morsels can be assigned to workers on the same NUMA node as the data.
Minimal Synchronization: Most work is thread-local; shared state uses fine-grained locking.
Tail Latency Control: Small morsel size prevents single slow morsel from blocking completion.
Scaling Results:
Well-implemented morsel-driven engines achieve near-linear scaling up to core counts in the hundreds, processing billions of rows per second.
On multi-socket servers, Non-Uniform Memory Access (NUMA) means memory access time depends on which CPU accesses which memory region. Cross-socket memory access can be 2-3x slower. NUMA-aware query execution:
Ignoring NUMA can cost 30-50% performance on large servers.
Pipelining is the technique that transforms database query execution from sequential materialization into efficient, streaming data flow. Let's consolidate the key concepts:
Module Complete:
This concludes our exploration of physical operators. From basic access methods through join algorithms to aggregation, set operations, sorting, and pipelining, you now understand how database systems transform abstract query plans into efficient physical execution. These techniques represent decades of database research and engineering, enabling the remarkable performance of modern database systems.
Congratulations! You've completed the module on Other Physical Operators. You now understand the full executor toolkit: aggregation, duplicate elimination, set operations, sorting, and the pipelining techniques that make it all work together efficiently. This knowledge is essential for understanding query performance, reading execution plans, and designing efficient database applications.