Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Data Formats and Arrow Integration

Loading…

Data Formats and Arrow Integration

Relevant source files

This page explains how Apache Arrow serves as the foundational columnar data format throughout LLKV, covering the types supported, serialization strategies, and integration points across the system. Arrow provides the in-memory representation for all data operations, from SQL query results to storage layer batches.

For information about how Arrow data flows through query execution, see Query Execution. For details on how the storage layer persists Arrow arrays, see Column Storage and ColumnStore.


RecordBatch as the Universal Data Container

The arrow::record_batch::RecordBatch type is the primary data exchange unit across all LLKV layers. A RecordBatch represents a columnar slice of data with a fixed schema, where each column is an ArrayRef (type-erased Arrow array).

Sources: llkv-table/src/table.rs:6-9 llkv-table/examples/test_streaming.rs:1-9

graph TB
    RecordBatch["RecordBatch"]
Schema["Schema\n(arrow::datatypes::Schema)"]
Columns["Vec<ArrayRef>\n(arrow::array::ArrayRef)"]
RecordBatch --> Schema
 
   RecordBatch --> Columns
    
 
   Schema --> Fields["Vec<Field>\nColumn Names + Types"]
Columns --> Array1["Column 0: ArrayRef"]
Columns --> Array2["Column 1: ArrayRef"]
Columns --> Array3["Column N: ArrayRef"]
Array1 --> UInt64Array["UInt64Array\n(row_id column)"]
Array2 --> Int64Array["Int64Array\n(user data)"]
Array3 --> StringArray["StringArray\n(text data)"]

RecordBatch Construction Pattern

All data inserted into tables must arrive as a RecordBatch with specific metadata:

Required ElementDescriptionMetadata Key
row_id columnUInt64Array with unique row identifiersNone (system column)
Field ID metadataMaps each user column to a FieldId"field_id"
Arrow data typeNative Arrow type (Int64, Utf8, etc.)N/A
Column nameHuman-readable identifierField name

Sources: llkv-table/src/table.rs:231-438 llkv-table/examples/test_streaming.rs:18-58


Schema and Field Metadata

The arrow::datatypes::Schema describes the structure of a RecordBatch. LLKV extends Arrow’s schema system with custom metadata to track column identity and table ownership.

Field Metadata Convention

Each user column’s Field carries a field_id metadata entry that encodes either:

  • User columns: A raw FieldId (0-2047) that gets namespaced to a LogicalFieldId
  • MVCC columns: A pre-computed LogicalFieldId for created_by / deleted_by

Sources: llkv-table/src/table.rs:243-320 llkv-table/constants.rs (FIELD_ID_META_KEY)

graph TB
    Field["arrow::datatypes::Field"]
FieldName["name: String"]
DataType["data_type: DataType"]
Nullable["nullable: bool"]
Metadata["metadata: HashMap<String, String>"]
Field --> FieldName
 
   Field --> DataType
 
   Field --> Nullable
 
   Field --> Metadata
    
 
   Metadata --> FieldIdKey["'field_id' → '42'\n(user column)"]
Metadata --> LFidKey["'field_id' → '8589934634'\n(MVCC column)"]
FieldIdKey -.converted to.-> UserLFid["LogicalFieldId::for_user(table_id, 42)"]
LFidKey -.direct use.-> SystemLFid["LogicalFieldId::for_mvcc_created_by(table_id)"]

Schema Reconstruction from Catalog

The Table::schema() method reconstructs an Arrow Schema by querying the system catalog for column names and combining them with type information from the column store:

Sources: llkv-table/src/table.rs:519-549

sequenceDiagram
    participant Caller
    participant Table
    participant ColumnStore
    participant SysCatalog
    
    Caller->>Table: schema()
    Table->>ColumnStore: user_field_ids_for_table(table_id)
    ColumnStore-->>Table: Vec<LogicalFieldId>
    Table->>SysCatalog: get_cols_meta(field_ids)
    SysCatalog-->>Table: Vec<ColMeta>
    Table->>ColumnStore: data_type(lfid) for each
    ColumnStore-->>Table: DataType
    Table->>Table: Build Field with metadata
    Table-->>Caller: Arc<Schema>

