Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

INSERT Buffering System

Loading…

INSERT Buffering System

Relevant source files

The INSERT Buffering System is a performance optimization that batches multiple consecutive INSERT ... VALUES statements into a single execution operation. This system reduces planning and execution overhead by accumulating literal row data across statement boundaries and flushing them together, achieving significant throughput improvements for bulk ingestion workloads while preserving per-statement result semantics.

For information about SQL statement execution in general, see SQL Interface. For details on how INSERT statements are planned and translated, see Query Planning.


Purpose and Scope

The buffering system operates at the SQL engine layer, intercepting INSERT statements before they reach the runtime executor. It analyzes each INSERT to determine if it contains literal values that can be safely accumulated, or if it requires immediate execution due to complex source expressions (e.g., INSERT ... SELECT). The system maintains buffering state across multiple execute() calls, making it suitable for workloads that stream SQL scripts containing thousands of INSERT statements.

In scope:

  • Buffering of INSERT ... VALUES statements with literal row data
  • Automatic flushing based on configurable thresholds and statement boundaries
  • Per-statement result tracking for compatibility with test harnesses
  • Conflict resolution action compatibility (REPLACE, IGNORE, etc.)

Out of scope:

  • Buffering of INSERT statements with subqueries or SELECT sources
  • Cross-table buffering (only same-table INSERTs are batched)
  • Transaction-aware buffering strategies

System Architecture

Sources: llkv-sql/src/sql_engine.rs:1057-1114 llkv-sql/src/sql_engine.rs:1231-1338 llkv-sql/src/sql_engine.rs:1423-1557


Core Data Structures

classDiagram
    class InsertBuffer {-String table_name\n-Vec~String~ columns\n-InsertConflictAction on_conflict\n-usize total_rows\n-Vec~usize~ statement_row_counts\n-Vec~Vec~PlanValue~~ rows\n+new() InsertBuffer\n+can_accept() bool\n+push_statement()\n+should_flush() bool}
    
    class PreparedInsert {<<enumeration>>\nValues\nImmediate}
    
    class SqlEngine {-RefCell~Option~InsertBuffer~~ insert_buffer\n-AtomicBool insert_buffering_enabled\n+buffer_insert() BufferedInsertResult\n+flush_buffer_results() Vec~SqlStatementResult~\n+set_insert_buffering()}
    
    SqlEngine --> InsertBuffer : contains
    SqlEngine --> PreparedInsert : produces

InsertBuffer

The InsertBuffer struct accumulates literal row data across multiple INSERT statements while tracking per-statement row counts for result reporting.

Field descriptions:

FieldTypePurpose
table_nameStringTarget table identifier for compatibility checking
columnsVec<String>Column list (must match across buffered statements)
on_conflictInsertConflictActionConflict resolution strategy (REPLACE, IGNORE, etc.)
total_rowsusizeSum of all buffered rows across statements
statement_row_countsVec<usize>Per-statement row counts for result splitting
rowsVec<Vec<PlanValue>>Flattened literal row data

Sources: llkv-sql/src/sql_engine.rs:497-547


Configuration and Thresholds

MAX_BUFFERED_INSERT_ROWS

The buffering system enforces a maximum row threshold to prevent unbounded memory growth during bulk ingestion:

When total_rows reaches this threshold, the buffer automatically flushes before accepting additional statements. This value balances memory usage against amortized planning overhead.

Sources: llkv-sql/src/sql_engine.rs490

Enabling and Disabling Buffering

Buffering is disabled by default to preserve immediate execution semantics for unit tests and applications that depend on synchronous error reporting. Long-running workloads can opt in via the set_insert_buffering method:

Sources: llkv-sql/src/sql_engine.rs:1022-1029 llkv-sql/src/sql_engine.rs583

sequenceDiagram
    participant App
    participant SqlEngine
    participant InsertBuffer
    
    App->>SqlEngine: set_insert_buffering(true)
    SqlEngine->>SqlEngine: Store enabled flag
    
    Note over App,SqlEngine: Buffering now active
    
    App->>SqlEngine: execute("INSERT INTO t VALUES (1)")
    SqlEngine->>InsertBuffer: Create or append
    App->>SqlEngine: execute("INSERT INTO t VALUES (2)")
    SqlEngine->>InsertBuffer: Append to existing
    
    App->>SqlEngine: set_insert_buffering(false)
    SqlEngine->>InsertBuffer: flush_buffer_results()
    InsertBuffer->>SqlEngine: Return per-statement results

Buffering Decision Flow

The buffer_insert method implements the core buffering logic, determining whether an INSERT should be accumulated or executed immediately:

Immediate Execution Conditions

