Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Aggregation System

Loading…

Aggregation System

Relevant source files

The Aggregation System implements SQL aggregate functions (COUNT, SUM, AVG, MIN, MAX, etc.) across the LLKV query pipeline. It handles both simple aggregates (SELECT COUNT(*) FROM table) and grouped aggregations (SELECT col, SUM(amount) FROM table GROUP BY col), with support for the DISTINCT modifier and expression-based aggregates like SUM(col1 + col2).

For information about scalar expression evaluation (non-aggregate), see Scalar Evaluation and NumericKernels. For query planning that produces aggregate plans, see Plan Structures.


System Architecture

The aggregation system spans four layers in the codebase, each with distinct responsibilities:

Diagram: Aggregation System Layering

graph TB
    subgraph "Expression Layer (llkv-expr)"
        AGG_CALL["AggregateCall<F>\nCountStar, Count, Sum, Avg, Min, Max"]
SCALAR_EXPR["ScalarExpr<F>\nAggregate(AggregateCall)"]
end
    
    subgraph "Plan Layer (llkv-plan)"
        AGG_EXPR["AggregateExpr\nCountStar, Column"]
AGG_FUNC["AggregateFunction\nCount, SumInt64, MinInt64, etc."]
SELECT_PLAN["SelectPlan\naggregates: Vec<AggregateExpr>\ngroup_by: Vec<String>"]
end
    
    subgraph "Execution Layer (llkv-executor)"
        EXECUTOR["QueryExecutor::execute_aggregates\nQueryExecutor::execute_group_by_single_table"]
AGG_VALUE["AggregateValue\nNull, Int64, Float64, Decimal128, String"]
GROUP_STATE["GroupAggregateState\nrepresentative_batch_idx\nrepresentative_row\nrow_locations"]
end
    
    subgraph "Accumulator Layer (llkv-aggregate)"
        AGG_ACCUMULATOR["AggregateAccumulator\nInterface for aggregate computation"]
AGG_KIND["AggregateKind\nType classification"]
AGG_SPEC["AggregateSpec\nConfiguration"]
AGG_STATE["AggregateState\nRuntime state"]
end
    
 
   SCALAR_EXPR --> AGG_CALL
 
   SELECT_PLAN --> AGG_EXPR
 
   AGG_EXPR --> AGG_FUNC
 
   EXECUTOR --> AGG_VALUE
 
   EXECUTOR --> GROUP_STATE
 
   EXECUTOR --> AGG_ACCUMULATOR
 
   AGG_ACCUMULATOR --> AGG_KIND
 
   AGG_ACCUMULATOR --> AGG_SPEC
 
   AGG_ACCUMULATOR --> AGG_STATE
    
    AGG_CALL -.translates to.-> AGG_EXPR
    SELECT_PLAN -.executes via.-> EXECUTOR

Sources:


Expression-Level Aggregates

Aggregate functions are represented in the expression AST via the AggregateCall<F> enum, which enables aggregates to appear within computed projections (e.g., COUNT(*) + 1 or SUM(col1) / AVG(col2)). Each variant captures the specific aggregate semantics:

VariantDescriptionExample SQL
CountStarCount all rows (including NULLs)COUNT(*)
Count { expr, distinct }Count non-NULL values of expressionCOUNT(col), COUNT(DISTINCT col)
Sum { expr, distinct }Sum numeric expression valuesSUM(amount), SUM(DISTINCT col)
Total { expr, distinct }Sum with NULL-to-zero coercionTOTAL(amount)
Avg { expr, distinct }Arithmetic mean of expressionAVG(price)
Min(expr)Minimum valueMIN(created_at)
Max(expr)Maximum valueMAX(score)
CountNulls(expr)Count NULL occurrencesCOUNT_NULLS(optional_field)
GroupConcat { expr, distinct, separator }Concatenate stringsGROUP_CONCAT(name, ',')

Each aggregate operates on a ScalarExpr<F>, not just a column name, which allows complex expressions like SUM(price * quantity) or AVG(col1 + col2).

Sources:


Plan-Level Representation

The query planner converts SQL aggregate syntax into AggregateExpr instances stored in SelectPlan::aggregates. The plan layer uses a simplified representation compared to the expression layer:

Diagram: Plan-Level Aggregate Structure

graph LR
    SELECT_PLAN["SelectPlan"]
AGG_LIST["aggregates: Vec&lt;AggregateExpr&gt;"]
GROUP_BY["group_by: Vec&lt;String&gt;"]
SELECT_PLAN --> AGG_LIST
 
   SELECT_PLAN --> GROUP_BY
    
 
   AGG_LIST --> COUNT_STAR["AggregateExpr::CountStar\nalias: String\ndistinct: bool"]
