Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

TablePlanner and TableExecutor

Loading…

TablePlanner and TableExecutor

Relevant source files

Purpose and Scope

This page documents the table-level query execution interface provided by the Table struct and its optimization strategies. The table layer acts as a bridge between high-level query plans (covered in Query Planning) and low-level columnar storage operations (covered in Column Storage and ColumnStore). It orchestrates scan execution, manages field ID namespacing, handles MVCC visibility, and selects execution strategies based on query characteristics.

For details about the lower-level scan execution mechanics and streaming strategies, see Scan Execution and Optimization. For filter expression evaluation, see Filter Evaluation.

Architecture Overview

The Table struct serves as the primary interface for executing queries against table data. Rather than implementing query execution logic directly, it orchestrates lower-level components and applies table-specific concerns like schema management, MVCC column injection, and field ID translation.

graph TB
    subgraph "Table Layer"
        API["Table API\nscan_stream, filter_row_ids"]
SCHEMA["Schema Management\nField ID → LogicalFieldId"]
MVCC["MVCC Integration\ncreated_by, deleted_by"]
OPTIONS["Execution Options\nScanStreamOptions"]
end
    
    subgraph "Scan Execution Layer"
        EXECUTE["execute_scan()\nllkv-scan"]
STREAM["RowStreamBuilder\nBatch streaming"]
FILTER["Filter Evaluation\nPredicate matching"]
end
    
    subgraph "Storage Layer"
        STORE["ColumnStore\nPhysical storage"]
GATHER["Column Gathering\nRecordBatch assembly"]
end
    
 
   API --> SCHEMA
 
   API --> MVCC
 
   API --> OPTIONS
    
 
   SCHEMA --> EXECUTE
 
   MVCC --> EXECUTE
 
   OPTIONS --> EXECUTE
    
 
   EXECUTE --> STREAM
 
   STREAM --> FILTER
 
   FILTER --> GATHER
 
   GATHER --> STORE
    
    STORE -.results.-> GATHER
    GATHER -.batches.-> STREAM
    STREAM -.batches.-> API

Table Layer Responsibilities

Sources: llkv-table/src/table.rs:60-69 llkv-table/src/table.rs:447-488

Table Struct and Core Methods

The Table<P> struct wraps a ColumnStore with table-specific metadata and caching:

ComponentTypePurpose
storeArc<ColumnStore<P>>Physical columnar storage
table_idTableIdUnique table identifier
mvcc_cacheRwLock<Option<MvccColumnCache>>Cached MVCC column presence

Primary execution methods:

  • scan_stream : Main scan interface accepting flexible projection types
  • scan_stream_with_exprs : Direct execution with resolved ScanProjection expressions
  • filter_row_ids : Collect row IDs matching a filter predicate
  • stream_columns : Stream raw column data without expression evaluation

Sources: llkv-table/src/table.rs:60-69 llkv-table/src/table.rs:447-462 llkv-table/src/table.rs:469-488 llkv-table/src/table.rs:490-496 llkv-table/src/table.rs:589-645

Scan Execution Flow

High-Level Execution Pipeline

Sources: llkv-table/src/table.rs:447-488 llkv-table/examples/performance_benchmark.rs:1-211

Scan Configuration: ScanStreamOptions

ScanStreamOptions controls execution behavior:

FieldTypePurpose
include_nullsboolInclude rows where all projected columns are null
orderOption<ScanOrderSpec>Ordering specification (ASC/DESC)
row_id_filterOption<Arc<dyn RowIdFilter>>Pre-filtered row ID set (e.g., MVCC visibility)
include_row_idsboolInclude row_id column in output
rangesOption<Vec<(Bound<u64>, Bound<u64>)>>Row ID range restrictions
driving_columnOption<LogicalFieldId>Column to drive scan ordering

Sources: llkv-table/src/table.rs:43-46

Projection Types: ScanProjection

Projections specify what data to retrieve:

  • Column(ColumnProjectionInfo) : Direct column access with optional alias
    • logical_field_id: Column to read
    • data_type: Expected Arrow data type
    • output_name: Column name in result
  • Expression(ProjectionEval) : Computed expressions over columns
    • Arithmetic operations, functions, literals
    • Evaluated during result assembly

Conversion: Projection (from llkv-column-map) can be converted to ScanProjection via Into trait.

Sources: llkv-table/src/table.rs:40-46

Optimization Strategies

Strategy Selection Logic

Sources: llkv-table/examples/performance_benchmark.rs:26-79 llkv-table/examples/test_streaming.rs:26-174

Fast Path: Direct Streaming

When conditions are met for the fast path, scan execution bypasses row ID collection and materializes chunks directly from ScanBuilder:

Eligibility criteria:

  1. Exactly one column projection
  2. Unbounded range filter (Bound::Unbounded on both ends)
  3. No null inclusion (include_nulls = false)
  4. No ordering requirements (order = None)
  5. No row ID filter (row_id_filter = None)

Performance characteristics:

  • 10-100x faster than standard path for large scans
  • Zero-copy chunk streaming when possible
  • Minimal memory overhead (streaming chunks, not full materialization)

Example usage:

Sources: llkv-table/examples/test_streaming.rs:65-110 llkv-table/examples/performance_benchmark.rs:145-151

Standard Path: Row ID Collection and Gathering

When fast path conditions are not met, execution uses a two-phase approach:

Phase 1: Row ID Collection

  • Evaluate filter expression against the table
  • Build a bitmap (Treemap) of matching row IDs
  • Filter by MVCC visibility if row_id_filter is provided
  • Apply range restrictions if ranges is specified

Phase 2: Column Gathering

  • Split row IDs into fixed-size windows
  • For each window:
    • Gather all projected columns
    • Evaluate computed expressions
    • Assemble into RecordBatch
    • Stream batch to callback

Batch size: Controlled by STREAM_BATCH_ROWS constant (default varies by workload).

Sources: llkv-table/src/table.rs:490-496 llkv-table/src/table.rs:589-645

Performance Comparison: Layer Overhead

ColumnStore Direct Access vs Table Layer

Measured overhead (1M row single-column scan):

  • Direct ColumnStore : ~5-10ms baseline
  • Table Layer (fast path) : ~10-20ms (1.5-2x overhead)
  • Table Layer (standard path) : ~50-100ms (5-10x overhead)

The fast path optimization significantly reduces the gap by bypassing row ID collection and expression evaluation when not needed.

Sources: llkv-table/examples/direct_comparison.rs:276-399

Row ID Filtering and Collection

RowIdScanCollector

Internal visitor for collecting row IDs that match filter predicates:

Implementation strategy:

  • Implements PrimitiveWithRowIdsVisitor and PrimitiveSortedWithRowIdsVisitor
  • Accumulates row IDs into a Treemap (compressed bitmap)
  • Ignores actual column values (focus on row IDs only)
  • Handles both chunked and sorted run formats

Key methods:

  • extend_from_array: Add row IDs from a chunk
  • extend_from_slice: Add a slice of row IDs
  • into_inner: Extract the final bitmap

Sources: llkv-table/src/table.rs:805-858

RowIdChunkEmitter

Streaming alternative that emits row IDs in fixed-size chunks without materializing the full set:

Use case: Memory-efficient processing when full row ID set is not needed.

Features:

  • Configurable chunk size
  • Optional reverse ordering for sorted runs
  • Error propagation via error field
  • Zero allocation when chunks align with storage chunks

Methods:

  • extend_from_array: Process a chunk of row IDs
  • extend_sorted_run: Handle sorted run with optional reversal
  • flush: Emit current buffer
  • finish: Final flush and error check

Sources: llkv-table/src/table.rs:860-1017

Integration with MVCC and Transactions

The table layer handles MVCC visibility transparently:

MVCC Column Injection

