Offline storage: batch flush inserts in one transaction#1497
Offline storage: batch flush inserts in one transaction#1497bmehta001 wants to merge 10 commits into
Conversation
StoreRecords() previously inserted each record in its own transaction
(StoreRecord -> DbTransaction -> BEGIN EXCLUSIVE/COMMIT), so a flush of N records
performed N transactions (N WAL fsyncs). Batch the whole vector into a single
transaction, sharing the per-record work via extracted helpers
(isValidRecord / insertRecordUnsafe / checkStorageSizeLimits). StoreRecord()
keeps its own single-record transaction; behavior is unchanged. This implements
the existing in-code TODO ("consider running the batch in transaction").
Measured against the SDK's vendored sqlite (WAL, synchronous=NORMAL,
BEGIN EXCLUSIVE): N=200 ~11x, N=1000 ~40x faster for the insert batch.
MemoryStorage::GetAndReserveRecords: when leaseTimeMs==0 (the flush-read path,
where the record is consumed and popped unconditionally), move the record
straight to the consumer instead of deep-copying it first.
Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecordsInOneTransaction.
Full UnitTests suite (524) passes.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR optimizes the offline-storage “flush” path by batching SQLite inserts into a single exclusive transaction and by eliminating an extra deep-copy on the in-memory flush-read path, aiming to reduce fsync overhead and improve throughput without changing functional behavior.
Changes:
- Batch
OfflineStorage_SQLite::StoreRecords()inserts into a singleBEGIN EXCLUSIVE/COMMITcycle via extracted helpers. - Refactor per-record validation/insert/size-limit logic into private helpers (
isValidRecord,insertRecordUnsafe,checkStorageSizeLimits). - Optimize
MemoryStorage::GetAndReserveRecords()to move records directly to the consumer whenleaseTimeMs == 0, avoiding a deep copy.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| tests/unittests/OfflineStorageTests_SQLite.cpp | Adds a new unit test covering batch storage behavior. |
| lib/offline/OfflineStorage_SQLite.hpp | Declares new private helper methods to support batched inserts. |
| lib/offline/OfflineStorage_SQLite.cpp | Implements single-transaction batch insert path and refactors record storage logic. |
| lib/offline/MemoryStorage.cpp | Avoids a deep copy on the no-reservation (flush-read) path by moving records to the consumer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // The whole batch is committed in a single transaction. | ||
| EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); |
| if (!m_db) { | ||
| LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); | ||
| return 0; | ||
| } |
| if (stored) { | ||
| checkStorageSizeLimits(); | ||
| } |
| EXPECT_THAT(consumer.records[0].reservedUntil, 0); | ||
| } | ||
|
|
||
| TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecordsInOneTransaction) |
- Rename StoreRecords batch test (drop the "InOneTransaction" claim it did not verify) and reword its comment: the test covers batch storage correctness; the single-transaction batching is a perf optimization verified by benchmarking. - Document the intended batching side-effect changes: the DB-open failure is now reported once for the whole batch (was once per record), and the size-full notification / resize check runs once after the batch commit (m_DbSizeEstimate already reflects all inserts; the resize result is unchanged). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| bool wantMore = consumer(std::move(record)); | ||
| m_records[latency].pop_back(); | ||
| m_size -= std::min(m_size, recordSize); |
| if (!m_db) { | ||
| // Report once for the whole batch (the previous per-record loop | ||
| // reported once per record). | ||
| LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); | ||
| return 0; | ||
| } |
| // Every record in the batch is stored and individually retrievable. (The | ||
| // single-transaction batching is a performance optimization verified by | ||
| // benchmarking; this test covers the batch's storage correctness.) | ||
| EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); | ||
|
|
||
| TestRecordConsumer consumer; | ||
| EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); | ||
| ASSERT_THAT(consumer.records.size(), kCount); | ||
| } |
- Revert the MemoryStorage::GetAndReserveRecords leaseTimeMs==0 move-instead-of- copy optimization. It changed behavior if a leaseTimeMs==0 consumer ever returned false (the record would be consumed instead of kept). It is low-value and only safe under the current "leaseTimeMs==0 consumers never reject" contract, so drop it; this PR is now focused on the transaction batching. - Tighten StoreRecordsBatchStoresAllRecords to assert each record id is individually retrieved (matching the comment), not just the total count. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| for (auto & i : records) { | ||
| if (!isValidRecord(i)) { | ||
| continue; | ||
| } |
| void OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) | ||
| { | ||
| SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob); | ||
| m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); | ||
| } |
- Validate all records BEFORE opening the transaction in StoreRecords(): build the list of valid records first (reporting rejects via OnStorageFailed), then open the single BEGIN EXCLUSIVE transaction and insert them. This keeps any observer callback out of the held transaction (matching the single StoreRecord(), which already validates before its transaction) and avoids re-entrancy/lock-duration hazards. - insertRecordUnsafe() now returns whether the INSERT actually succeeded and only increments m_DbSizeEstimate on success. StoreRecords() counts only successful inserts and StoreRecord() returns the real result, so a failed write no longer inflates the stored count or the size estimate. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| { | ||
| #ifdef ENABLE_LOCKING | ||
| LOCKGUARD(m_lock); | ||
| DbTransaction transaction(m_db.get()); | ||
| if (!transaction.locked) | ||
| { | ||
| LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); | ||
| m_observer->OnStorageFailed("Database error"); | ||
| return false; | ||
| } | ||
| #endif | ||
| if (!insertRecordUnsafe(record)) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| checkStorageSizeLimits(); | ||
|
|
||
| return true; |
| if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob)) | ||
| { | ||
| LOG_ERROR("Failed to store event %s:%s: database write failed", | ||
| tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); | ||
| return false; | ||
| } |
| if (!m_db) { | ||
| // Report once for the whole batch (the previous per-record loop | ||
| // reported once per record). | ||
| LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); |
…cords Move the validation pass ahead of the !m_db check so StoreRecords() reports invalid records consistently with the single StoreRecord() (which validates first) even when the database is not open. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| { | ||
| #ifdef ENABLE_LOCKING | ||
| LOCKGUARD(m_lock); | ||
| DbTransaction transaction(m_db.get()); | ||
| if (!transaction.locked) | ||
| { | ||
| LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); | ||
| m_observer->OnStorageFailed("Database error"); | ||
| return false; | ||
| } | ||
| #endif | ||
| if (!insertRecordUnsafe(record)) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| checkStorageSizeLimits(); | ||
|
|
||
| return true; |
| size_t stored = 0; | ||
| { | ||
| // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / | ||
| // COMMIT (one fsync) for the whole flush instead of one per record. | ||
| #ifdef ENABLE_LOCKING | ||
| LOCKGUARD(m_lock); | ||
| DbTransaction transaction(m_db.get()); | ||
| if (!transaction.locked) | ||
| { | ||
| LOG_ERROR("Failed to store %zu events: Database error", records.size()); | ||
| m_observer->OnStorageFailed("Database error"); | ||
| return 0; | ||
| } | ||
| #endif | ||
| for (auto * r : valid) { | ||
| if (insertRecordUnsafe(*r)) { | ||
| ++stored; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Run the size-full notification / resize check once after the batch | ||
| // commit (instead of after every record): m_DbSizeEstimate already | ||
| // reflects all inserts and the resize result is the same. | ||
| if (stored) { |
insertRecordUnsafe() returns false on a failed INSERT but did not notify the
observer, so a write failure could be silently dropped. Both StoreRecord() and
StoreRecords() now call m_observer->OnStorageFailed("Database write failed") when
an insert fails -- after the transaction has closed, so the observer callback
never runs while BEGIN EXCLUSIVE is held.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| size_t GetRecordCountUnsafe(EventLatency latency) const; | ||
|
|
||
| // Validate a record's required fields; reports OnStorageFailed on rejection. | ||
| bool isValidRecord(StorageRecord const& record); |
| } | ||
|
|
||
| bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) | ||
| bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) |
isValidRecord does not modify object state; mark it const for const-correctness. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| if (!stored) { | ||
| // Report the write failure after the transaction has closed, so the | ||
| // observer callback never runs while BEGIN EXCLUSIVE is held. | ||
| m_observer->OnStorageFailed("Database write failed"); | ||
| return false; | ||
| } |
| if (stored) { | ||
| checkStorageSizeLimits(); | ||
| } |
| std::vector<StorageRecord*> valid; | ||
| valid.reserve(records.size()); | ||
| for (auto & i : records) { | ||
| if (StoreRecord(i)) { | ||
| ++stored; | ||
| if (isValidRecord(i)) { | ||
| valid.push_back(&i); |
| bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) | ||
| { | ||
| if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob)) | ||
| { | ||
| LOG_ERROR("Failed to store event %s:%s: database write failed", | ||
| tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); | ||
| return false; | ||
| } | ||
| m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); | ||
| return true; |
- Always run checkStorageSizeLimits() even when an insert fails. My earlier early-return on failure skipped it, so a full DB (SQLITE_FULL) could no longer trigger ResizeDb() and storage could get stuck failing writes. StoreRecord() and StoreRecords() now run the size/resize check regardless of insert success. - Use const pointers for the batch validation list (records are not modified). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| if (!m_db) { | ||
| LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); | ||
| m_observer->OnStorageOpenFailed("Database is not open"); | ||
| return 0; | ||
| } | ||
|
|
||
| if (valid.empty()) { | ||
| return 0; | ||
| } |
| if (!stored) { | ||
| // Report the write failure after the transaction has closed, so the | ||
| // observer callback never runs while BEGIN EXCLUSIVE is held. | ||
| m_observer->OnStorageFailed("Database write failed"); | ||
| } | ||
|
|
||
| // Always run the size-limit check, even on failure: a full-DB error | ||
| // (e.g. SQLITE_FULL) must still be able to trigger ResizeDb() so storage | ||
| // can recover. | ||
| checkStorageSizeLimits(); |
| if (failed) { | ||
| // Report write failures once, after the transaction has closed. | ||
| m_observer->OnStorageFailed("Database write failed"); | ||
| } | ||
|
|
||
| // Always run the size-full notification / resize check once after the | ||
| // batch (even if every insert failed): a full DB must still be able to | ||
| // trigger ResizeDb() to recover. m_DbSizeEstimate already reflects all | ||
| // successful inserts. | ||
| checkStorageSizeLimits(); |
- In StoreRecords(), return early when no records are valid (already reported) before the DB-open check, matching StoreRecord() which returns after validation without checking DB-open. Log the valid count. - Stop overclaiming "recovery" in the size-check comments: checkStorageSizeLimits() runs after the batch to match the original per-record behavior; reliable full-DB recovery (refreshing the size estimate on failure) is a pre-existing concern out of scope for this batching change. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| if (!transaction.locked) | ||
| { | ||
| LOG_ERROR("Failed to store %zu events: Database error", records.size()); | ||
| m_observer->OnStorageFailed("Database error"); | ||
| return 0; | ||
| } |
| if (failed) { | ||
| // Report write failures once, after the transaction has closed. | ||
| m_observer->OnStorageFailed("Database write failed"); | ||
| } |
| DbTransaction transaction(m_db.get()); | ||
| if (!transaction.locked) | ||
| { | ||
| LOG_ERROR("Failed to store %zu events: Database error", records.size()); |
The transaction-lock failure message in StoreRecords() logged records.size(), but the batch has already been filtered to valid records; log valid.size(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…(was PR microsoft#1497) Combine the batched-flush perf work into this PR and make it cooperate with the Flush() data-loss fix, so both land together. OfflineStorage_SQLite: StoreRecords() now inserts the whole batch in a single BEGIN EXCLUSIVE / COMMIT (one fsync) instead of one transaction per record (~11x at 200 records, ~40x at 1000 vs the SDK's vendored sqlite). Shared per-record logic is factored into isValidRecord / insertRecordUnsafe / checkStorageSizeLimits. The batch is all-or-nothing: if any insert fails, the transaction is rolled back (new SqliteDB::rollback / DbTransaction::markForRollback) and the size estimate is undone, so callers can re-queue the whole batch without risking duplicate rows (the events table has no unique record_id constraint). OfflineStorageHandler::Flush() now uses the batched StoreRecords() to persist a drained batch in one transaction. Because StoreRecords() is all-or-nothing, on failure nothing is committed and Flush returns every record to the in-memory queue for retry -- realizing the batching speedup while keeping the no-event-loss / no-duplicate guarantee. StoreRecords/StoreRecord report write failures via OnStorageFailed after the transaction closes; validation runs before the transaction. Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecords. Full UnitTests (527) pass. Closes PR microsoft#1497. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Folded into #1491. The batched StoreRecords() optimization is combined with #1491's Flush() data-loss fix: StoreRecords() is now all-or-nothing (rolls back on any failure) and Flush() uses it, so the batch speedup and the no-loss/no-duplicate guarantee cooperate instead of conflicting. Driven to a clean Copilot review here over ~9 rounds; the final code lives in #1491. |
Summary
OfflineStorage_SQLite::StoreRecords()loopedStoreRecord(), and eachStoreRecord()opens its ownDbTransaction(BEGIN EXCLUSIVE…COMMIT). So a flush of N records did N transactions = N WAL fsyncs.This batches the whole vector into a single transaction, sharing the per-record logic via extracted private helpers (
isValidRecord,insertRecordUnsafe,checkStorageSizeLimits). Implements the existing in-code TODO (// consider running the batch in transaction).Measured against the SDK's own vendored sqlite (WAL,
synchronous=NORMAL,BEGIN EXCLUSIVE):Behavior notes
The batching keeps validation/reporting semantics equivalent to the old per-record path, with these intentional consequences:
BEGIN EXCLUSIVEis held.insertRecordUnsafe()now checks theexecute()result: a failed insert is not counted instoredand does not inflatem_DbSizeEstimate, and the write failure is reported viaOnStorageFailed("Database write failed")after the transaction closes. (Previously the per-record insert result was ignored — this is a correctness improvement that overlaps the separate data-safety PR Offline storage: data-safety fixes + batched flush (empty-filter guard, store-failure propagation, event-loss fix, one-transaction flush) #1491; the two will need a coordinated merge.)ResizeDb()check runs once after the batch (and still runs even if inserts failed, so a full DB can recover).Tests
OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecords(asserts each record is individually retrievable).UnitTests: 524/524 pass.