From 2079ec09241a923001c0018f435af347f21e5ceb Mon Sep 17 00:00:00 2001 From: otegami Date: Sat, 30 May 2026 11:47:22 +0800 Subject: [PATCH 1/3] feat: add per-worker proxy thread infrastructure to executor Add a per-worker proxy: one dedicated Ruby thread per DuckDB worker thread, using the same mutex/condvar hand-off protocol as the global executor but private to a single worker. This lets callbacks from different workers run concurrently instead of serializing through the one global executor queue. rbduckdb_function_executor_dispatch_via_proxy() routes the non-Ruby thread path (Case 3) through a given proxy when non-NULL, falling back to the global executor when NULL; the existing dispatch() now delegates to it with NULL, so behavior is unchanged. Live proxies are held in a GC-protected array. The new symbols are unused until table function integration lands, so this commit is behavior-preserving (full suite green). --- ext/duckdb/function_executor.c | 261 ++++++++++++++++++++++++++++++++- ext/duckdb/function_executor.h | 39 +++++ 2 files changed, 298 insertions(+), 2 deletions(-) diff --git a/ext/duckdb/function_executor.c b/ext/duckdb/function_executor.c index 84620287..425f9a3f 100644 --- a/ext/duckdb/function_executor.c +++ b/ext/duckdb/function_executor.c @@ -68,6 +68,14 @@ static struct callback_request *g_request_list = NULL; static VALUE g_executor_thread = Qnil; static int g_executor_started = 0; +/* + * GC-protection array holding every live per-worker proxy Ruby thread. + * Proxies are created from non-Ruby init hooks (via the global executor) and + * are not reachable from any marked object, so without this array the GC could + * collect a proxy thread while DuckDB still dispatches callbacks to it. + */ +static VALUE g_proxy_threads = Qnil; + /* Data passed to the executor wait function */ struct executor_wait_data { struct callback_request *request; @@ -166,6 +174,11 @@ void rbduckdb_function_executor_ensure_started(void) { } #endif + if (g_proxy_threads == Qnil) { + g_proxy_threads = rb_ary_new(); + rb_global_variable(&g_proxy_threads); + } + g_executor_thread = rb_thread_create(executor_thread_func, NULL); rb_global_variable(&g_executor_thread); g_executor_started = 1; @@ -242,7 +255,244 @@ static void *callback_with_gvl(void *data) { return NULL; } -void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data) { +/* + * ============================================================================ + * Per-worker proxy thread + * ============================================================================ + * + * One dedicated Ruby thread per DuckDB worker thread. Same hand-off protocol as + * the global executor (mutex + condvars), but private to a single worker so + * that callbacks from different workers no longer serialize through one queue. + * + * Pattern follows the FFI gem's async callback dispatcher: + * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c + */ +struct worker_proxy { + VALUE ruby_thread; + volatile int stop_requested; + rbduckdb_function_callback_t callback_func; + void *callback_data; + volatile int has_request; + volatile int request_done; + volatile int thread_exited; +#ifdef _MSC_VER + CRITICAL_SECTION lock; + CONDITION_VARIABLE request_cond; + CONDITION_VARIABLE request_done_cond; + CONDITION_VARIABLE thread_exit_cond; +#else + pthread_mutex_t lock; + pthread_cond_t request_cond; + pthread_cond_t request_done_cond; + pthread_cond_t thread_exit_cond; +#endif +}; + +/* Runs without GVL: the proxy waits for a callback request */ +static void *proxy_wait_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + pthread_cond_wait(&proxy->request_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif + + return NULL; +} + +/* Unblock function for the proxy thread (VM shutdown or Thread#kill) */ +static void proxy_stop_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* The proxy thread main loop (Ruby thread) */ +static VALUE proxy_thread_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + + while (!proxy->stop_requested) { + /* Release the GVL and wait for a request */ + rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy); + + if (proxy->stop_requested) break; + + if (proxy->has_request) { + /* Execute the callback with the GVL held */ + proxy->callback_func(proxy->callback_data); + + /* Signal completion to the DuckDB worker thread */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + WakeConditionVariable(&proxy->request_done_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + pthread_cond_signal(&proxy->request_done_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + } + } + + /* Stop being GC-protected now that we are about to exit */ + if (g_proxy_threads != Qnil) { + rb_ary_delete(g_proxy_threads, proxy->ruby_thread); + } + + /* + * Signal that this thread has finished and no longer touches the proxy + * struct. Only after this may rbduckdb_worker_proxy_destroy free it. + */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->thread_exited = 1; + WakeConditionVariable(&proxy->thread_exit_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->thread_exited = 1; + pthread_cond_signal(&proxy->thread_exit_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + + return Qnil; +} + +struct worker_proxy *rbduckdb_worker_proxy_create(void) { + /* + * Use calloc (not xcalloc): rbduckdb_worker_proxy_destroy frees the struct + * from a non-Ruby thread where xfree is unsafe. + */ + struct worker_proxy *proxy = calloc(1, sizeof(struct worker_proxy)); + if (proxy == NULL) { + rb_raise(rb_eNoMemError, "failed to allocate worker_proxy"); + } + + proxy->stop_requested = 0; + proxy->has_request = 0; + proxy->request_done = 0; + proxy->thread_exited = 0; + +#ifdef _MSC_VER + InitializeCriticalSection(&proxy->lock); + InitializeConditionVariable(&proxy->request_cond); + InitializeConditionVariable(&proxy->request_done_cond); + InitializeConditionVariable(&proxy->thread_exit_cond); +#else + pthread_mutex_init(&proxy->lock, NULL); + pthread_cond_init(&proxy->request_cond, NULL); + pthread_cond_init(&proxy->request_done_cond, NULL); + pthread_cond_init(&proxy->thread_exit_cond, NULL); +#endif + + proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); + + if (g_proxy_threads != Qnil) { + rb_ary_push(g_proxy_threads, proxy->ruby_thread); + } + + return proxy; +} + +/* + * Hand a callback to a proxy and block until it completes. + * Called from the DuckDB worker thread (non-Ruby thread) that owns this proxy. + */ +static void dispatch_callback_to_proxy(struct worker_proxy *proxy, rbduckdb_function_callback_t cb, void *user_data) { +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->callback_func = cb; + proxy->callback_data = user_data; + proxy->request_done = 0; + proxy->has_request = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + EnterCriticalSection(&proxy->lock); + while (!proxy->request_done) { + SleepConditionVariableCS(&proxy->request_done_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->callback_func = cb; + proxy->callback_data = user_data; + proxy->request_done = 0; + proxy->has_request = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + pthread_mutex_lock(&proxy->lock); + while (!proxy->request_done) { + pthread_cond_wait(&proxy->request_done_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif +} + +void rbduckdb_worker_proxy_destroy(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + if (proxy == NULL) return; + + /* Ask the proxy thread to stop, then wait until it has fully exited */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + EnterCriticalSection(&proxy->lock); + while (!proxy->thread_exited) { + SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); + + DeleteCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + pthread_mutex_lock(&proxy->lock); + while (!proxy->thread_exited) { + pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); + + pthread_cond_destroy(&proxy->thread_exit_cond); + pthread_cond_destroy(&proxy->request_done_cond); + pthread_cond_destroy(&proxy->request_cond); + pthread_mutex_destroy(&proxy->lock); +#endif + + free(proxy); +} + +void rbduckdb_function_executor_dispatch_via_proxy(rbduckdb_function_callback_t cb, void *user_data, struct worker_proxy *proxy) { if (ruby_native_thread_p()) { if (ruby_thread_has_gvl_p()) { /* Case 1: Ruby thread with GVL - call directly */ @@ -254,8 +504,15 @@ void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void * arg.user_data = user_data; rb_thread_call_with_gvl(callback_with_gvl, &arg); } + } else if (proxy != NULL) { + /* Case 3a: Non-Ruby thread with a per-worker proxy */ + dispatch_callback_to_proxy(proxy, cb, user_data); } else { - /* Case 3: Non-Ruby thread - dispatch to executor */ + /* Case 3b: Non-Ruby thread - dispatch to the global executor */ dispatch_callback_to_executor(cb, user_data); } } + +void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data) { + rbduckdb_function_executor_dispatch_via_proxy(cb, user_data, NULL); +} diff --git a/ext/duckdb/function_executor.h b/ext/duckdb/function_executor.h index 0b04297d..7404f169 100644 --- a/ext/duckdb/function_executor.h +++ b/ext/duckdb/function_executor.h @@ -43,4 +43,43 @@ void rbduckdb_function_executor_ensure_started(void); */ void rbduckdb_function_executor_dispatch(rbduckdb_function_callback_t cb, void *user_data); +/* + * ============================================================================ + * Per-worker proxy threads (DuckDB >= 1.5.0) + * ============================================================================ + * + * The global executor above serializes every non-Ruby-thread callback through + * a single Ruby thread. A per-worker proxy instead gives each DuckDB worker + * thread its own dedicated Ruby thread, so callbacks from different workers can + * run concurrently — they compete for the GVL in round-robin fashion, which + * helps when callbacks release the GVL (e.g. on I/O). + * + * Proxies are created lazily from DuckDB's per-worker init hook and stored in + * DuckDB's thread-local state; the global executor remains the fallback. + */ + +/* Opaque per-worker proxy handle. */ +struct worker_proxy; + +/* + * Create a per-worker proxy thread. Must be called with the GVL held + * (typically by dispatching this through the global executor from a per-worker + * init callback, which itself runs on a non-Ruby thread). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void); + +/* + * Destroy a per-worker proxy. The signature matches duckdb_delete_callback_t so + * it can be handed directly to DuckDB. Safe to call from a non-Ruby thread: it + * touches only OS primitives and frees memory allocated with calloc. + */ +void rbduckdb_worker_proxy_destroy(void *proxy); + +/* + * Like rbduckdb_function_executor_dispatch, but on the non-Ruby-thread path + * (Case 3) it routes through the given per-worker proxy when non-NULL, falling + * back to the global executor when NULL. Cases 1 and 2 are unchanged. + */ +void rbduckdb_function_executor_dispatch_via_proxy(rbduckdb_function_callback_t cb, void *user_data, struct worker_proxy *proxy); + #endif From 188ca01e9098821440b6f4c0c27a131e4a37bb4f Mon Sep 17 00:00:00 2001 From: otegami Date: Sun, 31 May 2026 12:45:55 +0800 Subject: [PATCH 2/3] fix: release the GVL while joining a proxy thread on destroy rbduckdb_worker_proxy_destroy waits for the proxy thread to exit, but the proxy thread must run Ruby code (removing itself from the GC-protection array) before exiting, which needs the GVL. DuckDB may invoke this destructor either from a worker thread (no GVL) or from a Ruby thread that holds the GVL, depending on when it tears down the local state. In the latter case waiting with the GVL held would deadlock: the proxy thread can never acquire the GVL to finish. Split the wait into proxy_join_func and run it via rb_thread_call_without_gvl when the caller holds the GVL; wait directly otherwise. The proxy is still dead code at this point (no integration yet), so behavior is unchanged. --- ext/duckdb/function_executor.c | 52 +++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/ext/duckdb/function_executor.c b/ext/duckdb/function_executor.c index 425f9a3f..2840bee6 100644 --- a/ext/duckdb/function_executor.c +++ b/ext/duckdb/function_executor.c @@ -453,36 +453,62 @@ static void dispatch_callback_to_proxy(struct worker_proxy *proxy, rbduckdb_func #endif } -void rbduckdb_worker_proxy_destroy(void *data) { +/* Blocks until the proxy thread has fully exited. Runs without the GVL. */ +static void *proxy_join_func(void *data) { struct worker_proxy *proxy = (struct worker_proxy *)data; - if (proxy == NULL) return; - /* Ask the proxy thread to stop, then wait until it has fully exited */ #ifdef _MSC_VER - EnterCriticalSection(&proxy->lock); - proxy->stop_requested = 1; - WakeConditionVariable(&proxy->request_cond); - LeaveCriticalSection(&proxy->lock); - EnterCriticalSection(&proxy->lock); while (!proxy->thread_exited) { SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE); } LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + while (!proxy->thread_exited) { + pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif - DeleteCriticalSection(&proxy->lock); + return NULL; +} + +void rbduckdb_worker_proxy_destroy(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + if (proxy == NULL) return; + + /* Ask the proxy thread to stop. */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); proxy->stop_requested = 1; pthread_cond_signal(&proxy->request_cond); pthread_mutex_unlock(&proxy->lock); +#endif - pthread_mutex_lock(&proxy->lock); - while (!proxy->thread_exited) { - pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); + /* + * Wait until the proxy thread has fully exited. Before exiting it runs Ruby + * code (removing itself from the GC-protection array), which needs the GVL. + * DuckDB may invoke this destructor either from a worker thread (no GVL) or + * — depending on when it tears down the local state — from a Ruby thread + * that holds the GVL. In the latter case we must release the GVL while + * waiting, or the proxy thread could never acquire it and we would deadlock. + */ + if (ruby_native_thread_p() && ruby_thread_has_gvl_p()) { + rb_thread_call_without_gvl(proxy_join_func, proxy, NULL, NULL); + } else { + proxy_join_func(proxy); } - pthread_mutex_unlock(&proxy->lock); + /* The proxy thread is gone; tear down OS primitives and free the struct. */ +#ifdef _MSC_VER + DeleteCriticalSection(&proxy->lock); +#else pthread_cond_destroy(&proxy->thread_exit_cond); pthread_cond_destroy(&proxy->request_done_cond); pthread_cond_destroy(&proxy->request_cond); From 7f65ec72fb340642efd3bc84fe77eccad35f9d47 Mon Sep 17 00:00:00 2001 From: otegami Date: Sat, 30 May 2026 11:53:12 +0800 Subject: [PATCH 3/3] feat: per-worker proxy for table function execute callback Wire the execute path to per-worker proxy threads on DuckDB >= 1.5.0. A local_init callback registered via duckdb_table_function_set_local_init runs once per worker thread, creates a proxy (allocating its Ruby thread under the GVL through the global executor, since local_init runs on a non-Ruby thread), and stores it as thread-local init data. The execute callback retrieves that proxy and dispatches through it via rbduckdb_function_executor_dispatch_via_proxy, so callbacks from different workers run concurrently instead of serializing on the single global executor. DuckDB frees each proxy through rbduckdb_worker_proxy_destroy. bind and init stay on the global executor (not on the hot path). On DuckDB < 1.5.0 the local_init hook is absent and the execute callback keeps using the global executor unchanged. Verified: with SET threads=4 plus cardinality/max_threads hints, a GVL-releasing callback reaches max_concurrent=4 (vs 2 on the global executor) for a ~2x speedup; results are identical. The added test asserts correctness of the local_init -> proxy -> destroy lifecycle under multi-threaded execution (throughput is checked manually to avoid CI flakiness). --- ext/duckdb/table_function.c | 51 ++++++++++++++++++++- test/duckdb_test/table_function_test.rb | 60 +++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index c5273a15..ba9559ba 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -21,6 +21,11 @@ static VALUE rbduckdb_table_function_set_init(VALUE self); static void table_function_init_callback(duckdb_init_info info); static VALUE rbduckdb_table_function_set_execute(VALUE self); static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* Thread detection (declared in function_executor.c); used to skip the proxy on Ruby threads. */ +extern int ruby_native_thread_p(void); +static void table_function_local_init_callback(duckdb_init_info info); +#endif static const rb_data_type_t table_function_data_type = { "DuckDB/TableFunction", @@ -358,6 +363,10 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Per-worker proxy threads for the execute path (DuckDB >= 1.5.0). */ + duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback); +#endif rbduckdb_function_executor_ensure_started(); @@ -405,6 +414,7 @@ static void execute_execute_callback_protected(void *user_data) { static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { rubyDuckDBTableFunction *ctx; struct execute_dispatch_arg darg; + struct worker_proxy *proxy = NULL; ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); if (!ctx || ctx->execute_proc == Qnil) return; @@ -413,9 +423,48 @@ static void table_function_execute_callback(duckdb_function_info info, duckdb_da darg.info = info; darg.output = output; - rbduckdb_function_executor_dispatch(execute_execute_callback_protected, &darg); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* On DuckDB >= 1.5.0 each worker thread carries its own proxy (see local_init). */ + proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info); +#endif + rbduckdb_function_executor_dispatch_via_proxy(execute_execute_callback_protected, &darg, proxy); } +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* + * Per-worker init for the execute path (DuckDB >= 1.5.0). + * + * DuckDB calls this once on each worker thread that will run the execute + * callback. We create a per-worker proxy (allocating its Ruby thread under the + * GVL via the global executor, since this runs on a non-Ruby thread) and store + * it as thread-local init data. The execute callback then dispatches through it + * instead of the shared global executor, so workers run callbacks concurrently. + * DuckDB invokes rbduckdb_worker_proxy_destroy when the local state is freed. + */ +struct table_proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void table_create_proxy_callback(void *user_data) { + struct table_proxy_create_arg *arg = (struct table_proxy_create_arg *)user_data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void table_function_local_init_callback(duckdb_init_info info) { + struct table_proxy_create_arg arg; + + /* A Ruby calling thread runs the callback inline (Case 1/2); no proxy needed. */ + if (ruby_native_thread_p()) return; + + arg.proxy = NULL; + rbduckdb_function_executor_dispatch(table_create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy); + } +} +#endif + rubyDuckDBTableFunction *get_struct_table_function(VALUE self) { rubyDuckDBTableFunction *ctx; TypedData_Get_Struct(self, rubyDuckDBTableFunction, &table_function_data_type, ctx); diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index 102b542d..7303ca84 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -199,6 +199,66 @@ def test_symbol_columns db.close end + # Per-worker proxy (GH-1136): exercises the local_init -> proxy -> destroy + # lifecycle under real multi-threaded execution (SET threads=4). We assert + # correctness rather than a concurrency count: the proxy path must neither + # deadlock nor corrupt results when DuckDB distributes the scan across + # workers. Parallel *throughput* is verified separately by + # letters/issue-1136/table_function_parallel_check.rb (kept out of CI to + # avoid scheduler-dependent flakiness). Requires DuckDB >= 1.5.0 + # (duckdb_table_function_set_local_init). + def test_execute_runs_correctly_under_multiple_threads + if ::DuckDBTest.duckdb_library_version < Gem::Version.new('1.5.0') + skip 'per-worker proxy requires DuckDB >= 1.5.0' + end + + chunks = 64 + rows_per_chunk = 100 + remaining = chunks + mutex = Mutex.new + + db = DuckDB::Database.open + conn = db.connect + conn.execute('SET threads=4') + + tf = DuckDB::TableFunction.new + tf.name = 'parallel_emitter' + tf.bind do |bind_info| + bind_info.add_result_column('v', DuckDB::LogicalType::BIGINT) + # Tell the planner there is real work so it distributes across workers. + bind_info.set_cardinality(chunks * rows_per_chunk, false) + end + tf.init do |init_info| + # Without this DuckDB assigns a single worker and the proxy never fires. + init_info.max_threads = 4 + end + tf.execute do |_info, output| + has_work = mutex.synchronize do + next false if remaining.zero? + + remaining -= 1 + true + end + + unless has_work + output.size = 0 + next + end + + rows_per_chunk.times { |i| output.set_value(0, i, 1) } + output.size = rows_per_chunk + sleep 0.001 # release the GVL so workers can overlap + end + + conn.register_table_function(tf) + result = conn.query('SELECT COUNT(*), SUM(v) FROM parallel_emitter()').each.to_a + + assert_equal [chunks * rows_per_chunk, chunks * rows_per_chunk], result.first + + conn.disconnect + db.close + end + private def setup_incomplete_function