This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Data Formats and Arrow Integration
Loading…
Data Formats and Arrow Integration
Relevant source files
- Cargo.lock
- Cargo.toml
- llkv-column-map/Cargo.toml
- llkv-column-map/src/gather.rs
- llkv-column-map/src/store/projection.rs
- llkv-csv/src/writer.rs
- llkv-sql/tests/pager_io_tests.rs
- llkv-table/examples/direct_comparison.rs
- llkv-table/examples/performance_benchmark.rs
- llkv-table/examples/test_streaming.rs
- llkv-table/src/table.rs
This page explains how Apache Arrow serves as the foundational columnar data format throughout LLKV, covering the types supported, serialization strategies, and integration points across the system. Arrow provides the in-memory representation for all data operations, from SQL query results to storage layer batches.
For information about how Arrow data flows through query execution, see Query Execution. For details on how the storage layer persists Arrow arrays, see Column Storage and ColumnStore.
RecordBatch as the Universal Data Container
The arrow::record_batch::RecordBatch type is the primary data exchange unit across all LLKV layers. A RecordBatch represents a columnar slice of data with a fixed schema, where each column is an ArrayRef (type-erased Arrow array).
Sources: llkv-table/src/table.rs:6-9 llkv-table/examples/test_streaming.rs:1-9
graph TB
RecordBatch["RecordBatch"]
Schema["Schema\n(arrow::datatypes::Schema)"]
Columns["Vec<ArrayRef>\n(arrow::array::ArrayRef)"]
RecordBatch --> Schema
RecordBatch --> Columns
Schema --> Fields["Vec<Field>\nColumn Names + Types"]
Columns --> Array1["Column 0: ArrayRef"]
Columns --> Array2["Column 1: ArrayRef"]
Columns --> Array3["Column N: ArrayRef"]
Array1 --> UInt64Array["UInt64Array\n(row_id column)"]
Array2 --> Int64Array["Int64Array\n(user data)"]
Array3 --> StringArray["StringArray\n(text data)"]
RecordBatch Construction Pattern
All data inserted into tables must arrive as a RecordBatch with specific metadata:
| Required Element | Description | Metadata Key |
|---|---|---|
row_id column | UInt64Array with unique row identifiers | None (system column) |
| Field ID metadata | Maps each user column to a FieldId | "field_id" |
| Arrow data type | Native Arrow type (Int64, Utf8, etc.) | N/A |
| Column name | Human-readable identifier | Field name |
Sources: llkv-table/src/table.rs:231-438 llkv-table/examples/test_streaming.rs:18-58
Schema and Field Metadata
The arrow::datatypes::Schema describes the structure of a RecordBatch. LLKV extends Arrow’s schema system with custom metadata to track column identity and table ownership.
Field Metadata Convention
Each user column’s Field carries a field_id metadata entry that encodes either:
- User columns: A raw
FieldId(0-2047) that gets namespaced to aLogicalFieldId - MVCC columns: A pre-computed
LogicalFieldIdforcreated_by/deleted_by
Sources: llkv-table/src/table.rs:243-320 llkv-table/constants.rs (FIELD_ID_META_KEY)
graph TB
Field["arrow::datatypes::Field"]
FieldName["name: String"]
DataType["data_type: DataType"]
Nullable["nullable: bool"]
Metadata["metadata: HashMap<String, String>"]
Field --> FieldName
Field --> DataType
Field --> Nullable
Field --> Metadata
Metadata --> FieldIdKey["'field_id' → '42'\n(user column)"]
Metadata --> LFidKey["'field_id' → '8589934634'\n(MVCC column)"]
FieldIdKey -.converted to.-> UserLFid["LogicalFieldId::for_user(table_id, 42)"]
LFidKey -.direct use.-> SystemLFid["LogicalFieldId::for_mvcc_created_by(table_id)"]
Schema Reconstruction from Catalog
The Table::schema() method reconstructs an Arrow Schema by querying the system catalog for column names and combining them with type information from the column store:
Sources: llkv-table/src/table.rs:519-549
sequenceDiagram
participant Caller
participant Table
participant ColumnStore
participant SysCatalog
Caller->>Table: schema()
Table->>ColumnStore: user_field_ids_for_table(table_id)
ColumnStore-->>Table: Vec<LogicalFieldId>
Table->>SysCatalog: get_cols_meta(field_ids)
SysCatalog-->>Table: Vec<ColMeta>
Table->>ColumnStore: data_type(lfid) for each
ColumnStore-->>Table: DataType
Table->>Table: Build Field with metadata
Table-->>Caller: Arc<Schema>
Supported Arrow Data Types
LLKV’s column store supports the following Arrow data types for user columns:
| Arrow Type | Rust Type Mapping | Storage Encoding | Notes |
|---|---|---|---|
UInt64 | u64 | Native | Row IDs, MVCC transaction IDs |
UInt32 | u32 | Native | - |
UInt16 | u16 | Native | - |
UInt8 | u8 | Native | - |
Int64 | i64 | Native | - |
Int32 | i32 | Native | - |
Int16 | i16 | Native | - |
Int8 | i8 | Native | - |
Float64 | f64 | IEEE 754 | - |
Float32 | f32 | IEEE 754 | - |
Date32 | i32 (days since epoch) | Native | - |
Date64 | i64 (ms since epoch) | Native | - |
Decimal128 | i128 | Native + precision/scale metadata | Fixed-point decimal |
Utf8 | String (i32 offsets) | Length-prefixed UTF-8 | Variable-length strings |
LargeUtf8 | String (i64 offsets) | Length-prefixed UTF-8 | Large strings |
Binary | Vec<u8> (i32 offsets) | Length-prefixed bytes | Variable-length binary |
LargeBinary | Vec<u8> (i64 offsets) | Length-prefixed bytes | Large binary |
Boolean | bool | Bit-packed | - |
Struct | Nested fields | Passthrough (no specialized gather) | Limited support |
Sources: llkv-column-map/src/store/projection.rs:135-185 llkv-column-map/src/gather.rs:10-18
Type Dispatch Strategy
The column store uses Rust enums to dispatch operations per data type, ensuring type-safe handling without runtime reflection:
Sources: llkv-column-map/src/store/projection.rs:99-236 llkv-column-map/src/store/projection.rs:237-395
graph TB
ColumnOutputBuilder["ColumnOutputBuilder\n(enum per type)"]
ColumnOutputBuilder --> Utf8["Utf8 { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> LargeUtf8["LargeUtf8 { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> Binary["Binary { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> LargeBinary["LargeBinary { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> Boolean["Boolean { builder, len_capacity }"]
ColumnOutputBuilder --> Decimal128["Decimal128 { builder, precision, scale }"]
ColumnOutputBuilder --> Primitive["Primitive(PrimitiveBuilderKind)"]
ColumnOutputBuilder --> Passthrough["Passthrough (Struct arrays)"]
Primitive --> UInt64Builder["UInt64 { builder, len_capacity }"]
Primitive --> Int64Builder["Int64 { builder, len_capacity }"]
Primitive --> Float64Builder["Float64 { builder, len_capacity }"]
Primitive --> OtherPrimitives["... (10 more primitive types)"]
Array Builders and Construction
Arrow provides builder types (arrow::array::*Builder) for incrementally constructing arrays. LLKV’s projection layer wraps these builders with capacity management to minimize allocations during multi-batch operations.
Capacity Pre-Allocation Strategy
The ColumnOutputBuilder tracks allocated capacity separately from Arrow’s builder to avoid repeated re-allocations:
Sources: llkv-column-map/src/store/projection.rs:189-234 llkv-column-map/src/store/projection.rs:398-457
graph LR
RequestedLen["Requested Len"]
ValueBytesHint["Value Bytes Hint"]
RequestedLen --> Check["len_capacity < len?"]
ValueBytesHint --> Check2["value_capacity < bytes?"]
Check -->|Yes| Reallocate["Recreate Builder\nwith_capacity(len, bytes)"]
Check2 -->|Yes| Reallocate
Check -->|No| Reuse["Reuse Existing Builder"]
Check2 -->|No| Reuse
Reallocate --> UpdateCapacity["Update cached capacities"]
UpdateCapacity --> Append["append_value()
or append_null()"]
Reuse --> Append
String and Binary Builder Pattern
Variable-length types require two capacity dimensions:
- Length capacity: Number of values
- Value capacity: Total bytes across all values
Sources: llkv-column-map/src/store/projection.rs:398-411 llkv-column-map/src/store/projection.rs:413-426
graph TB
subgraph "In-Memory Layer"
RecordBatch1["RecordBatch\n(user input)"]
ArrayRef1["ArrayRef\n(Int64Array, StringArray, etc.)"]
end
subgraph "Serialization Layer"
Serialize["serialize_array()\n(Arrow IPC encoding)"]
Deserialize["deserialize_array()\n(Arrow IPC decoding)"]
end
subgraph "Storage Layer"
ChunkBlob["EntryHandle\n(byte blob)"]
Pager["Pager::batch_put()"]
PhysicalKey["PhysicalKey\n(u64 identifier)"]
end
RecordBatch1 --> ArrayRef1
ArrayRef1 --> Serialize
Serialize --> ChunkBlob
ChunkBlob --> Pager
Pager --> PhysicalKey
PhysicalKey -.read.-> Pager
Pager -.read.-> ChunkBlob
ChunkBlob --> Deserialize
Deserialize --> ArrayRef1
Serialization and Storage Integration
Arrow arrays are serialized to byte blobs using Arrow’s IPC format before persistence in the key-value store. The column store maintains a separation between logical Arrow representation and physical storage:
Sources: llkv-column-map/src/serialization.rs (deserialize_array function), llkv-column-map/src/store/projection.rs17
Chunk Organization
Large columns are split into multiple chunks (default 65,536 rows per chunk). Each chunk stores:
- Value array: Serialized Arrow array with actual data
- Row ID array: Separate
UInt64Arraymapping row IDs to array indices
Sources: llkv-column-map/src/store/descriptor.rs (ChunkMetadata, ColumnDescriptor), llkv-column-map/src/store/mod.rs (DEFAULT_CHUNK_TARGET_ROWS)
graph TB
GatherNullPolicy["GatherNullPolicy"]
GatherNullPolicy --> ErrorOnMissing["ErrorOnMissing\nMissing row → Error"]
GatherNullPolicy --> IncludeNulls["IncludeNulls\nMissing row → NULL in output"]
GatherNullPolicy --> DropNulls["DropNulls\nMissing row → omit from output"]
ErrorOnMissing -.used by.-> InnerJoin["Inner Joins\nStrict row matches"]
IncludeNulls -.used by.-> OuterJoin["Outer Joins\nNULL padding"]
DropNulls -.used by.-> Projection["Filtered Projections\nRemove incomplete rows"]
Null Handling
Arrow’s nullable columns use a separate validity bitmap. LLKV’s GatherNullPolicy enum controls how nulls propagate during row gathering:
Sources: llkv-column-map/src/store/projection.rs:40-48 llkv-table/src/table.rs:589-645
Null Bitmap Semantics
Arrow arrays maintain a separate NullBuffer (bit-packed). When gathering rows, the system must:
- Check if the source array position is valid (
array.is_valid(idx)) - Append
nullto the builder if invalid, or the value if valid
Sources: llkv-column-map/src/gather.rs:369-402
graph LR
subgraph "CSV Export"
ScanStream["Table::scan_stream()"]
RecordBatch2["RecordBatch stream"]
ArrowCSV["arrow::csv::Writer"]
CSVFile["CSV File"]
ScanStream --> RecordBatch2
RecordBatch2 --> ArrowCSV
ArrowCSV --> CSVFile
end
subgraph "CSV Import"
CSVInput["CSV File"]
Parser["csv-core parser"]
Builders["Array Builders"]
RecordBatch3["RecordBatch"]
TableAppend["Table::append()"]
CSVInput --> Parser
Parser --> Builders
Builders --> RecordBatch3
RecordBatch3 --> TableAppend
end
Arrow Integration Points
CSV Import and Export
The llkv-csv crate uses Arrow’s native CSV writer (arrow::csv::WriterBuilder) for exports and custom parsing for imports:
Sources: llkv-csv/src/writer.rs:196-267 llkv-csv/src/reader.rs
SQL Query Results
All SQL query results are returned as RecordBatch instances, enabling zero-copy integration with Arrow-based tools:
| Statement Type | Output Format | Arrow Schema Source |
|---|---|---|
SELECT | Stream of RecordBatch | Query projection list |
INSERT | RecordBatch (row count) | Fixed schema: { affected_rows: UInt64 } |
UPDATE | RecordBatch (row count) | Fixed schema: { affected_rows: UInt64 } |
DELETE | RecordBatch (row count) | Fixed schema: { affected_rows: UInt64 } |
CREATE TABLE | RecordBatch (success) | Empty schema or metadata |
Sources: llkv-runtime/src/lib.rs (RuntimeStatementResult enum), llkv-sql/src/lib.rs (SqlEngine::execute)
graph TB
AggregateExpr["Aggregate Expression\n(SUM, AVG, MIN, MAX)"]
AggregateExpr --> ArrowKernel["arrow::compute kernels\n(sum, min, max)"]
AggregateExpr --> CustomAccumulator["Custom Accumulator\n(COUNT DISTINCT, etc.)"]
ArrowKernel --> PrimitiveArray["PrimitiveArray<T>"]
CustomAccumulator --> AnyArray["ArrayRef (any type)"]
PrimitiveArray -.vectorized ops.-> SIMDPath["SIMD-accelerated compute"]
AnyArray -.type-erased dispatch.-> SlowPath["Generic accumulation"]
Arrow-Native Aggregations
Aggregate functions operate directly on Arrow arrays using arrow::compute kernels where possible:
Sources: llkv-aggregate/src/lib.rs (aggregate accumulators), llkv-compute/src/kernels.rs (NumericKernels)
Data Type Conversion Table
The following table maps SQL types to Arrow types as materialized in the system:
| SQL Type | Arrow Type | Notes |
|---|---|---|
INTEGER, INT | Int64 | 64-bit signed |
BIGINT | Int64 | Same as INTEGER |
SMALLINT | Int32 | Promoted to Int32 internally |
TINYINT | Int32 | Promoted to Int32 internally |
FLOAT, REAL | Float32 | Single precision |
DOUBLE | Float64 | Double precision |
TEXT, VARCHAR | Utf8 | Variable-length, i32 offsets |
BLOB | Binary | Variable-length bytes |
BOOLEAN, BOOL | Boolean | Bit-packed |
DATE | Date32 | Days since Unix epoch |
TIMESTAMP | Date64 | Milliseconds since Unix epoch |
DECIMAL(p, s) | Decimal128(p, s) | Fixed-point, stores precision/scale |
Sources: llkv-plan/src/types.rs (SQL type mapping), llkv-types/src/arrow.rs (DataType conversion)
RecordBatch Flow Summary
Sources: llkv-table/src/table.rs:231-438 llkv-column-map/src/store/mod.rs (append logic)
Performance Considerations
Vectorization Benefits
Arrow’s columnar layout enables SIMD operations in compute kernels:
- Primitive aggregations:
arrow::compute::sum,min,maxuse SIMD - Filtering: Bit-packed boolean arrays enable fast predicate evaluation
- Null bitmap checks: Batch validity checks avoid per-row branches
Memory Efficiency
Columnar storage provides superior compression and cache locality:
- Compression ratio: Homogeneous columns compress better than row-based formats
- Cache efficiency: Scanning a single column loads contiguous memory
- Null optimization: Sparse nulls use minimal space (1 bit per value)
graph TB
GatherContextPool["GatherContextPool"]
GatherContextPool --> AcquireContext["acquire(field_ids)"]
AcquireContext --> CheckCache["Check HashMap\nfor cached context"]
CheckCache -->|Hit| ReuseContext["Reuse MultiGatherContext"]
CheckCache -->|Miss| CreateNew["Create new context"]
ReuseContext --> Execute["Execute gather operation"]
CreateNew --> Execute
Execute --> ReleaseContext["Release context back to pool"]
ReleaseContext --> StoreInPool["Store in HashMap\n(max 4 per field set)"]
Builder Reuse Strategy
The MultiGatherContext pools ColumnOutputBuilder instances to avoid repeated allocations during streaming scans:
Sources: llkv-column-map/src/store/projection.rs:651-691
Key Takeaways:
- RecordBatch is the universal data container for all operations
- Field metadata tracks column identity via
field_idkeys - Type dispatch uses Rust enums to handle 18+ Arrow data types efficiently
- Serialization uses Arrow IPC format for persistence
- Null handling respects Arrow’s validity bitmaps with configurable policies
- Integration is native: CSV, SQL results, and storage all speak Arrow
Dismiss
Refresh this wiki
Enter email to refresh