Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/api/capi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ evt_status_t mat_open_core(
{
if ((config == nullptr) || (config[0] == 0))
{
// Invalid configuration
// Invalid configuration. ctx is guaranteed non-null by the callers
// (mat_open / mat_open_with_params); set result and a known-invalid
// handle so callers don't observe a stale ctx->handle on error.
ctx->result = static_cast<evt_status_t>(EFAULT);
ctx->handle = 0;
return EFAULT;
}

Expand All @@ -91,7 +95,12 @@ evt_status_t mat_open_core(
{
if (client->ctx_data == config)
{
// Guest instance with the same config is already open
// Guest instance with the same config is already open.
// Return its handle so the caller still gets a usable handle
// (rather than leaving ctx->handle uninitialized), and set
// ctx->result to match the returned status like the other paths.
ctx->handle = code;
Comment on lines +98 to +102
ctx->result = static_cast<evt_status_t>(EALREADY);
return EALREADY;
}
// hash code is assigned to another client, increment and retry
Expand Down Expand Up @@ -143,6 +152,11 @@ evt_status_t mat_open_core(
}
catch (...)
{
// Roll back the partially-populated client so a later open with the
// same config does not find stale half-initialized state.
remove_client(code);
ctx->result = static_cast<evt_status_t>(EFAULT);
ctx->handle = 0;
return EFAULT;
}
}
Expand All @@ -158,6 +172,11 @@ evt_status_t mat_open_core(
}
catch (...)
{
// Roll back the partially-populated client so a later open with the
// same config does not find stale half-initialized state.
remove_client(code);
ctx->result = static_cast<evt_status_t>(EFAULT);
ctx->handle = 0;
return EFAULT;
}
}
Expand Down
13 changes: 8 additions & 5 deletions lib/offline/OfflineStorageHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace MAT_NS_BEGIN {
m_shutdownStarted(false),
m_memoryDbSize(0),
m_queryDbSize(0),
m_cacheMemorySizeLimitInBytes(0),
m_isStorageFullNotificationSend(false)
{
// TODO: [MG] - OfflineStorage_SQLite.cpp is performing similar checks
Expand Down Expand Up @@ -83,7 +84,7 @@ namespace MAT_NS_BEGIN {
void OfflineStorageHandler::Initialize(IOfflineStorageObserver& observer)
{
m_observer = &observer;
uint32_t cacheMemorySizeLimitInBytes = m_config[CFG_INT_RAM_QUEUE_SIZE];
m_cacheMemorySizeLimitInBytes = m_config[CFG_INT_RAM_QUEUE_SIZE];

m_offlineStorageDisk = OfflineStorageFactory::Create(m_logManager, m_config);
if (m_offlineStorageDisk)
Expand All @@ -94,7 +95,7 @@ namespace MAT_NS_BEGIN {
// TODO: [MG] - consider passing m_offlineStorageDisk to m_offlineStorageMemory,
// so that the Flush() op on memory storage leads to saving unflushed events to
// disk.
if (cacheMemorySizeLimitInBytes > 0)
if (m_cacheMemorySizeLimitInBytes > 0)
{
m_offlineStorageMemory.reset(new MemoryStorage(m_logManager, m_config));
m_offlineStorageMemory->Initialize(*this);
Expand Down Expand Up @@ -174,7 +175,7 @@ namespace MAT_NS_BEGIN {
// than the handle gets replaced by nullptr in this DeferredCallbackHandle obj.
m_flushHandle.Cancel();

size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize();
size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0;
if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk))
{
// This will block on and then take a lock for the duration of this move, and
Expand Down Expand Up @@ -233,8 +234,10 @@ namespace MAT_NS_BEGIN {
return false;
}

// Check cache size only once at start
static uint32_t cacheMemorySizeLimitInBytes = m_config[CFG_INT_RAM_QUEUE_SIZE];
// Cache size limit is per-instance config computed once in Initialize();
// it must NOT be a function-local static, which would share the first
// LogManager's value with every other LogManager instance.
uint32_t cacheMemorySizeLimitInBytes = m_cacheMemorySizeLimitInBytes;

if (nullptr != m_offlineStorageMemory && !m_shutdownStarted)
{
Expand Down
1 change: 1 addition & 0 deletions lib/offline/OfflineStorageHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ namespace MAT_NS_BEGIN {
unsigned m_memoryDbSize;
unsigned m_memoryDbSizeNotificationLimit;
unsigned m_queryDbSize;
uint32_t m_cacheMemorySizeLimitInBytes;
bool m_isStorageFullNotificationSend;

protected:
Expand Down
14 changes: 13 additions & 1 deletion lib/pal/TaskDispatcher_CAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <algorithm>
#include <atomic>
#include <exception>
#include <memory>
#include <mutex>
#include <map>
Expand Down Expand Up @@ -37,7 +38,18 @@ namespace PAL_NS_BEGIN {
void OnCallback()
{
if (m_task) {
(*m_task)();
// The task is host/user code running on the external dispatcher's
// thread; an exception escaping here would terminate the process.
// Log it (mirroring WorkerThread) instead of swallowing silently.
try {
(*m_task)();
}
catch (const std::exception& ex) {
LOG_ERROR("Unhandled exception in CAPI task: %s", ex.what());
}
catch (...) {
LOG_ERROR("Unhandled non-standard exception in CAPI task");
}
}
ReleaseItem();
}
Expand Down
16 changes: 15 additions & 1 deletion lib/pal/WorkerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "pal/WorkerThread.hpp"
#include "pal/PAL.hpp"

#include <exception>

#if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32)

/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
Expand Down Expand Up @@ -238,7 +240,19 @@ namespace PAL_NS_BEGIN {
// Item wasn't cancelled before it could be executed
if (self->m_itemInProgress != nullptr) {
LOG_TRACE("%10llu Execute item=%p type=%s\n", wakeupCount, item.get(), item.get()->TypeName.c_str() );
(*item)();
// A task can run arbitrary work (storage I/O, HTTP encode, and
// user DebugEventListener callbacks). An exception escaping here
// would unwind out of the thread entry function and call
// std::terminate, killing the host process. Contain it.
try {
(*item)();
}
catch (const std::exception& ex) {
LOG_ERROR("Unhandled exception in worker task: %s", ex.what());
}
catch (...) {
LOG_ERROR("Unhandled non-standard exception in worker task");
Comment on lines +250 to +254
}
self->m_itemInProgress = nullptr;
}

Expand Down
41 changes: 41 additions & 0 deletions tests/unittests/PalTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@

#include "common/Common.hpp"
#include "pal/PseudoRandomGenerator.hpp"
#include "pal/TaskDispatcher.hpp"
#include "pal/WorkerThread.hpp"
#include "Version.hpp"

#include <atomic>
#include <cstdint>
#include <stdexcept>
#include <string>

#ifdef HAVE_MAT_LOGGING
Expand Down Expand Up @@ -180,6 +184,43 @@ TEST_F(PalTests, SdkVersion)
EXPECT_THAT(PAL::getSdkVersion(), Eq(v));
}

namespace
{
class ThrowingTaskHelper
{
public:
void ThrowStdException() { throw std::runtime_error("worker task boom"); }
void ThrowNonStdException() { throw 123; }
void Signal(std::atomic<bool>* ran) { ran->store(true); }
};
}

// A task throwing an exception must be contained by the worker thread loop;
// otherwise the exception unwinds out of the thread entry function and calls
// std::terminate, killing the host process.
TEST_F(PalTests, WorkerThreadContainsThrowingTask)
{
auto dispatcher = PAL::WorkerThreadFactory::Create();
ThrowingTaskHelper helper;
std::atomic<bool> ranAfterStdThrow(false);
std::atomic<bool> ranAfterNonStdThrow(false);

PAL::dispatchTask(dispatcher.get(), &helper, &ThrowingTaskHelper::ThrowStdException);
PAL::dispatchTask(dispatcher.get(), &helper, &ThrowingTaskHelper::Signal, &ranAfterStdThrow);

PAL::dispatchTask(dispatcher.get(), &helper, &ThrowingTaskHelper::ThrowNonStdException);
PAL::dispatchTask(dispatcher.get(), &helper, &ThrowingTaskHelper::Signal, &ranAfterNonStdThrow);

// Wait for the follow-up tasks to run, proving the thread survived each throw.
for (int i = 0; i < 500 && !(ranAfterStdThrow.load() && ranAfterNonStdThrow.load()); ++i)
PAL::sleep(10);

EXPECT_TRUE(ranAfterStdThrow.load());
EXPECT_TRUE(ranAfterNonStdThrow.load());

dispatcher->Join();
}

#ifdef HAVE_MAT_LOGGING
class LogInitTest : public Test
{
Expand Down
21 changes: 21 additions & 0 deletions tests/unittests/TaskDispatcherCAPITests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "pal/typename.hpp"
#include "mat.h"

#include <stdexcept>

using namespace testing;
using namespace MAT;
using namespace PAL;
Expand Down Expand Up @@ -227,3 +229,22 @@ TEST(TaskDispatcherCAPITests, Join)
EXPECT_EQ(wasJoined, true);
}

TEST(TaskDispatcherCAPITests, ExecuteCallbackThatThrowsIsContained)
{
TaskDispatcher_CAPI taskDispatcher(&OnTaskDispatcherQueue, &OnTaskDispatcherCancel, &OnTaskDispatcherJoin);

AutoTestHelper testHelper;
testHelper->SetShouldExecute(true);

bool wasExecuted = false;
testHelper->SetCallbackValidation([&wasExecuted](int /*param1*/, int /*param2*/) {
wasExecuted = true;
throw std::runtime_error("task threw");
});

// The dispatcher must contain the exception so it never escapes back into
// the host's dispatcher thread (which would terminate the process).
EXPECT_NO_THROW(dispatchTask(&taskDispatcher, testHelper.get(), &TestHelper::Callback, 10 /*param1*/, 20 /*param2*/));
EXPECT_EQ(wasExecuted, true);
}

Loading