Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Scan Execution and Optimization

Loading…

Scan Execution and Optimization

Relevant source files

Purpose and Scope

This document details the table scan execution flow in LLKV, covering how queries retrieve data from columnar storage through optimized fast paths and streaming strategies. The scan subsystem bridges query planning and physical storage, coordinating row ID collection, data gathering, and result materialization while minimizing memory allocations.

For query planning that produces scan operations, see TablePlanner and TableExecutor. For predicate evaluation during filtering, see Filter Evaluation. For the underlying column storage mechanisms, see Column Storage and ColumnStore.

Scan Entry Points

The Table struct provides two primary scan entry points that execute projections with filtering:

MethodSignaturePurpose
scan_stream&self, projections: I, filter_expr: &Expr, options: ScanStreamOptions, on_batch: FAccepts flexible projection inputs, converts to ScanProjection
scan_stream_with_exprs&self, projections: &[ScanProjection], filter_expr: &Expr, options: ScanStreamOptions, on_batch: FDirect scan with pre-built ScanProjection values

Both methods stream results as Arrow RecordBatches to a callback function, avoiding full materialization in memory. The ScanStreamOptions struct controls scan behavior:

The on_batch callback receives each batch as it is produced, allowing streaming processing without memory accumulation.

Sources: llkv-table/src/table.rs:447-488 llkv-scan/src/lib.rs

Two-Phase Scan Execution Model

Two-Phase Architecture

LLKV scans execute in two distinct phases to minimize data movement:

Phase 1: Row ID Collection - Evaluates filter predicates against stored data to produce a set of matching row IDs. This phase:

  • Visits column chunks using the visitor pattern
  • Applies predicates via metadata-based pruning (min/max values)
  • Collects matching row IDs into a Treemap (bitmap) or Vec<RowId>
  • Avoids loading actual column values during filtering

Phase 2: Data Gathering - Retrieves column values for the matching row IDs identified in Phase 1. This phase:

  • Loads only chunks containing matched rows
  • Assembles Arrow arrays via type-specific gathering functions
  • Streams results in fixed-size batches (default 4096 rows)
  • Supports two execution paths: fast streaming or full materialization

This separation ensures predicates are evaluated without loading unnecessary column data, significantly reducing I/O and memory when filters are selective.

Sources: llkv-scan/src/execute.rs llkv-table/src/table.rs:479-488 llkv-scan/src/row_stream.rs

Fast Path Optimizations

The scan executor detects specific patterns that enable zero-copy streaming optimizations:

Fast Path Criteria

graph TD
    ScanRequest["Scan Request"]
CheckCols{"Single\nProjection?"}
CheckFilter{"Unbounded\nFilter?"}
CheckNulls{"include_nulls\n= false?"}
CheckOrder{"No ORDER BY?"}
CheckRowFilter{"No row_id_filter?"}
FastPath["✓ Fast Path\nDirect ScanBuilder streaming"]
SlowPath["✗ Materialization Path\nMultiGatherContext gathering"]
ScanRequest --> CheckCols
 
   CheckCols -->|Yes| CheckFilter
 
   CheckCols -->|No| SlowPath
    
 
   CheckFilter -->|Yes| CheckNulls
 
   CheckFilter -->|No| SlowPath
    
 
   CheckNulls -->|Yes| CheckOrder
 
   CheckNulls -->|No| SlowPath
    
 
   CheckOrder -->|Yes| CheckRowFilter
 
   CheckOrder -->|No| SlowPath
    
 
   CheckRowFilter -->|Yes| FastPath
 
   CheckRowFilter -->|No| SlowPath

The fast path activates when all conditions are met:

ConditionWhy Required
Single projectionMulti-column requires coordination across field plans
Unbounded filterComplex filters require row-by-row evaluation
include_nulls = falseNull handling requires inspection of each row
No ORDER BYSorting requires full materialization
No row_id_filterCustom filters require additional filtering logic

Performance Characteristics

When the fast path activates, scans execute directly via ScanBuilder against the ColumnStore, achieving throughputs exceeding 100M rows/second for simple numeric columns. The materialization path adds 2-5x overhead due to:

  • Context preparation and cache management
  • Chunk coordinate computation across multiple fields
  • Arrow array assembly and concatenation

Test examples demonstrate the performance difference:

  • Single-column unbounded scan: ~1-2ms for 1M rows
  • Multi-column scan: ~5-10ms for 1M rows
  • Complex filters: ~10-20ms for 1M rows

