Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 285 additions & 2 deletions ext/duckdb/function_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ static struct callback_request *g_request_list = NULL;
static VALUE g_executor_thread = Qnil;
static int g_executor_started = 0;

/*
* GC-protection array holding every live per-worker proxy Ruby thread.
* Proxies are created from non-Ruby init hooks (via the global executor) and
* are not reachable from any marked object, so without this array the GC could
* collect a proxy thread while DuckDB still dispatches callbacks to it.
*/
static VALUE g_proxy_threads = Qnil;

/* Data passed to the executor wait function */
struct executor_wait_data {
struct callback_request *request;
Expand Down Expand Up @@ -166,6 +174,11 @@ void rbduckdb_function_executor_ensure_started(void) {
}
#endif

if (g_proxy_threads == Qnil) {
g_proxy_threads = rb_ary_new();
rb_global_variable(&g_proxy_threads);
}

g_executor_thread = rb_thread_create(executor_thread_func, NULL);
rb_global_variable(&g_executor_thread);
g_executor_started = 1;
Expand Down Expand Up @@ -242,7 +255,270 @@ static void *callback_with_gvl(void *data) {
return NULL;
}

void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data) {
/*
* ============================================================================
* Per-worker proxy thread
* ============================================================================
*
* One dedicated Ruby thread per DuckDB worker thread. Same hand-off protocol as
* the global executor (mutex + condvars), but private to a single worker so
* that callbacks from different workers no longer serialize through one queue.
*
* Pattern follows the FFI gem's async callback dispatcher:
* https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c
*/
struct worker_proxy {
VALUE ruby_thread;
volatile int stop_requested;
rbduckdb_function_callback_t callback_func;
void *callback_data;
volatile int has_request;
volatile int request_done;
volatile int thread_exited;
#ifdef _MSC_VER
CRITICAL_SECTION lock;
CONDITION_VARIABLE request_cond;
CONDITION_VARIABLE request_done_cond;
CONDITION_VARIABLE thread_exit_cond;
#else
pthread_mutex_t lock;
pthread_cond_t request_cond;
pthread_cond_t request_done_cond;
pthread_cond_t thread_exit_cond;
#endif
};

/* Runs without GVL: the proxy waits for a callback request */
static void *proxy_wait_func(void *data) {
struct worker_proxy *proxy = (struct worker_proxy *)data;

#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
while (!proxy->stop_requested && !proxy->has_request) {
SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE);
}
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
while (!proxy->stop_requested && !proxy->has_request) {
pthread_cond_wait(&proxy->request_cond, &proxy->lock);
}
pthread_mutex_unlock(&proxy->lock);
#endif

return NULL;
}

/* Unblock function for the proxy thread (VM shutdown or Thread#kill) */
static void proxy_stop_func(void *data) {
struct worker_proxy *proxy = (struct worker_proxy *)data;

#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
proxy->stop_requested = 1;
WakeConditionVariable(&proxy->request_cond);
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
proxy->stop_requested = 1;
pthread_cond_signal(&proxy->request_cond);
pthread_mutex_unlock(&proxy->lock);
#endif
}

/* The proxy thread main loop (Ruby thread) */
static VALUE proxy_thread_func(void *data) {
struct worker_proxy *proxy = (struct worker_proxy *)data;

while (!proxy->stop_requested) {
/* Release the GVL and wait for a request */
rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy);

if (proxy->stop_requested) break;

if (proxy->has_request) {
/* Execute the callback with the GVL held */
proxy->callback_func(proxy->callback_data);

/* Signal completion to the DuckDB worker thread */
#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
proxy->has_request = 0;
proxy->request_done = 1;
WakeConditionVariable(&proxy->request_done_cond);
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
proxy->has_request = 0;
proxy->request_done = 1;
pthread_cond_signal(&proxy->request_done_cond);
pthread_mutex_unlock(&proxy->lock);
#endif
}
}

