diff --git a/lib/offline/OfflineStorage_SQLite.cpp b/lib/offline/OfflineStorage_SQLite.cpp index b9b2ed83d..02134e5c4 100644 --- a/lib/offline/OfflineStorage_SQLite.cpp +++ b/lib/offline/OfflineStorage_SQLite.cpp @@ -147,40 +147,31 @@ namespace MAT_NS_BEGIN { m_db->execute(command.c_str()); } - bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const { - // TODO: [MG] - this works, but may not play nicely with several LogManager instances - // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); - if (record.id.empty() || record.tenantToken.empty() || static_cast(record.latency) < 0 || record.timestamp <= 0) { LOG_ERROR("Failed to store event %s:%s: Invalid parameters", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); m_observer->OnStorageFailed("Invalid parameters"); return false; } + return true; + } - if (!m_db) { - LOG_ERROR("Failed to store event %s:%s: Database is not open", + bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record) + { + if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob)) + { + LOG_ERROR("Failed to store event %s:%s: database write failed", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageOpenFailed("Database is not open"); return false; } + m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); + return true; + } - { -#ifdef ENABLE_LOCKING - LOCKGUARD(m_lock); - DbTransaction transaction(m_db.get()); - if (!transaction.locked) - { - LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); - m_observer->OnStorageFailed("Database error"); - return false; - } -#endif - SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast(record.latency), static_cast(record.persistence), record.timestamp, record.blob); - m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size(); - } - + void OfflineStorage_SQLite::checkStorageSizeLimits() + { if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit)) { auto now = PAL::getMonotonicTimeMs(); @@ -210,19 +201,119 @@ namespace MAT_NS_BEGIN { m_resizing = false; } } + } - return true; + bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record) + { + // TODO: [MG] - this works, but may not play nicely with several LogManager instances + // static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data); + + if (!isValidRecord(record)) { + return false; + } + + if (!m_db) { + LOG_ERROR("Failed to store event %s:%s: Database is not open", + tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageOpenFailed("Database is not open"); + return false; + } + + bool stored = false; + { +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); + m_observer->OnStorageFailed("Database error"); + return false; + } +#endif + stored = insertRecordUnsafe(record); + } + + if (!stored) { + // Report the write failure after the transaction has closed, so the + // observer callback never runs while BEGIN EXCLUSIVE is held. + m_observer->OnStorageFailed("Database write failed"); + } + + // Run the size-limit check after the transaction, matching the original + // per-record path (which ran it on every StoreRecord call). + checkStorageSizeLimits(); + + return stored; } size_t OfflineStorage_SQLite::StoreRecords(std::vector & records) { + if (records.empty()) { + return 0; + } + + // Validate (and report rejects) first -- before both the DB-open check and + // the transaction -- so reporting matches the single StoreRecord() (which + // validates before everything) regardless of whether the DB is open, and + // so that no observer callback runs while the BEGIN EXCLUSIVE transaction + // is held. + std::vector valid; + valid.reserve(records.size()); + for (auto const& i : records) { + if (isValidRecord(i)) { + valid.push_back(&i); + } + } + + if (valid.empty()) { + // Every record was invalid (already reported above). Match the single + // StoreRecord(), which returns after validation without checking + // DB-open. + return 0; + } + + if (!m_db) { + LOG_ERROR("Failed to store %zu events: Database is not open", valid.size()); + m_observer->OnStorageOpenFailed("Database is not open"); + return 0; + } + size_t stored = 0; - for (auto & i : records) { - if (StoreRecord(i)) { - ++stored; + size_t failed = 0; + { + // Batch all inserts into a single transaction: one BEGIN EXCLUSIVE / + // COMMIT (one fsync) for the whole flush instead of one per record. +#ifdef ENABLE_LOCKING + LOCKGUARD(m_lock); + DbTransaction transaction(m_db.get()); + if (!transaction.locked) + { + LOG_ERROR("Failed to store %zu events: Database error", valid.size()); + m_observer->OnStorageFailed("Database error"); + return 0; } +#endif + for (auto const* r : valid) { + if (insertRecordUnsafe(*r)) { + ++stored; + } + else { + ++failed; + } + } + } + + if (failed) { + // Report write failures once, after the transaction has closed. + m_observer->OnStorageFailed("Database write failed"); } + + // Run the size-full notification / resize check once after the batch, + // matching the original per-record path (which ran it on every insert). + checkStorageSizeLimits(); + return stored; } diff --git a/lib/offline/OfflineStorage_SQLite.hpp b/lib/offline/OfflineStorage_SQLite.hpp index 18643cde5..1d32a4c77 100644 --- a/lib/offline/OfflineStorage_SQLite.hpp +++ b/lib/offline/OfflineStorage_SQLite.hpp @@ -122,6 +122,15 @@ namespace MAT_NS_BEGIN { private: size_t GetRecordCountUnsafe(EventLatency latency) const; + + // Validate a record's required fields; reports OnStorageFailed on rejection. + bool isValidRecord(StorageRecord const& record) const; + // Insert one already-validated record. Caller must hold m_lock and have an + // active DbTransaction (when ENABLE_LOCKING). Updates m_DbSizeEstimate. + // Returns false (without updating the size estimate) if the insert fails. + bool insertRecordUnsafe(StorageRecord const& record); + // Run the DB-size-full notification and resize checks (after inserts). + void checkStorageSizeLimits(); }; diff --git a/tests/unittests/OfflineStorageTests_SQLite.cpp b/tests/unittests/OfflineStorageTests_SQLite.cpp index e90b0a9ae..1550211c8 100644 --- a/tests/unittests/OfflineStorageTests_SQLite.cpp +++ b/tests/unittests/OfflineStorageTests_SQLite.cpp @@ -153,6 +153,37 @@ TEST_F(OfflineStorageTests_SQLite, GetAndReservedReturnsStoredRecord) EXPECT_THAT(consumer.records[0].reservedUntil, 0); } +TEST_F(OfflineStorageTests_SQLite, StoreRecordsBatchStoresAllRecords) +{ + initializeStorage(); + std::vector batch; + const size_t kCount = 8; + for (size_t i = 0; i < kCount; i++) + { + batch.push_back({ "g" + std::to_string(i), "token", EventLatency_Normal, + EventPersistence_Normal, static_cast(i + 1), { static_cast(i) } }); + } + + // Every record in the batch is stored and individually retrievable. (The + // single-transaction batching is a performance optimization verified by + // benchmarking; this test covers the batch's storage correctness.) + EXPECT_THAT(offlineStorage->StoreRecords(batch), kCount); + + TestRecordConsumer consumer; + EXPECT_THAT(offlineStorage->GetAndReserveRecords(consumer, 100000), true); + ASSERT_THAT(consumer.records.size(), kCount); + for (size_t i = 0; i < kCount; i++) + { + std::string expectedId = "g" + std::to_string(i); + bool found = false; + for (auto const& r : consumer.records) + { + if (r.id == expectedId) { found = true; break; } + } + EXPECT_TRUE(found) << "record " << expectedId << " was not retrieved"; + } +} + TEST_F(OfflineStorageTests_SQLite, ReservedRecordIsNotReturned) { initializeStorage();