This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Aggregation System
Loading…
Aggregation System
Relevant source files
The Aggregation System implements SQL aggregate functions (COUNT, SUM, AVG, MIN, MAX, etc.) across the LLKV query pipeline. It handles both simple aggregates (SELECT COUNT(*) FROM table) and grouped aggregations (SELECT col, SUM(amount) FROM table GROUP BY col), with support for the DISTINCT modifier and expression-based aggregates like SUM(col1 + col2).
For information about scalar expression evaluation (non-aggregate), see Scalar Evaluation and NumericKernels. For query planning that produces aggregate plans, see Plan Structures.
System Architecture
The aggregation system spans four layers in the codebase, each with distinct responsibilities:
Diagram: Aggregation System Layering
graph TB
subgraph "Expression Layer (llkv-expr)"
AGG_CALL["AggregateCall<F>\nCountStar, Count, Sum, Avg, Min, Max"]
SCALAR_EXPR["ScalarExpr<F>\nAggregate(AggregateCall)"]
end
subgraph "Plan Layer (llkv-plan)"
AGG_EXPR["AggregateExpr\nCountStar, Column"]
AGG_FUNC["AggregateFunction\nCount, SumInt64, MinInt64, etc."]
SELECT_PLAN["SelectPlan\naggregates: Vec<AggregateExpr>\ngroup_by: Vec<String>"]
end
subgraph "Execution Layer (llkv-executor)"
EXECUTOR["QueryExecutor::execute_aggregates\nQueryExecutor::execute_group_by_single_table"]
AGG_VALUE["AggregateValue\nNull, Int64, Float64, Decimal128, String"]
GROUP_STATE["GroupAggregateState\nrepresentative_batch_idx\nrepresentative_row\nrow_locations"]
end
subgraph "Accumulator Layer (llkv-aggregate)"
AGG_ACCUMULATOR["AggregateAccumulator\nInterface for aggregate computation"]
AGG_KIND["AggregateKind\nType classification"]
AGG_SPEC["AggregateSpec\nConfiguration"]
AGG_STATE["AggregateState\nRuntime state"]
end
SCALAR_EXPR --> AGG_CALL
SELECT_PLAN --> AGG_EXPR
AGG_EXPR --> AGG_FUNC
EXECUTOR --> AGG_VALUE
EXECUTOR --> GROUP_STATE
EXECUTOR --> AGG_ACCUMULATOR
AGG_ACCUMULATOR --> AGG_KIND
AGG_ACCUMULATOR --> AGG_SPEC
AGG_ACCUMULATOR --> AGG_STATE
AGG_CALL -.translates to.-> AGG_EXPR
SELECT_PLAN -.executes via.-> EXECUTOR
Sources:
- llkv-expr/src/expr.rs:184-215
- llkv-plan/src/plans.rs:1036-1061
- llkv-executor/src/lib.rs:109-151
- llkv-executor/src/lib.rs19
Expression-Level Aggregates
Aggregate functions are represented in the expression AST via the AggregateCall<F> enum, which enables aggregates to appear within computed projections (e.g., COUNT(*) + 1 or SUM(col1) / AVG(col2)). Each variant captures the specific aggregate semantics:
| Variant | Description | Example SQL |
|---|---|---|
CountStar | Count all rows (including NULLs) | COUNT(*) |
Count { expr, distinct } | Count non-NULL values of expression | COUNT(col), COUNT(DISTINCT col) |
Sum { expr, distinct } | Sum numeric expression values | SUM(amount), SUM(DISTINCT col) |
Total { expr, distinct } | Sum with NULL-to-zero coercion | TOTAL(amount) |
Avg { expr, distinct } | Arithmetic mean of expression | AVG(price) |
Min(expr) | Minimum value | MIN(created_at) |
Max(expr) | Maximum value | MAX(score) |
CountNulls(expr) | Count NULL occurrences | COUNT_NULLS(optional_field) |
GroupConcat { expr, distinct, separator } | Concatenate strings | GROUP_CONCAT(name, ',') |
Each aggregate operates on a ScalarExpr<F>, not just a column name, which allows complex expressions like SUM(price * quantity) or AVG(col1 + col2).
Sources:
Plan-Level Representation
The query planner converts SQL aggregate syntax into AggregateExpr instances stored in SelectPlan::aggregates. The plan layer uses a simplified representation compared to the expression layer:
Diagram: Plan-Level Aggregate Structure
graph LR
SELECT_PLAN["SelectPlan"]
AGG_LIST["aggregates: Vec<AggregateExpr>"]
GROUP_BY["group_by: Vec<String>"]
SELECT_PLAN --> AGG_LIST
SELECT_PLAN --> GROUP_BY
AGG_LIST --> COUNT_STAR["AggregateExpr::CountStar\nalias: String\ndistinct: bool"]
AGG_LIST --> COLUMN["AggregateExpr::Column\ncolumn: String\nalias: String\nfunction: AggregateFunction\ndistinct: bool"]
COLUMN --> FUNC["AggregateFunction::\nCount, SumInt64, TotalInt64,\nMinInt64, MaxInt64,\nCountNulls, GroupConcat"]
Sources:
The planner distinguishes between:
- Non-grouped aggregates : Empty
group_byvector, producing a single result row - Grouped aggregates : Populated
group_byvector, producing one row per distinct group
Execution Strategy Selection
The executor chooses different code paths based on query structure, optimizing for common patterns:
Diagram: Aggregate Execution Decision Tree
graph TD
START["QueryExecutor::execute_select"]
CHECK_COMPOUND{"plan.compound.is_some()"}
CHECK_EMPTY_TABLES{"plan.tables.is_empty()"}
CHECK_GROUP_BY{"!plan.group_by.is_empty()"}
CHECK_MULTI_TABLE{"plan.tables.len() > 1"}
CHECK_AGGREGATES{"!plan.aggregates.is_empty()"}
CHECK_COMPUTED{"has_computed_aggregates(&plan)"}
START --> CHECK_COMPOUND
CHECK_COMPOUND -->|Yes| COMPOUND["execute_compound_select"]
CHECK_COMPOUND -->|No| CHECK_EMPTY_TABLES
CHECK_EMPTY_TABLES -->|Yes| NO_TABLE["execute_select_without_table"]
CHECK_EMPTY_TABLES -->|No| CHECK_GROUP_BY
CHECK_GROUP_BY -->|Yes| CHECK_MULTI_TABLE
CHECK_MULTI_TABLE -->|Multi| CROSS_PROD["execute_cross_product"]
CHECK_MULTI_TABLE -->|Single| GROUP_BY_SINGLE["execute_group_by_single_table"]
CHECK_GROUP_BY -->|No| CHECK_MULTI_TABLE_2{"plan.tables.len() > 1"}
CHECK_MULTI_TABLE_2 -->|Yes| CROSS_PROD
CHECK_MULTI_TABLE_2 -->|No| CHECK_AGGREGATES
CHECK_AGGREGATES -->|Yes| EXEC_AGG["execute_aggregates"]
CHECK_AGGREGATES -->|No| CHECK_COMPUTED
CHECK_COMPUTED -->|Yes| COMPUTED_AGG["execute_computed_aggregates"]
CHECK_COMPUTED -->|No| PROJECTION["execute_projection"]
Sources:
Non-Grouped Aggregate Execution
execute_aggregates processes queries without GROUP BY clauses. All rows are treated as a single group:
- Projection Planning : Build
ScanProjectionlist for columns needed by aggregate expressions - Expression Translation : Convert
ScalarExpr<String>toScalarExpr<FieldId>using table schema - Data Streaming : Scan table and accumulate values via
AggregateAccumulator - Result Assembly : Finalize accumulators and construct single-row
RecordBatch
Sources:
Grouped Aggregate Execution
execute_group_by_single_table handles queries with GROUP BY clauses:
- Full Scan : Load all table rows into memory (required for grouping)
- Group Key Extraction : Evaluate
GROUP BYexpressions for each row, producingGroupKeyValueinstances - Group State Tracking : Build
FxHashMap<Vec<GroupKeyValue>, GroupAggregateState>mapping group keys to row locations - Per-Group Accumulation : For each group, process its rows through aggregate accumulators
- HAVING Filter : Apply post-aggregation filter if present
- Result Construction : Build output
RecordBatchwith one row per group
Sources:
Accumulator Interface
The llkv-aggregate crate (imported at llkv-executor/src/lib.rs19) provides the AggregateAccumulator trait, which abstracts the computation logic for individual aggregate functions. Each accumulator maintains incremental state as it processes rows:
Diagram: Accumulator Lifecycle
sequenceDiagram
participant Executor
participant Accumulator as AggregateAccumulator
participant State as AggregateState
Executor->>Accumulator: new(AggregateSpec)
Accumulator->>State: initialize()
loop For each batch
Executor->>Accumulator: update(batch, row_indices)
Accumulator->>State: accumulate values
end
Executor->>Accumulator: finalize()
Accumulator->>State: compute final value
Accumulator-->>Executor: AggregateValue
Sources:
The executor wraps accumulator results in AggregateValue, which handles type conversions between the accumulator’s output type and the plan’s expected type:
AggregateValue Variant | Usage |
|---|---|
Null | No rows matched, or all values were NULL |
Int64(i64) | Integer aggregates (COUNT, SUM for integers) |
Float64(f64) | Floating-point aggregates (AVG, SUM for floats) |
Decimal128 { value: i128, scale: i8 } | Precise decimal aggregates |
String(String) | String aggregates (GROUP_CONCAT) |
Sources:
graph TD
START["Aggregate with distinct=true"]
INIT["Initialize FxHashSet<Vec<u8>>\nfor distinct tracking"]
LOOP_START["For each input row"]
EXTRACT["Extract aggregate expression value"]
ENCODE["Encode value as byte vector\nusing encode_row()"]
CHECK_SEEN{"Value already\nin set?"}
SKIP["Skip row\n(duplicate)"]
INSERT["Insert into set"]
ACCUMULATE["Pass to accumulator"]
LOOP_END["Next row"]
FINALIZE["Finalize accumulator"]
START --> INIT
INIT --> LOOP_START
LOOP_START --> EXTRACT
EXTRACT --> ENCODE
ENCODE --> CHECK_SEEN
CHECK_SEEN -->|Yes| SKIP
CHECK_SEEN -->|No| INSERT
INSERT --> ACCUMULATE
SKIP --> LOOP_END
ACCUMULATE --> LOOP_END
LOOP_END --> LOOP_START
LOOP_END -.all rows.-> FINALIZE
Distinct Value Tracking
When an aggregate includes the DISTINCT modifier (e.g., COUNT(DISTINCT col)), the executor must deduplicate values before accumulation. This is handled via hash-based tracking:
Diagram: DISTINCT Aggregate Processing
The encode_row function (referenced throughout llkv-executor/src/lib.rs) converts values to a canonical byte representation suitable for hash-based deduplication.
Sources:
graph LR
INPUT["Input Batch"]
subgraph "Expression Evaluation"
TRANSLATE["translate_scalar\n(String → FieldId)"]
EVAL_NUMERIC["NumericKernels::evaluate_numeric"]
RESULT_ARRAY["Computed ArrayRef"]
end
subgraph "Accumulation"
EXTRACT["Extract values from array"]
ACCUMULATE["AggregateAccumulator::update"]
end
INPUT --> TRANSLATE
TRANSLATE --> EVAL_NUMERIC
EVAL_NUMERIC --> RESULT_ARRAY
RESULT_ARRAY --> EXTRACT
EXTRACT --> ACCUMULATE
Expression-Based Aggregates
Unlike simple column aggregates, expression-based aggregates (e.g., SUM(col1 * col2) or AVG(CASE WHEN x > 0 THEN x ELSE 0 END)) require evaluating the expression for each row before accumulating:
Diagram: Expression Aggregate Evaluation
The executor uses ensure_computed_projection to translate expression trees and infer result data types:
Sources:
This helper ensures the expression is added to the scan projection list only once (via caching), avoiding redundant computation when multiple aggregates reference the same expression.
Simple vs Complex Column Extraction
The function try_extract_simple_column optimizes aggregate evaluation by detecting when an aggregate expression is equivalent to a direct column reference:
This optimization allows the executor to skip expression evaluation machinery for common cases, reading column data directly from the column store.
Sources:
graph TD
AGG_VALUE["AggregateValue"]
AS_I64["as_i64() → Option<i64>"]
AS_F64["as_f64() → Option<f64>"]
AGG_VALUE --> AS_I64
AGG_VALUE --> AS_F64
AS_I64 --> NULL_CHECK1{"Null?"}
NULL_CHECK1 -->|Yes| NONE1["None"]
NULL_CHECK1 -->|No| TYPE_CHECK1{"Type?"}
TYPE_CHECK1 -->|Int64| DIRECT_I64["Some(value)"]
TYPE_CHECK1 -->|Float64| TRUNC["Some(value as i64)"]
TYPE_CHECK1 -->|Decimal128| SCALE_DOWN["Some(value / 10^scale)"]
TYPE_CHECK1 -->|String| PARSE_I64["s.parse::<i64>().ok()"]
AS_F64 --> NULL_CHECK2{"Null?"}
NULL_CHECK2 -->|Yes| NONE2["None"]
NULL_CHECK2 -->|No| TYPE_CHECK2{"Type?"}
TYPE_CHECK2 -->|Int64| PROMOTE["Some(value as f64)"]
TYPE_CHECK2 -->|Float64| DIRECT_F64["Some(value)"]
TYPE_CHECK2 -->|Decimal128| DIVIDE["Some(value / 10.0^scale)"]
TYPE_CHECK2 -->|String| PARSE_F64["s.parse::<f64>().ok()"]
Aggregate Result Types and Conversions
AggregateValue provides conversion methods to satisfy different consumer requirements:
Diagram: AggregateValue Type Conversions
Sources:
These conversions enable:
- Order By : Converting aggregate results to sortable numeric types
- HAVING Filters : Evaluating post-aggregate predicates that compare aggregate values
- Nested Aggregates : Using one aggregate’s result in another’s computation (rare, but supported in computed projections)
graph LR
EXPR["GROUP BY expression"]
EVAL["Evaluate for each row"]
subgraph "GroupKeyValue Variants"
NULL_VAL["Null"]
INT_VAL["Int(i64)"]
BOOL_VAL["Bool(bool)"]
STRING_VAL["String(String)"]
end
ENCODE["encode_row()\nVec<GroupKeyValue> → Vec<u8>"]
MAP["FxHashMap<Vec<u8>, GroupAggregateState>"]
EXPR --> EVAL
EVAL --> NULL_VAL
EVAL --> INT_VAL
EVAL --> BOOL_VAL
EVAL --> STRING_VAL
NULL_VAL --> ENCODE
INT_VAL --> ENCODE
BOOL_VAL --> ENCODE
STRING_VAL --> ENCODE
ENCODE --> MAP
Group Key Representation
For grouped aggregations, the executor encodes group-by expressions into GroupKeyValue instances, which form composite keys in the group state map:
Diagram: Group Key Encoding
Sources:
The GroupAggregateState struct tracks which rows belong to each group:
This representation enables efficient accumulation: for each group, the executor iterates row_locations and passes those rows to the aggregate accumulators.
Sources:
sequenceDiagram
participant Exec as QueryExecutor
participant Scan as Table Scan
participant Accum as Accumulators
participant Eval as NumericKernels
Exec->>Scan: Scan all rows
Scan-->>Exec: RecordBatch
Exec->>Exec: Identify aggregate calls\nin projections
loop For each aggregate
Exec->>Accum: create accumulator
Exec->>Accum: update(batch)
Accum-->>Exec: finalized value
end
Exec->>Exec: Inject aggregate values\nas synthetic columns
loop For each projection
Exec->>Eval: evaluate_numeric()\nwith synthetic columns
Eval-->>Exec: computed ArrayRef
end
Exec->>Exec: Construct final RecordBatch
Computed Aggregates in Projections
When a SELECT list includes computed expressions containing aggregate functions (e.g., SELECT COUNT(*) * 2, SUM(x) + AVG(y)), the executor uses execute_computed_aggregates:
Diagram: Computed Aggregate Flow
Sources:
This execution path:
- Scans the table once to collect all rows
- Evaluates aggregate functions to produce scalar values
- Injects those scalars into a temporary evaluation context as synthetic columns
- Evaluates the projection expressions referencing those synthetic columns
- Assembles the final result batch
This approach allows arbitrary nesting of aggregates within expressions while maintaining correctness.
Performance Considerations
The aggregation system makes several trade-offs:
| Strategy | Benefit | Cost |
|---|---|---|
AggregateAccumulator trait abstraction | Pluggable aggregate implementations | Indirect call overhead |
| Full batch materialization for GROUP BY | Simple implementation, works for any key type | High memory usage for large result sets |
| Hash-based DISTINCT tracking | Correct deduplication | Memory proportional to cardinality |
| Expression evaluation per row | Supports complex aggregates | Cannot leverage predicate pushdown |
FxHashMap for grouping | Fast hashing for typical keys | Collision risk with adversarial inputs |
For aggregates over large datasets, consider:
- Predicate pushdown : Filter rows before aggregation
- Projection pruning : Only scan columns needed by aggregate expressions
- Index-assisted aggregation : Use indexes for MIN/MAX when possible (not currently implemented)
Sources:
Dismiss
Refresh this wiki
Enter email to refresh