Sources: llkv-scan/src/execute.rs llkv-table/examples/test_streaming.rs:83-110 llkv-table/examples/performance_benchmark.rs:126-151

Row ID Collection Infrastructure

RowIdScanCollector

The RowIdScanCollector accumulates matching row IDs into a Treemap bitmap during Phase 1 filtering:

It implements multiple visitor traits to handle different scan modes:

  • PrimitiveVisitor - Ignores value chunks (row IDs not yet available)
  • PrimitiveSortedVisitor - Ignores sorted runs
  • PrimitiveWithRowIdsVisitor - Collects row IDs from chunks with row ID arrays
  • PrimitiveSortedWithRowIdsVisitor - Collects row IDs from sorted runs

The collector extracts row IDs from UInt64Array instances passed alongside value arrays, adding them to the bitmap for efficient deduplication and range queries.

Sources: llkv-table/src/table.rs:805-858

RowIdChunkEmitter

For streaming scenarios, RowIdChunkEmitter emits row IDs in fixed-size chunks to a callback rather than accumulating them:

This approach:

  • Minimizes memory usage by processing row IDs in batches
  • Enables early termination on errors
  • Supports reverse iteration for descending scans
  • Optimizes for cases where row IDs arrive in contiguous chunks

The emitter implements the same visitor traits as the collector but invokes the callback when the buffer reaches chunk_size, then clears the buffer for the next batch.

Sources: llkv-table/src/table.rs:860-1017

Visitor Pattern Integration

Both collectors integrate with the column-map scan infrastructure via visitor traits. During a scan:

  1. ScanBuilder iterates over column chunks
  2. For each chunk, it detects whether row IDs are available
  3. It invokes the appropriate visitor method based on chunk characteristics:
    • _chunk_with_rids for unsorted chunks with row IDs
    • _run_with_rids for sorted runs with row IDs
  4. The collector/emitter extracts row IDs and accumulates/emits them

This pattern decouples row ID collection from the storage format, allowing specialized handling for sorted vs. unsorted data.

Sources: llkv-column-map/src/scan/mod.rs llkv-table/src/table.rs:832-858

classDiagram
    class MultiGatherContext {+field_infos: FieldInfos\n+plans: FieldPlans\n+chunk_cache: FxHashMap~PhysicalKey, ArrayRef~\n+row_index: FxHashMap~u64, usize~\n+row_scratch: Vec~Option~\n+builders: Vec~ColumnOutputBuilder~\n+epoch: u64\n+new() MultiGatherContext\n+update_field_infos_and_plans()\n+matches_field_ids() bool\n+schema_for_nullability() Schema\n+chunk_span_for_row() Option}
    
    class FieldPlan {+dtype: DataType\n+value_metas: Vec~ChunkMetadata~\n+row_metas: Vec~ChunkMetadata~\n+candidate_indices: Vec~usize~\n+avg_value_bytes_per_row: f64}
    
    class ColumnOutputBuilder {<<enum>>\nUtf8\nBinary\nBoolean\nDecimal128\nPrimitive\nPassthrough}
    
    class GatherContextPool {+inner: Mutex~FxHashMap~\n+max_per_key: usize\n+acquire() GatherContextGuard}
    
    MultiGatherContext --> FieldPlan: contains
    MultiGatherContext --> ColumnOutputBuilder: contains
    GatherContextPool --> MultiGatherContext: pools

Data Gathering Infrastructure

MultiGatherContext

The MultiGatherContext coordinates multi-column data gathering during Phase 2:

Context Fields

FieldPurpose
field_infosMaps LogicalFieldId to DataType for each projected column
plansContains FieldPlan with chunk metadata for each column
chunk_cacheCaches decoded Arrow arrays to avoid redundant deserialization
row_indexMaps row IDs to output indices during gathering
row_scratchScratch buffer mapping output positions to chunk coordinates
buildersType-specific Arrow array builders for each output column
epochTracks storage version for cache invalidation

Gathering Flow

  1. Context preparation: Load column descriptors and chunk metadata for all fields
  2. Candidate selection: Identify chunks containing requested row IDs based on min/max values
  3. Chunk loading: Batch-fetch identified chunks from storage
  4. Row mapping: Build index from row ID to output position
  5. Value extraction: For each output position, locate value in cached chunk and append to builder
  6. Array finalization: Convert builders to Arrow arrays and assemble RecordBatch

