This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Query Execution
Loading…
Query Execution
Relevant source files
- llkv-executor/Cargo.toml
- llkv-executor/src/lib.rs
- llkv-join/Cargo.toml
- llkv-sql/src/sql_engine.rs
- llkv-table/Cargo.toml
Purpose and Scope
This document describes the query execution layer that transforms query plans into result data. The executor sits between the query planner (Query Planning) and the storage layer (Storage Layer), dispatching work to specialized components based on plan characteristics. This page provides a high-level overview of execution architecture and strategy selection. For detailed information about specific execution modes, see TablePlanner and TableExecutor, Scan Execution and Optimization, and Filter Evaluation.
Architecture Overview
The query execution layer is implemented primarily in the llkv-executor crate, with the QueryExecutor struct serving as the main orchestrator. The executor receives SelectPlan structures from the planner and produces SelectExecution results containing Arrow RecordBatch data.
Execution Strategy Dispatch Flow
graph TB
subgraph "Planning Layer"
PLAN["SelectPlan\n(from llkv-plan)"]
end
subgraph "Execution Layer - llkv-executor"
EXECUTOR["QueryExecutor<P>"]
DISPATCH{Execution\nStrategy\nDispatch}
COMPOUND["Compound SELECT\nUNION/EXCEPT/INTERSECT"]
NOTABLE["No-Table Execution\nSELECT constant"]
GROUPBY["Group By Execution\nAggregation + Grouping"]
CROSS["Cross Product\nMultiple Tables"]
AGGREGATE["Aggregate Execution\nSUM/AVG/COUNT"]
PROJECTION["Projection Execution\nColumn Selection"]
end
subgraph "Storage Layer"
PROVIDER["ExecutorTableProvider<P>"]
TABLE["ExecutorTable<P>"]
SCAN["Scan Operations\n(llkv-scan)"]
end
subgraph "Result"
EXECUTION["SelectExecution<P>"]
BATCH["RecordBatch[]\nArrow Data"]
end
PLAN --> EXECUTOR
EXECUTOR --> DISPATCH
DISPATCH -->|compound query| COMPOUND
DISPATCH -->|no FROM clause| NOTABLE
DISPATCH -->|GROUP BY present| GROUPBY
DISPATCH -->|multiple tables| CROSS
DISPATCH -->|aggregates only| AGGREGATE
DISPATCH -->|default path| PROJECTION
EXECUTOR --> PROVIDER
PROVIDER --> TABLE
TABLE --> SCAN
COMPOUND --> EXECUTION
NOTABLE --> EXECUTION
GROUPBY --> EXECUTION
CROSS --> EXECUTION
AGGREGATE --> EXECUTION
PROJECTION --> EXECUTION
EXECUTION --> BATCH
The executor examines plan characteristics to select an appropriate execution strategy. Each strategy is optimized for specific query patterns.
Sources: llkv-executor/src/lib.rs:504-563 llkv-executor/src/lib.rs:584-695
QueryExecutor Structure
The QueryExecutor<P> struct is the primary entry point for executing SELECT queries. It is generic over a Pager type P to support different storage backends.
| Component | Type | Purpose |
|---|---|---|
provider | Arc<dyn ExecutorTableProvider<P>> | Provides access to tables and their metadata |
The provider abstraction allows the executor to remain decoupled from specific table implementations, enabling testing with mock providers and supporting different storage configurations.
QueryExecutor and Provider Relationship
graph LR
subgraph "Executor Core"
QE["QueryExecutor<P>"]
end
subgraph "Provider Interface"
PROVIDER["ExecutorTableProvider<P>"]
GET_TABLE["get_table(name)\n→ ExecutorTable<P>"]
end
subgraph "Table Interface"
ETABLE["ExecutorTable<P>"]
SCHEMA["schema: ExecutorSchema"]
STORAGE["storage: TableStorageAdapter<P>"]
end
QE --> PROVIDER
PROVIDER --> GET_TABLE
GET_TABLE --> ETABLE
ETABLE --> SCHEMA
ETABLE --> STORAGE
The provider pattern enables dependency injection, allowing the executor to work with different table sources without tight coupling to storage implementations.
Sources: llkv-executor/src/lib.rs:504-517 llkv-executor/src/types.rs:1-100
Execution Entry Points
The executor provides two primary entry points for executing SELECT plans:
execute_select
Executes a SELECT plan without additional filtering constraints. This is the standard path used when no external row filtering is required.
execute_select_with_filter
Executes a SELECT plan with an optional row ID filter. The RowIdFilter trait allows callers to specify a predicate that determines which row IDs should be considered during execution. This is used for implementing transaction isolation (MVCC filtering) and other row-level visibility constraints.
Both methods extract limit and offset from the plan and apply them to the final SelectExecution result, ensuring consistent pagination behavior across all execution strategies.
Sources: llkv-executor/src/lib.rs:519-563
Execution Strategy Dispatch
The executor examines the characteristics of a SelectPlan to determine the most efficient execution strategy. The dispatch logic follows a priority hierarchy:
Execution Strategy Decision Tree
graph TD
START["execute_select_with_filter(plan)"]
CHECK_COMPOUND{plan.compound\nis Some?}
CHECK_TABLES{plan.tables\nis empty?}
CHECK_GROUPBY{plan.group_by\nnot empty?}
CHECK_MULTI{plan.tables.len()\n> 1?}
CHECK_AGG{plan.aggregates\nnot empty?}
CHECK_COMPUTED{has computed\naggregates?}
COMPOUND["execute_compound_select\nUNION/EXCEPT/INTERSECT"]
NOTABLE["execute_select_without_table\nConstant evaluation"]
GROUPBY_SINGLE["execute_group_by_single_table\nGROUP BY aggregation"]
GROUPBY_CROSS["execute_cross_product\nMulti-table GROUP BY"]
CROSS["execute_cross_product\nCartesian product"]
AGGREGATE["execute_aggregates\nSingle-table aggregates"]
COMPUTED_AGG["execute_computed_aggregates\nAggregates in expressions"]
PROJECTION["execute_projection\nSimple column selection"]
RESULT["SelectExecution<P>\nwith limit/offset"]
START --> CHECK_COMPOUND
CHECK_COMPOUND -->|Yes| COMPOUND
CHECK_COMPOUND -->|No| CHECK_TABLES
CHECK_TABLES -->|Yes| NOTABLE
CHECK_TABLES -->|No| CHECK_GROUPBY
CHECK_GROUPBY -->|Yes| CHECK_MULTI
CHECK_MULTI -->|Yes| GROUPBY_CROSS
CHECK_MULTI -->|No| GROUPBY_SINGLE
CHECK_GROUPBY -->|No| CHECK_MULTI
CHECK_MULTI -->|Yes| CROSS
CHECK_MULTI -->|No| CHECK_AGG
CHECK_AGG -->|Yes| AGGREGATE
CHECK_AGG -->|No| CHECK_COMPUTED
CHECK_COMPUTED -->|Yes| COMPUTED_AGG
CHECK_COMPUTED -->|No| PROJECTION
COMPOUND --> RESULT
NOTABLE --> RESULT
GROUPBY_SINGLE --> RESULT
GROUPBY_CROSS --> RESULT
CROSS --> RESULT
AGGREGATE --> RESULT
COMPUTED_AGG --> RESULT
PROJECTION --> RESULT
The executor prioritizes specialized execution paths over generic ones, enabling optimizations tailored to specific query patterns.
Strategy Descriptions
| Strategy | Plan Characteristics | Implementation |
|---|---|---|
| Compound SELECT | plan.compound.is_some() | Executes UNION, EXCEPT, or INTERSECT operations by evaluating component queries and combining results with deduplication for DISTINCT quantifiers |
| No-Table Execution | plan.tables.is_empty() | Evaluates constant expressions like SELECT 1, 2, 3 without accessing storage |
| Group By (Single Table) | !plan.group_by.is_empty() && plan.tables.len() == 1 | Performs grouped aggregation on a single table with efficient column scanning |
| Group By (Cross Product) | !plan.group_by.is_empty() && plan.tables.len() > 1 | Computes Cartesian product before grouping |
| Cross Product | plan.tables.len() > 1 | Joins multiple tables using nested loop or hash join |
| Aggregate Execution | !plan.aggregates.is_empty() | Computes aggregates (COUNT, SUM, AVG, etc.) over a single table |
| Computed Aggregates | Aggregates within computed expressions | Evaluates expressions containing aggregate functions |
| Projection Execution | Default path | Performs column selection with optional filtering |
Sources: llkv-executor/src/lib.rs:523-563
Result Representation
The executor returns results as SelectExecution<P> instances, which encapsulate one or more Arrow RecordBatch objects along with metadata.
SelectExecution Result Types
graph TB
subgraph "SelectExecution<P>"
EXEC["SelectExecution"]
SCHEMA["schema: Arc<Schema>"]
DISPLAY["display_name: String"]
MODE{Execution\nMode}
end
subgraph "Single Batch Mode"
SINGLE["Single RecordBatch"]
BATCH1["RecordBatch\nMaterialized data"]
end
subgraph "Multi Batch Mode"
MULTI["Vec<RecordBatch>"]
BATCH2["RecordBatch[]\nMultiple batches"]
end
subgraph "Streaming Mode"
STREAM["Scan Stream"]
LAZY["Lazy evaluation"]
ITER["Iterator-based"]
end
subgraph "Post-Processing"
LIMIT["limit: Option<usize>"]
OFFSET["offset: Option<usize>"]
APPLY["Applied during\nmaterialization"]
end
EXEC --> SCHEMA
EXEC --> DISPLAY
EXEC --> MODE
MODE -->|Materialized| SINGLE
MODE -->|Compound/Sorted| MULTI
MODE -->|Large tables| STREAM
SINGLE --> BATCH1
MULTI --> BATCH2
STREAM --> LAZY
STREAM --> ITER
EXEC --> LIMIT
EXEC --> OFFSET
LIMIT --> APPLY
OFFSET --> APPLY
The SelectExecution type supports multiple result modes optimized for different query patterns. The with_limit and with_offset methods attach pagination parameters that are applied when materializing results.
Result Materialization
Callers can materialize results in several ways:
- into_rows() : Converts all batches into a
Vec<CanonicalRow>representation, applying limit and offset - stream(callback) : Invokes a callback for each batch, enabling memory-efficient processing of large result sets
- into_record_batch() : Consolidates results into a single
RecordBatch, useful for small result sets - into_batches() : Returns all batches as a vector
The streaming API is particularly important for large queries where materializing all results at once would exceed memory limits.
Sources: llkv-executor/src/lib.rs:519-563 llkv-executor/src/scan.rs:1-100
Execution Phases
Most execution strategies follow a two-phase pattern optimized for columnar storage:
Phase 1: Row ID Collection
The executor first identifies which rows satisfy the query’s filter predicates without fetching the full column data. This phase produces a bitmap or set of row IDs that match the criteria.
Row ID Collection Phase
sequenceDiagram
participant EX as QueryExecutor
participant TBL as ExecutorTable
participant SCAN as Scan Operations
participant STORE as Column Store
EX->>TBL: filter_row_ids(predicate)
TBL->>SCAN: evaluate_filter
SCAN->>STORE: Load chunk metadata
Note over SCAN,STORE: Chunk pruning using\nmin/max values
SCAN->>STORE: Load matching chunks
SCAN->>SCAN: Vectorized predicate\nevaluation (SIMD)
SCAN-->>TBL: Bitmap of matching row IDs
TBL-->>EX: Row ID set
Predicate evaluation uses chunk metadata to skip irrelevant data (Scan Execution and Optimization) and vectorized kernels for efficient matching (Filter Evaluation).
sequenceDiagram
participant EX as QueryExecutor
participant TBL as ExecutorTable
participant STORE as Column Store
participant PAGER as Storage Pager
EX->>TBL: scan_stream(projections, row_ids)
loop For each projection
TBL->>STORE: gather_rows(field_id, row_ids)
STORE->>STORE: Identify chunks containing\nrequested row IDs
STORE->>PAGER: batch_get(chunk_keys)
PAGER-->>STORE: Chunk data
STORE->>STORE: Construct Arrow array
STORE-->>TBL: ArrayRef
end
TBL->>TBL: Construct RecordBatch\nfrom arrays
TBL-->>EX: RecordBatch
Phase 2: Data Gathering
Once the matching row IDs are known, the executor fetches only the required columns for those specific rows. This minimizes I/O by avoiding unnecessary column reads.
Data Gathering Phase
The gather operation reconstructs Arrow arrays from chunked columnar storage, fetching only the columns referenced in the query’s projections.
Phase 3: Post-Processing
After data gathering, the executor applies sorting, aggregation, or other transformations as required by the plan:
| Operation | When Applied | Implementation |
|---|---|---|
| Sorting | ORDER BY clause present | Uses Arrow’s lexsort_to_indices with custom NULLS FIRST/LAST handling |
| Limiting | LIMIT clause present | Truncates result set to specified row count |
| Offset | OFFSET clause present | Skips specified number of rows before returning results |
| Aggregation | GROUP BY or aggregate functions | Materializes groups and computes aggregate values |
| Distinct | SELECT DISTINCT | Hash-based deduplication using row encoding |
Sources: llkv-executor/src/lib.rs:584-1000 llkv-executor/src/scan.rs:1-500
Subquery Execution
The executor handles subqueries through recursive evaluation, supporting both scalar subqueries and EXISTS predicates.
graph TD
EXPR["Evaluate Expression\ncontaining subquery"]
COLLECT["Collect correlated\ncolumn bindings"]
ENCODE["Encode bindings\nas cache key"]
CHECK{Cache\nhit?}
CACHED["Return cached\nLiteral"]
EXECUTE["Execute subquery\nwith bindings"]
VALIDATE["Validate result:\n1 column, ≤1 row"]
STORE["Store in cache"]
RETURN["Return Literal"]
EXPR --> COLLECT
COLLECT --> ENCODE
ENCODE --> CHECK
CHECK -->|Yes| CACHED
CHECK -->|No| EXECUTE
EXECUTE --> VALIDATE
VALIDATE --> STORE
STORE --> RETURN
CACHED --> RETURN
Scalar Subquery Evaluation
Scalar subqueries are evaluated lazily during expression computation. The executor maintains a cache (scalar_subquery_cache) to avoid re-executing identical subqueries with the same correlated bindings:
Scalar Subquery Evaluation with Caching
The caching mechanism is essential for performance when a subquery is evaluated multiple times in a cross product or aggregate context.
Parallel Subquery Execution
For queries that require evaluating the same correlated subquery across many rows, the executor batches the work and executes it in parallel using Rayon:
let job_results: Vec<ExecutorResult<Literal>> = with_thread_pool(|| {
pending_bindings
.par_iter()
.map(|bindings| self.evaluate_scalar_subquery_with_bindings(subquery, bindings))
.collect()
});
This parallelization significantly reduces execution time for subquery-heavy queries.
Sources: llkv-executor/src/lib.rs:787-961
graph LR
subgraph "SqlEngine"
PARSE["Parse SQL\n(sqlparser)"]
PLAN["Build Plan\n(llkv-plan)"]
EXEC["Execute Plan"]
end
subgraph "RuntimeEngine"
CONTEXT["RuntimeContext"]
SESSION["RuntimeSession"]
CATALOG["CatalogManager"]
end
subgraph "Executor Layer"
QEXEC["QueryExecutor"]
PROVIDER["TableProvider"]
end
PARSE --> PLAN
PLAN --> EXEC
EXEC --> CONTEXT
CONTEXT --> SESSION
CONTEXT --> CATALOG
EXEC --> QEXEC
QEXEC --> PROVIDER
PROVIDER --> CATALOG
Integration with Runtime
The SqlEngine in llkv-sql orchestrates the entire execution pipeline, bridging SQL text to query results:
SqlEngine and Executor Integration
The RuntimeEngine provides the execution context, including transaction state, catalog access, and session configuration, while the QueryExecutor focuses solely on transforming plans into results.
Sources: llkv-sql/src/sql_engine.rs:572-745 llkv-executor/src/lib.rs:504-563
Dismiss
Refresh this wiki
Enter email to refresh