Buffering is bypassed under the following conditions:

  1. Statement Expectation is Error or Count: Test harnesses use expectations to validate synchronous error reporting and exact row counts
  2. Buffering Disabled: The insert_buffering_enabled flag is false (default state)
  3. Non-literal Source: The INSERT uses a SELECT subquery or other dynamic source
  4. Buffer Incompatibility: The INSERT targets a different table, column list, or conflict action

Sources: llkv-sql/src/sql_engine.rs:1231-1338

flowchart TD
    START["buffer_insert(insert, expectation)"]
CHECK_EXPECT{"expectation ==\nError |Count?"}
CHECK_ENABLED{"buffering_enabled?"}
PREPARE["prepare_insert insert"]
CHECK_TYPE{"PreparedInsert type?"}
CHECK_COMPAT{"buffer.can_accept ?"}
CHECK_FULL{"buffer.should_flush ?"}
EXEC_IMM["Execute immediately Return flushed + current"]
CREATE_BUF["Create new InsertBuffer"]
APPEND_BUF["buffer.push_statement"]
FLUSH_BUF["flush_buffer_results"]
RETURN_PLACE["Return placeholder result"]
START --> CHECK_EXPECT
 CHECK_EXPECT -->|Yes|EXEC_IMM
 CHECK_EXPECT -->|No|CHECK_ENABLED
 CHECK_ENABLED -->|No|EXEC_IMM
 CHECK_ENABLED -->|Yes|PREPARE
 PREPARE --> CHECK_TYPE
 CHECK_TYPE -->|Immediate|EXEC_IMM
 CHECK_TYPE -->|Values|CHECK_COMPAT
 CHECK_COMPAT -->|No|FLUSH_BUF
 CHECK_COMPAT -->|Yes|APPEND_BUF
 FLUSH_BUF --> CREATE_BUF
 CREATE_BUF --> RETURN_PLACE
 APPEND_BUF --> CHECK_FULL
 CHECK_FULL -->|Yes|FLUSH_BUF
 CHECK_FULL -->|No| RETURN_PLACE

Insert Canonicalization

The prepare_insert method analyzes the INSERT statement and converts it into one of two canonical forms:

PreparedInsert::Values

Literal VALUES clauses (including constant SELECT forms like SELECT 1, 2) are rewritten into Vec<Vec<PlanValue>> rows for buffering:

Conversion logic:

flowchart LR
    SQL["INSERT INTO users (id, name)\nVALUES (1, 'Alice')"]
AST["sqlparser::ast::Insert"]
CHECK{"Source type?"}
VALUES["SetExpr::Values"]
SELECT["SetExpr::Select"]
CONST{"Constant SELECT?"}
CONVERT["Convert to Vec~Vec~PlanValue~~"]
PREP_VAL["PreparedInsert::Values"]
PREP_IMM["PreparedInsert::Immediate"]
SQL --> AST
 
   AST --> CHECK
 
   CHECK -->|VALUES| VALUES
 
   CHECK -->|SELECT| SELECT
 
   VALUES --> CONVERT
 
   SELECT --> CONST
 
   CONST -->|Yes| CONVERT
 
   CONST -->|No| PREP_IMM
 
   CONVERT --> PREP_VAL

The method iterates over each row in the VALUES clause, converting SQL expressions to PlanValue variants:

SQL TypePlanValue Variant
Integer literalsPlanValue::Integer(i64)
Float literalsPlanValue::Float(f64)
String literalsPlanValue::String(String)
NULLPlanValue::Null
Date literalsPlanValue::Date32(i32)
DECIMAL literalsPlanValue::Decimal(DecimalValue)

Sources: llkv-sql/src/sql_engine.rs:1504-1524

flowchart LR
    COMPLEX["INSERT INTO t\nSELECT * FROM source"]
PLAN["build_select_plan()"]
WRAP["InsertPlan { source: InsertSource::Select }"]
PREP["PreparedInsert::Immediate"]
COMPLEX --> PLAN
 
   PLAN --> WRAP
 
   WRAP --> PREP

PreparedInsert::Immediate

Non-literal sources (subqueries, complex expressions) are wrapped in a fully-planned InsertPlan for immediate execution:

Sources: llkv-sql/src/sql_engine.rs:1543-1552


flowchart TD
    START["can_accept(table, columns, on_conflict)"]
CHECK_TABLE{"table_name == table?"}
CHECK_COLS{"columns == columns?"}
CHECK_CONFLICT{"on_conflict == on_conflict?"}
ACCEPT["Return true"]
REJECT["Return false"]
START --> CHECK_TABLE
 
   CHECK_TABLE -->|No| REJECT
 
   CHECK_TABLE -->|Yes| CHECK_COLS
 
   CHECK_COLS -->|No| REJECT
 
   CHECK_COLS -->|Yes| CHECK_CONFLICT
 
   CHECK_CONFLICT -->|No| REJECT
 
   CHECK_CONFLICT -->|Yes| ACCEPT