Sources: llkv-column-map/src/store/projection.rs:460-649

GatherContextPool

The GatherContextPool reuses MultiGatherContext instances across scans to amortize setup costs:

Pooling Strategy

  • Contexts are keyed by the list of projected LogicalFieldId values
  • Up to max_per_key contexts are retained per projection set (default: 4)
  • Contexts are reset (chunk cache cleared) when returned to the pool
  • Epoch tracking invalidates cached chunk arrays when storage changes

When a scan requests a context:

  1. Pool checks for matching contexts by field ID list
  2. If found, pops a context from the pool
  3. If not found or pool empty, creates a new context
  4. Context is wrapped in GatherContextGuard for automatic return to pool

This pattern is critical for high-frequency scans where context setup (descriptor loading, metadata parsing) dominates execution time. Pooling reduces per-scan overhead from ~10ms to <1ms for repeated projections.

Sources: llkv-column-map/src/store/projection.rs:651-720

Type-Specific Gathering Functions

LLKV provides specialized gathering functions for each Arrow type to optimize performance:

FunctionArrow TypeOptimization
gather_rows_from_chunks_stringGenericStringArray<O>Direct slice reuse for contiguous sorted rows
gather_rows_from_chunks_binaryGenericBinaryArray<O>Same as string with binary validation
gather_rows_from_chunks_boolBooleanArrayBitmap-based gathering
gather_rows_from_chunks_decimal128Decimal128ArrayPrecision-preserving assembly
gather_rows_from_chunks_structStructArrayRecursive gathering per child field
gather_rows_from_chunksPrimitiveArray<T>Fast path for single-chunk contiguous access

Each function implements a common pattern:

Fast Path Detection

All gathering functions check for a fast path where:

  • Only one chunk contains the requested row IDs
  • Row IDs are sorted ascending
  • Row IDs form a contiguous range in the chunk

When detected, the function returns a slice of the cached chunk array via Arrow’s zero-copy slice() method, avoiding builder allocation and value copying entirely.

Sources: llkv-column-map/src/gather.rs:283-403 llkv-column-map/src/gather.rs:405-526 llkv-column-map/src/gather.rs:529-638

flowchart LR
    RowIds["Row ID Source\nBitmap or Vec"]
Chunk1["Batch 1\n0..4096"]
Chunk2["Batch 2\n4096..8192"]
ChunkN["Batch N\n(N-1)*4096..M"]
Gather1["gather_rows()"]
Gather2["gather_rows()"]
GatherN["gather_rows()"]
Batch1["RecordBatch 1"]
Batch2["RecordBatch 2"]
BatchN["RecordBatch N"]
Callback["on_batch\ncallback"]
RowIds -->|split into chunks| Chunk1
 
   RowIds --> Chunk2
 
   RowIds --> ChunkN
    
 
   Chunk1 --> Gather1
 
   Chunk2 --> Gather2
 
   ChunkN --> GatherN
    
 
   Gather1 --> Batch1
 
   Gather2 --> Batch2
 
   GatherN --> BatchN
    
 
   Batch1 --> Callback
 
   Batch2 --> Callback
 
   BatchN --> Callback

Streaming Strategies

Batch-Based Streaming

LLKV streams scan results in fixed-size batches to balance memory usage and callback overhead:

Batch Size Configuration

The default batch size is 4096 rows, defined in llkv-table/src/constants.rs:

This value balances:

  • Memory footprint : Smaller batches reduce peak memory but increase callback overhead
  • Allocation efficiency : Builders pre-allocate for batch size, minimizing reallocation
  • Cache locality : Larger batches improve CPU cache utilization during gathering

Streaming Execution

The RowStreamBuilder manages batch-based streaming:

  1. Accept a RowIdSource (bitmap or vector)
  2. Split row IDs into windows of STREAM_BATCH_ROWS
  3. For each window:
    • Call gather_rows_with_reusable_context() with the window’s row IDs
    • Invoke the on_batch callback with the resulting RecordBatch
    • Reuse the MultiGatherContext for the next window

This approach ensures constant memory usage regardless of result set size, as only one batch exists in memory at a time.

Sources: llkv-table/src/constants.rs llkv-scan/src/row_stream.rs llkv-table/src/table.rs:588-645

Memory Efficiency

The streaming architecture minimizes memory allocations through several mechanisms:

