Skip to content

Offline storage: batch flush inserts in one transaction#1497

Closed
bmehta001 wants to merge 10 commits into
microsoft:mainfrom
bmehta001:bhamehta/perf-storage-flush
Closed

Offline storage: batch flush inserts in one transaction#1497
bmehta001 wants to merge 10 commits into
microsoft:mainfrom
bmehta001:bhamehta/perf-storage-flush

Conversation

@bmehta001

@bmehta001 bmehta001 commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

OfflineStorage_SQLite::StoreRecords() looped StoreRecord(), and each StoreRecord() opens its own DbTransaction (BEGIN EXCLUSIVECOMMIT). So a flush of N records did N transactions = N WAL fsyncs.

This batches the whole vector into a single transaction, sharing the per-record logic via extracted private helpers (isValidRecord, insertRecordUnsafe, checkStorageSizeLimits). Implements the existing in-code TODO (// consider running the batch in transaction).

Measured against the SDK's own vendored sqlite (WAL, synchronous=NORMAL, BEGIN EXCLUSIVE):

N records per-record txn single txn speedup
200 ~3.3 ms ~0.3 ms ~11×
1000 ~64 ms ~1.5 ms ~40×

Behavior notes

The batching keeps validation/reporting semantics equivalent to the old per-record path, with these intentional consequences:

  • Validation runs up front (reporting rejects) before the transaction, so no observer callback runs while BEGIN EXCLUSIVE is held.
  • insertRecordUnsafe() now checks the execute() result: a failed insert is not counted in stored and does not inflate m_DbSizeEstimate, and the write failure is reported via OnStorageFailed("Database write failed") after the transaction closes. (Previously the per-record insert result was ignored — this is a correctness improvement that overlaps the separate data-safety PR Offline storage: data-safety fixes + batched flush (empty-filter guard, store-failure propagation, event-loss fix, one-transaction flush) #1491; the two will need a coordinated merge.)
  • The size-full notification / ResizeDb() check runs once after the batch (and still runs even if inserts failed, so a full DB can recover).

Tests

  • Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecords (asserts each record is individually retrievable).
  • Full WSL UnitTests: 524/524 pass.

Note: an earlier revision also moved (instead of copied) the record on the leaseTimeMs==0 read path; that was dropped as low-value and slightly fragile, leaving this PR focused on the transaction batching.

StoreRecords() previously inserted each record in its own transaction
(StoreRecord -> DbTransaction -> BEGIN EXCLUSIVE/COMMIT), so a flush of N records
performed N transactions (N WAL fsyncs). Batch the whole vector into a single
transaction, sharing the per-record work via extracted helpers
(isValidRecord / insertRecordUnsafe / checkStorageSizeLimits). StoreRecord()
keeps its own single-record transaction; behavior is unchanged. This implements
the existing in-code TODO ("consider running the batch in transaction").

Measured against the SDK's vendored sqlite (WAL, synchronous=NORMAL,
BEGIN EXCLUSIVE): N=200 ~11x, N=1000 ~40x faster for the insert batch.

MemoryStorage::GetAndReserveRecords: when leaseTimeMs==0 (the flush-read path,
where the record is consumed and popped unconditionally), move the record
straight to the consumer instead of deep-copying it first.

Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecordsInOneTransaction.
Full UnitTests suite (524) passes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@bmehta001 bmehta001 requested a review from a team as a code owner June 23, 2026 05:32
@bmehta001 bmehta001 requested a review from Copilot June 23, 2026 05:38

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the offline-storage “flush” path by batching SQLite inserts into a single exclusive transaction and by eliminating an extra deep-copy on the in-memory flush-read path, aiming to reduce fsync overhead and improve throughput without changing functional behavior.

Changes:

  • Batch OfflineStorage_SQLite::StoreRecords() inserts into a single BEGIN EXCLUSIVE/COMMIT cycle via extracted helpers.
  • Refactor per-record validation/insert/size-limit logic into private helpers (isValidRecord, insertRecordUnsafe, checkStorageSizeLimits).
  • Optimize MemoryStorage::GetAndReserveRecords() to move records directly to the consumer when leaseTimeMs == 0, avoiding a deep copy.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
tests/unittests/OfflineStorageTests_SQLite.cpp Adds a new unit test covering batch storage behavior.
lib/offline/OfflineStorage_SQLite.hpp Declares new private helper methods to support batched inserts.
lib/offline/OfflineStorage_SQLite.cpp Implements single-transaction batch insert path and refactors record storage logic.
lib/offline/MemoryStorage.cpp Avoids a deep copy on the no-reservation (flush-read) path by moving records to the consumer.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +167 to +168
// The whole batch is committed in a single transaction.
EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount);
Comment on lines +242 to +246
if (!m_db) {
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +271 to +273
if (stored) {
checkStorageSizeLimits();
}
EXPECT_THAT(consumer.records[0].reservedUntil, 0);
}

TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecordsInOneTransaction)
- Rename StoreRecords batch test (drop the "InOneTransaction" claim it did not
  verify) and reword its comment: the test covers batch storage correctness; the
  single-transaction batching is a perf optimization verified by benchmarking.
- Document the intended batching side-effect changes: the DB-open failure is now
  reported once for the whole batch (was once per record), and the size-full
  notification / resize check runs once after the batch commit (m_DbSizeEstimate
  already reflects all inserts; the resize result is unchanged).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread lib/offline/MemoryStorage.cpp Outdated
Comment on lines +163 to +165
bool wantMore = consumer(std::move(record));
m_records[latency].pop_back();
m_size -= std::min(m_size, recordSize);
Comment on lines +242 to +248
if (!m_db) {
// Report once for the whole batch (the previous per-record loop
// reported once per record).
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}
Comment on lines +167 to +175
// Every record in the batch is stored and individually retrievable. (The
// single-transaction batching is a performance optimization verified by
// benchmarking; this test covers the batch's storage correctness.)
EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount);

TestRecordConsumer consumer;
EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true);
ASSERT_THAT(consumer.records.size(), kCount);
}
- Revert the MemoryStorage::GetAndReserveRecords leaseTimeMs==0 move-instead-of-
  copy optimization. It changed behavior if a leaseTimeMs==0 consumer ever
  returned false (the record would be consumed instead of kept). It is low-value
  and only safe under the current "leaseTimeMs==0 consumers never reject"
  contract, so drop it; this PR is now focused on the transaction batching.
- Tighten StoreRecordsBatchStoresAllRecords to assert each record id is
  individually retrieved (matching the comment), not just the total count.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@bmehta001 bmehta001 changed the title Offline storage: batch flush inserts in one transaction (+ avoid a read-path copy) Offline storage: batch flush inserts in one transaction Jun 23, 2026
@bmehta001 bmehta001 requested a review from Copilot June 23, 2026 06:22

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +264 to +267
for (auto & i : records) {
if (!isValidRecord(i)) {
continue;
}
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +161 to +165
void OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record)
{
SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob);
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
}
- Validate all records BEFORE opening the transaction in StoreRecords(): build
  the list of valid records first (reporting rejects via OnStorageFailed), then
  open the single BEGIN EXCLUSIVE transaction and insert them. This keeps any
  observer callback out of the held transaction (matching the single
  StoreRecord(), which already validates before its transaction) and avoids
  re-entrancy/lock-duration hazards.
- insertRecordUnsafe() now returns whether the INSERT actually succeeded and only
  increments m_DbSizeEstimate on success. StoreRecords() counts only successful
  inserts and StoreRecord() returns the real result, so a failed write no longer
  inflates the stored count or the size estimate.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines 222 to 240
{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
if (!insertRecordUnsafe(record)) {
return false;
}
}

checkStorageSizeLimits();

return true;
Comment on lines +163 to 168
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +250 to +254
if (!m_db) {
// Report once for the whole batch (the previous per-record loop
// reported once per record).
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
…cords

Move the validation pass ahead of the !m_db check so StoreRecords() reports
invalid records consistently with the single StoreRecord() (which validates
first) even when the database is not open.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines 222 to 240
{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
if (!insertRecordUnsafe(record)) {
return false;
}
}

checkStorageSizeLimits();

return true;
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +273 to +297
size_t stored = 0;
{
// Batch all inserts into a single transaction: one BEGIN EXCLUSIVE /
// COMMIT (one fsync) for the whole flush instead of one per record.
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", records.size());
m_observer->OnStorageFailed("Database error");
return 0;
}
#endif
for (auto * r : valid) {
if (insertRecordUnsafe(*r)) {
++stored;
}
}
}

// Run the size-full notification / resize check once after the batch
// commit (instead of after every record): m_DbSizeEstimate already
// reflects all inserts and the resize result is the same.
if (stored) {
insertRecordUnsafe() returns false on a failed INSERT but did not notify the
observer, so a write failure could be silently dropped. Both StoreRecord() and
StoreRecords() now call m_observer->OnStorageFailed("Database write failed") when
an insert fails -- after the transaction has closed, so the observer callback
never runs while BEGIN EXCLUSIVE is held.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment thread lib/offline/OfflineStorage_SQLite.hpp Outdated
size_t GetRecordCountUnsafe(EventLatency latency) const;

// Validate a record's required fields; reports OnStorageFailed on rejection.
bool isValidRecord(StorageRecord const& record);
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
}

bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record)
isValidRecord does not modify object state; mark it const for const-correctness.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

Comment on lines +237 to +242
if (!stored) {
// Report the write failure after the transaction has closed, so the
// observer callback never runs while BEGIN EXCLUSIVE is held.
m_observer->OnStorageFailed("Database write failed");
return false;
}
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +312 to +314
if (stored) {
checkStorageSizeLimits();
}
Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +261 to +265
std::vector<StorageRecord*> valid;
valid.reserve(records.size());
for (auto & i : records) {
if (StoreRecord(i)) {
++stored;
if (isValidRecord(i)) {
valid.push_back(&i);
Comment on lines +161 to +170
bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record)
{
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
return true;
- Always run checkStorageSizeLimits() even when an insert fails. My earlier
  early-return on failure skipped it, so a full DB (SQLITE_FULL) could no longer
  trigger ResizeDb() and storage could get stuck failing writes. StoreRecord()
  and StoreRecords() now run the size/resize check regardless of insert success.
- Use const pointers for the batch validation list (records are not modified).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
Comment on lines +271 to +279
if (!m_db) {
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}

if (valid.empty()) {
return 0;
}
Comment on lines +237 to +246
if (!stored) {
// Report the write failure after the transaction has closed, so the
// observer callback never runs while BEGIN EXCLUSIVE is held.
m_observer->OnStorageFailed("Database write failed");
}

// Always run the size-limit check, even on failure: a full-DB error
// (e.g. SQLITE_FULL) must still be able to trigger ResizeDb() so storage
// can recover.
checkStorageSizeLimits();
Comment on lines +306 to +315
if (failed) {
// Report write failures once, after the transaction has closed.
m_observer->OnStorageFailed("Database write failed");
}

// Always run the size-full notification / resize check once after the
// batch (even if every insert failed): a full DB must still be able to
// trigger ResizeDb() to recover. m_DbSizeEstimate already reflects all
// successful inserts.
checkStorageSizeLimits();
- In StoreRecords(), return early when no records are valid (already reported)
  before the DB-open check, matching StoreRecord() which returns after
  validation without checking DB-open. Log the valid count.
- Stop overclaiming "recovery" in the size-check comments: checkStorageSizeLimits()
  runs after the batch to match the original per-record behavior; reliable full-DB
  recovery (refreshing the size estimate on failure) is a pre-existing concern out
  of scope for this batching change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment on lines +291 to 296
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", records.size());
m_observer->OnStorageFailed("Database error");
return 0;
}
Comment on lines +308 to 311
if (failed) {
// Report write failures once, after the transaction has closed.
m_observer->OnStorageFailed("Database write failed");
}

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment thread lib/offline/OfflineStorage_SQLite.cpp Outdated
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", records.size());
The transaction-lock failure message in StoreRecords() logged records.size(),
but the batch has already been filtered to valid records; log valid.size().

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.

bmehta001 added a commit to bmehta001/cpp_client_telemetry that referenced this pull request Jun 23, 2026
…(was PR microsoft#1497)

Combine the batched-flush perf work into this PR and make it cooperate with the
Flush() data-loss fix, so both land together.

OfflineStorage_SQLite: StoreRecords() now inserts the whole batch in a single
BEGIN EXCLUSIVE / COMMIT (one fsync) instead of one transaction per record
(~11x at 200 records, ~40x at 1000 vs the SDK's vendored sqlite). Shared per-record
logic is factored into isValidRecord / insertRecordUnsafe / checkStorageSizeLimits.
The batch is all-or-nothing: if any insert fails, the transaction is rolled back
(new SqliteDB::rollback / DbTransaction::markForRollback) and the size estimate is
undone, so callers can re-queue the whole batch without risking duplicate rows
(the events table has no unique record_id constraint).

OfflineStorageHandler::Flush() now uses the batched StoreRecords() to persist a
drained batch in one transaction. Because StoreRecords() is all-or-nothing, on
failure nothing is committed and Flush returns every record to the in-memory queue
for retry -- realizing the batching speedup while keeping the no-event-loss /
no-duplicate guarantee.

StoreRecords/StoreRecord report write failures via OnStorageFailed after the
transaction closes; validation runs before the transaction. Adds
OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecords. Full UnitTests (527)
pass. Closes PR microsoft#1497.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@bmehta001

Copy link
Copy Markdown
Contributor Author

Folded into #1491. The batched StoreRecords() optimization is combined with #1491's Flush() data-loss fix: StoreRecords() is now all-or-nothing (rolls back on any failure) and Flush() uses it, so the batch speedup and the no-loss/no-duplicate guarantee cooperate instead of conflicting. Driven to a clean Copilot review here over ~9 rounds; the final code lives in #1491.

@bmehta001 bmehta001 closed this Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants