Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions lib/offline/OfflineStorageHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,44 @@ namespace MAT_NS_BEGIN {
// than the handle gets replaced by nullptr in this DeferredCallbackHandle obj.
m_flushHandle.Cancel();

size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize();
size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0;
if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk))
{
// This will block on and then take a lock for the duration of this move, and
// StoreRecord() will then block until the move completes.
// Drain the in-memory queue into a local batch. Records are removed
// from memory here; any that fail to persist below are re-inserted, so
// a disk write failure does not silently lose events. Draining (rather
// than reserving) keeps only a single copy of each record in flight and
// avoids stamping a reservation lease that the Room backend would
// persist to disk.
auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified);
std::vector<StorageRecordId> ids;

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("BEGIN");

size_t totalSaved = m_offlineStorageDisk->StoreRecords(records);

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("END");
// Persist one record at a time so we know exactly which succeeded. A
// batched StoreRecords() only returns a count, so on a partial failure
// we could not tell which records to re-queue, and re-storing
// already-saved records would duplicate them (the events table has no
// unique record_id constraint).
size_t totalSaved = 0;
size_t totalFailed = 0;
for (auto& record : records)
{
if (m_offlineStorageDisk->StoreRecord(record))
{
++totalSaved;
}
else
{
// Return the record to the in-memory queue for retry on a
// subsequent flush instead of dropping it.
++totalFailed;
m_offlineStorageMemory->StoreRecord(record);
Comment on lines +188 to +206
}
}

// Delete records from reserved on flush
HttpHeaders dummy;
bool fromMemory = true;
m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory);
if (totalFailed > 0)
{
LOG_WARN("Flush: %zu of %zu records failed to persist to disk; returned to the queue for retry",
totalFailed, records.size());
}

// Notify event listener about the records cached
OnStorageRecordsSaved(totalSaved);
Expand Down
8 changes: 7 additions & 1 deletion lib/offline/OfflineStorage_SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,13 @@ namespace MAT_NS_BEGIN {
return false;
}
#endif
SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob);
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database write failed");
return false;
Comment on lines +180 to +185
}
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
}

Expand Down
95 changes: 95 additions & 0 deletions tests/unittests/OfflineStorageTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

#include "common/Common.hpp"
#include "common/MockIOfflineStorage.hpp"
#include "common/MockIOfflineStorageObserver.hpp"
#include "common/MockIRuntimeConfig.hpp"
#include "offline/OfflineStorageHandler.hpp"
#include "offline/StorageObserver.hpp"
#include "NullObjects.hpp"

#include <cstdio>
#include <sstream>

using namespace testing;
using namespace MAT;
Expand Down Expand Up @@ -162,3 +169,91 @@ TEST_F(OfflineStorageTests, ReleaseRecordsIsForwarded)
.WillOnce(Return());
EXPECT_THAT(offlineStorage.releaseRecordsIncRetryCount(ctx), true);
}

namespace
{
// Remove a SQLite db file along with its WAL-mode companion files
// (-wal/-shm/-journal), which would otherwise accumulate in the temp dir.
void RemoveDbFiles(const std::string& path)
{
std::remove(path.c_str());
std::remove((path + "-wal").c_str());
std::remove((path + "-shm").c_str());
std::remove((path + "-journal").c_str());
}

// No-op dispatcher that owns queued tasks and frees them, so flushes only
// run when invoked directly and scheduled tasks (if any) are not leaked.
class NoopTaskDispatcher : public ITaskDispatcher
{
public:
void Join() override { clear(); }
void Queue(Task* task) override { m_tasks.push_back(task); }
bool Cancel(Task* task, uint64_t waitTime = 0) override
{
UNREFERENCED_PARAMETER(waitTime);
auto it = std::find(m_tasks.begin(), m_tasks.end(), task);
if (it != m_tasks.end())
{
delete *it;
m_tasks.erase(it);
}
return true;
}
~NoopTaskDispatcher() override { clear(); }

private:
void clear()
{
for (auto* t : m_tasks)
delete t;
m_tasks.clear();
}
std::vector<Task*> m_tasks;
};
}

// Regression test: when records pulled from the in-memory queue fail to persist
// to disk during Flush(), they must be returned to the queue rather than lost.
TEST(OfflineStorageHandlerFlushTests, FailedDiskWriteDuringFlushReturnsRecordsToMemory)
{
NullLogManager logManager;
NiceMock<MockIRuntimeConfig> config;
NoopTaskDispatcher dispatcher;
NiceMock<MockIOfflineStorageObserver> observer;

ON_CALL(config, GetOfflineStorageMaximumSizeBytes()).WillByDefault(Return(32 * 4096));
ON_CALL(config, GetMaximumRetryCount()).WillByDefault(Return(5));

std::ostringstream dbPath;
dbPath << GetTempDirectory() << "FlushReserveTest-" << PAL::getUtcSystemTimeMs() << ".db";
RemoveDbFiles(dbPath.str());
config[CFG_STR_CACHE_FILE_PATH] = dbPath.str();
config[CFG_INT_RAM_QUEUE_SIZE] = 1024 * 1024; // enable the in-memory queue

OfflineStorageHandler handler(logManager, config, dispatcher);
handler.Initialize(observer);

// A timestamp <= 0 is accepted by the in-memory queue but rejected by the
// SQLite disk store's input validation, so its StoreRecord() returns false.
// This drives the same Flush() failure-handling path as a disk write failure
// (a failed record must be returned to memory, not dropped).
const size_t kCount = 5;
for (size_t i = 0; i < kCount; i++)
{
StorageRecord r("flush-id-" + std::to_string(i), "tenant-token",
EventLatency_Normal, EventPersistence_Normal, /*timestamp*/ 0,
std::vector<uint8_t>{ 'x' });
handler.StoreRecord(r);
}
EXPECT_EQ(handler.GetRecordCount(), kCount);

handler.Flush();

// The disk rejected every record; with the fix they are returned to the
// in-memory queue rather than silently dropped.
EXPECT_EQ(handler.GetRecordCount(), kCount);

handler.Shutdown();
RemoveDbFiles(dbPath.str());
}
Loading