Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/offline/MemoryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ namespace MAT_NS_BEGIN {

void MemoryStorage::DeleteRecords(const std::map<std::string, std::string> & whereFilter)
{
// An empty filter matches every record. Never silently wipe the whole
// in-memory queue from a no-op predicate; callers must use
// DeleteAllRecords() for an intentional full clear. This mirrors the
// fail-closed behavior of OfflineStorage_SQLite::DeleteRecords.
if (whereFilter.empty())
{
LOG_WARN("DeleteRecords called with an empty filter; ignoring to avoid deleting all records.");
return;
}

auto matcher = [&](const StorageRecord &r, const std::map<std::string, std::string> & whereFilter)
{
bool matched = true;
Expand Down
47 changes: 29 additions & 18 deletions lib/offline/OfflineStorageHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,37 @@ namespace MAT_NS_BEGIN {
// than the handle gets replaced by nullptr in this DeferredCallbackHandle obj.
m_flushHandle.Cancel();

size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize();
size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0;
if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk))
{
// This will block on and then take a lock for the duration of this move, and
// StoreRecord() will then block until the move completes.
// Drain the in-memory queue into a local batch. Records are removed
// from memory here; any that fail to persist below are re-inserted, so
// a disk write failure does not silently lose events. Draining (rather
// than reserving) keeps only a single copy of each record in flight and
// avoids stamping a reservation lease that the Room backend would
// persist to disk.
auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified);
std::vector<StorageRecordId> ids;

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("BEGIN");

// Persist the whole batch to disk in a single transaction. The disk
// StoreRecords() is all-or-nothing on both backends: it returns the
// full count on success, or 0 if nothing was committed (SQLite rolls
// the transaction back; Room returns 0 on a failed JNI batch). So a
// zero result means nothing was persisted -- return every record to
// the in-memory queue for retry. No events are lost, and there are no
// duplicates because a failed batch leaves nothing on disk.
// (We key off == 0 rather than < size so that a non-zero-but-capped
// count -- only possible for batches larger than the RAM queue can
// ever hold -- is not mistaken for a failure.)
size_t totalSaved = m_offlineStorageDisk->StoreRecords(records);

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("END");

// Delete records from reserved on flush
HttpHeaders dummy;
bool fromMemory = true;
m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory);
if (totalSaved == 0 && !records.empty())
{
LOG_WARN("Flush: disk store failed for the batch of %zu records; returned to the queue for retry",
records.size());
for (auto& record : records)
{
m_offlineStorageMemory->StoreRecord(record);
}
}

// Notify event listener about the records cached
OnStorageRecordsSaved(totalSaved);
Expand Down Expand Up @@ -269,7 +278,9 @@ namespace MAT_NS_BEGIN {
{
if (record.persistence != EventPersistence::EventPersistence_DoNotStoreOnDisk)
{
m_offlineStorageDisk->StoreRecord(record);
// Propagate a synchronous disk write failure to the caller so a
// failed store is not counted as successfully persisted.
return m_offlineStorageDisk->StoreRecord(record);
Comment on lines +281 to +283
Comment on lines +281 to +283
}
}
}
Expand Down
181 changes: 152 additions & 29 deletions lib/offline/OfflineStorage_SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace MAT_NS_BEGIN {

class DbTransaction {
SqliteDB* m_db;
bool m_rollback = false;
public:
bool locked;

Expand All @@ -34,11 +35,24 @@ namespace MAT_NS_BEGIN {
}
}

// Discard the transaction (ROLLBACK) instead of committing it on destruction.
void markForRollback()
{
m_rollback = true;
}

~DbTransaction()
{
if (locked)
{
m_db->unlock();
if (m_rollback)
{
m_db->rollback();
}
else
{
m_db->unlock();
}
}
}
};
Expand Down Expand Up @@ -147,40 +161,31 @@ namespace MAT_NS_BEGIN {
m_db->execute(command.c_str());
}

bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (record.id.empty() || record.tenantToken.empty() || static_cast<int>(record.latency) < 0 || record.timestamp <= 0) {
LOG_ERROR("Failed to store event %s:%s: Invalid parameters",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Invalid parameters");
return false;
}
return true;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record)
{
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
return true;
}

{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob);
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
}

void OfflineStorage_SQLite::checkStorageSizeLimits()
{
if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit))
{
auto now = PAL::getMonotonicTimeMs();
Expand Down Expand Up @@ -210,20 +215,138 @@ namespace MAT_NS_BEGIN {
m_resizing = false;
}
}
}

return true;
bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (!isValidRecord(record)) {
return false;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}

bool stored = false;
{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
stored = insertRecordUnsafe(record);
}

if (!stored) {
// Report the write failure after the transaction has closed, so the
// observer callback never runs while BEGIN EXCLUSIVE is held.
m_observer->OnStorageFailed("Database write failed");
}

