Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Aggregation System

Relevant source files

The aggregation system evaluates SQL aggregate functions (COUNT, SUM, AVG, MIN, MAX, etc.) over Arrow RecordBatch streams. It consists of a planning layer that defines aggregate specifications and an execution layer that performs incremental accumulation with overflow checking and DISTINCT value tracking.

For information about scalar expression evaluation, see Scalar Evaluation and NumericKernels. For query execution orchestration, see Query Execution.

Architecture Overview

The aggregation system operates across three crates:

Sources: llkv-aggregate/src/lib.rs:1-1935 llkv-executor/src/lib.rs:1-3599 llkv-plan/src/plans.rs:1-1458

The planner creates AggregateExpr instances from SQL AST nodes, which the executor converts to AggregateSpec descriptors. These specs initialize AggregateAccumulator instances that process batches incrementally, accumulating values in memory. The AggregateState wraps the accumulator with metadata (alias, override values) and produces the final output arrays.

Aggregate Specification

AggregateSpec Structure

AggregateSpec defines an aggregate operation at plan-time:

FieldTypePurpose
aliasStringOutput column name for the aggregate result
kindAggregateKindType of aggregate operation and its parameters

Sources: llkv-aggregate/src/lib.rs:23-27

AggregateKind Variants

Sources: llkv-aggregate/src/lib.rs:30-67

Each variant captures the field ID to aggregate over, the expected data type, and operation-specific flags like distinct or separator. The field_id is optional for COUNT(*) which counts all rows regardless of column values.

Accumulator System

Accumulator Variants

AggregateAccumulator implements streaming accumulation for each aggregate type:

Sources: llkv-aggregate/src/lib.rs:92-247

graph TB
    subgraph "COUNT Variants"
        CS[CountStar\nvalue: i64]
        CC[CountColumn\ncolumn_index: usize\nvalue: i64]
        CDC[CountDistinctColumn\ncolumn_index: usize\nseen: FxHashSet]
    end
    
    subgraph "SUM Variants"
        SI64[SumInt64\nvalue: Option-i64-\nhas_values: bool]
        SDI64[SumDistinctInt64\nsum: Option-i64-\nseen: FxHashSet]
        SF64[SumFloat64\nvalue: f64\nsaw_value: bool]
        SDF64[SumDistinctFloat64\nsum: f64\nseen: FxHashSet]
        SD128[SumDecimal128\nsum: i128\nprecision: u8\nscale: i8]
    end
    
    subgraph "AVG Variants"
        AI64[AvgInt64\nsum: i64\ncount: i64]
        ADI64[AvgDistinctInt64\nsum: i64\nseen: FxHashSet]
        AF64[AvgFloat64\nsum: f64\ncount: i64]
    end
    
    subgraph "MIN/MAX Variants"
        MinI64[MinInt64\nvalue: Option-i64-]
        MaxI64[MaxInt64\nvalue: Option-i64-]
        MinF64[MinFloat64\nvalue: Option-f64-]
        MaxF64[MaxFloat64\nvalue: Option-f64-]
    end

Each accumulator variant is specialized for its data type and operation semantics. Integer accumulators track overflow using Option<i64> (None indicates overflow), while float accumulators use f64 which never overflows. Distinct variants maintain a FxHashSet of seen values.

sequenceDiagram
    participant Executor
    participant AggregateSpec
    participant AggregateAccumulator
    participant RecordBatch
    participant OutputArray
    
    Executor->>AggregateSpec: new_with_projection_index()
    AggregateSpec->>AggregateAccumulator: Create accumulator
    
    loop For each batch
        Executor->>RecordBatch: Stream next batch
        RecordBatch->>AggregateAccumulator: update(batch)
        Note over AggregateAccumulator: Accumulate values\nCheck overflow\nTrack distinct keys
    end
    
    Executor->>AggregateAccumulator: finalize()
    AggregateAccumulator->>OutputArray: (Field, ArrayRef)
    OutputArray->>Executor: Return result

Accumulator Lifecycle

Sources: llkv-aggregate/src/lib.rs:460-746 llkv-aggregate/src/lib.rs:748-1440 llkv-aggregate/src/lib.rs:1442-1934