Buffer Compatibility Checking

The can_accept method ensures that only compatible INSERTs are batched together:

Compatibility requirements:

FieldRequirement
table_nameExact string match (case-sensitive)
columnsExact column list match (order matters)
on_conflictSame conflict resolution strategy

Sources: llkv-sql/src/sql_engine.rs:528-535


flowchart TD
    subgraph "During execute()"
        STMT{"Statement Type"}
INSERT["INSERT"]
TRANS["Transaction Boundary\n(BEGIN/COMMIT/ROLLBACK)"]
OTHER["Other Statement\n(SELECT, UPDATE, etc.)"]
end
    
    subgraph "Buffer State"
        SIZE{"total_rows >=\nMAX_BUFFERED_INSERT_ROWS?"}
COMPAT{"can_accept()?"}
end
    
    subgraph "Explicit Triggers"
        DROP["SqlEngine::drop()"]
DISABLE["set_insert_buffering(false)"]
MANUAL["flush_pending_inserts()"]
end
    
    FLUSH["flush_buffer_results()"]
STMT --> INSERT
 
   STMT --> TRANS
 
   STMT --> OTHER
 
   INSERT --> SIZE
 
   INSERT --> COMPAT
 
   SIZE -->|Yes| FLUSH
 
   COMPAT -->|No| FLUSH
 
   TRANS --> FLUSH
 
   OTHER --> FLUSH
 
   DROP --> FLUSH
 
   DISABLE --> FLUSH
 
   MANUAL --> FLUSH

Flush Triggers

The buffer flushes under multiple conditions to maintain correctness and predictable memory usage:

Automatic Flush Conditions

Sources: llkv-sql/src/sql_engine.rs:544-546 llkv-sql/src/sql_engine.rs:1098-1111 llkv-sql/src/sql_engine.rs:592-596

Manual Flush API

Applications can force a flush via the flush_pending_inserts public method:

This is useful when the application needs to ensure all buffered data is persisted before performing a read operation or checkpoint.

Sources: llkv-sql/src/sql_engine.rs:1132-1134


sequenceDiagram
    participant Buffer as InsertBuffer
    participant Engine as SqlEngine
    participant Runtime as RuntimeEngine
    
    Note over Buffer: Buffer contains 3 statements:\n10, 20, 15 rows
    
    Engine->>Buffer: Take buffer (45 total rows)
    Buffer-->>Engine: InsertBuffer instance
    
    Engine->>Engine: Build InsertPlan with all 45 rows
    Engine->>Runtime: execute_statement(InsertPlan)
    Runtime->>Runtime: Persist all rows atomically
    Runtime-->>Engine: RuntimeStatementResult::Insert(45)
    
    Engine->>Engine: Split into per-statement results
    Note over Engine: [10 rows, 20 rows, 15 rows]
    
    Engine-->>Engine: Return Vec~SqlStatementResult~

Flush Execution and Result Splitting

When the buffer flushes, all accumulated rows are executed as a single InsertPlan, then the result is split back into per-statement results:

Implementation details:

  1. The buffer is moved out of the RefCell to prevent reentrancy issues
  2. A single InsertPlan is constructed with InsertSource::Rows(all_rows)
  3. The plan executes via RuntimeEngine::execute_statement
  4. The returned row count is validated against total_rows
  5. Per-statement results are reconstructed using statement_row_counts

Sources: llkv-sql/src/sql_engine.rs:1342-1407

Result Reconstruction

The flush method maintains the illusion of per-statement execution by reconstructing individual RuntimeStatementResult::Insert instances:

This ensures compatibility with test harnesses and applications that inspect per-statement results.

Sources: llkv-sql/src/sql_engine.rs:1389-1406


flowchart TD
    EXPECT["register_statement_expectation(expectation)"]
THREAD_LOCAL["PENDING_STATEMENT_EXPECTATIONS\n(thread-local queue)"]
EXEC["execute(sql)"]
NEXT["next_statement_expectation()"]
CHECK{"Expectation Type"}
OK["StatementExpectation::Ok\n(buffer eligible)"]
ERROR["StatementExpectation::Error\n(execute immediately)"]
COUNT["StatementExpectation::Count\n(execute immediately)"]
EXPECT --> THREAD_LOCAL
 
   EXEC --> NEXT
 
   NEXT --> THREAD_LOCAL
 
   THREAD_LOCAL --> CHECK
 
   CHECK --> OK
 
   CHECK --> ERROR
 
   CHECK --> COUNT