// Run the size-limit check after the transaction, matching the original
// per-record path (which ran it on every StoreRecord call).
checkStorageSizeLimits();

return stored;

}

size_t OfflineStorage_SQLite::StoreRecords(std::vector<StorageRecord> & records)
{
size_t stored = 0;
for (auto & i : records) {
if (StoreRecord(i)) {
++stored;
if (records.empty()) {
return 0;
}

// Validate (and report rejects) up front -- before the DB-open check and
// the transaction -- so no observer callback runs while BEGIN EXCLUSIVE is
// held. The batch is all-or-nothing: if ANY record is invalid we store
// nothing and return 0, so a caller that re-queues the whole batch on a
// short return (e.g. Flush) can never duplicate records that would
// otherwise have been partially committed.
size_t validCount = 0;
for (auto const& i : records) {
if (isValidRecord(i)) {
++validCount;
}
}
return stored;

if (validCount == 0) {
// Every record was invalid (already reported above). Match the single
// StoreRecord(), which returns after validation without checking
// DB-open.
return 0;
}

if (!m_db) {
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}

if (validCount != records.size()) {
// At least one record was invalid (already reported). Store nothing so
// the batch stays all-or-nothing for the caller.
return 0;
}

size_t addedSize = 0;
bool allStored = true;
{
// Batch all inserts into a single transaction: one BEGIN EXCLUSIVE /
// COMMIT (one fsync) for the whole flush instead of one per record.
// All-or-nothing: if any insert fails the transaction is rolled back,
// so callers (e.g. Flush) can re-queue the whole batch without risking
// duplicate rows (the events table has no unique record_id constraint).
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", records.size());
m_observer->OnStorageFailed("Database error");
return 0;
}
#endif
for (auto const& r : records) {
if (insertRecordUnsafe(r)) {
addedSize += r.id.size() + r.tenantToken.size() + r.blob.size();
}
else {
allStored = false;
break;
}
}

if (!allStored) {
#ifdef ENABLE_LOCKING
transaction.markForRollback();
#endif
// Undo the size-estimate added by the rolled-back inserts.
m_DbSizeEstimate -= std::min(m_DbSizeEstimate.load(), addedSize);
}
}

if (!allStored) {
// The whole batch was rolled back after a write failure; report once.
m_observer->OnStorageFailed("Database write failed");
}

// Run the size-full notification / resize check once after the batch,
// matching the original per-record path (which ran it on every insert).
checkStorageSizeLimits();

return allStored ? records.size() : 0;
}

// Debug routine to print record count in the DB
Expand Down
9 changes: 9 additions & 0 deletions lib/offline/OfflineStorage_SQLite.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ namespace MAT_NS_BEGIN {

private:
size_t GetRecordCountUnsafe(EventLatency latency) const;

// Validate a record's required fields; reports OnStorageFailed on rejection.
bool isValidRecord(StorageRecord const& record) const;
// Insert one already-validated record. Caller must hold m_lock and have an
// active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate.
// Returns false (without updating the size estimate) if the insert fails.
bool insertRecordUnsafe(StorageRecord const& record);
// Run the DB-size-full notification and resize checks (after inserts).
void checkStorageSizeLimits();
};


Expand Down
7 changes: 7 additions & 0 deletions lib/offline/SQLiteWrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ namespace MAT_NS_BEGIN {
return isOK(sqlite3_exec("COMMIT;"));
}

/**
* @brief Roll back (discard) the current DB transaction.
*/
bool rollback() {
return isOK(sqlite3_exec("ROLLBACK;"));
}

bool lock() {
#ifndef NDEBUG
unsigned count = 0;
Expand Down
18 changes: 18 additions & 0 deletions tests/unittests/MemoryStorageTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,24 @@ TEST_F(MemoryStorageTests, DeleteAllRecords)
EXPECT_THAT(storage.GetReservedCount(), 0);
}

TEST_F(MemoryStorageTests, DeleteRecordsWithEmptyFilterDoesNotDeleteAll)
{
MemoryStorage storage(testLogManager, *testConfig);

// Add some events to storage
auto total_db_size = addEvents(storage);
EXPECT_THAT(storage.GetSize(), total_db_size);
auto count_before = storage.GetRecordCount();
EXPECT_GT(count_before, static_cast<size_t>(0));

// An empty where-filter matches every record; it must NOT wipe the queue.
// Intentional full clears go through DeleteAllRecords().
storage.DeleteRecords(std::map<std::string, std::string>{});

EXPECT_THAT(storage.GetRecordCount(), count_before);
EXPECT_THAT(storage.GetSize(), total_db_size);
}


TEST_F(MemoryStorageTests, ReleaseRecords)
{
Expand Down
Loading
Loading