AGG_LIST --> COLUMN["AggregateExpr::Column\ncolumn: String\nalias: String\nfunction: AggregateFunction\ndistinct: bool"]
COLUMN --> FUNC["AggregateFunction::\nCount, SumInt64, TotalInt64,\nMinInt64, MaxInt64,\nCountNulls, GroupConcat"]

Sources:

The planner distinguishes between:

  • Non-grouped aggregates : Empty group_by vector, producing a single result row
  • Grouped aggregates : Populated group_by vector, producing one row per distinct group

Execution Strategy Selection

The executor chooses different code paths based on query structure, optimizing for common patterns:

Diagram: Aggregate Execution Decision Tree

graph TD
    START["QueryExecutor::execute_select"]
CHECK_COMPOUND{"plan.compound.is_some()"}
CHECK_EMPTY_TABLES{"plan.tables.is_empty()"}
CHECK_GROUP_BY{"!plan.group_by.is_empty()"}
CHECK_MULTI_TABLE{"plan.tables.len() > 1"}
CHECK_AGGREGATES{"!plan.aggregates.is_empty()"}
CHECK_COMPUTED{"has_computed_aggregates(&plan)"}
START --> CHECK_COMPOUND
 
   CHECK_COMPOUND -->|Yes| COMPOUND["execute_compound_select"]
CHECK_COMPOUND -->|No| CHECK_EMPTY_TABLES
    
 
   CHECK_EMPTY_TABLES -->|Yes| NO_TABLE["execute_select_without_table"]
CHECK_EMPTY_TABLES -->|No| CHECK_GROUP_BY
    
 
   CHECK_GROUP_BY -->|Yes| CHECK_MULTI_TABLE
 
   CHECK_MULTI_TABLE -->|Multi| CROSS_PROD["execute_cross_product"]
CHECK_MULTI_TABLE -->|Single| GROUP_BY_SINGLE["execute_group_by_single_table"]
CHECK_GROUP_BY -->|No| CHECK_MULTI_TABLE_2{"plan.tables.len() > 1"}
CHECK_MULTI_TABLE_2 -->|Yes| CROSS_PROD
 
   CHECK_MULTI_TABLE_2 -->|No| CHECK_AGGREGATES
    
 
   CHECK_AGGREGATES -->|Yes| EXEC_AGG["execute_aggregates"]
CHECK_AGGREGATES -->|No| CHECK_COMPUTED
    
 
   CHECK_COMPUTED -->|Yes| COMPUTED_AGG["execute_computed_aggregates"]
CHECK_COMPUTED -->|No| PROJECTION["execute_projection"]

Sources:

Non-Grouped Aggregate Execution

execute_aggregates processes queries without GROUP BY clauses. All rows are treated as a single group:

  1. Projection Planning : Build ScanProjection list for columns needed by aggregate expressions
  2. Expression Translation : Convert ScalarExpr<String> to ScalarExpr<FieldId> using table schema
  3. Data Streaming : Scan table and accumulate values via AggregateAccumulator
  4. Result Assembly : Finalize accumulators and construct single-row RecordBatch

Sources:

Grouped Aggregate Execution

execute_group_by_single_table handles queries with GROUP BY clauses:

  1. Full Scan : Load all table rows into memory (required for grouping)
  2. Group Key Extraction : Evaluate GROUP BY expressions for each row, producing GroupKeyValue instances
  3. Group State Tracking : Build FxHashMap<Vec<GroupKeyValue>, GroupAggregateState> mapping group keys to row locations
  4. Per-Group Accumulation : For each group, process its rows through aggregate accumulators
  5. HAVING Filter : Apply post-aggregation filter if present
  6. Result Construction : Build output RecordBatch with one row per group

Sources:


Accumulator Interface

The llkv-aggregate crate (imported at llkv-executor/src/lib.rs19) provides the AggregateAccumulator trait, which abstracts the computation logic for individual aggregate functions. Each accumulator maintains incremental state as it processes rows:

Diagram: Accumulator Lifecycle

sequenceDiagram
    participant Executor
    participant Accumulator as AggregateAccumulator
    participant State as AggregateState
    
    Executor->>Accumulator: new(AggregateSpec)
    Accumulator->>State: initialize()
    
    loop For each batch
        Executor->>Accumulator: update(batch, row_indices)
        Accumulator->>State: accumulate values
    end
    
    Executor->>Accumulator: finalize()
    Accumulator->>State: compute final value
    Accumulator-->>Executor: AggregateValue

