diff --git a/lib/offline/OfflineStorageHandler.cpp b/lib/offline/OfflineStorageHandler.cpp index 9049339c4..90f9c4a27 100644 --- a/lib/offline/OfflineStorageHandler.cpp +++ b/lib/offline/OfflineStorageHandler.cpp @@ -174,28 +174,44 @@ namespace MAT_NS_BEGIN { // than the handle gets replaced by nullptr in this DeferredCallbackHandle obj. m_flushHandle.Cancel(); - size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize(); + size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0; if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk)) { - // This will block on and then take a lock for the duration of this move, and - // StoreRecord() will then block until the move completes. + // Drain the in-memory queue into a local batch. Records are removed + // from memory here; any that fail to persist below are re-inserted, so + // a disk write failure does not silently lose events. Draining (rather + // than reserving) keeps only a single copy of each record in flight and + // avoids stamping a reservation lease that the Room backend would + // persist to disk. auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified); - std::vector ids; - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("BEGIN"); - - size_t totalSaved = m_offlineStorageDisk->StoreRecords(records); - - // TODO: [MG] - consider running the batch in transaction - // if (sqlite) - // sqlite->Execute("END"); + // Persist one record at a time so we know exactly which succeeded. A + // batched StoreRecords() only returns a count, so on a partial failure + // we could not tell which records to re-queue, and re-storing + // already-saved records would duplicate them (the events table has no + // unique record_id constraint). + size_t totalSaved = 0; + size_t totalFailed = 0; + for (auto& record : records) + { + if (m_offlineStorageDisk->StoreRecord(record)) + { + ++totalSaved; + } + else + { + // Return the record to the in-memory queue for retry on a + // subsequent flush instead of dropping it. + ++totalFailed; + m_offlineStorageMemory->StoreRecord(record); + } + } - // Delete records from reserved on flush - HttpHeaders dummy; - bool fromMemory = true; - m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory); + if (totalFailed > 0) + { + LOG_WARN("Flush: %zu of %zu records failed to persist to disk; returned to the queue for retry", + totalFailed, records.size()); + } // Notify event listener about the records cached OnStorageRecordsSaved(totalSaved); diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b9b2ed83d..b1c7c82b4 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -177,7 +177,13 @@ namespace MAT_NS_BEGIN { return false; } #endif - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database write failed"); + return false; + } m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); } diff --git a/tests/unittests/OfflineStorageTests.cpp b/tests/unittests/OfflineStorageTests.cpp index bbb8da8e0..1bc834755 100644 --- a/tests/unittests/OfflineStorageTests.cpp +++ b/tests/unittests/OfflineStorageTests.cpp @@ -2,7 +2,14 @@ #include "common/Common.hpp" #include "common/MockIOfflineStorage.hpp" +#include "common/MockIOfflineStorageObserver.hpp" +#include "common/MockIRuntimeConfig.hpp" +#include "offline/OfflineStorageHandler.hpp" #include "offline/StorageObserver.hpp" +#include "NullObjects.hpp" + +#include +#include using namespace testing; using namespace MAT; @@ -162,3 +169,91 @@ TEST_F(OfflineStorageTests, ReleaseRecordsIsForwarded) .WillOnce(Return()); EXPECT_THAT(offlineStorage.releaseRecordsIncRetryCount(ctx), true); } + +namespace +{ + // Remove a SQLite db file along with its WAL-mode companion files + // (-wal/-shm/-journal), which would otherwise accumulate in the temp dir. + void RemoveDbFiles(const std::string& path) + { + std::remove(path.c_str()); + std::remove((path + "-wal").c_str()); + std::remove((path + "-shm").c_str()); + std::remove((path + "-journal").c_str()); + } + + // No-op dispatcher that owns queued tasks and frees them, so flushes only + // run when invoked directly and scheduled tasks (if any) are not leaked. + class NoopTaskDispatcher : public ITaskDispatcher + { + public: + void Join() override { clear(); } + void Queue(Task* task) override { m_tasks.push_back(task); } + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it != m_tasks.end()) + { + delete *it; + m_tasks.erase(it); + } + return true; + } + ~NoopTaskDispatcher() override { clear(); } + + private: + void clear() + { + for (auto* t : m_tasks) + delete t; + m_tasks.clear(); + } + std::vector m_tasks; + }; +} + +// Regression test: when records pulled from the in-memory queue fail to persist +// to disk during Flush(), they must be returned to the queue rather than lost. +TEST(OfflineStorageHandlerFlushTests, FailedDiskWriteDuringFlushReturnsRecordsToMemory) +{ + NullLogManager logManager; + NiceMock config; + NoopTaskDispatcher dispatcher; + NiceMock observer; + + ON_CALL(config, GetOfflineStorageMaximumSizeBytes()).WillByDefault(Return(32 * 4096)); + ON_CALL(config, GetMaximumRetryCount()).WillByDefault(Return(5)); + + std::ostringstream dbPath; + dbPath << GetTempDirectory() << "FlushReserveTest-" << PAL::getUtcSystemTimeMs() << ".db"; + RemoveDbFiles(dbPath.str()); + config[CFG_STR_CACHE_FILE_PATH] = dbPath.str(); + config[CFG_INT_RAM_QUEUE_SIZE] = 1024 * 1024; // enable the in-memory queue + + OfflineStorageHandler handler(logManager, config, dispatcher); + handler.Initialize(observer); + + // A timestamp <= 0 is accepted by the in-memory queue but rejected by the + // SQLite disk store's input validation, so its StoreRecord() returns false. + // This drives the same Flush() failure-handling path as a disk write failure + // (a failed record must be returned to memory, not dropped). + const size_t kCount = 5; + for (size_t i = 0; i < kCount; i++) + { + StorageRecord r("flush-id-" + std::to_string(i), "tenant-token", + EventLatency_Normal, EventPersistence_Normal, /*timestamp*/ 0, + std::vector{ 'x' }); + handler.StoreRecord(r); + } + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Flush(); + + // The disk rejected every record; with the fix they are returned to the + // in-memory queue rather than silently dropped. + EXPECT_EQ(handler.GetRecordCount(), kCount); + + handler.Shutdown(); + RemoveDbFiles(dbPath.str()); +}