Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Query Execution

Loading…

Query Execution

Relevant source files

Purpose and Scope

This document describes the query execution layer that transforms query plans into result data. The executor sits between the query planner (Query Planning) and the storage layer (Storage Layer), dispatching work to specialized components based on plan characteristics. This page provides a high-level overview of execution architecture and strategy selection. For detailed information about specific execution modes, see TablePlanner and TableExecutor, Scan Execution and Optimization, and Filter Evaluation.

Architecture Overview

The query execution layer is implemented primarily in the llkv-executor crate, with the QueryExecutor struct serving as the main orchestrator. The executor receives SelectPlan structures from the planner and produces SelectExecution results containing Arrow RecordBatch data.

Execution Strategy Dispatch Flow

graph TB
    subgraph "Planning Layer"
        PLAN["SelectPlan\n(from llkv-plan)"]
end
    
    subgraph "Execution Layer - llkv-executor"
        EXECUTOR["QueryExecutor<P>"]
DISPATCH{Execution\nStrategy\nDispatch}
COMPOUND["Compound SELECT\nUNION/EXCEPT/INTERSECT"]
NOTABLE["No-Table Execution\nSELECT constant"]
GROUPBY["Group By Execution\nAggregation + Grouping"]
CROSS["Cross Product\nMultiple Tables"]
AGGREGATE["Aggregate Execution\nSUM/AVG/COUNT"]
PROJECTION["Projection Execution\nColumn Selection"]
end
    
    subgraph "Storage Layer"
        PROVIDER["ExecutorTableProvider<P>"]
TABLE["ExecutorTable<P>"]
SCAN["Scan Operations\n(llkv-scan)"]
end
    
    subgraph "Result"
        EXECUTION["SelectExecution<P>"]
BATCH["RecordBatch[]\nArrow Data"]
end
    
 
   PLAN --> EXECUTOR
 
   EXECUTOR --> DISPATCH
    
 
   DISPATCH -->|compound query| COMPOUND
 
   DISPATCH -->|no FROM clause| NOTABLE
 
   DISPATCH -->|GROUP BY present| GROUPBY
 
   DISPATCH -->|multiple tables| CROSS
 
   DISPATCH -->|aggregates only| AGGREGATE
 
   DISPATCH -->|default path| PROJECTION
    
 
   EXECUTOR --> PROVIDER
 
   PROVIDER --> TABLE
 
   TABLE --> SCAN
    
 
   COMPOUND --> EXECUTION
 
   NOTABLE --> EXECUTION
 
   GROUPBY --> EXECUTION
 
   CROSS --> EXECUTION
 
   AGGREGATE --> EXECUTION
 
   PROJECTION --> EXECUTION
    
 
   EXECUTION --> BATCH

The executor examines plan characteristics to select an appropriate execution strategy. Each strategy is optimized for specific query patterns.

Sources: llkv-executor/src/lib.rs:504-563 llkv-executor/src/lib.rs:584-695

QueryExecutor Structure

The QueryExecutor<P> struct is the primary entry point for executing SELECT queries. It is generic over a Pager type P to support different storage backends.

ComponentTypePurpose
providerArc<dyn ExecutorTableProvider<P>>Provides access to tables and their metadata

The provider abstraction allows the executor to remain decoupled from specific table implementations, enabling testing with mock providers and supporting different storage configurations.

QueryExecutor and Provider Relationship

graph LR
    subgraph "Executor Core"
        QE["QueryExecutor&lt;P&gt;"]
end
    
    subgraph "Provider Interface"
        PROVIDER["ExecutorTableProvider&lt;P&gt;"]
GET_TABLE["get_table(name)\n→ ExecutorTable&lt;P&gt;"]
end
    
    subgraph "Table Interface"
        ETABLE["ExecutorTable&lt;P&gt;"]
SCHEMA["schema: ExecutorSchema"]
STORAGE["storage: TableStorageAdapter&lt;P&gt;"]
end
    
 
   QE --> PROVIDER
 
   PROVIDER --> GET_TABLE
 
   GET_TABLE --> ETABLE
 
   ETABLE --> SCHEMA
 
   ETABLE --> STORAGE

The provider pattern enables dependency injection, allowing the executor to work with different table sources without tight coupling to storage implementations.

Sources: llkv-executor/src/lib.rs:504-517 llkv-executor/src/types.rs:1-100