Sources:

The executor wraps accumulator results in AggregateValue, which handles type conversions between the accumulator’s output type and the plan’s expected type:

AggregateValue VariantUsage
NullNo rows matched, or all values were NULL
Int64(i64)Integer aggregates (COUNT, SUM for integers)
Float64(f64)Floating-point aggregates (AVG, SUM for floats)
Decimal128 { value: i128, scale: i8 }Precise decimal aggregates
String(String)String aggregates (GROUP_CONCAT)

Sources:


graph TD
    START["Aggregate with distinct=true"]
INIT["Initialize FxHashSet&lt;Vec&lt;u8&gt;&gt;\nfor distinct tracking"]
LOOP_START["For each input row"]
EXTRACT["Extract aggregate expression value"]
ENCODE["Encode value as byte vector\nusing encode_row()"]
CHECK_SEEN{"Value already\nin set?"}
SKIP["Skip row\n(duplicate)"]
INSERT["Insert into set"]
ACCUMULATE["Pass to accumulator"]
LOOP_END["Next row"]
FINALIZE["Finalize accumulator"]
START --> INIT
 
   INIT --> LOOP_START
 
   LOOP_START --> EXTRACT
 
   EXTRACT --> ENCODE
 
   ENCODE --> CHECK_SEEN
 
   CHECK_SEEN -->|Yes| SKIP
 
   CHECK_SEEN -->|No| INSERT
 
   INSERT --> ACCUMULATE
 
   SKIP --> LOOP_END
 
   ACCUMULATE --> LOOP_END
 
   LOOP_END --> LOOP_START
    LOOP_END -.all rows.-> FINALIZE

Distinct Value Tracking

When an aggregate includes the DISTINCT modifier (e.g., COUNT(DISTINCT col)), the executor must deduplicate values before accumulation. This is handled via hash-based tracking:

Diagram: DISTINCT Aggregate Processing

The encode_row function (referenced throughout llkv-executor/src/lib.rs) converts values to a canonical byte representation suitable for hash-based deduplication.

Sources:


graph LR
    INPUT["Input Batch"]
subgraph "Expression Evaluation"
        TRANSLATE["translate_scalar\n(String → FieldId)"]
EVAL_NUMERIC["NumericKernels::evaluate_numeric"]
RESULT_ARRAY["Computed ArrayRef"]
end
    
    subgraph "Accumulation"
        EXTRACT["Extract values from array"]
ACCUMULATE["AggregateAccumulator::update"]
end
    
 
   INPUT --> TRANSLATE
 
   TRANSLATE --> EVAL_NUMERIC
 
   EVAL_NUMERIC --> RESULT_ARRAY
 
   RESULT_ARRAY --> EXTRACT
 
   EXTRACT --> ACCUMULATE

Expression-Based Aggregates

Unlike simple column aggregates, expression-based aggregates (e.g., SUM(col1 * col2) or AVG(CASE WHEN x > 0 THEN x ELSE 0 END)) require evaluating the expression for each row before accumulating:

Diagram: Expression Aggregate Evaluation

The executor uses ensure_computed_projection to translate expression trees and infer result data types:

Sources:

This helper ensures the expression is added to the scan projection list only once (via caching), avoiding redundant computation when multiple aggregates reference the same expression.


Simple vs Complex Column Extraction

The function try_extract_simple_column optimizes aggregate evaluation by detecting when an aggregate expression is equivalent to a direct column reference:

This optimization allows the executor to skip expression evaluation machinery for common cases, reading column data directly from the column store.

Sources:


graph TD
    AGG_VALUE["AggregateValue"]
AS_I64["as_i64() → Option&lt;i64&gt;"]
AS_F64["as_f64() → Option&lt;f64&gt;"]
AGG_VALUE --> AS_I64
 
   AGG_VALUE --> AS_F64
    
 
   AS_I64 --> NULL_CHECK1{"Null?"}
NULL_CHECK1 -->|Yes| NONE1["None"]
NULL_CHECK1 -->|No| TYPE_CHECK1{"Type?"}
TYPE_CHECK1 -->|Int64| DIRECT_I64["Some(value)"]
TYPE_CHECK1 -->|Float64| TRUNC["Some(value as i64)"]
TYPE_CHECK1 -->|Decimal128| SCALE_DOWN["Some(value / 10^scale)"]
TYPE_CHECK1 -->|String| PARSE_I64["s.parse::&lt;i64&gt;().ok()"]
AS_F64 --> NULL_CHECK2{"Null?"}
NULL_CHECK2 -->|Yes| NONE2["None"]
NULL_CHECK2 -->|No| TYPE_CHECK2{"Type?"}
TYPE_CHECK2 -->|Int64| PROMOTE["Some(value as f64)"]
TYPE_CHECK2 -->|Float64| DIRECT_F64["Some(value)"]
TYPE_CHECK2 -->|Decimal128| DIVIDE["Some(value / 10.0^scale)"]
TYPE_CHECK2 -->|String| PARSE_F64["s.parse::&lt;f64&gt;().ok()"]

