Skip to content
Closed
143 changes: 117 additions & 26 deletions lib/offline/OfflineStorage_SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,40 +147,31 @@ namespace MAT_NS_BEGIN {
m_db->execute(command.c_str());
}

bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (record.id.empty() || record.tenantToken.empty() || static_cast<int>(record.latency) < 0 || record.timestamp <= 0) {
LOG_ERROR("Failed to store event %s:%s: Invalid parameters",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Invalid parameters");
return false;
}
return true;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record)
{
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}
Comment on lines +163 to 168
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
return true;
Comment on lines +161 to +170
}

{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob);
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
}

void OfflineStorage_SQLite::checkStorageSizeLimits()
{
if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit))
{
auto now = PAL::getMonotonicTimeMs();
Expand Down Expand Up @@ -210,19 +201,119 @@ namespace MAT_NS_BEGIN {
m_resizing = false;
}
}
}

return true;
bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (!isValidRecord(record)) {
return false;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}

bool stored = false;
{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
stored = insertRecordUnsafe(record);
}

if (!stored) {
// Report the write failure after the transaction has closed, so the
// observer callback never runs while BEGIN EXCLUSIVE is held.
m_observer->OnStorageFailed("Database write failed");
}

// Run the size-limit check after the transaction, matching the original
// per-record path (which ran it on every StoreRecord call).
checkStorageSizeLimits();

return stored;

}

size_t OfflineStorage_SQLite::StoreRecords(std::vector<StorageRecord> & records)
{
if (records.empty()) {
return 0;
}

// Validate (and report rejects) first -- before both the DB-open check and
// the transaction -- so reporting matches the single StoreRecord() (which
// validates before everything) regardless of whether the DB is open, and
// so that no observer callback runs while the BEGIN EXCLUSIVE transaction
// is held.
std::vector<const StorageRecord*> valid;
valid.reserve(records.size());
for (auto const& i : records) {
if (isValidRecord(i)) {
valid.push_back(&i);
}
}

if (valid.empty()) {
// Every record was invalid (already reported above). Match the single
// StoreRecord(), which returns after validation without checking
// DB-open.
return 0;
}

if (!m_db) {
LOG_ERROR("Failed to store %zu events: Database is not open", valid.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}

size_t stored = 0;
for (auto & i : records) {
if (StoreRecord(i)) {
++stored;
size_t failed = 0;
{
// Batch all inserts into a single transaction: one BEGIN EXCLUSIVE /
// COMMIT (one fsync) for the whole flush instead of one per record.
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", valid.size());
m_observer->OnStorageFailed("Database error");
return 0;
}
#endif
for (auto const* r : valid) {
if (insertRecordUnsafe(*r)) {
++stored;
}
else {
++failed;
}
}
}

if (failed) {
// Report write failures once, after the transaction has closed.
m_observer->OnStorageFailed("Database write failed");
}
Comment on lines +308 to 311

// Run the size-full notification / resize check once after the batch,
// matching the original per-record path (which ran it on every insert).
checkStorageSizeLimits();

return stored;
}

Expand Down
9 changes: 9 additions & 0 deletions lib/offline/OfflineStorage_SQLite.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ namespace MAT_NS_BEGIN {

private:
size_t GetRecordCountUnsafe(EventLatency latency) const;

// Validate a record's required fields; reports OnStorageFailed on rejection.
bool isValidRecord(StorageRecord const& record) const;
// Insert one already-validated record. Caller must hold m_lock and have an
// active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate.
// Returns false (without updating the size estimate) if the insert fails.
bool insertRecordUnsafe(StorageRecord const& record);
// Run the DB-size-full notification and resize checks (after inserts).
void checkStorageSizeLimits();
};


Expand Down
31 changes: 31 additions & 0 deletions tests/unittests/OfflineStorageTests_SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,37 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord)
EXPECT_THAT(consumer.records[0].reservedUntil, 0);
}

TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords)
{
initializeStorage();
std::vector<StorageRecord> batch;
const size_t kCount = 8;
for (size_t i = 0; i < kCount; i++)
{
batch.push_back({ "g" + std::to_string(i), "token", EventLatency_Normal,
EventPersistence_Normal, static_cast<int64_t>(i + 1), { static_cast<uint8_t>(i) } });
}

// Every record in the batch is stored and individually retrievable. (The
// single-transaction batching is a performance optimization verified by
// benchmarking; this test covers the batch's storage correctness.)
EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount);

TestRecordConsumer consumer;
EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true);
ASSERT_THAT(consumer.records.size(), kCount);
for (size_t i = 0; i < kCount; i++)
{
std::string expectedId = "g" + std::to_string(i);
bool found = false;
for (auto const& r : consumer.records)
{
if (r.id == expectedId) { found = true; break; }
}
EXPECT_TRUE(found) << "record " << expectedId << " was not retrieved";
}
}

TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned)
{
initializeStorage();
Expand Down
Loading