This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Scan Execution and Optimization
Loading…
Scan Execution and Optimization
Relevant source files
- llkv-column-map/Cargo.toml
- llkv-column-map/src/gather.rs
- llkv-column-map/src/store/projection.rs
- llkv-csv/src/writer.rs
- llkv-sql/tests/pager_io_tests.rs
- llkv-table/examples/direct_comparison.rs
- llkv-table/examples/performance_benchmark.rs
- llkv-table/examples/test_streaming.rs
- llkv-table/src/table.rs
Purpose and Scope
This document details the table scan execution flow in LLKV, covering how queries retrieve data from columnar storage through optimized fast paths and streaming strategies. The scan subsystem bridges query planning and physical storage, coordinating row ID collection, data gathering, and result materialization while minimizing memory allocations.
For query planning that produces scan operations, see TablePlanner and TableExecutor. For predicate evaluation during filtering, see Filter Evaluation. For the underlying column storage mechanisms, see Column Storage and ColumnStore.
Scan Entry Points
The Table struct provides two primary scan entry points that execute projections with filtering:
| Method | Signature | Purpose |
|---|---|---|
scan_stream | &self, projections: I, filter_expr: &Expr, options: ScanStreamOptions, on_batch: F | Accepts flexible projection inputs, converts to ScanProjection |
scan_stream_with_exprs | &self, projections: &[ScanProjection], filter_expr: &Expr, options: ScanStreamOptions, on_batch: F | Direct scan with pre-built ScanProjection values |
Both methods stream results as Arrow RecordBatches to a callback function, avoiding full materialization in memory. The ScanStreamOptions struct controls scan behavior:
The on_batch callback receives each batch as it is produced, allowing streaming processing without memory accumulation.
Sources: llkv-table/src/table.rs:447-488 llkv-scan/src/lib.rs
Two-Phase Scan Execution Model
Two-Phase Architecture
LLKV scans execute in two distinct phases to minimize data movement:
Phase 1: Row ID Collection - Evaluates filter predicates against stored data to produce a set of matching row IDs. This phase:
- Visits column chunks using the visitor pattern
- Applies predicates via metadata-based pruning (min/max values)
- Collects matching row IDs into a
Treemap(bitmap) orVec<RowId> - Avoids loading actual column values during filtering
Phase 2: Data Gathering - Retrieves column values for the matching row IDs identified in Phase 1. This phase:
- Loads only chunks containing matched rows
- Assembles Arrow arrays via type-specific gathering functions
- Streams results in fixed-size batches (default 4096 rows)
- Supports two execution paths: fast streaming or full materialization
This separation ensures predicates are evaluated without loading unnecessary column data, significantly reducing I/O and memory when filters are selective.
Sources: llkv-scan/src/execute.rs llkv-table/src/table.rs:479-488 llkv-scan/src/row_stream.rs
Fast Path Optimizations
The scan executor detects specific patterns that enable zero-copy streaming optimizations:
Fast Path Criteria
graph TD
ScanRequest["Scan Request"]
CheckCols{"Single\nProjection?"}
CheckFilter{"Unbounded\nFilter?"}
CheckNulls{"include_nulls\n= false?"}
CheckOrder{"No ORDER BY?"}
CheckRowFilter{"No row_id_filter?"}
FastPath["✓ Fast Path\nDirect ScanBuilder streaming"]
SlowPath["✗ Materialization Path\nMultiGatherContext gathering"]
ScanRequest --> CheckCols
CheckCols -->|Yes| CheckFilter
CheckCols -->|No| SlowPath
CheckFilter -->|Yes| CheckNulls
CheckFilter -->|No| SlowPath
CheckNulls -->|Yes| CheckOrder
CheckNulls -->|No| SlowPath
CheckOrder -->|Yes| CheckRowFilter
CheckOrder -->|No| SlowPath
CheckRowFilter -->|Yes| FastPath
CheckRowFilter -->|No| SlowPath
The fast path activates when all conditions are met:
| Condition | Why Required |
|---|---|
| Single projection | Multi-column requires coordination across field plans |
| Unbounded filter | Complex filters require row-by-row evaluation |
include_nulls = false | Null handling requires inspection of each row |
| No ORDER BY | Sorting requires full materialization |
| No row_id_filter | Custom filters require additional filtering logic |
Performance Characteristics
When the fast path activates, scans execute directly via ScanBuilder against the ColumnStore, achieving throughputs exceeding 100M rows/second for simple numeric columns. The materialization path adds 2-5x overhead due to:
- Context preparation and cache management
- Chunk coordinate computation across multiple fields
- Arrow array assembly and concatenation
Test examples demonstrate the performance difference:
- Single-column unbounded scan: ~1-2ms for 1M rows
- Multi-column scan: ~5-10ms for 1M rows
- Complex filters: ~10-20ms for 1M rows
Sources: llkv-scan/src/execute.rs llkv-table/examples/test_streaming.rs:83-110 llkv-table/examples/performance_benchmark.rs:126-151
Row ID Collection Infrastructure
RowIdScanCollector
The RowIdScanCollector accumulates matching row IDs into a Treemap bitmap during Phase 1 filtering:
It implements multiple visitor traits to handle different scan modes:
PrimitiveVisitor- Ignores value chunks (row IDs not yet available)PrimitiveSortedVisitor- Ignores sorted runsPrimitiveWithRowIdsVisitor- Collects row IDs from chunks with row ID arraysPrimitiveSortedWithRowIdsVisitor- Collects row IDs from sorted runs
The collector extracts row IDs from UInt64Array instances passed alongside value arrays, adding them to the bitmap for efficient deduplication and range queries.
Sources: llkv-table/src/table.rs:805-858
RowIdChunkEmitter
For streaming scenarios, RowIdChunkEmitter emits row IDs in fixed-size chunks to a callback rather than accumulating them:
This approach:
- Minimizes memory usage by processing row IDs in batches
- Enables early termination on errors
- Supports reverse iteration for descending scans
- Optimizes for cases where row IDs arrive in contiguous chunks
The emitter implements the same visitor traits as the collector but invokes the callback when the buffer reaches chunk_size, then clears the buffer for the next batch.
Sources: llkv-table/src/table.rs:860-1017
Visitor Pattern Integration
Both collectors integrate with the column-map scan infrastructure via visitor traits. During a scan:
ScanBuilderiterates over column chunks- For each chunk, it detects whether row IDs are available
- It invokes the appropriate visitor method based on chunk characteristics:
_chunk_with_ridsfor unsorted chunks with row IDs_run_with_ridsfor sorted runs with row IDs
- The collector/emitter extracts row IDs and accumulates/emits them
This pattern decouples row ID collection from the storage format, allowing specialized handling for sorted vs. unsorted data.
Sources: llkv-column-map/src/scan/mod.rs llkv-table/src/table.rs:832-858
classDiagram
class MultiGatherContext {+field_infos: FieldInfos\n+plans: FieldPlans\n+chunk_cache: FxHashMap~PhysicalKey, ArrayRef~\n+row_index: FxHashMap~u64, usize~\n+row_scratch: Vec~Option~\n+builders: Vec~ColumnOutputBuilder~\n+epoch: u64\n+new() MultiGatherContext\n+update_field_infos_and_plans()\n+matches_field_ids() bool\n+schema_for_nullability() Schema\n+chunk_span_for_row() Option}
class FieldPlan {+dtype: DataType\n+value_metas: Vec~ChunkMetadata~\n+row_metas: Vec~ChunkMetadata~\n+candidate_indices: Vec~usize~\n+avg_value_bytes_per_row: f64}
class ColumnOutputBuilder {<<enum>>\nUtf8\nBinary\nBoolean\nDecimal128\nPrimitive\nPassthrough}
class GatherContextPool {+inner: Mutex~FxHashMap~\n+max_per_key: usize\n+acquire() GatherContextGuard}
MultiGatherContext --> FieldPlan: contains
MultiGatherContext --> ColumnOutputBuilder: contains
GatherContextPool --> MultiGatherContext: pools
Data Gathering Infrastructure
MultiGatherContext
The MultiGatherContext coordinates multi-column data gathering during Phase 2:
Context Fields
| Field | Purpose |
|---|---|
field_infos | Maps LogicalFieldId to DataType for each projected column |
plans | Contains FieldPlan with chunk metadata for each column |
chunk_cache | Caches decoded Arrow arrays to avoid redundant deserialization |
row_index | Maps row IDs to output indices during gathering |
row_scratch | Scratch buffer mapping output positions to chunk coordinates |
builders | Type-specific Arrow array builders for each output column |
epoch | Tracks storage version for cache invalidation |
Gathering Flow
- Context preparation: Load column descriptors and chunk metadata for all fields
- Candidate selection: Identify chunks containing requested row IDs based on min/max values
- Chunk loading: Batch-fetch identified chunks from storage
- Row mapping: Build index from row ID to output position
- Value extraction: For each output position, locate value in cached chunk and append to builder
- Array finalization: Convert builders to Arrow arrays and assemble
RecordBatch
Sources: llkv-column-map/src/store/projection.rs:460-649
GatherContextPool
The GatherContextPool reuses MultiGatherContext instances across scans to amortize setup costs:
Pooling Strategy
- Contexts are keyed by the list of projected
LogicalFieldIdvalues - Up to
max_per_keycontexts are retained per projection set (default: 4) - Contexts are reset (chunk cache cleared) when returned to the pool
- Epoch tracking invalidates cached chunk arrays when storage changes
When a scan requests a context:
- Pool checks for matching contexts by field ID list
- If found, pops a context from the pool
- If not found or pool empty, creates a new context
- Context is wrapped in
GatherContextGuardfor automatic return to pool
This pattern is critical for high-frequency scans where context setup (descriptor loading, metadata parsing) dominates execution time. Pooling reduces per-scan overhead from ~10ms to <1ms for repeated projections.
Sources: llkv-column-map/src/store/projection.rs:651-720
Type-Specific Gathering Functions
LLKV provides specialized gathering functions for each Arrow type to optimize performance:
| Function | Arrow Type | Optimization |
|---|---|---|
gather_rows_from_chunks_string | GenericStringArray<O> | Direct slice reuse for contiguous sorted rows |
gather_rows_from_chunks_binary | GenericBinaryArray<O> | Same as string with binary validation |
gather_rows_from_chunks_bool | BooleanArray | Bitmap-based gathering |
gather_rows_from_chunks_decimal128 | Decimal128Array | Precision-preserving assembly |
gather_rows_from_chunks_struct | StructArray | Recursive gathering per child field |
gather_rows_from_chunks | PrimitiveArray<T> | Fast path for single-chunk contiguous access |
Each function implements a common pattern:
Fast Path Detection
All gathering functions check for a fast path where:
- Only one chunk contains the requested row IDs
- Row IDs are sorted ascending
- Row IDs form a contiguous range in the chunk
When detected, the function returns a slice of the cached chunk array via Arrow’s zero-copy slice() method, avoiding builder allocation and value copying entirely.
Sources: llkv-column-map/src/gather.rs:283-403 llkv-column-map/src/gather.rs:405-526 llkv-column-map/src/gather.rs:529-638
flowchart LR
RowIds["Row ID Source\nBitmap or Vec"]
Chunk1["Batch 1\n0..4096"]
Chunk2["Batch 2\n4096..8192"]
ChunkN["Batch N\n(N-1)*4096..M"]
Gather1["gather_rows()"]
Gather2["gather_rows()"]
GatherN["gather_rows()"]
Batch1["RecordBatch 1"]
Batch2["RecordBatch 2"]
BatchN["RecordBatch N"]
Callback["on_batch\ncallback"]
RowIds -->|split into chunks| Chunk1
RowIds --> Chunk2
RowIds --> ChunkN
Chunk1 --> Gather1
Chunk2 --> Gather2
ChunkN --> GatherN
Gather1 --> Batch1
Gather2 --> Batch2
GatherN --> BatchN
Batch1 --> Callback
Batch2 --> Callback
BatchN --> Callback
Streaming Strategies
Batch-Based Streaming
LLKV streams scan results in fixed-size batches to balance memory usage and callback overhead:
Batch Size Configuration
The default batch size is 4096 rows, defined in llkv-table/src/constants.rs:
This value balances:
- Memory footprint : Smaller batches reduce peak memory but increase callback overhead
- Allocation efficiency : Builders pre-allocate for batch size, minimizing reallocation
- Cache locality : Larger batches improve CPU cache utilization during gathering
Streaming Execution
The RowStreamBuilder manages batch-based streaming:
- Accept a
RowIdSource(bitmap or vector) - Split row IDs into windows of
STREAM_BATCH_ROWS - For each window:
- Call
gather_rows_with_reusable_context()with the window’s row IDs - Invoke the
on_batchcallback with the resultingRecordBatch - Reuse the
MultiGatherContextfor the next window
- Call
This approach ensures constant memory usage regardless of result set size, as only one batch exists in memory at a time.
Sources: llkv-table/src/constants.rs llkv-scan/src/row_stream.rs llkv-table/src/table.rs:588-645
Memory Efficiency
The streaming architecture minimizes memory allocations through several mechanisms:
Builder Reuse - Arrow array builders are pre-allocated to the batch size and reused across batches. Builders track their capacity and avoid reallocation when the next batch has the same or smaller size.
Context Pooling - The GatherContextPool retains prepared contexts, eliminating the need to reload descriptors and rebuild field plans for repeated projections.
Chunk Caching - Within a context, decoded Arrow arrays are cached by physical key. Chunks that span multiple batches are deserialized once and reused, reducing I/O and deserialization overhead.
Scratch Buffer Reuse - The row_scratch buffer that maps output indices to chunk coordinates is allocated once per context and cleared (not deallocated) between batches.
Visitor State Management - Row ID collectors and emitters maintain minimal state, typically just a bitmap or small buffer, avoiding large intermediate structures.
These optimizations enable LLKV to sustain multi-million row/second scan rates with sub-100MB memory footprints even for billion-row tables.
Sources: llkv-column-map/src/store/projection.rs:562-568 llkv-column-map/src/store/projection.rs:709-720
graph TB
subgraph "llkv-table layer"
ScanStream["Table::scan_stream()"]
end
subgraph "llkv-scan layer"
ExecuteScan["execute_scan()"]
CheckFast{"Can Use\nFast Path?"}
FastLogic["build_fast_path_stream()"]
SlowLogic["collect_row_ids_for_table()\nthen build_row_stream()"]
FastStream["Direct ScanBuilder\nstreaming"]
SlowStream["RowStreamBuilder\nwith MultiGatherContext"]
end
subgraph "llkv-column-map layer"
ScanBuilder["ScanBuilder::run()"]
ColumnStore["ColumnStore::gather_rows()"]
end
Callback["User-provided\non_batch callback"]
ScanStream --> ExecuteScan
ExecuteScan --> CheckFast
CheckFast -->|Yes| FastLogic
CheckFast -->|No| SlowLogic
FastLogic --> FastStream
SlowLogic --> SlowStream
FastStream --> ScanBuilder
SlowStream --> ColumnStore
ScanBuilder --> Callback
ColumnStore --> Callback
Integration with llkv-scan
The llkv-scan crate provides the execute_scan function that orchestrates the full scan execution:
execute_scan Function Signature
ScanStorage Trait
The ScanStorage trait abstracts over storage implementations, allowing execute_scan to work with both Table and direct ColumnStore access:
This abstraction enables unit testing of scan logic without requiring full table setup and allows specialized storage implementations to optimize row ID collection.
Sources: llkv-scan/src/execute.rs llkv-scan/src/storage.rs llkv-table/src/table.rs:479-488
Performance Monitoring
Benchmarking Scan Performance
The repository includes examples that measure scan performance across different scenarios:
| Example | Purpose | Key Measurement |
|---|---|---|
test_streaming.rs | Verify streaming optimization activates | Throughput (rows/sec) |
direct_comparison.rs | Compare ColumnStore vs Table layer overhead | Execution time ratio |
performance_benchmark.rs | Profile different scan configurations | Latency distribution |
Representative Results (1M row table, Int64 column):
| Scenario | Throughput | Notes |
|---|---|---|
| Single column, unbounded | 100-200M rows/sec | Fast path active |
| Multi-column | 20-50M rows/sec | Materialization path |
| Bounded filter | 10-30M rows/sec | Predicate evaluation overhead |
| With nulls | 15-40M rows/sec | Null inspection required |
These benchmarks guide optimization efforts and validate that changes maintain performance characteristics.
Sources: llkv-table/examples/test_streaming.rs llkv-table/examples/direct_comparison.rs:276-399 llkv-table/examples/performance_benchmark.rs:82-211
Pager I/O Instrumentation
The InstrumentedPager wrapper tracks storage operations during scans:
Tracked Metrics
physical_gets: Total number of key-value readsget_batches: Number of batch read operationsphysical_puts: Total number of key-value writesput_batches: Number of batch write operations
For a SELECT COUNT(*) on a 3-row table, typical I/O patterns show:
- ~36 physical gets for descriptor and chunk loads
- ~23 batch operations due to pipelined chunk fetching
This instrumentation identifies I/O hotspots and validates that scan optimizations reduce storage access.
Sources: llkv-sql/tests/pager_io_tests.rs:17-73 llkv-storage/src/pager/instrumented.rs
Dismiss
Refresh this wiki
Enter email to refresh