Supported Arrow Data Types

LLKV’s column store supports the following Arrow data types for user columns:

Arrow TypeRust Type MappingStorage EncodingNotes
UInt64u64NativeRow IDs, MVCC transaction IDs
UInt32u32Native-
UInt16u16Native-
UInt8u8Native-
Int64i64Native-
Int32i32Native-
Int16i16Native-
Int8i8Native-
Float64f64IEEE 754-
Float32f32IEEE 754-
Date32i32 (days since epoch)Native-
Date64i64 (ms since epoch)Native-
Decimal128i128Native + precision/scale metadataFixed-point decimal
Utf8String (i32 offsets)Length-prefixed UTF-8Variable-length strings
LargeUtf8String (i64 offsets)Length-prefixed UTF-8Large strings
BinaryVec<u8> (i32 offsets)Length-prefixed bytesVariable-length binary
LargeBinaryVec<u8> (i64 offsets)Length-prefixed bytesLarge binary
BooleanboolBit-packed-
StructNested fieldsPassthrough (no specialized gather)Limited support

Sources: llkv-column-map/src/store/projection.rs:135-185 llkv-column-map/src/gather.rs:10-18

Type Dispatch Strategy

The column store uses Rust enums to dispatch operations per data type, ensuring type-safe handling without runtime reflection:

Sources: llkv-column-map/src/store/projection.rs:99-236 llkv-column-map/src/store/projection.rs:237-395

graph TB
    ColumnOutputBuilder["ColumnOutputBuilder\n(enum per type)"]