Execution Entry Points

The executor provides two primary entry points for executing SELECT plans:

execute_select

Executes a SELECT plan without additional filtering constraints. This is the standard path used when no external row filtering is required.

execute_select_with_filter

Executes a SELECT plan with an optional row ID filter. The RowIdFilter trait allows callers to specify a predicate that determines which row IDs should be considered during execution. This is used for implementing transaction isolation (MVCC filtering) and other row-level visibility constraints.

Both methods extract limit and offset from the plan and apply them to the final SelectExecution result, ensuring consistent pagination behavior across all execution strategies.

Sources: llkv-executor/src/lib.rs:519-563

Execution Strategy Dispatch

The executor examines the characteristics of a SelectPlan to determine the most efficient execution strategy. The dispatch logic follows a priority hierarchy:

Execution Strategy Decision Tree

graph TD
    START["execute_select_with_filter(plan)"]
CHECK_COMPOUND{plan.compound\nis Some?}
CHECK_TABLES{plan.tables\nis empty?}
CHECK_GROUPBY{plan.group_by\nnot empty?}
CHECK_MULTI{plan.tables.len()\n&gt; 1?}
CHECK_AGG{plan.aggregates\nnot empty?}
CHECK_COMPUTED{has computed\naggregates?}
COMPOUND["execute_compound_select\nUNION/EXCEPT/INTERSECT"]
NOTABLE["execute_select_without_table\nConstant evaluation"]
GROUPBY_SINGLE["execute_group_by_single_table\nGROUP BY aggregation"]
GROUPBY_CROSS["execute_cross_product\nMulti-table GROUP BY"]
CROSS["execute_cross_product\nCartesian product"]
AGGREGATE["execute_aggregates\nSingle-table aggregates"]
COMPUTED_AGG["execute_computed_aggregates\nAggregates in expressions"]
PROJECTION["execute_projection\nSimple column selection"]
RESULT["SelectExecution&lt;P&gt;\nwith limit/offset"]
START --> CHECK_COMPOUND
 
   CHECK_COMPOUND -->|Yes| COMPOUND
 
   CHECK_COMPOUND -->|No| CHECK_TABLES
    
 
   CHECK_TABLES -->|Yes| NOTABLE
 
   CHECK_TABLES -->|No| CHECK_GROUPBY
    
 
   CHECK_GROUPBY -->|Yes| CHECK_MULTI
 
   CHECK_MULTI -->|Yes| GROUPBY_CROSS
 
   CHECK_MULTI -->|No| GROUPBY_SINGLE
    
 
   CHECK_GROUPBY -->|No| CHECK_MULTI
 
   CHECK_MULTI -->|Yes| CROSS
 
   CHECK_MULTI -->|No| CHECK_AGG
    
 
   CHECK_AGG -->|Yes| AGGREGATE
 
   CHECK_AGG -->|No| CHECK_COMPUTED
    
 
   CHECK_COMPUTED -->|Yes| COMPUTED_AGG
 
   CHECK_COMPUTED -->|No| PROJECTION
    
 
   COMPOUND --> RESULT
 
   NOTABLE --> RESULT
 
   GROUPBY_SINGLE --> RESULT
 
   GROUPBY_CROSS --> RESULT
 
   CROSS --> RESULT
 
   AGGREGATE --> RESULT
 
   COMPUTED_AGG --> RESULT
 
   PROJECTION --> RESULT

The executor prioritizes specialized execution paths over generic ones, enabling optimizations tailored to specific query patterns.

Strategy Descriptions

StrategyPlan CharacteristicsImplementation
Compound SELECTplan.compound.is_some()Executes UNION, EXCEPT, or INTERSECT operations by evaluating component queries and combining results with deduplication for DISTINCT quantifiers
No-Table Executionplan.tables.is_empty()Evaluates constant expressions like SELECT 1, 2, 3 without accessing storage
Group By (Single Table)!plan.group_by.is_empty() && plan.tables.len() == 1Performs grouped aggregation on a single table with efficient column scanning
Group By (Cross Product)!plan.group_by.is_empty() && plan.tables.len() > 1Computes Cartesian product before grouping
Cross Productplan.tables.len() > 1Joins multiple tables using nested loop or hash join
Aggregate Execution!plan.aggregates.is_empty()Computes aggregates (COUNT, SUM, AVG, etc.) over a single table
Computed AggregatesAggregates within computed expressionsEvaluates expressions containing aggregate functions
Projection ExecutionDefault pathPerforms column selection with optional filtering

