diff --git a/lib/offline/MemoryStorage.cpp b/lib/offline/MemoryStorage.cpp index 1d4ec5664..77ff0fc7c 100644 --- a/lib/offline/MemoryStorage.cpp +++ b/lib/offline/MemoryStorage.cpp @@ -224,6 +224,16 @@ namespace MAT_NS_BEGIN { void MemoryStorage::DeleteRecords(const std::map & whereFilter) { + // An empty filter matches every record. Never silently wipe the whole + // in-memory queue from a no-op predicate; callers must use + // DeleteAllRecords() for an intentional full clear. This mirrors the + // fail-closed behavior of OfflineStorage_SQLite::DeleteRecords. + if (whereFilter.empty()) + { + LOG_WARN("DeleteRecords called with an empty filter; ignoring to avoid deleting all records."); + return; + } + auto matcher = [&](const StorageRecord &r, const std::map & whereFilter) { bool matched = true; diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index 9049339c4..520493e86 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -174,28 +174,37 @@ namespace MAT_NS_BEGIN { // than the handle gets replaced by nullptr in this DeferredCallbackHandle obj. m_flushHandle.Cancel(); - size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize(); + size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0; if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk)) { - // This will block on and then take a lock for the duration of this move, and - // StoreRecord() will then block until the move completes. + // Drain the in-memory queue into a local batch. Records are removed + // from memory here; any that fail to persist below are re-inserted, so + // a disk write failure does not silently lose events. Draining (rather + // than reserving) keeps only a single copy of each record in flight and + // avoids stamping a reservation lease that the Room backend would + // persist to disk. auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified); - std::vector ids; - - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("BEGIN"); + // Persist the whole batch to disk in a single transaction. The disk + // StoreRecords() is all-or-nothing on both backends: it returns the + // full count on success, or 0 if nothing was committed (SQLite rolls + // the transaction back; Room returns 0 on a failed JNI batch). So a + // zero result means nothing was persisted -- return every record to + // the in-memory queue for retry. No events are lost, and there are no + // duplicates because a failed batch leaves nothing on disk. + // (We key off == 0 rather than < size so that a non-zero-but-capped + // count -- only possible for batches larger than the RAM queue can + // ever hold -- is not mistaken for a failure.) size_t totalSaved = m_offlineStorageDisk->StoreRecords(records); - - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("END"); - - // Delete records from reserved on flush - HttpHeaders dummy; - bool fromMemory = true; - m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory); + if (totalSaved == 0 && !records.empty()) + { + LOG_WARN("Flush: disk store failed for the batch of %zu records; returned to the queue for retry", + records.size()); + for (auto& record : records) + { + m_offlineStorageMemory->StoreRecord(record); + } + } // Notify event listener about the records cached OnStorageRecordsSaved(totalSaved); @@ -269,7 +278,9 @@ namespace MAT_NS_BEGIN { { if (record.persistence != EventPersistence::EventPersistence_DoNotStoreOnDisk) { - m_offlineStorageDisk->StoreRecord(record); + // Propagate a synchronous disk write failure to the caller so a + // failed store is not counted as successfully persisted. + return m_offlineStorageDisk->StoreRecord(record); } } } diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b9b2ed83d..1a39059ba 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -23,6 +23,7 @@ namespace MAT_NS_BEGIN { class DbTransaction { SqliteDB* m_db; + bool m_rollback = false; public: bool locked; @@ -34,11 +35,24 @@ namespace MAT_NS_BEGIN { } } + // Discard the transaction (ROLLBACK) instead of committing it on destruction. + void markForRollback() + { + m_rollback = true; + } + ~DbTransaction() { if (locked) { - m_db->unlock(); + if (m_rollback) + { + m_db->rollback(); + } + else + { + m_db->unlock(); + } } } }; @@ -147,40 +161,31 @@ namespace MAT_NS_BEGIN { m_db->execute(command.c_str()); } - bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const { - // TODO: [MG] - this works, but may not play nicely with several LogManager instances - // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); - if (record.id.empty() || record.tenantToken.empty() || static_cast(record.latency) < 0 || record.timestamp <= 0) { LOG_ERROR("Failed to store event %s:%s: Invalid parameters", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); m_observer->OnStorageFailed("Invalid parameters"); return false; } + return true; + } - if (!m_db) { - LOG_ERROR("Failed to store event %s:%s: Database is not open", + bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) + { + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageOpenFailed("Database is not open"); return false; } + m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); + return true; + } - { -#ifdef ENABLE_LOCKING - LOCKGUARD(m_lock); - DbTransaction transaction(m_db.get()); - if (!transaction.locked) - { - LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageFailed("Database error"); - return false; - } -#endif - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); - m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); - } - + void OfflineStorage_SQLite::checkStorageSizeLimits() + { if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit)) { auto now = PAL::getMonotonicTimeMs(); @@ -210,20 +215,138 @@ namespace MAT_NS_BEGIN { m_resizing = false; } } + } - return true; + bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + { + // TODO: [MG] - this works, but may not play nicely with several LogManager instances + // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); + + if (!isValidRecord(record)) { + return false; + } + + if (!m_db) { + LOG_ERROR("Failed to store event %s:%s: Database is not open", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageOpenFailed("Database is not open"); + return false; + } + + bool stored = false; + { +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database error"); + return false; + } +#endif + stored = insertRecordUnsafe(record); + } + + if (!stored) { + // Report the write failure after the transaction has closed, so the + // observer callback never runs while BEGIN EXCLUSIVE is held. + m_observer->OnStorageFailed("Database write failed"); + } + + // Run the size-limit check after the transaction, matching the original + // per-record path (which ran it on every StoreRecord call). + checkStorageSizeLimits(); + + return stored; } size_t OfflineStorage_SQLite::StoreRecords(std::vector & records) { - size_t stored = 0; - for (auto & i : records) { - if (StoreRecord(i)) { - ++stored; + if (records.empty()) { + return 0; + } + + // Validate (and report rejects) up front -- before the DB-open check and + // the transaction -- so no observer callback runs while BEGIN EXCLUSIVE is + // held. The batch is all-or-nothing: if ANY record is invalid we store + // nothing and return 0, so a caller that re-queues the whole batch on a + // short return (e.g. Flush) can never duplicate records that would + // otherwise have been partially committed. + size_t validCount = 0; + for (auto const& i : records) { + if (isValidRecord(i)) { + ++validCount; } } - return stored; + + if (validCount == 0) { + // Every record was invalid (already reported above). Match the single + // StoreRecord(), which returns after validation without checking + // DB-open. + return 0; + } + + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", records.size()); + m_observer->OnStorageOpenFailed("Database is not open"); + return 0; + } + + if (validCount != records.size()) { + // At least one record was invalid (already reported). Store nothing so + // the batch stays all-or-nothing for the caller. + return 0; + } + + size_t addedSize = 0; + bool allStored = true; + { + // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / + // COMMIT (one fsync) for the whole flush instead of one per record. + // All-or-nothing: if any insert fails the transaction is rolled back, + // so callers (e.g. Flush) can re-queue the whole batch without risking + // duplicate rows (the events table has no unique record_id constraint). +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store %zu events: Database error", records.size()); + m_observer->OnStorageFailed("Database error"); + return 0; + } +#endif + for (auto const& r : records) { + if (insertRecordUnsafe(r)) { + addedSize += r.id.size() + r.tenantToken.size() + r.blob.size(); + } + else { + allStored = false; + break; + } + } + + if (!allStored) { +#ifdef ENABLE_LOCKING + transaction.markForRollback(); +#endif + // Undo the size-estimate added by the rolled-back inserts. + m_DbSizeEstimate -= std::min(m_DbSizeEstimate.load(), addedSize); + } + } + + if (!allStored) { + // The whole batch was rolled back after a write failure; report once. + m_observer->OnStorageFailed("Database write failed"); + } + + // Run the size-full notification / resize check once after the batch, + // matching the original per-record path (which ran it on every insert). + checkStorageSizeLimits(); + + return allStored ? records.size() : 0; } // Debug routine to print record count in the DB diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 18643cde5..1d32a4c77 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -122,6 +122,15 @@ namespace MAT_NS_BEGIN { private: size_t GetRecordCountUnsafe(EventLatency latency) const; + + // Validate a record's required fields; reports OnStorageFailed on rejection. + bool isValidRecord(StorageRecord const& record) const; + // Insert one already-validated record. Caller must hold m_lock and have an + // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. + // Returns false (without updating the size estimate) if the insert fails. + bool insertRecordUnsafe(StorageRecord const& record); + // Run the DB-size-full notification and resize checks (after inserts). + void checkStorageSizeLimits(); }; diff --git a/lib/offline/SQLiteWrapper.hpp b/lib/offline/SQLiteWrapper.hpp index 3f4f998e3..2ebdb99a4 100644 --- a/lib/offline/SQLiteWrapper.hpp +++ b/lib/offline/SQLiteWrapper.hpp @@ -439,6 +439,13 @@ namespace MAT_NS_BEGIN { return isOK(sqlite3_exec("COMMIT;")); } + /** + * @brief Roll back (discard) the current DB transaction. + */ + bool rollback() { + return isOK(sqlite3_exec("ROLLBACK;")); + } + bool lock() { #ifndef NDEBUG unsigned count = 0; diff --git a/tests/unittests/MemoryStorageTests.cpp b/tests/unittests/MemoryStorageTests.cpp index a736d125f..268cf137d 100644 --- a/tests/unittests/MemoryStorageTests.cpp +++ b/tests/unittests/MemoryStorageTests.cpp @@ -213,6 +213,24 @@ TEST_F(MemoryStorageTests, DeleteAllRecords) EXPECT_THAT(storage.GetReservedCount(), 0); } +TEST_F(MemoryStorageTests, DeleteRecordsWithEmptyFilterDoesNotDeleteAll) +{ + MemoryStorage storage(testLogManager, *testConfig); + + // Add some events to storage + auto total_db_size = addEvents(storage); + EXPECT_THAT(storage.GetSize(), total_db_size); + auto count_before = storage.GetRecordCount(); + EXPECT_GT(count_before, static_cast(0)); + + // An empty where-filter matches every record; it must NOT wipe the queue. + // Intentional full clears go through DeleteAllRecords(). + storage.DeleteRecords(std::map{}); + + EXPECT_THAT(storage.GetRecordCount(), count_before); + EXPECT_THAT(storage.GetSize(), total_db_size); +} + TEST_F(MemoryStorageTests, ReleaseRecords) { diff --git a/tests/unittests/OfflineStorageTests.cpp b/tests/unittests/OfflineStorageTests.cpp index bbb8da8e0..04df15e11 100644 --- a/tests/unittests/OfflineStorageTests.cpp +++ b/tests/unittests/OfflineStorageTests.cpp @@ -2,7 +2,14 @@ #include "common/Common.hpp" #include "common/MockIOfflineStorage.hpp" +#include "common/MockIOfflineStorageObserver.hpp" +#include "common/MockIRuntimeConfig.hpp" +#include "offline/OfflineStorageHandler.hpp" #include "offline/StorageObserver.hpp" +#include "NullObjects.hpp" + +#include +#include using namespace testing; using namespace MAT; @@ -162,3 +169,93 @@ TEST_F(OfflineStorageTests, ReleaseRecordsIsForwarded) .WillOnce(Return()); EXPECT_THAT(offlineStorage.releaseRecordsIncRetryCount(ctx), true); } + +namespace +{ + // Remove a SQLite db file along with its WAL-mode companion files + // (-wal/-shm/-journal), which would otherwise accumulate in the temp dir. + void RemoveDbFiles(const std::string& path) + { + std::remove(path.c_str()); + std::remove((path + "-wal").c_str()); + std::remove((path + "-shm").c_str()); + std::remove((path + "-journal").c_str()); + } + + // No-op dispatcher that owns queued tasks and frees them, so flushes only + // run when invoked directly and scheduled tasks (if any) are not leaked. + class NoopTaskDispatcher : public ITaskDispatcher + { + public: + void Join() override { clear(); } + void Queue(Task* task) override { m_tasks.push_back(task); } + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it != m_tasks.end()) + { + delete *it; + m_tasks.erase(it); + return true; + } + return false; + } + ~NoopTaskDispatcher() override { clear(); } + + private: + void clear() + { + for (auto* t : m_tasks) + delete t; + m_tasks.clear(); + } + std::vector m_tasks; + }; +} + +// Regression test: when records drained from the in-memory queue fail to be +// stored by the disk backend during Flush() (StoreRecord() returns false), they +// must be returned to the queue rather than lost. +TEST(OfflineStorageHandlerFlushTests, FailedDiskStoreDuringFlushReturnsRecordsToMemory) +{ + NullLogManager logManager; + NiceMock config; + NoopTaskDispatcher dispatcher; + NiceMock observer; + + ON_CALL(config, GetOfflineStorageMaximumSizeBytes()).WillByDefault(Return(32 * 4096)); + ON_CALL(config, GetMaximumRetryCount()).WillByDefault(Return(5)); + + std::ostringstream dbPath; + dbPath << GetTempDirectory() << "FlushReserveTest-" << PAL::getUtcSystemTimeMs() << ".db"; + RemoveDbFiles(dbPath.str()); + config[CFG_STR_CACHE_FILE_PATH] = dbPath.str(); + config[CFG_INT_RAM_QUEUE_SIZE] = 1024 * 1024; // enable the in-memory queue + + OfflineStorageHandler handler(logManager, config, dispatcher); + handler.Initialize(observer); + + // A timestamp <= 0 is accepted by the in-memory queue but rejected by the + // SQLite disk store's input validation, so its StoreRecord() returns false. + // This drives the same Flush() failure-handling path as any disk store + // failure (a failed record must be returned to memory, not dropped). + const size_t kCount = 5; + for (size_t i = 0; i < kCount; i++) + { + StorageRecord r("flush-id-" + std::to_string(i), "tenant-token", + EventLatency_Normal, EventPersistence_Normal, /*timestamp*/ 0, + std::vector{ 'x' }); + handler.StoreRecord(r); + } + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Flush(); + + // The disk rejected every record; with the fix they are returned to the + // in-memory queue rather than silently dropped. + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Shutdown(); + RemoveDbFiles(dbPath.str()); +} diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index e90b0a9ae..c1998cfea 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -153,6 +153,58 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord) EXPECT_THAT(consumer.records[0].reservedUntil, 0); } +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) +{ + initializeStorage(); + std::vector batch; + const size_t kCount = 8; + for (size_t i = 0; i < kCount; i++) + { + batch.push_back({ "g" + std::to_string(i), "token", EventLatency_Normal, + EventPersistence_Normal, static_cast(i + 1), { static_cast(i) } }); + } + + // Every record in the batch is stored and individually retrievable. (The + // single-transaction batching is a performance optimization verified by + // benchmarking; this test covers the batch's storage correctness.) + EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); + + TestRecordConsumer consumer; + EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); + ASSERT_THAT(consumer.records.size(), kCount); + for (size_t i = 0; i < kCount; i++) + { + std::string expectedId = "g" + std::to_string(i); + bool found = false; + for (auto const& r : consumer.records) + { + if (r.id == expectedId) { found = true; break; } + } + EXPECT_TRUE(found) << "record " << expectedId << " was not retrieved"; + } +} + +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchWithAnyInvalidStoresNothing) +{ + initializeStorage(); + std::vector batch = { + { "g1", "token", EventLatency_Normal, EventPersistence_Normal, 1, { 1 } }, // valid + { "g2", "token", EventLatency_Normal, EventPersistence_Normal, 0, { 2 } }, // invalid: timestamp <= 0 + }; + + // The invalid record is reported during validation. + EXPECT_CALL(observerMock, OnStorageFailed("Invalid parameters")); + + // All-or-nothing: with any invalid record in the batch, nothing is stored + // (so a caller that re-queues the batch on a short return can't duplicate the + // otherwise-valid record). + EXPECT_THAT(offlineStorage->StoreRecords(batch), static_cast(0)); + + TestRecordConsumer consumer; + offlineStorage->GetAndReserveRecords(consumer, 100000); + EXPECT_THAT(consumer.records.size(), static_cast(0)); +} + TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) { initializeStorage();