Statement Expectations Integration

The SQL Logic Test (SLT) harness uses statement expectations to validate error handling and row counts. The buffering system respects these expectations by bypassing the buffer when necessary:

Expectation types:

ExpectationBehaviorReason
OkAllow bufferingNo specific validation needed
ErrorForce immediate executionMust observe synchronous error
Count(n)Force immediate executionMust validate exact row count

Sources: llkv-sql/src/sql_engine.rs:375-391 llkv-sql/src/sql_engine.rs:1240-1250


Performance Characteristics

The buffering system provides substantial performance improvements for bulk ingestion workloads:

Planning Overhead Reduction

Without buffering, each INSERT statement incurs:

  1. SQL parsing (via sqlparser)
  2. Plan construction (InsertPlan allocation)
  3. Schema validation (column name resolution)
  4. Runtime plan dispatch

With buffering, these costs are amortized across all buffered statements, executing once per flush rather than once per statement.

Batch Size Impact

The default threshold of 8192 rows balances several factors:

FactorImpact of Larger BatchesImpact of Smaller Batches
Memory usageIncreases linearlyReduces proportionally
Planning amortizationBetter (fewer flushes)Worse (more flushes)
Latency to visibilityHigher (longer buffering)Lower (frequent flushes)
Write throughputGenerally higherGenerally lower

Sources: llkv-sql/src/sql_engine.rs490


sequenceDiagram
    participant App
    participant SqlEngine
    participant Buffer as InsertBuffer
    participant Runtime
    
    App->>SqlEngine: execute("BEGIN TRANSACTION")
    SqlEngine->>Buffer: flush_buffer_results()
    Buffer-->>SqlEngine: (empty, no pending data)
    SqlEngine->>Runtime: begin_transaction()
    
    App->>SqlEngine: execute("INSERT INTO t VALUES (1)")
    SqlEngine->>Buffer: Buffer statement
    
    App->>SqlEngine: execute("COMMIT")
    SqlEngine->>Buffer: flush_buffer_results()
    Buffer->>Runtime: execute_statement(InsertPlan)
    Runtime-->>Buffer: Success
    SqlEngine->>Runtime: commit_transaction()

Integration with Transaction Boundaries

The buffering system automatically flushes at transaction boundaries to maintain ACID semantics:

This ensures that all buffered writes are persisted before the transaction commits, preventing data loss or inconsistency.

Sources: llkv-sql/src/sql_engine.rs:1094-1102


Conflict Resolution Compatibility

The buffer preserves conflict resolution semantics by tracking the on_conflict action:

ActionSQLite SyntaxBuffering Behavior
None(default)Buffers with same action
ReplaceREPLACE INTO / INSERT OR REPLACEBuffers separately from other actions
IgnoreINSERT OR IGNOREBuffers separately from other actions
AbortINSERT OR ABORTBuffers separately from other actions
FailINSERT OR FAILBuffers separately from other actions
RollbackINSERT OR ROLLBACKBuffers separately from other actions

The buffer’s can_accept method ensures that only statements with identical conflict actions are batched together, preserving the semantic behavior of each INSERT.

Sources: llkv-sql/src/sql_engine.rs:1441-1455 llkv-sql/src/sql_engine.rs:528-535


Drop Safety

The SqlEngine implements a Drop handler to ensure that buffered data is persisted when the engine goes out of scope:

This prevents data loss when applications terminate without explicitly flushing or disabling buffering.

Sources: llkv-sql/src/sql_engine.rs:590-597


Key Code Entities

EntityLocationRole
InsertBufferllkv-sql/src/sql_engine.rs:497-547Accumulates row data and tracks per-statement counts
SqlEngine::insert_bufferllkv-sql/src/sql_engine.rs576Holds current buffer instance
SqlEngine::insert_buffering_enabledllkv-sql/src/sql_engine.rs583Global enable/disable flag
MAX_BUFFERED_INSERT_ROWSllkv-sql/src/sql_engine.rs490Flush threshold constant
PreparedInsertllkv-sql/src/sql_engine.rs:555-563Canonical INSERT representation
buffer_insertllkv-sql/src/sql_engine.rs:1231-1338Main buffering decision logic
prepare_insertllkv-sql/src/sql_engine.rs:1423-1557INSERT canonicalization
flush_buffer_resultsllkv-sql/src/sql_engine.rs:1342-1407Execution and result splitting
BufferedInsertResultllkv-sql/src/sql_engine.rs:567-570Return type for buffering operations

Sources: llkv-sql/src/sql_engine.rs:490-1557

Dismiss

Refresh this wiki

Enter email to refresh