Sources: llkv-executor/src/lib.rs:523-563

Result Representation

The executor returns results as SelectExecution<P> instances, which encapsulate one or more Arrow RecordBatch objects along with metadata.

SelectExecution Result Types

graph TB
    subgraph "SelectExecution&lt;P&gt;"
        EXEC["SelectExecution"]
SCHEMA["schema: Arc&lt;Schema&gt;"]
DISPLAY["display_name: String"]
MODE{Execution\nMode}
end
    
    subgraph "Single Batch Mode"
        SINGLE["Single RecordBatch"]
BATCH1["RecordBatch\nMaterialized data"]
end
    
    subgraph "Multi Batch Mode"
        MULTI["Vec&lt;RecordBatch&gt;"]
BATCH2["RecordBatch[]\nMultiple batches"]
end
    
    subgraph "Streaming Mode"
        STREAM["Scan Stream"]
LAZY["Lazy evaluation"]
ITER["Iterator-based"]
end
    
    subgraph "Post-Processing"
        LIMIT["limit: Option&lt;usize&gt;"]
OFFSET["offset: Option&lt;usize&gt;"]
APPLY["Applied during\nmaterialization"]
end
    
 
   EXEC --> SCHEMA
 
   EXEC --> DISPLAY
 
   EXEC --> MODE
    
 
   MODE -->|Materialized| SINGLE
 
   MODE -->|Compound/Sorted| MULTI
 
   MODE -->|Large tables| STREAM
    
 
   SINGLE --> BATCH1
 
   MULTI --> BATCH2
 
   STREAM --> LAZY
 
   STREAM --> ITER
    
 
   EXEC --> LIMIT
 
   EXEC --> OFFSET
 
   LIMIT --> APPLY
 
   OFFSET --> APPLY

The SelectExecution type supports multiple result modes optimized for different query patterns. The with_limit and with_offset methods attach pagination parameters that are applied when materializing results.

Result Materialization

Callers can materialize results in several ways:

  • into_rows() : Converts all batches into a Vec<CanonicalRow> representation, applying limit and offset
  • stream(callback) : Invokes a callback for each batch, enabling memory-efficient processing of large result sets
  • into_record_batch() : Consolidates results into a single RecordBatch, useful for small result sets
  • into_batches() : Returns all batches as a vector

The streaming API is particularly important for large queries where materializing all results at once would exceed memory limits.

Sources: llkv-executor/src/lib.rs:519-563 llkv-executor/src/scan.rs:1-100

Execution Phases

Most execution strategies follow a two-phase pattern optimized for columnar storage:

Phase 1: Row ID Collection

The executor first identifies which rows satisfy the query’s filter predicates without fetching the full column data. This phase produces a bitmap or set of row IDs that match the criteria.

Row ID Collection Phase

sequenceDiagram
    participant EX as QueryExecutor
    participant TBL as ExecutorTable
    participant SCAN as Scan Operations
    participant STORE as Column Store
    
    EX->>TBL: filter_row_ids(predicate)
    TBL->>SCAN: evaluate_filter
    SCAN->>STORE: Load chunk metadata
    
    Note over SCAN,STORE: Chunk pruning using\nmin/max values
    
    SCAN->>STORE: Load matching chunks
    SCAN->>SCAN: Vectorized predicate\nevaluation (SIMD)
    SCAN-->>TBL: Bitmap of matching row IDs
    TBL-->>EX: Row ID set

Predicate evaluation uses chunk metadata to skip irrelevant data (Scan Execution and Optimization) and vectorized kernels for efficient matching (Filter Evaluation).

sequenceDiagram
    participant EX as QueryExecutor
    participant TBL as ExecutorTable
    participant STORE as Column Store
    participant PAGER as Storage Pager
    
    EX->>TBL: scan_stream(projections, row_ids)
    
    loop For each projection
        TBL->>STORE: gather_rows(field_id, row_ids)
        STORE->>STORE: Identify chunks containing\nrequested row IDs
        STORE->>PAGER: batch_get(chunk_keys)
        PAGER-->>STORE: Chunk data
        STORE->>STORE: Construct Arrow array
        STORE-->>TBL: ArrayRef
    end
    
    TBL->>TBL: Construct RecordBatch\nfrom arrays
    TBL-->>EX: RecordBatch

