This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
INSERT Buffering System
Loading…
INSERT Buffering System
Relevant source files
The INSERT Buffering System is a performance optimization that batches multiple consecutive INSERT ... VALUES statements into a single execution operation. This system reduces planning and execution overhead by accumulating literal row data across statement boundaries and flushing them together, achieving significant throughput improvements for bulk ingestion workloads while preserving per-statement result semantics.
For information about SQL statement execution in general, see SQL Interface. For details on how INSERT statements are planned and translated, see Query Planning.
Purpose and Scope
The buffering system operates at the SQL engine layer, intercepting INSERT statements before they reach the runtime executor. It analyzes each INSERT to determine if it contains literal values that can be safely accumulated, or if it requires immediate execution due to complex source expressions (e.g., INSERT ... SELECT). The system maintains buffering state across multiple execute() calls, making it suitable for workloads that stream SQL scripts containing thousands of INSERT statements.
In scope:
- Buffering of
INSERT ... VALUESstatements with literal row data - Automatic flushing based on configurable thresholds and statement boundaries
- Per-statement result tracking for compatibility with test harnesses
- Conflict resolution action compatibility (REPLACE, IGNORE, etc.)
Out of scope:
- Buffering of INSERT statements with subqueries or SELECT sources
- Cross-table buffering (only same-table INSERTs are batched)
- Transaction-aware buffering strategies
System Architecture
Sources: llkv-sql/src/sql_engine.rs:1057-1114 llkv-sql/src/sql_engine.rs:1231-1338 llkv-sql/src/sql_engine.rs:1423-1557
Core Data Structures
classDiagram
class InsertBuffer {-String table_name\n-Vec~String~ columns\n-InsertConflictAction on_conflict\n-usize total_rows\n-Vec~usize~ statement_row_counts\n-Vec~Vec~PlanValue~~ rows\n+new() InsertBuffer\n+can_accept() bool\n+push_statement()\n+should_flush() bool}
class PreparedInsert {<<enumeration>>\nValues\nImmediate}
class SqlEngine {-RefCell~Option~InsertBuffer~~ insert_buffer\n-AtomicBool insert_buffering_enabled\n+buffer_insert() BufferedInsertResult\n+flush_buffer_results() Vec~SqlStatementResult~\n+set_insert_buffering()}
SqlEngine --> InsertBuffer : contains
SqlEngine --> PreparedInsert : produces
InsertBuffer
The InsertBuffer struct accumulates literal row data across multiple INSERT statements while tracking per-statement row counts for result reporting.
Field descriptions:
| Field | Type | Purpose |
|---|---|---|
table_name | String | Target table identifier for compatibility checking |
columns | Vec<String> | Column list (must match across buffered statements) |
on_conflict | InsertConflictAction | Conflict resolution strategy (REPLACE, IGNORE, etc.) |
total_rows | usize | Sum of all buffered rows across statements |
statement_row_counts | Vec<usize> | Per-statement row counts for result splitting |
rows | Vec<Vec<PlanValue>> | Flattened literal row data |
Sources: llkv-sql/src/sql_engine.rs:497-547
Configuration and Thresholds
MAX_BUFFERED_INSERT_ROWS
The buffering system enforces a maximum row threshold to prevent unbounded memory growth during bulk ingestion:
When total_rows reaches this threshold, the buffer automatically flushes before accepting additional statements. This value balances memory usage against amortized planning overhead.
Sources: llkv-sql/src/sql_engine.rs490
Enabling and Disabling Buffering
Buffering is disabled by default to preserve immediate execution semantics for unit tests and applications that depend on synchronous error reporting. Long-running workloads can opt in via the set_insert_buffering method:
Sources: llkv-sql/src/sql_engine.rs:1022-1029 llkv-sql/src/sql_engine.rs583
sequenceDiagram
participant App
participant SqlEngine
participant InsertBuffer
App->>SqlEngine: set_insert_buffering(true)
SqlEngine->>SqlEngine: Store enabled flag
Note over App,SqlEngine: Buffering now active
App->>SqlEngine: execute("INSERT INTO t VALUES (1)")
SqlEngine->>InsertBuffer: Create or append
App->>SqlEngine: execute("INSERT INTO t VALUES (2)")
SqlEngine->>InsertBuffer: Append to existing
App->>SqlEngine: set_insert_buffering(false)
SqlEngine->>InsertBuffer: flush_buffer_results()
InsertBuffer->>SqlEngine: Return per-statement results
Buffering Decision Flow
The buffer_insert method implements the core buffering logic, determining whether an INSERT should be accumulated or executed immediately:
Immediate Execution Conditions
Buffering is bypassed under the following conditions:
- Statement Expectation is Error or Count: Test harnesses use expectations to validate synchronous error reporting and exact row counts
- Buffering Disabled: The
insert_buffering_enabledflag is false (default state) - Non-literal Source: The INSERT uses a SELECT subquery or other dynamic source
- Buffer Incompatibility: The INSERT targets a different table, column list, or conflict action
Sources: llkv-sql/src/sql_engine.rs:1231-1338
flowchart TD
START["buffer_insert(insert, expectation)"]
CHECK_EXPECT{"expectation ==\nError |Count?"}
CHECK_ENABLED{"buffering_enabled?"}
PREPARE["prepare_insert insert"]
CHECK_TYPE{"PreparedInsert type?"}
CHECK_COMPAT{"buffer.can_accept ?"}
CHECK_FULL{"buffer.should_flush ?"}
EXEC_IMM["Execute immediately Return flushed + current"]
CREATE_BUF["Create new InsertBuffer"]
APPEND_BUF["buffer.push_statement"]
FLUSH_BUF["flush_buffer_results"]
RETURN_PLACE["Return placeholder result"]
START --> CHECK_EXPECT
CHECK_EXPECT -->|Yes|EXEC_IMM
CHECK_EXPECT -->|No|CHECK_ENABLED
CHECK_ENABLED -->|No|EXEC_IMM
CHECK_ENABLED -->|Yes|PREPARE
PREPARE --> CHECK_TYPE
CHECK_TYPE -->|Immediate|EXEC_IMM
CHECK_TYPE -->|Values|CHECK_COMPAT
CHECK_COMPAT -->|No|FLUSH_BUF
CHECK_COMPAT -->|Yes|APPEND_BUF
FLUSH_BUF --> CREATE_BUF
CREATE_BUF --> RETURN_PLACE
APPEND_BUF --> CHECK_FULL
CHECK_FULL -->|Yes|FLUSH_BUF
CHECK_FULL -->|No| RETURN_PLACE
Insert Canonicalization
The prepare_insert method analyzes the INSERT statement and converts it into one of two canonical forms:
PreparedInsert::Values
Literal VALUES clauses (including constant SELECT forms like SELECT 1, 2) are rewritten into Vec<Vec<PlanValue>> rows for buffering:
Conversion logic:
flowchart LR
SQL["INSERT INTO users (id, name)\nVALUES (1, 'Alice')"]
AST["sqlparser::ast::Insert"]
CHECK{"Source type?"}
VALUES["SetExpr::Values"]
SELECT["SetExpr::Select"]
CONST{"Constant SELECT?"}
CONVERT["Convert to Vec~Vec~PlanValue~~"]
PREP_VAL["PreparedInsert::Values"]
PREP_IMM["PreparedInsert::Immediate"]
SQL --> AST
AST --> CHECK
CHECK -->|VALUES| VALUES
CHECK -->|SELECT| SELECT
VALUES --> CONVERT
SELECT --> CONST
CONST -->|Yes| CONVERT
CONST -->|No| PREP_IMM
CONVERT --> PREP_VAL
The method iterates over each row in the VALUES clause, converting SQL expressions to PlanValue variants:
| SQL Type | PlanValue Variant |
|---|---|
| Integer literals | PlanValue::Integer(i64) |
| Float literals | PlanValue::Float(f64) |
| String literals | PlanValue::String(String) |
| NULL | PlanValue::Null |
| Date literals | PlanValue::Date32(i32) |
| DECIMAL literals | PlanValue::Decimal(DecimalValue) |
Sources: llkv-sql/src/sql_engine.rs:1504-1524
flowchart LR
COMPLEX["INSERT INTO t\nSELECT * FROM source"]
PLAN["build_select_plan()"]
WRAP["InsertPlan { source: InsertSource::Select }"]
PREP["PreparedInsert::Immediate"]
COMPLEX --> PLAN
PLAN --> WRAP
WRAP --> PREP
PreparedInsert::Immediate
Non-literal sources (subqueries, complex expressions) are wrapped in a fully-planned InsertPlan for immediate execution:
Sources: llkv-sql/src/sql_engine.rs:1543-1552
flowchart TD
START["can_accept(table, columns, on_conflict)"]
CHECK_TABLE{"table_name == table?"}
CHECK_COLS{"columns == columns?"}
CHECK_CONFLICT{"on_conflict == on_conflict?"}
ACCEPT["Return true"]
REJECT["Return false"]
START --> CHECK_TABLE
CHECK_TABLE -->|No| REJECT
CHECK_TABLE -->|Yes| CHECK_COLS
CHECK_COLS -->|No| REJECT
CHECK_COLS -->|Yes| CHECK_CONFLICT
CHECK_CONFLICT -->|No| REJECT
CHECK_CONFLICT -->|Yes| ACCEPT
Buffer Compatibility Checking
The can_accept method ensures that only compatible INSERTs are batched together:
Compatibility requirements:
| Field | Requirement |
|---|---|
table_name | Exact string match (case-sensitive) |
columns | Exact column list match (order matters) |
on_conflict | Same conflict resolution strategy |
Sources: llkv-sql/src/sql_engine.rs:528-535
flowchart TD
subgraph "During execute()"
STMT{"Statement Type"}
INSERT["INSERT"]
TRANS["Transaction Boundary\n(BEGIN/COMMIT/ROLLBACK)"]
OTHER["Other Statement\n(SELECT, UPDATE, etc.)"]
end
subgraph "Buffer State"
SIZE{"total_rows >=\nMAX_BUFFERED_INSERT_ROWS?"}
COMPAT{"can_accept()?"}
end
subgraph "Explicit Triggers"
DROP["SqlEngine::drop()"]
DISABLE["set_insert_buffering(false)"]
MANUAL["flush_pending_inserts()"]
end
FLUSH["flush_buffer_results()"]
STMT --> INSERT
STMT --> TRANS
STMT --> OTHER
INSERT --> SIZE
INSERT --> COMPAT
SIZE -->|Yes| FLUSH
COMPAT -->|No| FLUSH
TRANS --> FLUSH
OTHER --> FLUSH
DROP --> FLUSH
DISABLE --> FLUSH
MANUAL --> FLUSH
Flush Triggers
The buffer flushes under multiple conditions to maintain correctness and predictable memory usage:
Automatic Flush Conditions
Sources: llkv-sql/src/sql_engine.rs:544-546 llkv-sql/src/sql_engine.rs:1098-1111 llkv-sql/src/sql_engine.rs:592-596
Manual Flush API
Applications can force a flush via the flush_pending_inserts public method:
This is useful when the application needs to ensure all buffered data is persisted before performing a read operation or checkpoint.
Sources: llkv-sql/src/sql_engine.rs:1132-1134
sequenceDiagram
participant Buffer as InsertBuffer
participant Engine as SqlEngine
participant Runtime as RuntimeEngine
Note over Buffer: Buffer contains 3 statements:\n10, 20, 15 rows
Engine->>Buffer: Take buffer (45 total rows)
Buffer-->>Engine: InsertBuffer instance
Engine->>Engine: Build InsertPlan with all 45 rows
Engine->>Runtime: execute_statement(InsertPlan)
Runtime->>Runtime: Persist all rows atomically
Runtime-->>Engine: RuntimeStatementResult::Insert(45)
Engine->>Engine: Split into per-statement results
Note over Engine: [10 rows, 20 rows, 15 rows]
Engine-->>Engine: Return Vec~SqlStatementResult~
Flush Execution and Result Splitting
When the buffer flushes, all accumulated rows are executed as a single InsertPlan, then the result is split back into per-statement results:
Implementation details:
- The buffer is moved out of the
RefCellto prevent reentrancy issues - A single
InsertPlanis constructed withInsertSource::Rows(all_rows) - The plan executes via
RuntimeEngine::execute_statement - The returned row count is validated against
total_rows - Per-statement results are reconstructed using
statement_row_counts
Sources: llkv-sql/src/sql_engine.rs:1342-1407
Result Reconstruction
The flush method maintains the illusion of per-statement execution by reconstructing individual RuntimeStatementResult::Insert instances:
This ensures compatibility with test harnesses and applications that inspect per-statement results.
Sources: llkv-sql/src/sql_engine.rs:1389-1406
flowchart TD
EXPECT["register_statement_expectation(expectation)"]
THREAD_LOCAL["PENDING_STATEMENT_EXPECTATIONS\n(thread-local queue)"]
EXEC["execute(sql)"]
NEXT["next_statement_expectation()"]
CHECK{"Expectation Type"}
OK["StatementExpectation::Ok\n(buffer eligible)"]
ERROR["StatementExpectation::Error\n(execute immediately)"]
COUNT["StatementExpectation::Count\n(execute immediately)"]
EXPECT --> THREAD_LOCAL
EXEC --> NEXT
NEXT --> THREAD_LOCAL
THREAD_LOCAL --> CHECK
CHECK --> OK
CHECK --> ERROR
CHECK --> COUNT
Statement Expectations Integration
The SQL Logic Test (SLT) harness uses statement expectations to validate error handling and row counts. The buffering system respects these expectations by bypassing the buffer when necessary:
Expectation types:
| Expectation | Behavior | Reason |
|---|---|---|
Ok | Allow buffering | No specific validation needed |
Error | Force immediate execution | Must observe synchronous error |
Count(n) | Force immediate execution | Must validate exact row count |
Sources: llkv-sql/src/sql_engine.rs:375-391 llkv-sql/src/sql_engine.rs:1240-1250
Performance Characteristics
The buffering system provides substantial performance improvements for bulk ingestion workloads:
Planning Overhead Reduction
Without buffering, each INSERT statement incurs:
- SQL parsing (via
sqlparser) - Plan construction (
InsertPlanallocation) - Schema validation (column name resolution)
- Runtime plan dispatch
With buffering, these costs are amortized across all buffered statements, executing once per flush rather than once per statement.
Batch Size Impact
The default threshold of 8192 rows balances several factors:
| Factor | Impact of Larger Batches | Impact of Smaller Batches |
|---|---|---|
| Memory usage | Increases linearly | Reduces proportionally |
| Planning amortization | Better (fewer flushes) | Worse (more flushes) |
| Latency to visibility | Higher (longer buffering) | Lower (frequent flushes) |
| Write throughput | Generally higher | Generally lower |
Sources: llkv-sql/src/sql_engine.rs490
sequenceDiagram
participant App
participant SqlEngine
participant Buffer as InsertBuffer
participant Runtime
App->>SqlEngine: execute("BEGIN TRANSACTION")
SqlEngine->>Buffer: flush_buffer_results()
Buffer-->>SqlEngine: (empty, no pending data)
SqlEngine->>Runtime: begin_transaction()
App->>SqlEngine: execute("INSERT INTO t VALUES (1)")
SqlEngine->>Buffer: Buffer statement
App->>SqlEngine: execute("COMMIT")
SqlEngine->>Buffer: flush_buffer_results()
Buffer->>Runtime: execute_statement(InsertPlan)
Runtime-->>Buffer: Success
SqlEngine->>Runtime: commit_transaction()
Integration with Transaction Boundaries
The buffering system automatically flushes at transaction boundaries to maintain ACID semantics:
This ensures that all buffered writes are persisted before the transaction commits, preventing data loss or inconsistency.
Sources: llkv-sql/src/sql_engine.rs:1094-1102
Conflict Resolution Compatibility
The buffer preserves conflict resolution semantics by tracking the on_conflict action:
| Action | SQLite Syntax | Buffering Behavior |
|---|---|---|
None | (default) | Buffers with same action |
Replace | REPLACE INTO / INSERT OR REPLACE | Buffers separately from other actions |
Ignore | INSERT OR IGNORE | Buffers separately from other actions |
Abort | INSERT OR ABORT | Buffers separately from other actions |
Fail | INSERT OR FAIL | Buffers separately from other actions |
Rollback | INSERT OR ROLLBACK | Buffers separately from other actions |
The buffer’s can_accept method ensures that only statements with identical conflict actions are batched together, preserving the semantic behavior of each INSERT.
Sources: llkv-sql/src/sql_engine.rs:1441-1455 llkv-sql/src/sql_engine.rs:528-535
Drop Safety
The SqlEngine implements a Drop handler to ensure that buffered data is persisted when the engine goes out of scope:
This prevents data loss when applications terminate without explicitly flushing or disabling buffering.
Sources: llkv-sql/src/sql_engine.rs:590-597
Key Code Entities
| Entity | Location | Role |
|---|---|---|
InsertBuffer | llkv-sql/src/sql_engine.rs:497-547 | Accumulates row data and tracks per-statement counts |
SqlEngine::insert_buffer | llkv-sql/src/sql_engine.rs576 | Holds current buffer instance |
SqlEngine::insert_buffering_enabled | llkv-sql/src/sql_engine.rs583 | Global enable/disable flag |
MAX_BUFFERED_INSERT_ROWS | llkv-sql/src/sql_engine.rs490 | Flush threshold constant |
PreparedInsert | llkv-sql/src/sql_engine.rs:555-563 | Canonical INSERT representation |
buffer_insert | llkv-sql/src/sql_engine.rs:1231-1338 | Main buffering decision logic |
prepare_insert | llkv-sql/src/sql_engine.rs:1423-1557 | INSERT canonicalization |
flush_buffer_results | llkv-sql/src/sql_engine.rs:1342-1407 | Execution and result splitting |
BufferedInsertResult | llkv-sql/src/sql_engine.rs:567-570 | Return type for buffering operations |
Sources: llkv-sql/src/sql_engine.rs:490-1557
Dismiss
Refresh this wiki
Enter email to refresh