This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Aggregation System
Relevant source files
- Cargo.lock
- Cargo.toml
- llkv-aggregate/src/lib.rs
- llkv-executor/Cargo.toml
- llkv-executor/src/lib.rs
- llkv-plan/src/plans.rs
- llkv-sql/src/lib.rs
- llkv-sql/src/sql_value.rs
The aggregation system evaluates SQL aggregate functions (COUNT, SUM, AVG, MIN, MAX, etc.) over Arrow RecordBatch streams. It consists of a planning layer that defines aggregate specifications and an execution layer that performs incremental accumulation with overflow checking and DISTINCT value tracking.
For information about scalar expression evaluation, see Scalar Evaluation and NumericKernels. For query execution orchestration, see Query Execution.
Architecture Overview
The aggregation system operates across three crates:
Sources: llkv-aggregate/src/lib.rs:1-1935 llkv-executor/src/lib.rs:1-3599 llkv-plan/src/plans.rs:1-1458
The planner creates AggregateExpr instances from SQL AST nodes, which the executor converts to AggregateSpec descriptors. These specs initialize AggregateAccumulator instances that process batches incrementally, accumulating values in memory. The AggregateState wraps the accumulator with metadata (alias, override values) and produces the final output arrays.
Aggregate Specification
AggregateSpec Structure
AggregateSpec defines an aggregate operation at plan-time:
| Field | Type | Purpose |
|---|---|---|
alias | String | Output column name for the aggregate result |
kind | AggregateKind | Type of aggregate operation and its parameters |
Sources: llkv-aggregate/src/lib.rs:23-27
AggregateKind Variants
Sources: llkv-aggregate/src/lib.rs:30-67
Each variant captures the field ID to aggregate over, the expected data type, and operation-specific flags like distinct or separator. The field_id is optional for COUNT(*) which counts all rows regardless of column values.
Accumulator System
Accumulator Variants
AggregateAccumulator implements streaming accumulation for each aggregate type:
Sources: llkv-aggregate/src/lib.rs:92-247
graph TB
subgraph "COUNT Variants"
CS[CountStar\nvalue: i64]
CC[CountColumn\ncolumn_index: usize\nvalue: i64]
CDC[CountDistinctColumn\ncolumn_index: usize\nseen: FxHashSet]
end
subgraph "SUM Variants"
SI64[SumInt64\nvalue: Option-i64-\nhas_values: bool]
SDI64[SumDistinctInt64\nsum: Option-i64-\nseen: FxHashSet]
SF64[SumFloat64\nvalue: f64\nsaw_value: bool]
SDF64[SumDistinctFloat64\nsum: f64\nseen: FxHashSet]
SD128[SumDecimal128\nsum: i128\nprecision: u8\nscale: i8]
end
subgraph "AVG Variants"
AI64[AvgInt64\nsum: i64\ncount: i64]
ADI64[AvgDistinctInt64\nsum: i64\nseen: FxHashSet]
AF64[AvgFloat64\nsum: f64\ncount: i64]
end
subgraph "MIN/MAX Variants"
MinI64[MinInt64\nvalue: Option-i64-]
MaxI64[MaxInt64\nvalue: Option-i64-]
MinF64[MinFloat64\nvalue: Option-f64-]
MaxF64[MaxFloat64\nvalue: Option-f64-]
end
Each accumulator variant is specialized for its data type and operation semantics. Integer accumulators track overflow using Option<i64> (None indicates overflow), while float accumulators use f64 which never overflows. Distinct variants maintain a FxHashSet of seen values.
sequenceDiagram
participant Executor
participant AggregateSpec
participant AggregateAccumulator
participant RecordBatch
participant OutputArray
Executor->>AggregateSpec: new_with_projection_index()
AggregateSpec->>AggregateAccumulator: Create accumulator
loop For each batch
Executor->>RecordBatch: Stream next batch
RecordBatch->>AggregateAccumulator: update(batch)
Note over AggregateAccumulator: Accumulate values\nCheck overflow\nTrack distinct keys
end
Executor->>AggregateAccumulator: finalize()
AggregateAccumulator->>OutputArray: (Field, ArrayRef)
OutputArray->>Executor: Return result
Accumulator Lifecycle
Sources: llkv-aggregate/src/lib.rs:460-746 llkv-aggregate/src/lib.rs:748-1440 llkv-aggregate/src/lib.rs:1442-1934
The accumulator is initialized with a projection index indicating which column in the RecordBatch to aggregate. The update() method processes each batch incrementally, and finalize() produces the final Arrow array and field schema.
Distinct Value Tracking
DistinctKey Enumeration
The system tracks distinct values using a hash-based approach:
| Variant | Type | Purpose |
|---|---|---|
Int(i64) | Integer values | Exact integer comparison |
Float(u64) | Float bit pattern | Bitwise float equality |
Str(String) | String values | Text comparison |
Bool(bool) | Boolean values | True/false comparison |
Date(i32) | Date32 values | Date comparison |
Decimal(i128) | Decimal raw value | Exact decimal comparison |
Sources: llkv-aggregate/src/lib.rs:249-257 llkv-aggregate/src/lib.rs:259-333
Float values are converted to bit patterns (to_bits()) to enable hash-based deduplication while preserving NaN and infinity semantics. Decimal values use raw i128 representation for exact comparison without scale conversion.
Distinct Accumulation Example
For COUNT(DISTINCT column), the accumulator inserts each non-null value into the hash set:
Sources: llkv-aggregate/src/lib.rs:785-798 llkv-aggregate/src/lib.rs:1465-1473
graph LR
Batch1[RecordBatch 1\nvalues: 1,2,3]
Batch2[RecordBatch 2\nvalues: 2,3,4]
Batch3[RecordBatch 3\nvalues: 1,4,5]
Batch1 --> HS1[seen: {1,2,3}]
Batch2 --> HS2[seen: {1,2,3,4}]
Batch3 --> HS3[seen: {1,2,3,4,5}]
HS3 --> Result[COUNT: 5]
The hash set automatically deduplicates values across batches. Only the set size is returned as the final count, avoiding materialization of the entire set in the output.
Aggregate Functions
COUNT Family
| Function | Null Handling | Return Type | Overflow |
|---|---|---|---|
COUNT(*) | Counts all rows | Int64 | Checked |
COUNT(column) | Skips NULL values | Int64 | Checked |
COUNT(DISTINCT column) | Skips NULL, deduplicates | Int64 | Checked |
Sources: llkv-aggregate/src/lib.rs:467-485 llkv-aggregate/src/lib.rs:759-783 llkv-aggregate/src/lib.rs:1452-1473
COUNT operations verify that the result fits in i64 range. COUNT(*) accumulates batch row counts directly. COUNT(column) filters invalid (NULL) rows using array.is_valid(i). COUNT(DISTINCT) maintains a hash set and returns its size.
SUM and TOTAL
| Function | Overflow Behavior | Return Type | NULL Result |
|---|---|---|---|
SUM(int_column) | Returns error | Int64 | NULL if no values |
SUM(float_column) | Accumulates infinities | Float64 | NULL if no values |
TOTAL(int_column) | Converts to Float64 | Float64 | 0.0 if no values |
TOTAL(float_column) | Accumulates infinities | Float64 | 0.0 if no values |
Sources: llkv-aggregate/src/lib.rs:486-541 llkv-aggregate/src/lib.rs:799-824 llkv-aggregate/src/lib.rs:958-975
graph LR
Input[Input Column]
Sum[Accumulate Sum]
Count[Count Non-NULL]
Div[Divide sum/count]
Output[Float64 Result]
Input --> Sum
Input --> Count
Sum --> Div
Count --> Div
Div --> Output
SUM uses checked_add for integers and returns an error on overflow. TOTAL never overflows because it accumulates as Float64 even for integer columns. The key difference is NULL handling: SUM returns NULL for empty input, TOTAL returns 0.0.
AVG (Average)
Sources: llkv-aggregate/src/lib.rs:598-654 llkv-aggregate/src/lib.rs:1096-1121 llkv-aggregate/src/lib.rs:1635-1645
AVG maintains separate sum and count accumulators. During finalization, it divides sum / count to produce a Float64 result. Integer sums are converted to Float64 for the division. If count is zero, AVG returns NULL.
MIN and MAX
| Data Type | Comparison Strategy | NULL Handling |
|---|---|---|
| Int64 | i64::min() / i64::max() | Skip NULL values |
| Float64 | partial_cmp() with NaN handling | Skip NULL values |
| Decimal128 | i128::min() / i128::max() on raw values | Skip NULL values |
| String | Numeric coercion via array_value_to_numeric() | Skip NULL values |
Sources: llkv-aggregate/src/lib.rs:656-710 llkv-aggregate/src/lib.rs:1259-1277 llkv-aggregate/src/lib.rs:1279-1300
MIN/MAX start with None and update to Some(value) on the first non-NULL entry. Subsequent values are compared using type-specific logic. Float comparisons use partial_cmp() to handle NaN values correctly.
graph LR
Values["Column Values:\n42, 'hello', 3.14"]
Convert[Convert to Strings:\n'42', 'hello', '3.14']
Join["Join with separator\n(default: ',')"]
Result["Result: '42,hello,3.14'"]
Values --> Convert
Convert --> Join
Join --> Result
GROUP_CONCAT
GROUP_CONCAT concatenates string representations of column values with a separator:
Sources: llkv-aggregate/src/lib.rs:722-744 llkv-aggregate/src/lib.rs:1409-1437 llkv-aggregate/src/lib.rs:1847-1874
The accumulator collects string representations using array_value_to_string() which coerces integers, floats, and booleans to text. DISTINCT variants track seen values in a hash set. Finalization joins the strings with the specified separator (default: ',').
Group-by Integration
Grouping Key Extraction
For GROUP BY queries, the executor extracts grouping keys from each row:
Sources: llkv-executor/src/lib.rs:1097-1173
sequenceDiagram
participant Executor
participant GroupMap
participant AggregateState
participant Accumulator
loop For each batch
Executor->>Executor: Extract group keys
loop For each group
Executor->>GroupMap: Get or create group
Executor->>AggregateState: Get accumulators for group
Executor->>Accumulator: Filter batch to group rows
Executor->>Accumulator: update(filtered_batch)
end
end
Executor->>GroupMap: Iterate all groups
loop For each group
Executor->>AggregateState: finalize()
AggregateState->>Executor: Return aggregate arrays
end
Each unique combination of group-by column values maps to a separate GroupAggregateState which tracks the representative row and a list of matching row locations across batches.
Aggregate Accumulation per Group
Sources: llkv-executor/src/lib.rs:1174-1383
The executor maintains separate accumulators for each group. When processing a batch, it filters rows by group membership using RowIdFilter and updates each group's accumulators independently. This ensures that SUM(sales) for group 'USA' only accumulates sales records where country='USA'.
Output Construction
After processing all batches, the executor constructs the output RecordBatch:
| Column Type | Source | Construction |
|---|---|---|
| Group-by columns | Representative rows | Gathered from original batches |
| Aggregate columns | Finalized accumulators | Converted to Arrow arrays |
Sources: llkv-executor/src/lib.rs:1384-1467
The system gathers one representative row per group for the group-by columns, then appends the finalized aggregate arrays as additional columns. This produces a result like:
+----------+---------+
| country | SUM(sales) |
+----------+---------+
| USA | 1500000 |
| Canada | 750000 |
+----------+---------+
graph LR
StringCol["String Column\n'42', 'hello', '3.14'"]
Parse1["'42' → 42.0"]
Parse2["'hello' → 0.0"]
Parse3["'3.14' → 3.14"]
Sum[SUM: 45.14]
StringCol --> Parse1
StringCol --> Parse2
StringCol --> Parse3
Parse1 --> Sum
Parse2 --> Sum
Parse3 --> Sum
Type System and Coercion
Numeric Coercion
The system performs SQLite-style type coercion for aggregates on string columns:
Sources: llkv-aggregate/src/lib.rs:398-447
The array_value_to_numeric() function attempts to parse strings as floats. Non-numeric strings coerce to 0.0, matching SQLite behavior. This enables SUM(string_column) where some values are numeric.
Type-specific Accumulators
| Input Type | SUM Accumulator | AVG Accumulator | MIN/MAX Accumulator |
|---|---|---|---|
| Int64 | SumInt64 (i64 with overflow) | AvgInt64 (sum: i64, count: i64) | MinInt64 / MaxInt64 |
| Float64 | SumFloat64 (f64, never overflows) | AvgFloat64 (sum: f64, count: i64) | MinFloat64 / MaxFloat64 |
| Decimal128 | SumDecimal128 (i128 + precision/scale) | AvgDecimal128 (sum: i128, count: i64) | MinDecimal128 / MaxDecimal128 |
| Utf8 | SumFloat64 (numeric coercion) | AvgFloat64 (numeric coercion) | MinFloat64 (numeric coercion) |
Sources: llkv-aggregate/src/lib.rs:486-710
graph TB
IntValue[Integer Value]
CheckedAdd[checked_add-value-]
Overflow{Overflow?}
ErrorSUM[SUM: Return Error]
ContinueTOTAL[TOTAL: Continue as Float64]
IntValue --> CheckedAdd
CheckedAdd --> Overflow
Overflow -->|Yes + SUM| ErrorSUM
Overflow -->|Yes + TOTAL| ContinueTOTAL
Overflow -->|No| IntValue
Each data type uses a specialized accumulator to preserve precision and overflow semantics. Decimal aggregates maintain precision and scale metadata throughout accumulation.
Overflow Handling
Integer Overflow Strategy
Sources: llkv-aggregate/src/lib.rs:799-824 llkv-aggregate/src/lib.rs:958-975 llkv-aggregate/src/lib.rs:1474-1494
SUM uses checked_add() and sets the accumulator to None on overflow, returning an error during finalization. TOTAL avoids this by accumulating integers as Float64 from the start, trading precision for guaranteed completion.
Decimal Overflow
Decimal128 aggregates use checked_add() on the raw i128 values:
Sources: llkv-aggregate/src/lib.rs:915-932
When Decimal128 overflow occurs, the system returns an error immediately. There is no TOTAL-style fallback for decimals because precision requirements are explicit in the type signature.
graph LR
Projection["Projection:\nSUM(price * quantity)"]
Extract[Extract Aggregate\nFunction Call]
Expression["price * quantity"]
Translate[Translate to\nScalarExpr]
EnsureProj[ensure_computed_projection]
Accumulate[Accumulate via\nAggregateAccumulator]
Projection --> Extract
Extract --> Expression
Expression --> Translate
Translate --> EnsureProj
EnsureProj --> Accumulate
Computed Aggregates
Aggregate Expressions in Projections
The executor handles aggregate function calls embedded in computed projections:
Sources: llkv-executor/src/lib.rs:703-712 llkv-executor/src/lib.rs:735-798 llkv-executor/src/lib.rs:473-505
When a projection contains an aggregate like SUM(price * quantity), the executor:
- Detects the aggregate via
expr_contains_aggregate() - Translates the inner expression (
price * quantity) to aScalarExpr - Creates a computed projection for the expression
- Initializes an accumulator for the projection index
- Accumulates values from the computed column
This allows complex aggregate expressions beyond simple column references.
Performance Considerations
Memory Usage
Each accumulator maintains state proportional to:
| Accumulator Type | Memory Per Group | Notes |
|---|---|---|
| COUNT(*) | 8 bytes (i64) | Constant size |
| SUM/AVG | 16-24 bytes | Value + metadata |
| MIN/MAX | 8-24 bytes | Single value + type info |
| COUNT(DISTINCT) | O(unique values) | Hash set grows with cardinality |
| GROUP_CONCAT | O(total string length) | Vector of strings |
Sources: llkv-aggregate/src/lib.rs:92-247
DISTINCT and GROUP_CONCAT have unbounded memory growth for high-cardinality data. The system does not implement spilling or approximate algorithms for these cases.
Parallelization
Aggregates are accumulated serially within a single thread because:
- Accumulators maintain mutable state that is not thread-safe
- DISTINCT tracking requires synchronized hash set updates
- Sequential batch processing simplifies overflow detection
Future work could introduce parallel accumulation with merge operations for distributive aggregates (SUM, COUNT, MIN, MAX) but not for algebraic aggregates (AVG) or DISTINCT operations without additional complexity.
Sources: llkv-aggregate/src/lib.rs:748-1440