The accumulator is initialized with a projection index indicating which column in the RecordBatch to aggregate. The update() method processes each batch incrementally, and finalize() produces the final Arrow array and field schema.

Distinct Value Tracking

DistinctKey Enumeration

The system tracks distinct values using a hash-based approach:

VariantTypePurpose
Int(i64)Integer valuesExact integer comparison
Float(u64)Float bit patternBitwise float equality
Str(String)String valuesText comparison
Bool(bool)Boolean valuesTrue/false comparison
Date(i32)Date32 valuesDate comparison
Decimal(i128)Decimal raw valueExact decimal comparison

Sources: llkv-aggregate/src/lib.rs:249-257 llkv-aggregate/src/lib.rs:259-333

Float values are converted to bit patterns (to_bits()) to enable hash-based deduplication while preserving NaN and infinity semantics. Decimal values use raw i128 representation for exact comparison without scale conversion.

Distinct Accumulation Example

For COUNT(DISTINCT column), the accumulator inserts each non-null value into the hash set:

Sources: llkv-aggregate/src/lib.rs:785-798 llkv-aggregate/src/lib.rs:1465-1473

graph LR
    Batch1[RecordBatch 1\nvalues: 1,2,3]
    Batch2[RecordBatch 2\nvalues: 2,3,4]
    Batch3[RecordBatch 3\nvalues: 1,4,5]
    
 
   Batch1 --> HS1[seen: {1,2,3}]
 
   Batch2 --> HS2[seen: {1,2,3,4}]
 
   Batch3 --> HS3[seen: {1,2,3,4,5}]
    
 
   HS3 --> Result[COUNT: 5]

The hash set automatically deduplicates values across batches. Only the set size is returned as the final count, avoiding materialization of the entire set in the output.

Aggregate Functions

COUNT Family

FunctionNull HandlingReturn TypeOverflow
COUNT(*)Counts all rowsInt64Checked
COUNT(column)Skips NULL valuesInt64Checked
COUNT(DISTINCT column)Skips NULL, deduplicatesInt64Checked

Sources: llkv-aggregate/src/lib.rs:467-485 llkv-aggregate/src/lib.rs:759-783 llkv-aggregate/src/lib.rs:1452-1473

COUNT operations verify that the result fits in i64 range. COUNT(*) accumulates batch row counts directly. COUNT(column) filters invalid (NULL) rows using array.is_valid(i). COUNT(DISTINCT) maintains a hash set and returns its size.

SUM and TOTAL

FunctionOverflow BehaviorReturn TypeNULL Result
SUM(int_column)Returns errorInt64NULL if no values
SUM(float_column)Accumulates infinitiesFloat64NULL if no values
TOTAL(int_column)Converts to Float64Float640.0 if no values
TOTAL(float_column)Accumulates infinitiesFloat640.0 if no values

Sources: llkv-aggregate/src/lib.rs:486-541 llkv-aggregate/src/lib.rs:799-824 llkv-aggregate/src/lib.rs:958-975

graph LR
    Input[Input Column]
    Sum[Accumulate Sum]
    Count[Count Non-NULL]
    Div[Divide sum/count]
    Output[Float64 Result]
    
 
   Input --> Sum
 
   Input --> Count
 
   Sum --> Div
 
   Count --> Div
 
   Div --> Output

SUM uses checked_add for integers and returns an error on overflow. TOTAL never overflows because it accumulates as Float64 even for integer columns. The key difference is NULL handling: SUM returns NULL for empty input, TOTAL returns 0.0.

AVG (Average)

Sources: llkv-aggregate/src/lib.rs:598-654 llkv-aggregate/src/lib.rs:1096-1121 llkv-aggregate/src/lib.rs:1635-1645

AVG maintains separate sum and count accumulators. During finalization, it divides sum / count to produce a Float64 result. Integer sums are converted to Float64 for the division. If count is zero, AVG returns NULL.

MIN and MAX

Data TypeComparison StrategyNULL Handling
Int64i64::min() / i64::max()Skip NULL values
Float64partial_cmp() with NaN handlingSkip NULL values
Decimal128i128::min() / i128::max() on raw valuesSkip NULL values
StringNumeric coercion via array_value_to_numeric()Skip NULL values

Sources: llkv-aggregate/src/lib.rs:656-710 llkv-aggregate/src/lib.rs:1259-1277 llkv-aggregate/src/lib.rs:1279-1300

MIN/MAX start with None and update to Some(value) on the first non-NULL entry. Subsequent values are compared using type-specific logic. Float comparisons use partial_cmp() to handle NaN values correctly.

graph LR
    Values["Column Values:\n42, 'hello', 3.14"]
Convert[Convert to Strings:\n'42', 'hello', '3.14']
    Join["Join with separator\n(default: ',')"]
Result["Result: '42,hello,3.14'"]
Values --> Convert
 
   Convert --> Join
 
   Join --> Result

GROUP_CONCAT

GROUP_CONCAT concatenates string representations of column values with a separator:

Sources: llkv-aggregate/src/lib.rs:722-744 llkv-aggregate/src/lib.rs:1409-1437 llkv-aggregate/src/lib.rs:1847-1874

The accumulator collects string representations using array_value_to_string() which coerces integers, floats, and booleans to text. DISTINCT variants track seen values in a hash set. Finalization joins the strings with the specified separator (default: ',').

Group-by Integration

Grouping Key Extraction

For GROUP BY queries, the executor extracts grouping keys from each row:

Sources: llkv-executor/src/lib.rs:1097-1173

sequenceDiagram
    participant Executor
    participant GroupMap
    participant AggregateState
    participant Accumulator
    
    loop For each batch
        Executor->>Executor: Extract group keys
        loop For each group
            Executor->>GroupMap: Get or create group
            Executor->>AggregateState: Get accumulators for group
            Executor->>Accumulator: Filter batch to group rows
            Executor->>Accumulator: update(filtered_batch)
        end
    end
    
    Executor->>GroupMap: Iterate all groups
    loop For each group
        Executor->>AggregateState: finalize()
        AggregateState->>Executor: Return aggregate arrays
    end

Each unique combination of group-by column values maps to a separate GroupAggregateState which tracks the representative row and a list of matching row locations across batches.

Aggregate Accumulation per Group

Sources: llkv-executor/src/lib.rs:1174-1383

The executor maintains separate accumulators for each group. When processing a batch, it filters rows by group membership using RowIdFilter and updates each group's accumulators independently. This ensures that SUM(sales) for group 'USA' only accumulates sales records where country='USA'.

Output Construction

After processing all batches, the executor constructs the output RecordBatch:

Column TypeSourceConstruction
Group-by columnsRepresentative rowsGathered from original batches
Aggregate columnsFinalized accumulatorsConverted to Arrow arrays

Sources: llkv-executor/src/lib.rs:1384-1467

The system gathers one representative row per group for the group-by columns, then appends the finalized aggregate arrays as additional columns. This produces a result like:

+----------+---------+
| country  | SUM(sales) |
+----------+---------+
| USA      | 1500000 |
| Canada   | 750000  |
+----------+---------+
graph LR
    StringCol["String Column\n'42', 'hello', '3.14'"]
Parse1["'42' → 42.0"]
Parse2["'hello' → 0.0"]
Parse3["'3.14' → 3.14"]
Sum[SUM: 45.14]
    
 
   StringCol --> Parse1
 
   StringCol --> Parse2
 
   StringCol --> Parse3
    
 
   Parse1 --> Sum
 
   Parse2 --> Sum
 
   Parse3 --> Sum

Type System and Coercion

Numeric Coercion

The system performs SQLite-style type coercion for aggregates on string columns:

Sources: llkv-aggregate/src/lib.rs:398-447

The array_value_to_numeric() function attempts to parse strings as floats. Non-numeric strings coerce to 0.0, matching SQLite behavior. This enables SUM(string_column) where some values are numeric.

Type-specific Accumulators

