From f3f7f7fbf12b4108666fe781cbdcbf06d0645c8c Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 00:31:58 -0500 Subject: [PATCH 01/10] Offline storage: batch flush inserts and avoid a copy on the read path StoreRecords() previously inserted each record in its own transaction (StoreRecord -> DbTransaction -> BEGIN EXCLUSIVE/COMMIT), so a flush of N records performed N transactions (N WAL fsyncs). Batch the whole vector into a single transaction, sharing the per-record work via extracted helpers (isValidRecord / insertRecordUnsafe / checkStorageSizeLimits). StoreRecord() keeps its own single-record transaction; behavior is unchanged. This implements the existing in-code TODO ("consider running the batch in transaction"). Measured against the SDK's vendored sqlite (WAL, synchronous=NORMAL, BEGIN EXCLUSIVE): N=200 ~11x, N=1000 ~40x faster for the insert batch. MemoryStorage::GetAndReserveRecords: when leaseTimeMs==0 (the flush-read path, where the record is consumed and popped unconditionally), move the record straight to the consumer instead of deep-copying it first. Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecordsInOneTransaction. Full UnitTests suite (524) passes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/MemoryStorage.cpp | 24 +++- lib/offline/OfflineStorage_SQLite.cpp | 103 +++++++++++++----- lib/offline/OfflineStorage_SQLite.hpp | 8 ++ .../unittests/OfflineStorageTests_SQLite.cpp | 19 ++++ 4 files changed, 121 insertions(+), 33 deletions(-) diff --git a/lib/offline/MemoryStorage.cpp b/lib/offline/MemoryStorage.cpp index 1d4ec5664..d47ab89c0 100644 --- a/lib/offline/MemoryStorage.cpp +++ b/lib/offline/MemoryStorage.cpp @@ -154,20 +154,32 @@ namespace MAT_NS_BEGIN { StorageRecord & record = m_records[latency].back(); size_t recordSize = record.blob.size() + sizeof(record); - StorageRecord forConsumer(record); - if (leaseTimeMs) + + if (leaseTimeMs == 0) { - forConsumer.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs; + // No reservation requested (e.g. the flush-read path): the + // record is consumed and removed unconditionally, so move it + // straight to the consumer instead of deep-copying it first. + bool wantMore = consumer(std::move(record)); + m_records[latency].pop_back(); + m_size -= std::min(m_size, recordSize); + maxCount--; + m_lastReadCount++; + if (!wantMore) { + return true; + } + continue; } + StorageRecord forConsumer(record); + forConsumer.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs; + bool wantMore = consumer(std::move(forConsumer)); // move to consumer if (!wantMore) { return true; } - if (leaseTimeMs) { - m_reserved_records[record.id] = std::move(record); // move to reserved - } + m_reserved_records[record.id] = std::move(record); // move to reserved m_records[latency].pop_back(); m_size -= std::min(m_size, recordSize); maxCount--; diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b9b2ed83d..0ea64afce 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -147,40 +147,25 @@ namespace MAT_NS_BEGIN { m_db->execute(command.c_str()); } - bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) { - // TODO: [MG] - this works, but may not play nicely with several LogManager instances - // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); - if (record.id.empty() || record.tenantToken.empty() || static_cast(record.latency) < 0 || record.timestamp <= 0) { LOG_ERROR("Failed to store event %s:%s: Invalid parameters", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); m_observer->OnStorageFailed("Invalid parameters"); return false; } + return true; + } - if (!m_db) { - LOG_ERROR("Failed to store event %s:%s: Database is not open", - tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageOpenFailed("Database is not open"); - return false; - } - - { -#ifdef ENABLE_LOCKING - LOCKGUARD(m_lock); - DbTransaction transaction(m_db.get()); - if (!transaction.locked) - { - LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageFailed("Database error"); - return false; - } -#endif - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); - m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); - } + void OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) + { + SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); + m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); + } + void OfflineStorage_SQLite::checkStorageSizeLimits() + { if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit)) { auto now = PAL::getMonotonicTimeMs(); @@ -210,6 +195,39 @@ namespace MAT_NS_BEGIN { m_resizing = false; } } + } + + bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + { + // TODO: [MG] - this works, but may not play nicely with several LogManager instances + // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); + + if (!isValidRecord(record)) { + return false; + } + + if (!m_db) { + LOG_ERROR("Failed to store event %s:%s: Database is not open", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageOpenFailed("Database is not open"); + return false; + } + + { +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database error"); + return false; + } +#endif + insertRecordUnsafe(record); + } + + checkStorageSizeLimits(); return true; @@ -217,12 +235,43 @@ namespace MAT_NS_BEGIN { size_t OfflineStorage_SQLite::StoreRecords(std::vector & records) { + if (records.empty()) { + return 0; + } + + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); + m_observer->OnStorageOpenFailed("Database is not open"); + return 0; + } + size_t stored = 0; - for (auto & i : records) { - if (StoreRecord(i)) { + { + // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / + // COMMIT (one fsync) for the whole flush instead of one per record. +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store %zu events: Database error", records.size()); + m_observer->OnStorageFailed("Database error"); + return 0; + } +#endif + for (auto & i : records) { + if (!isValidRecord(i)) { + continue; + } + insertRecordUnsafe(i); ++stored; } } + + if (stored) { + checkStorageSizeLimits(); + } + return stored; } diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 18643cde5..3b6e684ce 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -122,6 +122,14 @@ namespace MAT_NS_BEGIN { private: size_t GetRecordCountUnsafe(EventLatency latency) const; + + // Validate a record's required fields; reports OnStorageFailed on rejection. + bool isValidRecord(StorageRecord const& record); + // Insert one already-validated record. Caller must hold m_lock and have an + // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. + void insertRecordUnsafe(StorageRecord const& record); + // Run the DB-size-full notification and resize checks (after inserts). + void checkStorageSizeLimits(); }; diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index e90b0a9ae..13e0565e0 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -153,6 +153,25 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord) EXPECT_THAT(consumer.records[0].reservedUntil, 0); } +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecordsInOneTransaction) +{ + initializeStorage(); + std::vector batch; + const size_t kCount = 8; + for (size_t i = 0; i < kCount; i++) + { + batch.push_back({ "g" + std::to_string(i), "token", EventLatency_Normal, + EventPersistence_Normal, static_cast(i + 1), { static_cast(i) } }); + } + + // The whole batch is committed in a single transaction. + EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); + + TestRecordConsumer consumer; + EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); + ASSERT_THAT(consumer.records.size(), kCount); +} + TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) { initializeStorage(); From 2c18ed99c6e8188c5abb8a475676e6ee0d11820c Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 01:07:18 -0500 Subject: [PATCH 02/10] Address Copilot round-1 comments - Rename StoreRecords batch test (drop the "InOneTransaction" claim it did not verify) and reword its comment: the test covers batch storage correctness; the single-transaction batching is a perf optimization verified by benchmarking. - Document the intended batching side-effect changes: the DB-open failure is now reported once for the whole batch (was once per record), and the size-full notification / resize check runs once after the batch commit (m_DbSizeEstimate already reflects all inserts; the resize result is unchanged). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 5 +++++ tests/unittests/OfflineStorageTests_SQLite.cpp | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 0ea64afce..36cd02829 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -240,6 +240,8 @@ namespace MAT_NS_BEGIN { } if (!m_db) { + // Report once for the whole batch (the previous per-record loop + // reported once per record). LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); m_observer->OnStorageOpenFailed("Database is not open"); return 0; @@ -268,6 +270,9 @@ namespace MAT_NS_BEGIN { } } + // Run the size-full notification / resize check once after the batch + // commit (instead of after every record): m_DbSizeEstimate already + // reflects all inserts and the resize result is the same. if (stored) { checkStorageSizeLimits(); } diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index 13e0565e0..f46c302c3 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -153,7 +153,7 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord) EXPECT_THAT(consumer.records[0].reservedUntil, 0); } -TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecordsInOneTransaction) +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) { initializeStorage(); std::vector batch; @@ -164,7 +164,9 @@ TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecordsInOneTransac EventPersistence_Normal, static_cast(i + 1), { static_cast(i) } }); } - // The whole batch is committed in a single transaction. + // Every record in the batch is stored and individually retrievable. (The + // single-transaction batching is a performance optimization verified by + // benchmarking; this test covers the batch's storage correctness.) EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); TestRecordConsumer consumer; From 1f9cf639eea2f0e6c5eb9ab3a3bd3e1e09d45d4e Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 01:21:45 -0500 Subject: [PATCH 03/10] Address Copilot round-2: drop the read-path move; tighten batch test - Revert the MemoryStorage::GetAndReserveRecords leaseTimeMs==0 move-instead-of- copy optimization. It changed behavior if a leaseTimeMs==0 consumer ever returned false (the record would be consumed instead of kept). It is low-value and only safe under the current "leaseTimeMs==0 consumers never reject" contract, so drop it; this PR is now focused on the transaction batching. - Tighten StoreRecordsBatchStoresAllRecords to assert each record id is individually retrieved (matching the comment), not just the total count. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/MemoryStorage.cpp | 24 +++++-------------- .../unittests/OfflineStorageTests_SQLite.cpp | 10 ++++++++ 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/lib/offline/MemoryStorage.cpp b/lib/offline/MemoryStorage.cpp index d47ab89c0..1d4ec5664 100644 --- a/lib/offline/MemoryStorage.cpp +++ b/lib/offline/MemoryStorage.cpp @@ -154,32 +154,20 @@ namespace MAT_NS_BEGIN { StorageRecord & record = m_records[latency].back(); size_t recordSize = record.blob.size() + sizeof(record); - - if (leaseTimeMs == 0) + StorageRecord forConsumer(record); + if (leaseTimeMs) { - // No reservation requested (e.g. the flush-read path): the - // record is consumed and removed unconditionally, so move it - // straight to the consumer instead of deep-copying it first. - bool wantMore = consumer(std::move(record)); - m_records[latency].pop_back(); - m_size -= std::min(m_size, recordSize); - maxCount--; - m_lastReadCount++; - if (!wantMore) { - return true; - } - continue; + forConsumer.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs; } - StorageRecord forConsumer(record); - forConsumer.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs; - bool wantMore = consumer(std::move(forConsumer)); // move to consumer if (!wantMore) { return true; } - m_reserved_records[record.id] = std::move(record); // move to reserved + if (leaseTimeMs) { + m_reserved_records[record.id] = std::move(record); // move to reserved + } m_records[latency].pop_back(); m_size -= std::min(m_size, recordSize); maxCount--; diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index f46c302c3..1550211c8 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -172,6 +172,16 @@ TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) TestRecordConsumer consumer; EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); ASSERT_THAT(consumer.records.size(), kCount); + for (size_t i = 0; i < kCount; i++) + { + std::string expectedId = "g" + std::to_string(i); + bool found = false; + for (auto const& r : consumer.records) + { + if (r.id == expectedId) { found = true; break; } + } + EXPECT_TRUE(found) << "record " << expectedId << " was not retrieved"; + } } TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) From aeeb52e4545b6273b9354d382f35dd72eab7232b Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 01:37:35 -0500 Subject: [PATCH 04/10] Address Copilot round-3 comments - Validate all records BEFORE opening the transaction in StoreRecords(): build the list of valid records first (reporting rejects via OnStorageFailed), then open the single BEGIN EXCLUSIVE transaction and insert them. This keeps any observer callback out of the held transaction (matching the single StoreRecord(), which already validates before its transaction) and avoids re-entrancy/lock-duration hazards. - insertRecordUnsafe() now returns whether the INSERT actually succeeded and only increments m_DbSizeEstimate on success. StoreRecords() counts only successful inserts and StoreRecord() returns the real result, so a failed write no longer inflates the stored count or the size estimate. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 37 +++++++++++++++++++++------ lib/offline/OfflineStorage_SQLite.hpp | 3 ++- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 36cd02829..533542bca 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -158,10 +158,16 @@ namespace MAT_NS_BEGIN { return true; } - void OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) + bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) { - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + return false; + } m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); + return true; } void OfflineStorage_SQLite::checkStorageSizeLimits() @@ -224,7 +230,9 @@ namespace MAT_NS_BEGIN { return false; } #endif - insertRecordUnsafe(record); + if (!insertRecordUnsafe(record)) { + return false; + } } checkStorageSizeLimits(); @@ -247,6 +255,21 @@ namespace MAT_NS_BEGIN { return 0; } + // Validate (and report rejects) before opening the transaction, so that + // no observer callback runs while the BEGIN EXCLUSIVE transaction is held + // (this matches the single StoreRecord(), which validates before its + // transaction). + std::vector valid; + valid.reserve(records.size()); + for (auto & i : records) { + if (isValidRecord(i)) { + valid.push_back(&i); + } + } + if (valid.empty()) { + return 0; + } + size_t stored = 0; { // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / @@ -261,12 +284,10 @@ namespace MAT_NS_BEGIN { return 0; } #endif - for (auto & i : records) { - if (!isValidRecord(i)) { - continue; + for (auto * r : valid) { + if (insertRecordUnsafe(*r)) { + ++stored; } - insertRecordUnsafe(i); - ++stored; } } diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 3b6e684ce..05be6acf8 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN { bool isValidRecord(StorageRecord const& record); // Insert one already-validated record. Caller must hold m_lock and have an // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. - void insertRecordUnsafe(StorageRecord const& record); + // Returns false (without updating the size estimate) if the insert fails. + bool insertRecordUnsafe(StorageRecord const& record); // Run the DB-size-full notification and resize checks (after inserts). void checkStorageSizeLimits(); }; From 0435f1a240f9a686ddadd5c4b02e7d4bf9f7fc8d Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 01:58:54 -0500 Subject: [PATCH 05/10] Address Copilot round-4: validate before the DB-open check in StoreRecords Move the validation pass ahead of the !m_db check so StoreRecords() reports invalid records consistently with the single StoreRecord() (which validates first) even when the database is not open. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 533542bca..cbe4d265b 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -247,18 +247,11 @@ namespace MAT_NS_BEGIN { return 0; } - if (!m_db) { - // Report once for the whole batch (the previous per-record loop - // reported once per record). - LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); - m_observer->OnStorageOpenFailed("Database is not open"); - return 0; - } - - // Validate (and report rejects) before opening the transaction, so that - // no observer callback runs while the BEGIN EXCLUSIVE transaction is held - // (this matches the single StoreRecord(), which validates before its - // transaction). + // Validate (and report rejects) first -- before both the DB-open check and + // the transaction -- so reporting matches the single StoreRecord() (which + // validates before everything) regardless of whether the DB is open, and + // so that no observer callback runs while the BEGIN EXCLUSIVE transaction + // is held. std::vector valid; valid.reserve(records.size()); for (auto & i : records) { @@ -266,6 +259,13 @@ namespace MAT_NS_BEGIN { valid.push_back(&i); } } + + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); + m_observer->OnStorageOpenFailed("Database is not open"); + return 0; + } + if (valid.empty()) { return 0; } From 08cd5629d4cafbd29e7fe1165bdc83395e07b491 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 02:10:36 -0500 Subject: [PATCH 06/10] Address Copilot round-5: report disk write failures to the observer insertRecordUnsafe() returns false on a failed INSERT but did not notify the observer, so a write failure could be silently dropped. Both StoreRecord() and StoreRecords() now call m_observer->OnStorageFailed("Database write failed") when an insert fails -- after the transaction has closed, so the observer callback never runs while BEGIN EXCLUSIVE is held. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index cbe4d265b..7e7ebf207 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -219,6 +219,7 @@ namespace MAT_NS_BEGIN { return false; } + bool stored = false; { #ifdef ENABLE_LOCKING LOCKGUARD(m_lock); @@ -230,9 +231,14 @@ namespace MAT_NS_BEGIN { return false; } #endif - if (!insertRecordUnsafe(record)) { - return false; - } + stored = insertRecordUnsafe(record); + } + + if (!stored) { + // Report the write failure after the transaction has closed, so the + // observer callback never runs while BEGIN EXCLUSIVE is held. + m_observer->OnStorageFailed("Database write failed"); + return false; } checkStorageSizeLimits(); @@ -271,6 +277,7 @@ namespace MAT_NS_BEGIN { } size_t stored = 0; + size_t failed = 0; { // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / // COMMIT (one fsync) for the whole flush instead of one per record. @@ -288,9 +295,17 @@ namespace MAT_NS_BEGIN { if (insertRecordUnsafe(*r)) { ++stored; } + else { + ++failed; + } } } + if (failed) { + // Report write failures once, after the transaction has closed. + m_observer->OnStorageFailed("Database write failed"); + } + // Run the size-full notification / resize check once after the batch // commit (instead of after every record): m_DbSizeEstimate already // reflects all inserts and the resize result is the same. From ac1d5ac21b82aa9f8a182e815c137f04a0aa6f97 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 02:20:01 -0500 Subject: [PATCH 07/10] Address Copilot round-6: mark isValidRecord const isValidRecord does not modify object state; mark it const for const-correctness. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 2 +- lib/offline/OfflineStorage_SQLite.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 7e7ebf207..273e74217 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -147,7 +147,7 @@ namespace MAT_NS_BEGIN { m_db->execute(command.c_str()); } - bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) + bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const { if (record.id.empty() || record.tenantToken.empty() || static_cast(record.latency) < 0 || record.timestamp <= 0) { LOG_ERROR("Failed to store event %s:%s: Invalid parameters", diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 05be6acf8..1d32a4c77 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -124,7 +124,7 @@ namespace MAT_NS_BEGIN { size_t GetRecordCountUnsafe(EventLatency latency) const; // Validate a record's required fields; reports OnStorageFailed on rejection. - bool isValidRecord(StorageRecord const& record); + bool isValidRecord(StorageRecord const& record) const; // Insert one already-validated record. Caller must hold m_lock and have an // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. // Returns false (without updating the size estimate) if the insert fails. From a2408189b627510a6c1539cb6020d705be733d58 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 02:31:12 -0500 Subject: [PATCH 08/10] Address Copilot round-7 comments - Always run checkStorageSizeLimits() even when an insert fails. My earlier early-return on failure skipped it, so a full DB (SQLITE_FULL) could no longer trigger ResizeDb() and storage could get stuck failing writes. StoreRecord() and StoreRecords() now run the size/resize check regardless of insert success. - Use const pointers for the batch validation list (records are not modified). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 273e74217..b4ba5bec4 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -238,12 +238,14 @@ namespace MAT_NS_BEGIN { // Report the write failure after the transaction has closed, so the // observer callback never runs while BEGIN EXCLUSIVE is held. m_observer->OnStorageFailed("Database write failed"); - return false; } + // Always run the size-limit check, even on failure: a full-DB error + // (e.g. SQLITE_FULL) must still be able to trigger ResizeDb() so storage + // can recover. checkStorageSizeLimits(); - return true; + return stored; } @@ -258,9 +260,9 @@ namespace MAT_NS_BEGIN { // validates before everything) regardless of whether the DB is open, and // so that no observer callback runs while the BEGIN EXCLUSIVE transaction // is held. - std::vector valid; + std::vector valid; valid.reserve(records.size()); - for (auto & i : records) { + for (auto const& i : records) { if (isValidRecord(i)) { valid.push_back(&i); } @@ -291,7 +293,7 @@ namespace MAT_NS_BEGIN { return 0; } #endif - for (auto * r : valid) { + for (auto const* r : valid) { if (insertRecordUnsafe(*r)) { ++stored; } @@ -306,12 +308,11 @@ namespace MAT_NS_BEGIN { m_observer->OnStorageFailed("Database write failed"); } - // Run the size-full notification / resize check once after the batch - // commit (instead of after every record): m_DbSizeEstimate already - // reflects all inserts and the resize result is the same. - if (stored) { - checkStorageSizeLimits(); - } + // Always run the size-full notification / resize check once after the + // batch (even if every insert failed): a full DB must still be able to + // trigger ResizeDb() to recover. m_DbSizeEstimate already reflects all + // successful inserts. + checkStorageSizeLimits(); return stored; } From 0171a712a5c199a0c4efd360e188e1a9d24bdd07 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 02:42:45 -0500 Subject: [PATCH 09/10] Address Copilot round-8: ordering + accurate comments - In StoreRecords(), return early when no records are valid (already reported) before the DB-open check, matching StoreRecord() which returns after validation without checking DB-open. Log the valid count. - Stop overclaiming "recovery" in the size-check comments: checkStorageSizeLimits() runs after the batch to match the original per-record behavior; reliable full-DB recovery (refreshing the size estimate on failure) is a pre-existing concern out of scope for this batching change. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b4ba5bec4..b36337dac 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -240,9 +240,8 @@ namespace MAT_NS_BEGIN { m_observer->OnStorageFailed("Database write failed"); } - // Always run the size-limit check, even on failure: a full-DB error - // (e.g. SQLITE_FULL) must still be able to trigger ResizeDb() so storage - // can recover. + // Run the size-limit check after the transaction, matching the original + // per-record path (which ran it on every StoreRecord call). checkStorageSizeLimits(); return stored; @@ -268,13 +267,16 @@ namespace MAT_NS_BEGIN { } } - if (!m_db) { - LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); - m_observer->OnStorageOpenFailed("Database is not open"); + if (valid.empty()) { + // Every record was invalid (already reported above). Match the single + // StoreRecord(), which returns after validation without checking + // DB-open. return 0; } - if (valid.empty()) { + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", valid.size()); + m_observer->OnStorageOpenFailed("Database is not open"); return 0; } @@ -308,10 +310,8 @@ namespace MAT_NS_BEGIN { m_observer->OnStorageFailed("Database write failed"); } - // Always run the size-full notification / resize check once after the - // batch (even if every insert failed): a full DB must still be able to - // trigger ResizeDb() to recover. m_DbSizeEstimate already reflects all - // successful inserts. + // Run the size-full notification / resize check once after the batch, + // matching the original per-record path (which ran it on every insert). checkStorageSizeLimits(); return stored; From d1f86ec9f25f28114badd20d08fd005be90745cb Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 09:54:48 -0500 Subject: [PATCH 10/10] Address Copilot round-9: log valid count on lock failure The transaction-lock failure message in StoreRecords() logged records.size(), but the batch has already been filtered to valid records; log valid.size(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b36337dac..02134e5c4 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -290,7 +290,7 @@ namespace MAT_NS_BEGIN { DbTransaction transaction(m_db.get()); if (!transaction.locked) { - LOG_ERROR("Failed to store %zu events: Database error", records.size()); + LOG_ERROR("Failed to store %zu events: Database error", valid.size()); m_observer->OnStorageFailed("Database error"); return 0; }