Aggregate Result Types and Conversions

AggregateValue provides conversion methods to satisfy different consumer requirements:

Diagram: AggregateValue Type Conversions

Sources:

These conversions enable:

  • Order By : Converting aggregate results to sortable numeric types
  • HAVING Filters : Evaluating post-aggregate predicates that compare aggregate values
  • Nested Aggregates : Using one aggregate’s result in another’s computation (rare, but supported in computed projections)

graph LR
    EXPR["GROUP BY expression"]
EVAL["Evaluate for each row"]
subgraph "GroupKeyValue Variants"
        NULL_VAL["Null"]
INT_VAL["Int(i64)"]
BOOL_VAL["Bool(bool)"]
STRING_VAL["String(String)"]
end
    
    ENCODE["encode_row()\nVec&lt;GroupKeyValue&gt; → Vec&lt;u8&gt;"]
MAP["FxHashMap&lt;Vec&lt;u8&gt;, GroupAggregateState&gt;"]
EXPR --> EVAL
 
   EVAL --> NULL_VAL
 
   EVAL --> INT_VAL
 
   EVAL --> BOOL_VAL
 
   EVAL --> STRING_VAL
    
 
   NULL_VAL --> ENCODE
 
   INT_VAL --> ENCODE
 
   BOOL_VAL --> ENCODE
 
   STRING_VAL --> ENCODE
    
 
   ENCODE --> MAP

Group Key Representation

For grouped aggregations, the executor encodes group-by expressions into GroupKeyValue instances, which form composite keys in the group state map:

Diagram: Group Key Encoding

Sources:

The GroupAggregateState struct tracks which rows belong to each group:

This representation enables efficient accumulation: for each group, the executor iterates row_locations and passes those rows to the aggregate accumulators.

Sources:


sequenceDiagram
    participant Exec as QueryExecutor
    participant Scan as Table Scan
    participant Accum as Accumulators
    participant Eval as NumericKernels
    
    Exec->>Scan: Scan all rows
    Scan-->>Exec: RecordBatch
    
    Exec->>Exec: Identify aggregate calls\nin projections
    
    loop For each aggregate
        Exec->>Accum: create accumulator
        Exec->>Accum: update(batch)
        Accum-->>Exec: finalized value
    end
    
    Exec->>Exec: Inject aggregate values\nas synthetic columns
    
    loop For each projection
        Exec->>Eval: evaluate_numeric()\nwith synthetic columns
        Eval-->>Exec: computed ArrayRef
    end
    
    Exec->>Exec: Construct final RecordBatch

Computed Aggregates in Projections

When a SELECT list includes computed expressions containing aggregate functions (e.g., SELECT COUNT(*) * 2, SUM(x) + AVG(y)), the executor uses execute_computed_aggregates:

Diagram: Computed Aggregate Flow

Sources:

This execution path:

  1. Scans the table once to collect all rows
  2. Evaluates aggregate functions to produce scalar values
  3. Injects those scalars into a temporary evaluation context as synthetic columns
  4. Evaluates the projection expressions referencing those synthetic columns
  5. Assembles the final result batch

This approach allows arbitrary nesting of aggregates within expressions while maintaining correctness.


Performance Considerations

The aggregation system makes several trade-offs:

StrategyBenefitCost
AggregateAccumulator trait abstractionPluggable aggregate implementationsIndirect call overhead
Full batch materialization for GROUP BYSimple implementation, works for any key typeHigh memory usage for large result sets
Hash-based DISTINCT trackingCorrect deduplicationMemory proportional to cardinality
Expression evaluation per rowSupports complex aggregatesCannot leverage predicate pushdown
FxHashMap for groupingFast hashing for typical keysCollision risk with adversarial inputs

For aggregates over large datasets, consider:

  • Predicate pushdown : Filter rows before aggregation
  • Projection pruning : Only scan columns needed by aggregate expressions
  • Index-assisted aggregation : Use indexes for MIN/MAX when possible (not currently implemented)

Sources:

Dismiss

Refresh this wiki

Enter email to refresh