Prevent event loss when a disk write fails during Flush()#1496
Prevent event loss when a disk write fails during Flush()#1496bmehta001 wants to merge 7 commits into
Conversation
OfflineStorageHandler::Flush() moved records out of the in-memory queue with GetRecords() (which removes them) before handing them to the disk store via StoreRecords(). If the disk write failed for some or all records (e.g. a SQLite write error), those records had already been removed from memory and were never re-queued, so events were silently lost -- only the smaller totalSaved count hinted at it. Reserve-then-confirm-delete instead: - Reserve the records in memory (GetAndReserveRecords with a nominal lease) so the originals stay retrievable rather than being dropped outright. - Persist them one at a time, recording exactly which ids succeeded and failed. - DeleteRecords() only the persisted ids (now safely on disk). - ReleaseRecords() the failed ids back into the in-memory queue so they are retried on a subsequent flush instead of being lost. Added OfflineStorageHandlerFlushTests.FailedDiskWriteDuringFlushReturnsRecordsToMemory: records that the SQLite store rejects (timestamp<=0) remain in memory after Flush(). Verified the test fails against the previous GetRecords()-based Flush (GetRecordCount()==0) and passes with this change. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes a data-loss scenario in OfflineStorageHandler::Flush() where records were removed from the in-memory queue before being safely persisted to the SQLite-backed disk store. The new approach reserves records first, persists them, then deletes only the successfully persisted records while releasing failed ones back to memory for retry.
Changes:
- Update
OfflineStorageHandler::Flush()to reserve records in memory, persist them individually to disk, then delete/release by ID based on per-record success. - Add a regression unit test ensuring records rejected by the SQLite store during
Flush()are returned to the in-memory queue. - Add supporting test scaffolding (observer/config mocks and a dispatcher that prevents background flush execution).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
lib/offline/OfflineStorageHandler.cpp |
Switch Flush to reserve→persist→confirm-delete/release, preventing silent loss on disk write failures. |
tests/unittests/OfflineStorageTests.cpp |
Add regression test covering failed disk writes during Flush and ensuring records remain in memory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #include "NullObjects.hpp" | ||
|
|
||
| #include <sstream> | ||
|
|
| std::ostringstream dbPath; | ||
| dbPath << GetTempDirectory() << "FlushReserveTest.db"; | ||
| std::remove(dbPath.str().c_str()); | ||
| config[CFG_STR_CACHE_FILE_PATH] = dbPath.str(); | ||
| config[CFG_INT_RAM_QUEUE_SIZE] = 1024 * 1024; // enable the in-memory queue | ||
|
|
| handler.Shutdown(); | ||
| std::remove(dbPath.str().c_str()); | ||
| } |
OfflineStorageTests.cpp: add explicit #include <cstdio> for std::remove (was relying on transitive includes), and clean up the SQLite WAL companion files (-wal/-shm/-journal) in addition to the main .db via a RemoveDbFiles() helper, so the test does not leave temp files accumulating in the temp directory. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize(); | ||
| if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk)) | ||
| { |
| // Persist to disk one record at a time, tracking exactly which records | ||
| // were stored so we only drop the persisted ones from memory. | ||
| std::vector<StorageRecordId> persistedIds; | ||
| std::vector<StorageRecordId> failedIds; | ||
| persistedIds.reserve(records.size()); | ||
| for (auto& record : records) | ||
| { | ||
| if (m_offlineStorageDisk->StoreRecord(record)) | ||
| persistedIds.push_back(record.id); | ||
| else | ||
| failedIds.push_back(record.id); | ||
| } |
| std::ostringstream dbPath; | ||
| dbPath << GetTempDirectory() << "FlushReserveTest.db"; | ||
| RemoveDbFiles(dbPath.str()); |
OfflineStorageHandler::Flush(): null-guard the m_offlineStorageMemory->GetSize() read so Flush() is safe when the RAM queue is disabled (CFG_INT_RAM_QUEUE_SIZE==0) and only disk storage exists. OfflineStorageTests.cpp: use a unique per-run temp DB filename (append getUtcSystemTimeMs()) to avoid cross-run/parallel interference; keep cleanup best-effort. Per-record StoreRecord() in Flush is kept deliberately and now documented: a batched StoreRecords() only returns a count, so on a partial failure we could not tell which records to delete vs. retry, and re-storing already-persisted records would duplicate them (events table has no unique record_id constraint). The Room batched-JNI efficiency trade-off is noted for a possible follow-up (a batch disk-store API that reports per-record success). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| class NoopTaskDispatcher : public ITaskDispatcher | ||
| { | ||
| public: | ||
| void Join() override {} | ||
| void Queue(Task* task) override { UNREFERENCED_PARAMETER(task); } | ||
| bool Cancel(Task* task, uint64_t waitTime = 0) override | ||
| { | ||
| UNREFERENCED_PARAMETER(task); | ||
| UNREFERENCED_PARAMETER(waitTime); | ||
| return true; | ||
| } | ||
| }; |
| { | ||
| if (m_offlineStorageDisk->StoreRecord(record)) | ||
| persistedIds.push_back(record.id); | ||
| else | ||
| failedIds.push_back(record.id); |
OfflineStorage_SQLite::StoreRecord: return false when the INSERT execute() fails (was ignoring the result and returning true even on a real write failure). The Flush() reserve/confirm-delete logic in this PR relies on the disk backend reporting per-record failure; without this, a failed sqlite3_step would still be treated as persisted and dropped from memory. (This is the same one-line fix as PR microsoft#1491, included here so this PR is correct on its own; trivial overlap that resolves cleanly at merge.) OfflineStorageTests.cpp NoopTaskDispatcher: own queued tasks and delete them on Cancel()/Join()/destruction instead of dropping the Task* (PAL::scheduleTask allocates with new and expects the dispatcher to take ownership), so the helper cannot leak. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| for (auto& record : records) | ||
| { | ||
| if (m_offlineStorageDisk->StoreRecord(record)) | ||
| persistedIds.push_back(record.id); | ||
| else |
Flush() reserves records with a non-zero lease, which stamps reservedUntil on the records handed to the consumer. SQLite ignores reservedUntil on insert, but the Room backend persists it, so freshly flushed records would be written as "reserved" for ~2 minutes and become temporarily ineligible for upload selection. Clear record.reservedUntil before the disk StoreRecord() so the in-memory lease never leaks to durable storage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| // timestamp <= 0 is accepted by the in-memory queue but rejected by the | ||
| // SQLite disk store, simulating a disk write failure during flush. | ||
| const size_t kCount = 5; |
The test comment said it simulates a "disk write failure"; in fact StoreRecord() fails via the SQLite backend's input validation (timestamp <= 0), not a literal disk I/O error. Reword to describe the actual mechanism while noting it exercises the same Flush() failure-handling path (a failed record is returned to memory). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| // Persist to disk one record at a time, tracking exactly which records | ||
| // were stored so we only drop the persisted ones from memory. This | ||
| // intentionally favors correctness over batching: a batched StoreRecords() | ||
| // only returns a count, so on a partial failure we could not tell which | ||
| // records to keep vs. retry, and re-storing already-saved records would | ||
| // duplicate them (the events table has no unique record_id constraint). | ||
| std::vector<StorageRecordId> persistedIds; |
| // Reserve the records in memory (with a nominal lease) instead of | ||
| // removing them outright. The lease duration is not time-critical | ||
| // here because the records are persisted and resolved synchronously | ||
| // under m_flushLock; reserving simply keeps the originals retrievable | ||
| // so any that fail to persist can be returned to the queue. | ||
| const unsigned reserveLeaseMs = 120000; | ||
| std::vector<StorageRecord> records; | ||
| auto consumer = [&records](StorageRecord&& record) -> bool { | ||
| records.push_back(std::move(record)); |
Replace the reserve/confirm-delete approach with the simpler drain-and-re-insert variant Copilot originally suggested. Flush() now drains the in-memory queue into a single local batch (GetRecords), persists each record, and re-inserts only the records that fail to persist. This keeps the data-loss fix (failed records are returned to memory, not dropped) while resolving two issues with the reserve approach: - Peak memory: it no longer holds two full copies of every record (reserved map + local vector) during the disk-write loop -- only a single copy is in flight. - It no longer sets a reservation lease, so there is no reservedUntil to clear and no risk of the Room backend persisting a stale reservation. Per-record StoreRecord() is still used deliberately (documented): a batched StoreRecords() only returns a count, so on partial failure we could not tell which records to re-queue, and re-storing already-saved records would duplicate them (no unique record_id constraint). The Room per-record JNI cost remains a possible follow-up that would need a public IOfflineStorage batch-with-results API. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| // Persist one record at a time so we know exactly which succeeded. A | ||
| // batched StoreRecords() only returns a count, so on a partial failure | ||
| // we could not tell which records to re-queue, and re-storing | ||
| // already-saved records would duplicate them (the events table has no | ||
| // unique record_id constraint). | ||
| size_t totalSaved = 0; | ||
| size_t totalFailed = 0; | ||
| for (auto& record : records) | ||
| { | ||
| if (m_offlineStorageDisk->StoreRecord(record)) | ||
| { | ||
| ++totalSaved; | ||
| } | ||
| else | ||
| { | ||
| // Return the record to the in-memory queue for retry on a | ||
| // subsequent flush instead of dropping it. | ||
| ++totalFailed; | ||
| m_offlineStorageMemory->StoreRecord(record); |
| if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob)) | ||
| { | ||
| LOG_ERROR("Failed to store event %s:%s: database write failed", | ||
| tenantTokenToId(record.tenantToken).c_str(), record.id.c_str()); | ||
| m_observer->OnStorageFailed("Database write failed"); | ||
| return false; |
Combine the Flush() data-loss fix into this storage-data-safety PR (the two are halves of the same fix: this PR already makes OfflineStorage_SQLite::StoreRecord report write failures; Flush() must act on that). OfflineStorageHandler::Flush() previously drained the in-memory queue with GetRecords() (which removes records) and handed them to StoreRecords() before confirming persistence. On a partial/total disk write failure the un-persisted records were already gone from memory and never re-queued -> events lost. Flush() now drains into a local batch, persists one record at a time, and re-inserts only the records that fail to persist (so failures are retried, not lost). Per-record StoreRecord() is used deliberately: a batched StoreRecords() only returns a count, so on a partial failure we could not tell which records to re-queue, and re-storing already-saved records would duplicate them (no unique record_id constraint). Also null-guards the dbSizeBeforeFlush read so Flush() is safe with disk-only storage (CFG_INT_RAM_QUEUE_SIZE == 0). Adds OfflineStorageHandlerFlushTests.FailedDiskWriteDuringFlushReturnsRecordsToMemory (records the SQLite store rejects stay in memory after Flush; verified it fails against the previous GetRecords()-based Flush). Closes the separate PR microsoft#1496. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Folded into #1491 — the Flush() event-loss fix and the StoreRecord-failure-reporting fix are two halves of the same offline-storage data-safety change, so they're now combined in #1491 (which already contained the StoreRecord half). The Flush logic here (drain-and-re-insert-failed) and its regression test were moved verbatim into #1491. |
Summary
Fixes a latent event-loss bug in
OfflineStorageHandler::Flush(), surfaced by Copilot's review of #1491.Flush()pulled records out of the in-memory queue withGetRecords()— which removes them — before writing them to the disk store viaStoreRecords(). If the disk write failed for some or all records (e.g. a SQLite write error), those records had already been dropped from memory and were never re-queued, so the events were silently lost; only a smallertotalSavedcount hinted at it.Fix — reserve, then confirm-delete
GetRecords()(removes immediately)GetAndReserveRecords()with a nominal lease (originals retained in the reserved set)StoreRecords()(count only)StoreRecord(), tracking which ids succeeded/failedDeleteRecords(persistedIds)— dropped only once safely on diskReleaseRecords(failedIds)— returned to the in-memory queue, retried next flushThe reserve lease is nominal: records are reserved and resolved synchronously under
m_flushLock, andreservedUntilhas no reaper inMemoryStorage.Test
OfflineStorageHandlerFlushTests.FailedDiskWriteDuringFlushReturnsRecordsToMemory: records the SQLite store rejects (timestamp <= 0, accepted by the RAM queue but rejected on disk) must still be in memory afterFlush().GetRecords()-based Flush (GetRecordCount() == 0— records lost).GetRecordCount() == 5).Validation
WSL
UnitTests: storage + LogManager suites 82/82 pass, including the new test.Context
Split out of the #1491 review as requested, keeping #1491 scoped to its original empty-filter-delete guard and synchronous-store failure propagation.