Builder Reuse - Arrow array builders are pre-allocated to the batch size and reused across batches. Builders track their capacity and avoid reallocation when the next batch has the same or smaller size.

Context Pooling - The GatherContextPool retains prepared contexts, eliminating the need to reload descriptors and rebuild field plans for repeated projections.

Chunk Caching - Within a context, decoded Arrow arrays are cached by physical key. Chunks that span multiple batches are deserialized once and reused, reducing I/O and deserialization overhead.

Scratch Buffer Reuse - The row_scratch buffer that maps output indices to chunk coordinates is allocated once per context and cleared (not deallocated) between batches.

Visitor State Management - Row ID collectors and emitters maintain minimal state, typically just a bitmap or small buffer, avoiding large intermediate structures.

These optimizations enable LLKV to sustain multi-million row/second scan rates with sub-100MB memory footprints even for billion-row tables.

Sources: llkv-column-map/src/store/projection.rs:562-568 llkv-column-map/src/store/projection.rs:709-720

graph TB
    subgraph "llkv-table layer"
        ScanStream["Table::scan_stream()"]
end
    
    subgraph "llkv-scan layer"
        ExecuteScan["execute_scan()"]
CheckFast{"Can Use\nFast Path?"}
FastLogic["build_fast_path_stream()"]
SlowLogic["collect_row_ids_for_table()\nthen build_row_stream()"]
FastStream["Direct ScanBuilder\nstreaming"]
SlowStream["RowStreamBuilder\nwith MultiGatherContext"]
end
    
    subgraph "llkv-column-map layer"
        ScanBuilder["ScanBuilder::run()"]
ColumnStore["ColumnStore::gather_rows()"]
end
    
    Callback["User-provided\non_batch callback"]
ScanStream --> ExecuteScan
    
 
   ExecuteScan --> CheckFast
    
 
   CheckFast -->|Yes| FastLogic
 
   CheckFast -->|No| SlowLogic
    
 
   FastLogic --> FastStream
 
   SlowLogic --> SlowStream
    
 
   FastStream --> ScanBuilder
 
   SlowStream --> ColumnStore
    
 
   ScanBuilder --> Callback
 
   ColumnStore --> Callback

Integration with llkv-scan

The llkv-scan crate provides the execute_scan function that orchestrates the full scan execution:

execute_scan Function Signature

ScanStorage Trait

The ScanStorage trait abstracts over storage implementations, allowing execute_scan to work with both Table and direct ColumnStore access:

This abstraction enables unit testing of scan logic without requiring full table setup and allows specialized storage implementations to optimize row ID collection.

Sources: llkv-scan/src/execute.rs llkv-scan/src/storage.rs llkv-table/src/table.rs:479-488

Performance Monitoring

Benchmarking Scan Performance

The repository includes examples that measure scan performance across different scenarios:

ExamplePurposeKey Measurement
test_streaming.rsVerify streaming optimization activatesThroughput (rows/sec)
direct_comparison.rsCompare ColumnStore vs Table layer overheadExecution time ratio
performance_benchmark.rsProfile different scan configurationsLatency distribution

Representative Results (1M row table, Int64 column):

ScenarioThroughputNotes
Single column, unbounded100-200M rows/secFast path active
Multi-column20-50M rows/secMaterialization path
Bounded filter10-30M rows/secPredicate evaluation overhead
With nulls15-40M rows/secNull inspection required

These benchmarks guide optimization efforts and validate that changes maintain performance characteristics.

Sources: llkv-table/examples/test_streaming.rs llkv-table/examples/direct_comparison.rs:276-399 llkv-table/examples/performance_benchmark.rs:82-211

Pager I/O Instrumentation

The InstrumentedPager wrapper tracks storage operations during scans:

Tracked Metrics

  • physical_gets: Total number of key-value reads
  • get_batches: Number of batch read operations
  • physical_puts: Total number of key-value writes
  • put_batches: Number of batch write operations

For a SELECT COUNT(*) on a 3-row table, typical I/O patterns show:

  • ~36 physical gets for descriptor and chunk loads
  • ~23 batch operations due to pipelined chunk fetching

This instrumentation identifies I/O hotspots and validates that scan optimizations reduce storage access.

Sources: llkv-sql/tests/pager_io_tests.rs:17-73 llkv-storage/src/pager/instrumented.rs

Dismiss

Refresh this wiki

Enter email to refresh