Input TypeSUM AccumulatorAVG AccumulatorMIN/MAX Accumulator
Int64SumInt64 (i64 with overflow)AvgInt64 (sum: i64, count: i64)MinInt64 / MaxInt64
Float64SumFloat64 (f64, never overflows)AvgFloat64 (sum: f64, count: i64)MinFloat64 / MaxFloat64
Decimal128SumDecimal128 (i128 + precision/scale)AvgDecimal128 (sum: i128, count: i64)MinDecimal128 / MaxDecimal128
Utf8SumFloat64 (numeric coercion)AvgFloat64 (numeric coercion)MinFloat64 (numeric coercion)

Sources: llkv-aggregate/src/lib.rs:486-710

graph TB
    IntValue[Integer Value]
    CheckedAdd[checked_add-value-]
    Overflow{Overflow?}
ErrorSUM[SUM: Return Error]
    ContinueTOTAL[TOTAL: Continue as Float64]
    
 
   IntValue --> CheckedAdd
 
   CheckedAdd --> Overflow
 
   Overflow -->|Yes + SUM| ErrorSUM
 
   Overflow -->|Yes + TOTAL| ContinueTOTAL
 
   Overflow -->|No| IntValue

Each data type uses a specialized accumulator to preserve precision and overflow semantics. Decimal aggregates maintain precision and scale metadata throughout accumulation.

Overflow Handling

Integer Overflow Strategy

Sources: llkv-aggregate/src/lib.rs:799-824 llkv-aggregate/src/lib.rs:958-975 llkv-aggregate/src/lib.rs:1474-1494

SUM uses checked_add() and sets the accumulator to None on overflow, returning an error during finalization. TOTAL avoids this by accumulating integers as Float64 from the start, trading precision for guaranteed completion.

Decimal Overflow

Decimal128 aggregates use checked_add() on the raw i128 values:

Sources: llkv-aggregate/src/lib.rs:915-932

When Decimal128 overflow occurs, the system returns an error immediately. There is no TOTAL-style fallback for decimals because precision requirements are explicit in the type signature.

graph LR
    Projection["Projection:\nSUM(price * quantity)"]
Extract[Extract Aggregate\nFunction Call]
    Expression["price * quantity"]
Translate[Translate to\nScalarExpr]
    EnsureProj[ensure_computed_projection]
    Accumulate[Accumulate via\nAggregateAccumulator]
    
 
   Projection --> Extract
 
   Extract --> Expression
 
   Expression --> Translate
 
   Translate --> EnsureProj
 
   EnsureProj --> Accumulate

Computed Aggregates

Aggregate Expressions in Projections

The executor handles aggregate function calls embedded in computed projections:

Sources: llkv-executor/src/lib.rs:703-712 llkv-executor/src/lib.rs:735-798 llkv-executor/src/lib.rs:473-505

When a projection contains an aggregate like SUM(price * quantity), the executor:

  1. Detects the aggregate via expr_contains_aggregate()
  2. Translates the inner expression (price * quantity) to a ScalarExpr
  3. Creates a computed projection for the expression
  4. Initializes an accumulator for the projection index
  5. Accumulates values from the computed column

This allows complex aggregate expressions beyond simple column references.

Performance Considerations

Memory Usage

Each accumulator maintains state proportional to:

Accumulator TypeMemory Per GroupNotes
COUNT(*)8 bytes (i64)Constant size
SUM/AVG16-24 bytesValue + metadata
MIN/MAX8-24 bytesSingle value + type info
COUNT(DISTINCT)O(unique values)Hash set grows with cardinality
GROUP_CONCATO(total string length)Vector of strings

Sources: llkv-aggregate/src/lib.rs:92-247

DISTINCT and GROUP_CONCAT have unbounded memory growth for high-cardinality data. The system does not implement spilling or approximate algorithms for these cases.

Parallelization

Aggregates are accumulated serially within a single thread because:

  1. Accumulators maintain mutable state that is not thread-safe
  2. DISTINCT tracking requires synchronized hash set updates
  3. Sequential batch processing simplifies overflow detection

Future work could introduce parallel accumulation with merge operations for distributive aggregates (SUM, COUNT, MIN, MAX) but not for algebraic aggregates (AVG) or DISTINCT operations without additional complexity.

Sources: llkv-aggregate/src/lib.rs:748-1440