From 652e5e5fcb4b1e4bc2e28b67d4da0db2db355ddd Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 10:48:59 -0500 Subject: [PATCH 1/9] Offline storage: guard empty-filter delete + propagate SQLite store failure Two latent data-loss bugs found during a repo-wide review: 1) MemoryStorage::DeleteRecords(whereFilter) matched EVERY record when whereFilter was empty (the matcher starts `matched = true` and the per-key loop never runs), silently wiping the entire in-memory queue. This contradicts the fail-closed OfflineStorage_SQLite::DeleteRecords and the Room backend. Guard an empty filter and return without deleting; intentional full clears use DeleteAllRecords(). 2) OfflineStorage_SQLite::StoreRecord ignored the bool returned by SqliteStatement::execute(), returning true and bumping m_DbSizeEstimate even on a real write failure (SQLITE_FULL/IOERR/etc). The event is silently lost with no OnStorageFailed notification and the size estimate drifts. Capture the result; on failure log, notify the observer, and return false (skipping the size bump). Tests: added MemoryStorageTests.DeleteRecordsWithEmptyFilterDoesNotDeleteAll (fails without the guard -- the queue is wiped to 0; passes with it). The StoreRecord write-failure path isn't unit-testable here (the insert is REPLACE INTO with no constraint to violate), so it's covered by build + review. Verified locally on Linux: all 9 MemoryStorageTests and 32 OfflineStorageTests_SQLite pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/MemoryStorage.cpp | 10 ++++++++++ lib/offline/OfflineStorage_SQLite.cpp | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/offline/MemoryStorage.cpp b/lib/offline/MemoryStorage.cpp index 1d4ec5664..77ff0fc7c 100644 --- a/lib/offline/MemoryStorage.cpp +++ b/lib/offline/MemoryStorage.cpp @@ -224,6 +224,16 @@ namespace MAT_NS_BEGIN { void MemoryStorage::DeleteRecords(const std::map & whereFilter) { + // An empty filter matches every record. Never silently wipe the whole + // in-memory queue from a no-op predicate; callers must use + // DeleteAllRecords() for an intentional full clear. This mirrors the + // fail-closed behavior of OfflineStorage_SQLite::DeleteRecords. + if (whereFilter.empty()) + { + LOG_WARN("DeleteRecords called with an empty filter; ignoring to avoid deleting all records."); + return; + } + auto matcher = [&](const StorageRecord &r, const std::map & whereFilter) { bool matched = true; diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b9b2ed83d..b1c7c82b4 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -177,7 +177,13 @@ namespace MAT_NS_BEGIN { return false; } #endif - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database write failed"); + return false; + } m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); } From 325c55b98673c312b539349b7ec22fbfb337be68 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 10:55:53 -0500 Subject: [PATCH 2/9] Add MemoryStorage empty-filter delete regression test Verified TDD: this test fails without the empty-filter guard (the queue is wiped, GetSize()/GetRecordCount() drop to 0) and passes with it. Run on Linux host. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unittests/MemoryStorageTests.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/unittests/MemoryStorageTests.cpp b/tests/unittests/MemoryStorageTests.cpp index a736d125f..268cf137d 100644 --- a/tests/unittests/MemoryStorageTests.cpp +++ b/tests/unittests/MemoryStorageTests.cpp @@ -213,6 +213,24 @@ TEST_F(MemoryStorageTests, DeleteAllRecords) EXPECT_THAT(storage.GetReservedCount(), 0); } +TEST_F(MemoryStorageTests, DeleteRecordsWithEmptyFilterDoesNotDeleteAll) +{ + MemoryStorage storage(testLogManager, *testConfig); + + // Add some events to storage + auto total_db_size = addEvents(storage); + EXPECT_THAT(storage.GetSize(), total_db_size); + auto count_before = storage.GetRecordCount(); + EXPECT_GT(count_before, static_cast(0)); + + // An empty where-filter matches every record; it must NOT wipe the queue. + // Intentional full clears go through DeleteAllRecords(). + storage.DeleteRecords(std::map{}); + + EXPECT_THAT(storage.GetRecordCount(), count_before); + EXPECT_THAT(storage.GetSize(), total_db_size); +} + TEST_F(MemoryStorageTests, ReleaseRecords) { From d9640b726a83c289da3016276d1fe197fc08d8b2 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 16:06:17 -0500 Subject: [PATCH 3/9] Address review comment: propagate synchronous disk store failures lib/offline/OfflineStorage_SQLite.cpp::StoreRecord now returns false on a write failure (this PR), but OfflineStorageHandler::StoreRecord ignored the disk result and always returned true, so a failed synchronous store (RAM queue disabled or during shutdown) was counted as successfully persisted by StoreRecords()/StorageObserver. Return the disk StoreRecord() result in the direct-to-disk path. The memory path is unchanged: MemoryStorage::StoreRecord returning false means an intentional latency-Off skip, not a failure, so it must not surface as an error. Verified at lib/offline/OfflineStorageHandler.cpp:266-275 and lib/offline/OfflineStorage_SQLite.cpp:180-186. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorageHandler.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index 9049339c4..95810a6a8 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -269,7 +269,9 @@ namespace MAT_NS_BEGIN { { if (record.persistence != EventPersistence::EventPersistence_DoNotStoreOnDisk) { - m_offlineStorageDisk->StoreRecord(record); + // Propagate a synchronous disk write failure to the caller so a + // failed store is not counted as successfully persisted. + return m_offlineStorageDisk->StoreRecord(record); } } } From f1b33810c5d46bccdf14a037ceb626b0f345c0cb Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 22:57:45 -0500 Subject: [PATCH 4/9] Prevent event loss when a disk write fails during Flush() Combine the Flush() data-loss fix into this storage-data-safety PR (the two are halves of the same fix: this PR already makes OfflineStorage_SQLite::StoreRecord report write failures; Flush() must act on that). OfflineStorageHandler::Flush() previously drained the in-memory queue with GetRecords() (which removes records) and handed them to StoreRecords() before confirming persistence. On a partial/total disk write failure the un-persisted records were already gone from memory and never re-queued -> events lost. Flush() now drains into a local batch, persists one record at a time, and re-inserts only the records that fail to persist (so failures are retried, not lost). Per-record StoreRecord() is used deliberately: a batched StoreRecords() only returns a count, so on a partial failure we could not tell which records to re-queue, and re-storing already-saved records would duplicate them (no unique record_id constraint). Also null-guards the dbSizeBeforeFlush read so Flush() is safe with disk-only storage (CFG_INT_RAM_QUEUE_SIZE == 0). Adds OfflineStorageHandlerFlushTests.FailedDiskWriteDuringFlushReturnsRecordsToMemory (records the SQLite store rejects stay in memory after Flush; verified it fails against the previous GetRecords()-based Flush). Closes the separate PR #1496. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorageHandler.cpp | 50 ++++++++----- tests/unittests/OfflineStorageTests.cpp | 95 +++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 17 deletions(-) diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index 95810a6a8..50de1c264 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -174,28 +174,44 @@ namespace MAT_NS_BEGIN { // than the handle gets replaced by nullptr in this DeferredCallbackHandle obj. m_flushHandle.Cancel(); - size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize(); + size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0; if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk)) { - // This will block on and then take a lock for the duration of this move, and - // StoreRecord() will then block until the move completes. + // Drain the in-memory queue into a local batch. Records are removed + // from memory here; any that fail to persist below are re-inserted, so + // a disk write failure does not silently lose events. Draining (rather + // than reserving) keeps only a single copy of each record in flight and + // avoids stamping a reservation lease that the Room backend would + // persist to disk. auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified); - std::vector ids; - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("BEGIN"); - - size_t totalSaved = m_offlineStorageDisk->StoreRecords(records); - - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("END"); + // Persist one record at a time so we know exactly which succeeded. A + // batched StoreRecords() only returns a count, so on a partial failure + // we could not tell which records to re-queue, and re-storing + // already-saved records would duplicate them (the events table has no + // unique record_id constraint). + size_t totalSaved = 0; + size_t totalFailed = 0; + for (auto& record : records) + { + if (m_offlineStorageDisk->StoreRecord(record)) + { + ++totalSaved; + } + else + { + // Return the record to the in-memory queue for retry on a + // subsequent flush instead of dropping it. + ++totalFailed; + m_offlineStorageMemory->StoreRecord(record); + } + } - // Delete records from reserved on flush - HttpHeaders dummy; - bool fromMemory = true; - m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory); + if (totalFailed > 0) + { + LOG_WARN("Flush: %zu of %zu records failed to persist to disk; returned to the queue for retry", + totalFailed, records.size()); + } // Notify event listener about the records cached OnStorageRecordsSaved(totalSaved); diff --git a/tests/unittests/OfflineStorageTests.cpp b/tests/unittests/OfflineStorageTests.cpp index bbb8da8e0..1bc834755 100644 --- a/tests/unittests/OfflineStorageTests.cpp +++ b/tests/unittests/OfflineStorageTests.cpp @@ -2,7 +2,14 @@ #include "common/Common.hpp" #include "common/MockIOfflineStorage.hpp" +#include "common/MockIOfflineStorageObserver.hpp" +#include "common/MockIRuntimeConfig.hpp" +#include "offline/OfflineStorageHandler.hpp" #include "offline/StorageObserver.hpp" +#include "NullObjects.hpp" + +#include +#include using namespace testing; using namespace MAT; @@ -162,3 +169,91 @@ TEST_F(OfflineStorageTests, ReleaseRecordsIsForwarded) .WillOnce(Return()); EXPECT_THAT(offlineStorage.releaseRecordsIncRetryCount(ctx), true); } + +namespace +{ + // Remove a SQLite db file along with its WAL-mode companion files + // (-wal/-shm/-journal), which would otherwise accumulate in the temp dir. + void RemoveDbFiles(const std::string& path) + { + std::remove(path.c_str()); + std::remove((path + "-wal").c_str()); + std::remove((path + "-shm").c_str()); + std::remove((path + "-journal").c_str()); + } + + // No-op dispatcher that owns queued tasks and frees them, so flushes only + // run when invoked directly and scheduled tasks (if any) are not leaked. + class NoopTaskDispatcher : public ITaskDispatcher + { + public: + void Join() override { clear(); } + void Queue(Task* task) override { m_tasks.push_back(task); } + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it != m_tasks.end()) + { + delete *it; + m_tasks.erase(it); + } + return true; + } + ~NoopTaskDispatcher() override { clear(); } + + private: + void clear() + { + for (auto* t : m_tasks) + delete t; + m_tasks.clear(); + } + std::vector m_tasks; + }; +} + +// Regression test: when records pulled from the in-memory queue fail to persist +// to disk during Flush(), they must be returned to the queue rather than lost. +TEST(OfflineStorageHandlerFlushTests, FailedDiskWriteDuringFlushReturnsRecordsToMemory) +{ + NullLogManager logManager; + NiceMock config; + NoopTaskDispatcher dispatcher; + NiceMock observer; + + ON_CALL(config, GetOfflineStorageMaximumSizeBytes()).WillByDefault(Return(32 * 4096)); + ON_CALL(config, GetMaximumRetryCount()).WillByDefault(Return(5)); + + std::ostringstream dbPath; + dbPath << GetTempDirectory() << "FlushReserveTest-" << PAL::getUtcSystemTimeMs() << ".db"; + RemoveDbFiles(dbPath.str()); + config[CFG_STR_CACHE_FILE_PATH] = dbPath.str(); + config[CFG_INT_RAM_QUEUE_SIZE] = 1024 * 1024; // enable the in-memory queue + + OfflineStorageHandler handler(logManager, config, dispatcher); + handler.Initialize(observer); + + // A timestamp <= 0 is accepted by the in-memory queue but rejected by the + // SQLite disk store's input validation, so its StoreRecord() returns false. + // This drives the same Flush() failure-handling path as a disk write failure + // (a failed record must be returned to memory, not dropped). + const size_t kCount = 5; + for (size_t i = 0; i < kCount; i++) + { + StorageRecord r("flush-id-" + std::to_string(i), "tenant-token", + EventLatency_Normal, EventPersistence_Normal, /*timestamp*/ 0, + std::vector{ 'x' }); + handler.StoreRecord(r); + } + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Flush(); + + // The disk rejected every record; with the fix they are returned to the + // in-memory queue rather than silently dropped. + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Shutdown(); + RemoveDbFiles(dbPath.str()); +} From 45e9d55cf4505dd985ad34bdde33cf4ab8d9ba2e Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 23:09:00 -0500 Subject: [PATCH 5/9] Address Copilot comment: NoopTaskDispatcher::Cancel returns found-state The test helper's Cancel() returned true unconditionally, violating the ITaskDispatcher::Cancel contract (return whether the task was found/cancelled). Return true only when the task was present in the queue, false otherwise. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unittests/OfflineStorageTests.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unittests/OfflineStorageTests.cpp b/tests/unittests/OfflineStorageTests.cpp index 1bc834755..a94b98ea4 100644 --- a/tests/unittests/OfflineStorageTests.cpp +++ b/tests/unittests/OfflineStorageTests.cpp @@ -197,8 +197,9 @@ namespace { delete *it; m_tasks.erase(it); + return true; } - return true; + return false; } ~NoopTaskDispatcher() override { clear(); } From bab7b420f5b05b4f7784964f49599473dd918d85 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 22 Jun 2026 23:25:35 -0500 Subject: [PATCH 6/9] Address Copilot comments: rename flush test for precision Rename FailedDiskWriteDuringFlush... -> FailedDiskStoreDuringFlush... and reword its comments: the test exercises a disk StoreRecord() rejection (SQLite input validation), which drives the same Flush() re-queue path as any disk store failure, not a literal disk write/IO error. (The reviewer's separate note that Flush() ignores EventPersistence_DoNotStoreOnDisk is a pre-existing behavior, out of scope for this data-safety change and not cleanly unit-testable via the public API; tracked as a follow-up.) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unittests/OfflineStorageTests.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/unittests/OfflineStorageTests.cpp b/tests/unittests/OfflineStorageTests.cpp index a94b98ea4..04df15e11 100644 --- a/tests/unittests/OfflineStorageTests.cpp +++ b/tests/unittests/OfflineStorageTests.cpp @@ -214,9 +214,10 @@ namespace }; } -// Regression test: when records pulled from the in-memory queue fail to persist -// to disk during Flush(), they must be returned to the queue rather than lost. -TEST(OfflineStorageHandlerFlushTests, FailedDiskWriteDuringFlushReturnsRecordsToMemory) +// Regression test: when records drained from the in-memory queue fail to be +// stored by the disk backend during Flush() (StoreRecord() returns false), they +// must be returned to the queue rather than lost. +TEST(OfflineStorageHandlerFlushTests, FailedDiskStoreDuringFlushReturnsRecordsToMemory) { NullLogManager logManager; NiceMock config; @@ -237,8 +238,8 @@ TEST(OfflineStorageHandlerFlushTests, FailedDiskWriteDuringFlushReturnsRecordsTo // A timestamp <= 0 is accepted by the in-memory queue but rejected by the // SQLite disk store's input validation, so its StoreRecord() returns false. - // This drives the same Flush() failure-handling path as a disk write failure - // (a failed record must be returned to memory, not dropped). + // This drives the same Flush() failure-handling path as any disk store + // failure (a failed record must be returned to memory, not dropped). const size_t kCount = 5; for (size_t i = 0; i < kCount; i++) { From 84e49a6efc8b17605f48843331b3a4573bcf02ad Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 11:33:27 -0500 Subject: [PATCH 7/9] Fold the SQLite batch-flush optimization into the data-safety change (was PR #1497) Combine the batched-flush perf work into this PR and make it cooperate with the Flush() data-loss fix, so both land together. OfflineStorage_SQLite: StoreRecords() now inserts the whole batch in a single BEGIN EXCLUSIVE / COMMIT (one fsync) instead of one transaction per record (~11x at 200 records, ~40x at 1000 vs the SDK's vendored sqlite). Shared per-record logic is factored into isValidRecord / insertRecordUnsafe / checkStorageSizeLimits. The batch is all-or-nothing: if any insert fails, the transaction is rolled back (new SqliteDB::rollback / DbTransaction::markForRollback) and the size estimate is undone, so callers can re-queue the whole batch without risking duplicate rows (the events table has no unique record_id constraint). OfflineStorageHandler::Flush() now uses the batched StoreRecords() to persist a drained batch in one transaction. Because StoreRecords() is all-or-nothing, on failure nothing is committed and Flush returns every record to the in-memory queue for retry -- realizing the batching speedup while keeping the no-event-loss / no-duplicate guarantee. StoreRecords/StoreRecord report write failures via OnStorageFailed after the transaction closes; validation runs before the transaction. Adds OfflineStorageTests_SQLite.StoreRecordsBatchStoresAllRecords. Full UnitTests (527) pass. Closes PR #1497. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorageHandler.cpp | 32 +--- lib/offline/OfflineStorage_SQLite.cpp | 181 ++++++++++++++---- lib/offline/OfflineStorage_SQLite.hpp | 9 + lib/offline/SQLiteWrapper.hpp | 7 + .../unittests/OfflineStorageTests_SQLite.cpp | 31 +++ 5 files changed, 205 insertions(+), 55 deletions(-) diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index 50de1c264..f0d57f3de 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -185,34 +185,22 @@ namespace MAT_NS_BEGIN { // persist to disk. auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified); - // Persist one record at a time so we know exactly which succeeded. A - // batched StoreRecords() only returns a count, so on a partial failure - // we could not tell which records to re-queue, and re-storing - // already-saved records would duplicate them (the events table has no - // unique record_id constraint). - size_t totalSaved = 0; - size_t totalFailed = 0; - for (auto& record : records) + // Persist the whole batch to disk in a single transaction. + // StoreRecords() is all-or-nothing, so on any failure nothing is + // committed and we return every record to the in-memory queue for + // retry -- no events are lost, and there are no duplicates because the + // failed batch left nothing on disk. + size_t totalSaved = m_offlineStorageDisk->StoreRecords(records); + if (totalSaved < records.size()) { - if (m_offlineStorageDisk->StoreRecord(record)) + LOG_WARN("Flush: disk store failed for the batch of %zu records; returned to the queue for retry", + records.size()); + for (auto& record : records) { - ++totalSaved; - } - else - { - // Return the record to the in-memory queue for retry on a - // subsequent flush instead of dropping it. - ++totalFailed; m_offlineStorageMemory->StoreRecord(record); } } - if (totalFailed > 0) - { - LOG_WARN("Flush: %zu of %zu records failed to persist to disk; returned to the queue for retry", - totalFailed, records.size()); - } - // Notify event listener about the records cached OnStorageRecordsSaved(totalSaved); diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b1c7c82b4..57a53c270 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -23,6 +23,7 @@ namespace MAT_NS_BEGIN { class DbTransaction { SqliteDB* m_db; + bool m_rollback = false; public: bool locked; @@ -34,11 +35,24 @@ namespace MAT_NS_BEGIN { } } + // Discard the transaction (ROLLBACK) instead of committing it on destruction. + void markForRollback() + { + m_rollback = true; + } + ~DbTransaction() { if (locked) { - m_db->unlock(); + if (m_rollback) + { + m_db->rollback(); + } + else + { + m_db->unlock(); + } } } }; @@ -147,46 +161,31 @@ namespace MAT_NS_BEGIN { m_db->execute(command.c_str()); } - bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const { - // TODO: [MG] - this works, but may not play nicely with several LogManager instances - // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); - if (record.id.empty() || record.tenantToken.empty() || static_cast(record.latency) < 0 || record.timestamp <= 0) { LOG_ERROR("Failed to store event %s:%s: Invalid parameters", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); m_observer->OnStorageFailed("Invalid parameters"); return false; } + return true; + } - if (!m_db) { - LOG_ERROR("Failed to store event %s:%s: Database is not open", + bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) + { + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageOpenFailed("Database is not open"); return false; } + m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); + return true; + } - { -#ifdef ENABLE_LOCKING - LOCKGUARD(m_lock); - DbTransaction transaction(m_db.get()); - if (!transaction.locked) - { - LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageFailed("Database error"); - return false; - } -#endif - if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) - { - LOG_ERROR("Failed to store event %s:%s: database write failed", - tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageFailed("Database write failed"); - return false; - } - m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); - } - + void OfflineStorage_SQLite::checkStorageSizeLimits() + { if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit)) { auto now = PAL::getMonotonicTimeMs(); @@ -216,19 +215,135 @@ namespace MAT_NS_BEGIN { m_resizing = false; } } + } - return true; + bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + { + // TODO: [MG] - this works, but may not play nicely with several LogManager instances + // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); + + if (!isValidRecord(record)) { + return false; + } + + if (!m_db) { + LOG_ERROR("Failed to store event %s:%s: Database is not open", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageOpenFailed("Database is not open"); + return false; + } + + bool stored = false; + { +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database error"); + return false; + } +#endif + stored = insertRecordUnsafe(record); + } + + if (!stored) { + // Report the write failure after the transaction has closed, so the + // observer callback never runs while BEGIN EXCLUSIVE is held. + m_observer->OnStorageFailed("Database write failed"); + } + + // Run the size-limit check after the transaction, matching the original + // per-record path (which ran it on every StoreRecord call). + checkStorageSizeLimits(); + + return stored; } size_t OfflineStorage_SQLite::StoreRecords(std::vector & records) { + if (records.empty()) { + return 0; + } + + // Validate (and report rejects) first -- before both the DB-open check and + // the transaction -- so reporting matches the single StoreRecord() (which + // validates before everything) regardless of whether the DB is open, and + // so that no observer callback runs while the BEGIN EXCLUSIVE transaction + // is held. + std::vector valid; + valid.reserve(records.size()); + for (auto const& i : records) { + if (isValidRecord(i)) { + valid.push_back(&i); + } + } + + if (valid.empty()) { + // Every record was invalid (already reported above). Match the single + // StoreRecord(), which returns after validation without checking + // DB-open. + return 0; + } + + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", valid.size()); + m_observer->OnStorageOpenFailed("Database is not open"); + return 0; + } + size_t stored = 0; - for (auto & i : records) { - if (StoreRecord(i)) { - ++stored; + size_t addedSize = 0; + { + // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / + // COMMIT (one fsync) for the whole flush instead of one per record. + // All-or-nothing: if any insert fails the transaction is rolled back, + // so callers (e.g. Flush) can re-queue the whole batch without risking + // duplicate rows (the events table has no unique record_id constraint). +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store %zu events: Database error", valid.size()); + m_observer->OnStorageFailed("Database error"); + return 0; } +#endif + bool allStored = true; + for (auto const* r : valid) { + if (insertRecordUnsafe(*r)) { + addedSize += r->id.size() + r->tenantToken.size() + r->blob.size(); + } + else { + allStored = false; + break; + } + } + + if (allStored) { + stored = valid.size(); + } + else { +#ifdef ENABLE_LOCKING + transaction.markForRollback(); +#endif + // Undo the size-estimate added by the rolled-back inserts. + m_DbSizeEstimate -= std::min(m_DbSizeEstimate.load(), addedSize); + } + } + + if (stored == 0) { + // The whole batch was rolled back after a write failure; report once. + m_observer->OnStorageFailed("Database write failed"); } + + // Run the size-full notification / resize check once after the batch, + // matching the original per-record path (which ran it on every insert). + checkStorageSizeLimits(); + return stored; } diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 18643cde5..1d32a4c77 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -122,6 +122,15 @@ namespace MAT_NS_BEGIN { private: size_t GetRecordCountUnsafe(EventLatency latency) const; + + // Validate a record's required fields; reports OnStorageFailed on rejection. + bool isValidRecord(StorageRecord const& record) const; + // Insert one already-validated record. Caller must hold m_lock and have an + // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. + // Returns false (without updating the size estimate) if the insert fails. + bool insertRecordUnsafe(StorageRecord const& record); + // Run the DB-size-full notification and resize checks (after inserts). + void checkStorageSizeLimits(); }; diff --git a/lib/offline/SQLiteWrapper.hpp b/lib/offline/SQLiteWrapper.hpp index 3f4f998e3..2ebdb99a4 100644 --- a/lib/offline/SQLiteWrapper.hpp +++ b/lib/offline/SQLiteWrapper.hpp @@ -439,6 +439,13 @@ namespace MAT_NS_BEGIN { return isOK(sqlite3_exec("COMMIT;")); } + /** + * @brief Roll back (discard) the current DB transaction. + */ + bool rollback() { + return isOK(sqlite3_exec("ROLLBACK;")); + } + bool lock() { #ifndef NDEBUG unsigned count = 0; diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index e90b0a9ae..1550211c8 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -153,6 +153,37 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord) EXPECT_THAT(consumer.records[0].reservedUntil, 0); } +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) +{ + initializeStorage(); + std::vector batch; + const size_t kCount = 8; + for (size_t i = 0; i < kCount; i++) + { + batch.push_back({ "g" + std::to_string(i), "token", EventLatency_Normal, + EventPersistence_Normal, static_cast(i + 1), { static_cast(i) } }); + } + + // Every record in the batch is stored and individually retrievable. (The + // single-transaction batching is a performance optimization verified by + // benchmarking; this test covers the batch's storage correctness.) + EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); + + TestRecordConsumer consumer; + EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); + ASSERT_THAT(consumer.records.size(), kCount); + for (size_t i = 0; i < kCount; i++) + { + std::string expectedId = "g" + std::to_string(i); + bool found = false; + for (auto const& r : consumer.records) + { + if (r.id == expectedId) { found = true; break; } + } + EXPECT_TRUE(found) << "record " << expectedId << " was not retrieved"; + } +} + TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) { initializeStorage(); From e1e7c4e599bf649f4c72697987da9e629969d03a Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 12:06:38 -0500 Subject: [PATCH 8/9] Address Copilot: make StoreRecords fully all-or-nothing on invalid records StoreRecords() previously filtered out invalid records and committed the valid ones, so it could return a count < records.size() even though some records were persisted. OfflineStorageHandler::Flush() treats totalSaved < records.size() as a batch failure and re-queues ALL drained records, which would duplicate the valid records that were actually stored. Make StoreRecords() truly all-or-nothing: if ANY input record is invalid, store nothing and return 0 (invalids are still reported via isValidRecord()). Combined with the existing rollback-on-write-failure, StoreRecords() now returns either records.size() (whole batch committed) or 0 (nothing committed), so Flush's re-queue-all-on-short-return can never duplicate records. Adds OfflineStorageTests_SQLite.StoreRecordsBatchWithAnyInvalidStoresNothing. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorage_SQLite.cpp | 46 ++++++++++--------- .../unittests/OfflineStorageTests_SQLite.cpp | 21 +++++++++ 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index 57a53c270..1a39059ba 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -268,20 +268,20 @@ namespace MAT_NS_BEGIN { return 0; } - // Validate (and report rejects) first -- before both the DB-open check and - // the transaction -- so reporting matches the single StoreRecord() (which - // validates before everything) regardless of whether the DB is open, and - // so that no observer callback runs while the BEGIN EXCLUSIVE transaction - // is held. - std::vector valid; - valid.reserve(records.size()); + // Validate (and report rejects) up front -- before the DB-open check and + // the transaction -- so no observer callback runs while BEGIN EXCLUSIVE is + // held. The batch is all-or-nothing: if ANY record is invalid we store + // nothing and return 0, so a caller that re-queues the whole batch on a + // short return (e.g. Flush) can never duplicate records that would + // otherwise have been partially committed. + size_t validCount = 0; for (auto const& i : records) { if (isValidRecord(i)) { - valid.push_back(&i); + ++validCount; } } - if (valid.empty()) { + if (validCount == 0) { // Every record was invalid (already reported above). Match the single // StoreRecord(), which returns after validation without checking // DB-open. @@ -289,13 +289,19 @@ namespace MAT_NS_BEGIN { } if (!m_db) { - LOG_ERROR("Failed to store %zu events: Database is not open", valid.size()); + LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); m_observer->OnStorageOpenFailed("Database is not open"); return 0; } - size_t stored = 0; + if (validCount != records.size()) { + // At least one record was invalid (already reported). Store nothing so + // the batch stays all-or-nothing for the caller. + return 0; + } + size_t addedSize = 0; + bool allStored = true; { // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / // COMMIT (one fsync) for the whole flush instead of one per record. @@ -307,15 +313,14 @@ namespace MAT_NS_BEGIN { DbTransaction transaction(m_db.get()); if (!transaction.locked) { - LOG_ERROR("Failed to store %zu events: Database error", valid.size()); + LOG_ERROR("Failed to store %zu events: Database error", records.size()); m_observer->OnStorageFailed("Database error"); return 0; } #endif - bool allStored = true; - for (auto const* r : valid) { - if (insertRecordUnsafe(*r)) { - addedSize += r->id.size() + r->tenantToken.size() + r->blob.size(); + for (auto const& r : records) { + if (insertRecordUnsafe(r)) { + addedSize += r.id.size() + r.tenantToken.size() + r.blob.size(); } else { allStored = false; @@ -323,10 +328,7 @@ namespace MAT_NS_BEGIN { } } - if (allStored) { - stored = valid.size(); - } - else { + if (!allStored) { #ifdef ENABLE_LOCKING transaction.markForRollback(); #endif @@ -335,7 +337,7 @@ namespace MAT_NS_BEGIN { } } - if (stored == 0) { + if (!allStored) { // The whole batch was rolled back after a write failure; report once. m_observer->OnStorageFailed("Database write failed"); } @@ -344,7 +346,7 @@ namespace MAT_NS_BEGIN { // matching the original per-record path (which ran it on every insert). checkStorageSizeLimits(); - return stored; + return allStored ? records.size() : 0; } // Debug routine to print record count in the DB diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index 1550211c8..c1998cfea 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -184,6 +184,27 @@ TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) } } +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchWithAnyInvalidStoresNothing) +{ + initializeStorage(); + std::vector batch = { + { "g1", "token", EventLatency_Normal, EventPersistence_Normal, 1, { 1 } }, // valid + { "g2", "token", EventLatency_Normal, EventPersistence_Normal, 0, { 2 } }, // invalid: timestamp <= 0 + }; + + // The invalid record is reported during validation. + EXPECT_CALL(observerMock, OnStorageFailed("Invalid parameters")); + + // All-or-nothing: with any invalid record in the batch, nothing is stored + // (so a caller that re-queues the batch on a short return can't duplicate the + // otherwise-valid record). + EXPECT_THAT(offlineStorage->StoreRecords(batch), static_cast(0)); + + TestRecordConsumer consumer; + offlineStorage->GetAndReserveRecords(consumer, 100000); + EXPECT_THAT(consumer.records.size(), static_cast(0)); +} + TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) { initializeStorage(); From 40fd1183c138985ec9282bfa0229cd7480d73f19 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 23 Jun 2026 12:18:41 -0500 Subject: [PATCH 9/9] Address Copilot: re-queue the flush batch only on a zero store result Flush() re-queued the whole drained batch whenever StoreRecords() returned a count < records.size(). Both disk backends are all-or-nothing (SQLite rolls back; Room returns 0 on a failed JNI batch), so the only meaningful "failure" value is 0. Room also caps its returned count at min(size, INT32_MAX); keying off < records.size() would treat that capped count as a failure and re-queue already-persisted records (duplicates). Key the re-queue off totalSaved == 0 instead, which is the true "nothing committed" signal. (The cap only matters for a batch larger than the RAM queue could ever hold.) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/offline/OfflineStorageHandler.cpp | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index f0d57f3de..520493e86 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -185,13 +185,18 @@ namespace MAT_NS_BEGIN { // persist to disk. auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified); - // Persist the whole batch to disk in a single transaction. - // StoreRecords() is all-or-nothing, so on any failure nothing is - // committed and we return every record to the in-memory queue for - // retry -- no events are lost, and there are no duplicates because the - // failed batch left nothing on disk. + // Persist the whole batch to disk in a single transaction. The disk + // StoreRecords() is all-or-nothing on both backends: it returns the + // full count on success, or 0 if nothing was committed (SQLite rolls + // the transaction back; Room returns 0 on a failed JNI batch). So a + // zero result means nothing was persisted -- return every record to + // the in-memory queue for retry. No events are lost, and there are no + // duplicates because a failed batch leaves nothing on disk. + // (We key off == 0 rather than < size so that a non-zero-but-capped + // count -- only possible for batches larger than the RAM queue can + // ever hold -- is not mistaken for a failure.) size_t totalSaved = m_offlineStorageDisk->StoreRecords(records); - if (totalSaved < records.size()) + if (totalSaved == 0 && !records.empty()) { LOG_WARN("Flush: disk store failed for the batch of %zu records; returned to the queue for retry", records.size());