Phase 2: Data Gathering

Once the matching row IDs are known, the executor fetches only the required columns for those specific rows. This minimizes I/O by avoiding unnecessary column reads.

Data Gathering Phase

The gather operation reconstructs Arrow arrays from chunked columnar storage, fetching only the columns referenced in the query’s projections.

Phase 3: Post-Processing

After data gathering, the executor applies sorting, aggregation, or other transformations as required by the plan:

OperationWhen AppliedImplementation
SortingORDER BY clause presentUses Arrow’s lexsort_to_indices with custom NULLS FIRST/LAST handling
LimitingLIMIT clause presentTruncates result set to specified row count
OffsetOFFSET clause presentSkips specified number of rows before returning results
AggregationGROUP BY or aggregate functionsMaterializes groups and computes aggregate values
DistinctSELECT DISTINCTHash-based deduplication using row encoding

Sources: llkv-executor/src/lib.rs:584-1000 llkv-executor/src/scan.rs:1-500

Subquery Execution

The executor handles subqueries through recursive evaluation, supporting both scalar subqueries and EXISTS predicates.

graph TD
    EXPR["Evaluate Expression\ncontaining subquery"]
COLLECT["Collect correlated\ncolumn bindings"]
ENCODE["Encode bindings\nas cache key"]
CHECK{Cache\nhit?}
CACHED["Return cached\nLiteral"]
EXECUTE["Execute subquery\nwith bindings"]
VALIDATE["Validate result:\n1 column, ≤1 row"]
STORE["Store in cache"]
RETURN["Return Literal"]
EXPR --> COLLECT
 
   COLLECT --> ENCODE
 
   ENCODE --> CHECK
 
   CHECK -->|Yes| CACHED
 
   CHECK -->|No| EXECUTE
 
   EXECUTE --> VALIDATE
 
   VALIDATE --> STORE
 
   STORE --> RETURN
 
   CACHED --> RETURN

Scalar Subquery Evaluation

Scalar subqueries are evaluated lazily during expression computation. The executor maintains a cache (scalar_subquery_cache) to avoid re-executing identical subqueries with the same correlated bindings:

Scalar Subquery Evaluation with Caching

The caching mechanism is essential for performance when a subquery is evaluated multiple times in a cross product or aggregate context.

Parallel Subquery Execution

For queries that require evaluating the same correlated subquery across many rows, the executor batches the work and executes it in parallel using Rayon:

let job_results: Vec<ExecutorResult<Literal>> = with_thread_pool(|| {
    pending_bindings
        .par_iter()
        .map(|bindings| self.evaluate_scalar_subquery_with_bindings(subquery, bindings))
        .collect()
});

This parallelization significantly reduces execution time for subquery-heavy queries.

Sources: llkv-executor/src/lib.rs:787-961

graph LR
    subgraph "SqlEngine"
        PARSE["Parse SQL\n(sqlparser)"]
PLAN["Build Plan\n(llkv-plan)"]
EXEC["Execute Plan"]
end
    
    subgraph "RuntimeEngine"
        CONTEXT["RuntimeContext"]
SESSION["RuntimeSession"]
CATALOG["CatalogManager"]
end
    
    subgraph "Executor Layer"
        QEXEC["QueryExecutor"]
PROVIDER["TableProvider"]
end
    
 
   PARSE --> PLAN
 
   PLAN --> EXEC
 
   EXEC --> CONTEXT
 
   CONTEXT --> SESSION
 
   CONTEXT --> CATALOG
    
 
   EXEC --> QEXEC
 
   QEXEC --> PROVIDER
 
   PROVIDER --> CATALOG

Integration with Runtime

The SqlEngine in llkv-sql orchestrates the entire execution pipeline, bridging SQL text to query results:

SqlEngine and Executor Integration

The RuntimeEngine provides the execution context, including transaction state, catalog access, and session configuration, while the QueryExecutor focuses solely on transforming plans into results.

Sources: llkv-sql/src/sql_engine.rs:572-745 llkv-executor/src/lib.rs:504-563

Dismiss

Refresh this wiki

Enter email to refresh