/* Stop being GC-protected now that we are about to exit */
if (g_proxy_threads != Qnil) {
rb_ary_delete(g_proxy_threads, proxy->ruby_thread);
}

/*
* Signal that this thread has finished and no longer touches the proxy
* struct. Only after this may rbduckdb_worker_proxy_destroy free it.
*/
#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
proxy->thread_exited = 1;
WakeConditionVariable(&proxy->thread_exit_cond);
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
proxy->thread_exited = 1;
pthread_cond_signal(&proxy->thread_exit_cond);
pthread_mutex_unlock(&proxy->lock);
#endif

return Qnil;
}

struct worker_proxy *rbduckdb_worker_proxy_create(void) {
/*
* Use calloc (not xcalloc): rbduckdb_worker_proxy_destroy frees the struct
* from a non-Ruby thread where xfree is unsafe.
*/
struct worker_proxy *proxy = calloc(1, sizeof(struct worker_proxy));
if (proxy == NULL) {
rb_raise(rb_eNoMemError, "failed to allocate worker_proxy");
}

proxy->stop_requested = 0;
proxy->has_request = 0;
proxy->request_done = 0;
proxy->thread_exited = 0;

#ifdef _MSC_VER
InitializeCriticalSection(&proxy->lock);
InitializeConditionVariable(&proxy->request_cond);
InitializeConditionVariable(&proxy->request_done_cond);
InitializeConditionVariable(&proxy->thread_exit_cond);
#else
pthread_mutex_init(&proxy->lock, NULL);
pthread_cond_init(&proxy->request_cond, NULL);
pthread_cond_init(&proxy->request_done_cond, NULL);
pthread_cond_init(&proxy->thread_exit_cond, NULL);
#endif

proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy);

if (g_proxy_threads != Qnil) {
rb_ary_push(g_proxy_threads, proxy->ruby_thread);
}

return proxy;
}

/*
* Hand a callback to a proxy and block until it completes.
* Called from the DuckDB worker thread (non-Ruby thread) that owns this proxy.
*/
static void dispatch_callback_to_proxy(struct worker_proxy *proxy, rbduckdb_function_callback_t cb, void *user_data) {
#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
proxy->callback_func = cb;
proxy->callback_data = user_data;
proxy->request_done = 0;
proxy->has_request = 1;
WakeConditionVariable(&proxy->request_cond);
LeaveCriticalSection(&proxy->lock);

EnterCriticalSection(&proxy->lock);
while (!proxy->request_done) {
SleepConditionVariableCS(&proxy->request_done_cond, &proxy->lock, INFINITE);
}
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
proxy->callback_func = cb;
proxy->callback_data = user_data;
proxy->request_done = 0;
proxy->has_request = 1;
pthread_cond_signal(&proxy->request_cond);
pthread_mutex_unlock(&proxy->lock);

pthread_mutex_lock(&proxy->lock);
while (!proxy->request_done) {
pthread_cond_wait(&proxy->request_done_cond, &proxy->lock);
}
pthread_mutex_unlock(&proxy->lock);
#endif
}

/* Blocks until the proxy thread has fully exited. Runs without the GVL. */
static void *proxy_join_func(void *data) {
struct worker_proxy *proxy = (struct worker_proxy *)data;

#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
while (!proxy->thread_exited) {
SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE);
}
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
while (!proxy->thread_exited) {
pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock);
}
pthread_mutex_unlock(&proxy->lock);
#endif

return NULL;
}