When appending data, the Table::append method automatically injects MVCC columns if not present:

Columns added:

  • created_by (UInt64): Transaction ID that created the row (defaults to 1 for auto-commit)
  • deleted_by (UInt64): Transaction ID that deleted the row (defaults to 0 for not deleted)

Field ID assignment:

  • MVCC columns get reserved logical field IDs via LogicalFieldId::for_mvcc_created_by and LogicalFieldId::for_mvcc_deleted_by
  • Separate from user field IDs to avoid collisions

Caching: The MvccColumnCache stores whether MVCC columns exist to avoid repeated schema inspections.

Sources: llkv-table/src/table.rs:50-56 llkv-table/src/table.rs:231-438

Transaction Visibility Filtering

Query execution can filter by transaction visibility using the row_id_filter option:

The filter is applied during row ID collection phase, ensuring only visible rows are included in results.

Sources: llkv-table/src/table.rs:43-46

graph LR
    USER["User Field ID\n(FieldId: u32)"]
META["Field Metadata\n'field_id' key"]
TABLE["Table ID\n(TableId: u32)"]
LOGICAL["LogicalFieldId\n(u64)"]
USER --> COMPOSE
 
   TABLE --> COMPOSE
 
   COMPOSE["LogicalFieldId::for_user()"] --> LOGICAL
    
    META -.annotates.-> USER

Field ID Translation and Namespacing

The table layer translates user-visible field IDs to logical field IDs that include table ID:

Translation Process

Why namespacing?

  • Multiple tables can have the same user field IDs
  • ColumnStore operates on LogicalFieldId to avoid collisions
  • Table layer encapsulates this translation

Key functions:

  • LogicalFieldId::for_user(table_id, field_id): Create user data field
  • LogicalFieldId::for_mvcc_created_by(table_id): Create MVCC created_by field
  • LogicalFieldId::for_mvcc_deleted_by(table_id): Create MVCC deleted_by field

Sources: llkv-table/src/table.rs:231-438 llkv-table/src/table.rs:589-645

Schema Management

Schema Construction

The Table::schema() method builds an Arrow schema from catalog metadata:

Process:

  1. Query ColumnStore for all logical fields belonging to this table
  2. Sort fields by field ID for consistent ordering
  3. Retrieve column names from SysCatalog
  4. Build Arrow Field with data type and metadata
  5. Add row_id field first, then user fields

Schema metadata: Each field includes field_id metadata for round-trip compatibility.

Sources: llkv-table/src/table.rs:519-549

Schema as RecordBatch

For display purposes, schema_recordbatch() formats the schema as a table:

ColumnTypeContents
nameUtf8Column name
field_idUInt32User field ID
data_typeUtf8Arrow data type string

Sources: llkv-table/src/table.rs:554-586

Example: CSV Export Integration

The CSV export writer demonstrates table-level query execution in practice:

Export Pipeline

Key steps:

  1. Convert CsvExportColumn to ScanProjection with field ID translation
  2. Resolve aliases from catalog or use defaults
  3. Create unbounded filter for full table scan
  4. Stream batches directly to Arrow CSV writer
  5. No intermediate materialization

Sources: llkv-csv/src/writer.rs:139-268

Summary: Optimization Decision Matrix

Query CharacteristicFast PathStandard Path
Single column projection
Multiple columns
Unbounded filter
Bounded/complex filter
Include nulls
Exclude nulls
No ordering
Ordered results
No MVCC filter
MVCC filtering
No range restrictions
Range restrictions

Performance impact:

  • Fast path: 1.5-2x slower than direct ColumnStore access
  • Standard path: 5-10x slower than direct ColumnStore access
  • Still significantly faster than alternatives due to columnar storage and zero-copy optimizations

Sources: llkv-table/examples/performance_benchmark.rs:1-211 llkv-table/examples/direct_comparison.rs:276-399

Dismiss

Refresh this wiki

Enter email to refresh