ColumnOutputBuilder --> Utf8["Utf8 { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> LargeUtf8["LargeUtf8 { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> Binary["Binary { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> LargeBinary["LargeBinary { builder, len_capacity, value_capacity }"]
ColumnOutputBuilder --> Boolean["Boolean { builder, len_capacity }"]
ColumnOutputBuilder --> Decimal128["Decimal128 { builder, precision, scale }"]
ColumnOutputBuilder --> Primitive["Primitive(PrimitiveBuilderKind)"]
ColumnOutputBuilder --> Passthrough["Passthrough (Struct arrays)"]
Primitive --> UInt64Builder["UInt64 { builder, len_capacity }"]
Primitive --> Int64Builder["Int64 { builder, len_capacity }"]
Primitive --> Float64Builder["Float64 { builder, len_capacity }"]
Primitive --> OtherPrimitives["... (10 more primitive types)"]

Array Builders and Construction

Arrow provides builder types (arrow::array::*Builder) for incrementally constructing arrays. LLKV’s projection layer wraps these builders with capacity management to minimize allocations during multi-batch operations.

Capacity Pre-Allocation Strategy

The ColumnOutputBuilder tracks allocated capacity separately from Arrow’s builder to avoid repeated re-allocations:

Sources: llkv-column-map/src/store/projection.rs:189-234 llkv-column-map/src/store/projection.rs:398-457

graph LR
    RequestedLen["Requested Len"]
ValueBytesHint["Value Bytes Hint"]
RequestedLen --> Check["len_capacity < len?"]
ValueBytesHint --> Check2["value_capacity < bytes?"]
Check -->|Yes| Reallocate["Recreate Builder\nwith_capacity(len, bytes)"]
Check2 -->|Yes| Reallocate
    
 
   Check -->|No| Reuse["Reuse Existing Builder"]
Check2 -->|No| Reuse
    
 
   Reallocate --> UpdateCapacity["Update cached capacities"]
UpdateCapacity --> Append["append_value()
or append_null()"]
Reuse --> Append

String and Binary Builder Pattern

Variable-length types require two capacity dimensions:

  • Length capacity: Number of values
  • Value capacity: Total bytes across all values

Sources: llkv-column-map/src/store/projection.rs:398-411 llkv-column-map/src/store/projection.rs:413-426


graph TB
    subgraph "In-Memory Layer"
        RecordBatch1["RecordBatch\n(user input)"]
ArrayRef1["ArrayRef\n(Int64Array, StringArray, etc.)"]
end
    
    subgraph "Serialization Layer"
        Serialize["serialize_array()\n(Arrow IPC encoding)"]
Deserialize["deserialize_array()\n(Arrow IPC decoding)"]
end
    
    subgraph "Storage Layer"
        ChunkBlob["EntryHandle\n(byte blob)"]
Pager["Pager::batch_put()"]
PhysicalKey["PhysicalKey\n(u64 identifier)"]
end
    
 
   RecordBatch1 --> ArrayRef1
 
   ArrayRef1 --> Serialize
 
   Serialize --> ChunkBlob
 
   ChunkBlob --> Pager
 
   Pager --> PhysicalKey
    
    PhysicalKey -.read.-> Pager
    Pager -.read.-> ChunkBlob
 
   ChunkBlob --> Deserialize
 
   Deserialize --> ArrayRef1

Serialization and Storage Integration

Arrow arrays are serialized to byte blobs using Arrow’s IPC format before persistence in the key-value store. The column store maintains a separation between logical Arrow representation and physical storage:

Sources: llkv-column-map/src/serialization.rs (deserialize_array function), llkv-column-map/src/store/projection.rs17

Chunk Organization

Large columns are split into multiple chunks (default 65,536 rows per chunk). Each chunk stores:

  • Value array: Serialized Arrow array with actual data
  • Row ID array: Separate UInt64Array mapping row IDs to array indices

Sources: llkv-column-map/src/store/descriptor.rs (ChunkMetadata, ColumnDescriptor), llkv-column-map/src/store/mod.rs (DEFAULT_CHUNK_TARGET_ROWS)


graph TB
    GatherNullPolicy["GatherNullPolicy"]
GatherNullPolicy --> ErrorOnMissing["ErrorOnMissing\nMissing row → Error"]
GatherNullPolicy --> IncludeNulls["IncludeNulls\nMissing row → NULL in output"]
GatherNullPolicy --> DropNulls["DropNulls\nMissing row → omit from output"]
ErrorOnMissing -.used by.-> InnerJoin["Inner Joins\nStrict row matches"]
IncludeNulls -.used by.-> OuterJoin["Outer Joins\nNULL padding"]
DropNulls -.used by.-> Projection["Filtered Projections\nRemove incomplete rows"]

Null Handling

Arrow’s nullable columns use a separate validity bitmap. LLKV’s GatherNullPolicy enum controls how nulls propagate during row gathering:

Sources: llkv-column-map/src/store/projection.rs:40-48 llkv-table/src/table.rs:589-645

Null Bitmap Semantics

Arrow arrays maintain a separate NullBuffer (bit-packed). When gathering rows, the system must:

  1. Check if the source array position is valid (array.is_valid(idx))
  2. Append null to the builder if invalid, or the value if valid

Sources: llkv-column-map/src/gather.rs:369-402


graph LR
    subgraph "CSV Export"
        ScanStream["Table::scan_stream()"]
RecordBatch2["RecordBatch stream"]
ArrowCSV["arrow::csv::Writer"]
CSVFile["CSV File"]
ScanStream --> RecordBatch2
 
       RecordBatch2 --> ArrowCSV
 
       ArrowCSV --> CSVFile
    end
    
    subgraph "CSV Import"
        CSVInput["CSV File"]
Parser["csv-core parser"]
Builders["Array Builders"]
RecordBatch3["RecordBatch"]
TableAppend["Table::append()"]
CSVInput --> Parser
 
       Parser --> Builders
 
       Builders --> RecordBatch3
 
       RecordBatch3 --> TableAppend
    end

Arrow Integration Points

CSV Import and Export

The llkv-csv crate uses Arrow’s native CSV writer (arrow::csv::WriterBuilder) for exports and custom parsing for imports:

Sources: llkv-csv/src/writer.rs:196-267 llkv-csv/src/reader.rs

SQL Query Results

All SQL query results are returned as RecordBatch instances, enabling zero-copy integration with Arrow-based tools:

Statement TypeOutput FormatArrow Schema Source
SELECTStream of RecordBatchQuery projection list
INSERTRecordBatch (row count)Fixed schema: { affected_rows: UInt64 }
UPDATERecordBatch (row count)Fixed schema: { affected_rows: UInt64 }
DELETERecordBatch (row count)Fixed schema: { affected_rows: UInt64 }
CREATE TABLERecordBatch (success)Empty schema or metadata

Sources: llkv-runtime/src/lib.rs (RuntimeStatementResult enum), llkv-sql/src/lib.rs (SqlEngine::execute)

graph TB
    AggregateExpr["Aggregate Expression\n(SUM, AVG, MIN, MAX)"]
AggregateExpr --> ArrowKernel["arrow::compute kernels\n(sum, min, max)"]
AggregateExpr --> CustomAccumulator["Custom Accumulator\n(COUNT DISTINCT, etc.)"]
ArrowKernel --> PrimitiveArray["PrimitiveArray&lt;T&gt;"]
CustomAccumulator --> AnyArray["ArrayRef (any type)"]
PrimitiveArray -.vectorized ops.-> SIMDPath["SIMD-accelerated compute"]
AnyArray -.type-erased dispatch.-> SlowPath["Generic accumulation"]

Arrow-Native Aggregations

Aggregate functions operate directly on Arrow arrays using arrow::compute kernels where possible:

Sources: llkv-aggregate/src/lib.rs (aggregate accumulators), llkv-compute/src/kernels.rs (NumericKernels)


Data Type Conversion Table

The following table maps SQL types to Arrow types as materialized in the system:

SQL TypeArrow TypeNotes
INTEGER, INTInt6464-bit signed
BIGINTInt64Same as INTEGER
SMALLINTInt32Promoted to Int32 internally
TINYINTInt32Promoted to Int32 internally
FLOAT, REALFloat32Single precision
DOUBLEFloat64Double precision
TEXT, VARCHARUtf8Variable-length, i32 offsets
BLOBBinaryVariable-length bytes
BOOLEAN, BOOLBooleanBit-packed
DATEDate32Days since Unix epoch
TIMESTAMPDate64Milliseconds since Unix epoch
DECIMAL(p, s)Decimal128(p, s)Fixed-point, stores precision/scale

Sources: llkv-plan/src/types.rs (SQL type mapping), llkv-types/src/arrow.rs (DataType conversion)


RecordBatch Flow Summary

Sources: llkv-table/src/table.rs:231-438 llkv-column-map/src/store/mod.rs (append logic)


Performance Considerations

Vectorization Benefits

Arrow’s columnar layout enables SIMD operations in compute kernels:

  • Primitive aggregations: arrow::compute::sum, min, max use SIMD
  • Filtering: Bit-packed boolean arrays enable fast predicate evaluation
  • Null bitmap checks: Batch validity checks avoid per-row branches

Memory Efficiency

Columnar storage provides superior compression and cache locality:

  • Compression ratio: Homogeneous columns compress better than row-based formats
  • Cache efficiency: Scanning a single column loads contiguous memory
  • Null optimization: Sparse nulls use minimal space (1 bit per value)
graph TB
    GatherContextPool["GatherContextPool"]
GatherContextPool --> AcquireContext["acquire(field_ids)"]
AcquireContext --> CheckCache["Check HashMap\nfor cached context"]
CheckCache -->|Hit| ReuseContext["Reuse MultiGatherContext"]
CheckCache -->|Miss| CreateNew["Create new context"]
ReuseContext --> Execute["Execute gather operation"]
CreateNew --> Execute
    
 
   Execute --> ReleaseContext["Release context back to pool"]
ReleaseContext --> StoreInPool["Store in HashMap\n(max 4 per field set)"]

Builder Reuse Strategy

The MultiGatherContext pools ColumnOutputBuilder instances to avoid repeated allocations during streaming scans:

Sources: llkv-column-map/src/store/projection.rs:651-691


Key Takeaways:

  • RecordBatch is the universal data container for all operations
  • Field metadata tracks column identity via field_id keys
  • Type dispatch uses Rust enums to handle 18+ Arrow data types efficiently
  • Serialization uses Arrow IPC format for persistence
  • Null handling respects Arrow’s validity bitmaps with configurable policies
  • Integration is native: CSV, SQL results, and storage all speak Arrow

Dismiss

Refresh this wiki

Enter email to refresh