void rbduckdb_worker_proxy_destroy(void *data) {
struct worker_proxy *proxy = (struct worker_proxy *)data;
if (proxy == NULL) return;

/* Ask the proxy thread to stop. */
#ifdef _MSC_VER
EnterCriticalSection(&proxy->lock);
proxy->stop_requested = 1;
WakeConditionVariable(&proxy->request_cond);
LeaveCriticalSection(&proxy->lock);
#else
pthread_mutex_lock(&proxy->lock);
proxy->stop_requested = 1;
pthread_cond_signal(&proxy->request_cond);
pthread_mutex_unlock(&proxy->lock);
#endif

/*
* Wait until the proxy thread has fully exited. Before exiting it runs Ruby
* code (removing itself from the GC-protection array), which needs the GVL.
* DuckDB may invoke this destructor either from a worker thread (no GVL) or
* — depending on when it tears down the local state — from a Ruby thread
* that holds the GVL. In the latter case we must release the GVL while
* waiting, or the proxy thread could never acquire it and we would deadlock.
*/
if (ruby_native_thread_p() && ruby_thread_has_gvl_p()) {
rb_thread_call_without_gvl(proxy_join_func, proxy, NULL, NULL);
} else {
proxy_join_func(proxy);
}

/* The proxy thread is gone; tear down OS primitives and free the struct. */
#ifdef _MSC_VER
DeleteCriticalSection(&proxy->lock);
#else
pthread_cond_destroy(&proxy->thread_exit_cond);
pthread_cond_destroy(&proxy->request_done_cond);
pthread_cond_destroy(&proxy->request_cond);
pthread_mutex_destroy(&proxy->lock);
#endif

free(proxy);
}

void rbduckdb_function_executor_dispatch_via_proxy(rbduckdb_function_callback_t cb, void *user_data, struct worker_proxy *proxy) {
if (ruby_native_thread_p()) {
if (ruby_thread_has_gvl_p()) {
/* Case 1: Ruby thread with GVL - call directly */
Expand All @@ -254,8 +530,15 @@ void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *
arg.user_data = user_data;
rb_thread_call_with_gvl(callback_with_gvl, &arg);
}
} else if (proxy != NULL) {
/* Case 3a: Non-Ruby thread with a per-worker proxy */
dispatch_callback_to_proxy(proxy, cb, user_data);
} else {
/* Case 3: Non-Ruby thread - dispatch to executor */
/* Case 3b: Non-Ruby thread - dispatch to the global executor */
dispatch_callback_to_executor(cb, user_data);
}
}

void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data) {
rbduckdb_function_executor_dispatch_via_proxy(cb, user_data, NULL);
}
39 changes: 39 additions & 0 deletions ext/duckdb/function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,43 @@ void rbduckdb_function_executor_ensure_started(void);
*/
void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data);

/*
* ============================================================================
* Per-worker proxy threads (DuckDB >= 1.5.0)
* ============================================================================
*
* The global executor above serializes every non-Ruby-thread callback through
* a single Ruby thread. A per-worker proxy instead gives each DuckDB worker
* thread its own dedicated Ruby thread, so callbacks from different workers can
* run concurrently — they compete for the GVL in round-robin fashion, which
* helps when callbacks release the GVL (e.g. on I/O).
*
* Proxies are created lazily from DuckDB's per-worker init hook and stored in
* DuckDB's thread-local state; the global executor remains the fallback.
*/

/* Opaque per-worker proxy handle. */
struct worker_proxy;

/*
* Create a per-worker proxy thread. Must be called with the GVL held
* (typically by dispatching this through the global executor from a per-worker
* init callback, which itself runs on a non-Ruby thread).
*/
struct worker_proxy *rbduckdb_worker_proxy_create(void);

/*
* Destroy a per-worker proxy. The signature matches duckdb_delete_callback_t so
* it can be handed directly to DuckDB. Safe to call from a non-Ruby thread: it
* touches only OS primitives and frees memory allocated with calloc.
*/
void rbduckdb_worker_proxy_destroy(void *proxy);

/*
* Like rbduckdb_function_executor_dispatch, but on the non-Ruby-thread path
* (Case 3) it routes through the given per-worker proxy when non-NULL, falling
* back to the global executor when NULL. Cases 1 and 2 are unchanged.
*/
void rbduckdb_function_executor_dispatch_via_proxy(rbduckdb_function_callback_t cb, void *user_data, struct worker_proxy *proxy);

#endif
Loading
Loading