Skip to content

Commit b90ac4d

Browse files
committed
address review
1 parent 8098145 commit b90ac4d

6 files changed

Lines changed: 67 additions & 57 deletions

File tree

src/VecSim/algorithms/svs/svs.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@ struct SVSIndexBase
3838
SVSIndexBase() : num_marked_deleted{0} {};
3939
virtual ~SVSIndexBase() = default;
4040

41-
// Singleton accessor for the shared SVS thread pool.
42-
// Always valid — initialized with size 1 (write-in-place mode: 0 worker threads,
43-
// only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls.
44-
static std::shared_ptr<VecSimSVSThreadPoolImpl> getSharedThreadPool() {
45-
static auto shared_pool = std::make_shared<VecSimSVSThreadPoolImpl>(1);
46-
return shared_pool;
47-
}
4841
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
4942
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
5043
virtual bool isLabelExists(labelType label) const = 0;
@@ -368,8 +361,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
368361
leanvec_dim{
369362
svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)},
370363
epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)},
371-
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)},
372-
threadpool_{SVSIndexBase::getSharedThreadPool(), this->logCallbackCtx}, impl_{nullptr} {
364+
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, threadpool_{this->logCallbackCtx},
365+
impl_{nullptr} {
373366
logger_ = makeLogger();
374367
if (params.num_threads != 0) {
375368
this->log(VecSimCommonStrings::LOG_WARNING_STRING,

src/VecSim/algorithms/svs/svs_utils.h

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -403,11 +403,11 @@ class RentedThreads {
403403
// * Multiple callers can rent disjoint subsets of threads concurrently
404404
// * Shrinking while threads are rented is safe (shared_ptr lifecycle)
405405
class VecSimSVSThreadPoolImpl {
406-
public:
407406
// Create a pool with `num_threads` total parallelism (including the calling thread).
408407
// Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1.
409408
// In write-in-place mode, the pool is created with num_threads == 1 (0 worker threads,
410409
// only the calling thread participates).
410+
// Private — use instance() to access the shared singleton.
411411
explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) {
412412
assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads");
413413
slots_.reserve(num_threads - 1);
@@ -416,6 +416,16 @@ class VecSimSVSThreadPoolImpl {
416416
}
417417
}
418418

419+
public:
420+
// Singleton accessor for the shared SVS thread pool.
421+
// Always valid — initialized with size 1 (write-in-place mode: 0 worker threads,
422+
// only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls.
423+
static std::shared_ptr<VecSimSVSThreadPoolImpl> instance() {
424+
static auto shared_pool =
425+
std::shared_ptr<VecSimSVSThreadPoolImpl>(new VecSimSVSThreadPoolImpl(1));
426+
return shared_pool;
427+
}
428+
419429
// Total parallelism: worker slots + 1 (the calling thread always participates).
420430
size_t size() const {
421431
std::lock_guard lock{pool_mutex_};
@@ -468,10 +478,9 @@ class VecSimSVSThreadPoolImpl {
468478

469479
// Rent n-1 worker threads
470480
auto rented = rent(n - 1, log_ctx);
471-
size_t num_workers = rented.count();
472481

473482
// Assign work to rented workers (partitions 1..n-1)
474-
for (size_t i = 0; i < num_workers; ++i) {
483+
for (size_t i = 0; i < rented.count(); ++i) {
475484
rented[i].assign({&f, i + 1});
476485
}
477486

@@ -485,7 +494,7 @@ class VecSimSVSThreadPoolImpl {
485494

486495
// Wait for all rented workers and collect errors.
487496
// RentedThreads destructor will release the slots after this block.
488-
manage_workers_after_run(main_thread_error, rented, num_workers);
497+
manage_workers_after_run(main_thread_error, rented);
489498
}
490499

491500
private:
@@ -526,8 +535,7 @@ class VecSimSVSThreadPoolImpl {
526535

527536
// Wait for all rented workers to finish. If any worker (or the main thread) threw,
528537
// restart crashed workers and throw a combined exception.
529-
void manage_workers_after_run(const std::string &main_thread_error, RentedThreads &rented,
530-
size_t rented_count) {
538+
void manage_workers_after_run(const std::string &main_thread_error, RentedThreads &rented) {
531539
auto message = std::string{};
532540
auto inserter = std::back_inserter(message);
533541
bool has_error = !main_thread_error.empty();
@@ -536,7 +544,7 @@ class VecSimSVSThreadPoolImpl {
536544
fmt::format_to(inserter, "Thread 0: {}\n", main_thread_error);
537545
}
538546

539-
for (size_t i = 0; i < rented_count; ++i) {
547+
for (size_t i = 0; i < rented.count(); ++i) {
540548
auto &thread = rented[i];
541549
thread.wait();
542550
if (!thread.is_okay()) {
@@ -582,16 +590,16 @@ class VecSimSVSThreadPool {
582590
void *log_ctx_ = nullptr; // per-index log context
583591

584592
public:
585-
// Construct with reference to the shared pool singleton.
593+
// Construct using the shared pool singleton.
586594
// parallelism_ starts at 1 (the calling thread always participates), matching the
587595
// pool's minimum size. Safe for immediate use in write-in-place mode without an
588596
// explicit setParallelism() call.
589-
explicit VecSimSVSThreadPool(std::shared_ptr<VecSimSVSThreadPoolImpl> pool,
590-
void *log_ctx = nullptr)
591-
: pool_(std::move(pool)), parallelism_(std::make_shared<std::atomic<size_t>>(1)),
592-
log_ctx_(log_ctx) {
593-
assert(pool_ && "Pool must not be null");
594-
}
597+
explicit VecSimSVSThreadPool(void *log_ctx = nullptr)
598+
: pool_(VecSimSVSThreadPoolImpl::instance()),
599+
parallelism_(std::make_shared<std::atomic<size_t>>(1)), log_ctx_(log_ctx) {}
600+
601+
// Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance().
602+
static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); }
595603

596604
// Set the degree of parallelism for this index's next operation.
597605
// n must be the number of threads actually reserved by the caller (i.e., the

src/VecSim/vec_sim.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
#include "VecSim/vec_sim_index.h"
1616
#include "VecSim/vec_sim_adhoc_bf_ctx.h"
1717
#include "VecSim/types/bfloat16.h"
18-
#include "VecSim/algorithms/svs/svs.h"
18+
#include "VecSim/algorithms/svs/svs_utils.h"
1919
#include <cassert>
2020
#include "memory.h"
2121

@@ -44,7 +44,7 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) {
4444
// Resize the shared SVS thread pool. resize() clamps to minimum size 1.
4545
// new_size == 0 → pool size 1 (only calling thread, write-in-place).
4646
// new_size > 0 → pool size new_size (new_size - 1 worker threads).
47-
SVSIndexBase::getSharedThreadPool()->resize(std::max(new_size, size_t{1}));
47+
VecSimSVSThreadPool::resize(std::max(new_size, size_t{1}));
4848
}
4949

5050
static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam,

tests/unit/test_svs.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3305,9 +3305,8 @@ TEST(SVSTest, compute_distance) {
33053305
// ---------------------------------------------------------------------------
33063306
TEST(SVSTest, NumThreadsParamIgnored) {
33073307
// Resize the shared singleton pool to a known size.
3308-
auto pool = SVSIndexBase::getSharedThreadPool();
3309-
pool->resize(2);
3310-
ASSERT_EQ(pool->size(), 2);
3308+
VecSimSVSThreadPool::resize(2);
3309+
ASSERT_EQ(VecSimSVSThreadPoolImpl::instance()->size(), 2);
33113310

33123311
// Capture warning logs emitted by VecSim.
33133312
std::string captured_log;
@@ -3330,7 +3329,7 @@ TEST(SVSTest, NumThreadsParamIgnored) {
33303329
ASSERT_NE(index, nullptr);
33313330

33323331
// The shared pool size must remain at 2 — num_threads was ignored.
3333-
ASSERT_EQ(pool->size(), 2);
3332+
ASSERT_EQ(VecSimSVSThreadPoolImpl::instance()->size(), 2);
33343333

33353334
// The index reports the shared pool size, not the deprecated param value.
33363335
VecSimIndexDebugInfo info = VecSimIndex_DebugInfo(index);
@@ -3359,7 +3358,7 @@ TEST(SVSTest, NumThreadsParamIgnored) {
33593358
VecSimIndex_Free(index2);
33603359

33613360
// Restore pool to default size and clear log callback.
3362-
pool->resize(1);
3361+
VecSimSVSThreadPool::resize(1);
33633362
VecSimIndexInterface::logCallback = nullptr;
33643363
}
33653364

tests/unit/test_svs_threadpool.cpp

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ class SVSThreadPoolTest : public ::testing::Test {
4545
saved_callback_ = VecSimIndexInterface::logCallback;
4646
VecSimIndexInterface::logCallback = nullptr;
4747
}
48-
void TearDown() override { VecSimIndexInterface::logCallback = saved_callback_; }
48+
void TearDown() override {
49+
// Reset the shared singleton pool to size 1 so tests don't leak state.
50+
VecSimSVSThreadPool::resize(1);
51+
VecSimIndexInterface::logCallback = saved_callback_;
52+
}
4953

5054
private:
5155
logCallbackFunction saved_callback_ = nullptr;
@@ -55,7 +59,7 @@ class SVSThreadPoolTest : public ::testing::Test {
5559
// Test 1: VecSimSVSThreadPoolImpl::resize — grow and shrink
5660
// ---------------------------------------------------------------------------
5761
TEST_F(SVSThreadPoolTest, ResizeGrowAndShrink) {
58-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(1);
62+
auto pool = VecSimSVSThreadPoolImpl::instance();
5963
ASSERT_EQ(pool->size(), 1);
6064

6165
// Grow 1 → 4
@@ -91,11 +95,12 @@ TEST_F(SVSThreadPoolTest, ResizeGrowAndShrink) {
9195
// ---------------------------------------------------------------------------
9296
TEST_F(SVSThreadPoolTest, ShrinkWhileRented) {
9397
// Pool size 5: 4 worker slots [s0, s1, s2, s3].
94-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(5);
98+
auto pool = VecSimSVSThreadPoolImpl::instance();
99+
pool->resize(5);
95100
ASSERT_EQ(pool->size(), 5);
96101

97102
// Wrapper A uses parallelism 3 → rents 2 workers (s0, s1).
98-
VecSimSVSThreadPool wrapperA(pool);
103+
VecSimSVSThreadPool wrapperA;
99104
wrapperA.setParallelism(3);
100105

101106
std::latch hold(1); // blocks rented workers
@@ -133,7 +138,7 @@ TEST_F(SVSThreadPoolTest, ShrinkWhileRented) {
133138

134139
// While wrapperA's threads are still alive (blocked on latch), run
135140
// parallel_for on the shrunk pool with a second wrapper using a free slot.
136-
VecSimSVSThreadPool wrapperB(pool);
141+
VecSimSVSThreadPool wrapperB;
137142
// Parallelism 2 = 1 rented worker + calling thread. The pool has 3 slots
138143
// [s0, s1, s2] after shrink; s0 and s1 are occupied by wrapperA, so the
139144
// single rented worker will get s2 (the only free slot).
@@ -158,11 +163,12 @@ TEST_F(SVSThreadPoolTest, ShrinkWhileRented) {
158163
// ---------------------------------------------------------------------------
159164
TEST_F(SVSThreadPoolTest, GrowWhileRented) {
160165
// Pool size 3: 2 worker slots [s0, s1].
161-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(3);
166+
auto pool = VecSimSVSThreadPoolImpl::instance();
167+
pool->resize(3);
162168
ASSERT_EQ(pool->size(), 3);
163169

164170
// Wrapper A uses parallelism 3 → rents 2 workers (s0, s1).
165-
VecSimSVSThreadPool wrapperA(pool);
171+
VecSimSVSThreadPool wrapperA;
166172
wrapperA.setParallelism(3);
167173

168174
std::latch hold(1); // blocks rented workers
@@ -198,7 +204,7 @@ TEST_F(SVSThreadPoolTest, GrowWhileRented) {
198204
// Wrapper B uses parallelism 3 → rents 2 workers. s0, s1 are occupied by
199205
// wrapperA, so it gets the 2 newly created slots s2, s3... but we only
200206
// need 2 of the 3 free slots (s2, s3 are free, only need 2).
201-
VecSimSVSThreadPool wrapperB(pool);
207+
VecSimSVSThreadPool wrapperB;
202208
wrapperB.setParallelism(3);
203209
std::atomic_int resultB{0};
204210
wrapperB.parallel_for([&](size_t) { resultB++; }, 3);
@@ -227,9 +233,9 @@ TEST_F(SVSThreadPoolTest, GrowWhileRented) {
227233
// the original is visible to SVS's copy when it calls size().
228234
// ---------------------------------------------------------------------------
229235
TEST_F(SVSThreadPoolTest, ParallelismPropagationAcrossCopies) {
230-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(8);
236+
VecSimSVSThreadPool::resize(8);
231237

232-
VecSimSVSThreadPool original(pool);
238+
VecSimSVSThreadPool original;
233239
original.setParallelism(2);
234240
ASSERT_EQ(original.size(), 2);
235241

@@ -259,10 +265,10 @@ TEST_F(SVSThreadPoolTest, ParallelismPropagationAcrossCopies) {
259265
// sequentially using disjoint thread budgets.
260266
// ---------------------------------------------------------------------------
261267
TEST_F(SVSThreadPoolTest, TwoIndexesIndependentParallelism) {
262-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(8);
268+
VecSimSVSThreadPool::resize(8);
263269

264-
VecSimSVSThreadPool wrapperA(pool);
265-
VecSimSVSThreadPool wrapperB(pool);
270+
VecSimSVSThreadPool wrapperA;
271+
VecSimSVSThreadPool wrapperB;
266272

267273
wrapperA.setParallelism(2);
268274
wrapperB.setParallelism(5);
@@ -306,7 +312,7 @@ TEST_F(SVSThreadPoolTest, TwoIndexesIndependentParallelism) {
306312
// The C API sets write mode and resizes the shared singleton pool.
307313
// ---------------------------------------------------------------------------
308314
TEST_F(SVSThreadPoolTest, UpdateThreadPoolSizeModeTransitions) {
309-
auto pool = SVSIndexBase::getSharedThreadPool();
315+
auto pool = VecSimSVSThreadPoolImpl::instance();
310316

311317
// 0 → 4: switch to async mode, pool resizes to 4.
312318
VecSim_UpdateThreadPoolSize(4);
@@ -345,11 +351,11 @@ TEST_F(SVSThreadPoolTest, UpdateThreadPoolSizeModeTransitions) {
345351
// ---------------------------------------------------------------------------
346352
TEST_F(SVSThreadPoolTest, ConcurrentRentalFromTwoIndexes) {
347353
// Pool size 8: wrappers A (4) and B (4) sum to exactly 8.
348-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(8);
354+
VecSimSVSThreadPool::resize(8);
349355

350-
VecSimSVSThreadPool wrapperA(pool);
356+
VecSimSVSThreadPool wrapperA;
351357
wrapperA.setParallelism(4);
352-
VecSimSVSThreadPool wrapperB(pool);
358+
VecSimSVSThreadPool wrapperB;
353359
wrapperB.setParallelism(4);
354360

355361
std::atomic_int resultA{0};
@@ -398,7 +404,7 @@ TEST_F(SVSThreadPoolTest, ConcurrentRentalFromTwoIndexes) {
398404
<< "Timed out waiting for concurrent parallel_for calls. "
399405
"resultA="
400406
<< resultA << ", resultB=" << resultB << ", idsA.size=" << idsA.size()
401-
<< ", idsB.size=" << idsB.size() << ", pool_size=" << pool->size();
407+
<< ", idsB.size=" << idsB.size() << ", pool_size=" << wrapperA.poolSize();
402408

403409
ASSERT_EQ(resultA, 4);
404410
ASSERT_EQ(resultB, 4);
@@ -424,9 +430,9 @@ TEST_F(SVSThreadPoolTest, ConcurrentRentalFromTwoIndexes) {
424430
// ---------------------------------------------------------------------------
425431
TEST_F(SVSThreadPoolTest, AllThreadsOccupied) {
426432
// Pool size 4 (3 worker slots). Wrapper A rents all 3.
427-
auto pool = std::make_shared<VecSimSVSThreadPoolImpl>(4);
433+
VecSimSVSThreadPool::resize(4);
428434

429-
VecSimSVSThreadPool wrapperA(pool);
435+
VecSimSVSThreadPool wrapperA;
430436
wrapperA.setParallelism(4);
431437

432438
std::latch hold(1);
@@ -449,10 +455,10 @@ TEST_F(SVSThreadPoolTest, AllThreadsOccupied) {
449455
ASSERT_TRUE(wait_with_timeout(workers_ready, kTestTimeout))
450456
<< "Timed out waiting for wrapper A's 3 workers to start. "
451457
"resultA="
452-
<< resultA << ", pool_size=" << pool->size();
458+
<< resultA << ", pool_size=" << wrapperA.poolSize();
453459

454460
// All 3 worker slots are occupied. Wrapper B tries to rent 1 worker.
455-
VecSimSVSThreadPool wrapperB(pool);
461+
VecSimSVSThreadPool wrapperB;
456462
wrapperB.setParallelism(2);
457463

458464
#ifdef NDEBUG

tests/unit/test_svs_tiered.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3801,8 +3801,8 @@ TYPED_TEST(SVSTieredIndexTestBasic, testDeletedJournalMulti) {
38013801
TEST(SVSTieredIndexTest, testThreadPool) {
38023802
// Test VecSimSVSThreadPool with shared pool
38033803
const size_t num_threads = 4;
3804-
auto shared_pool = std::make_shared<VecSimSVSThreadPoolImpl>(num_threads);
3805-
auto pool = VecSimSVSThreadPool(shared_pool);
3804+
VecSimSVSThreadPool::resize(num_threads);
3805+
VecSimSVSThreadPool pool;
38063806
ASSERT_EQ(pool.poolSize(), num_threads);
38073807
ASSERT_EQ(pool.size(), 1); // parallelism starts at 1 (calling thread)
38083808
ASSERT_EQ(pool.getParallelism(), 1);
@@ -3846,8 +3846,8 @@ TEST(SVSTieredIndexTest, testThreadPool) {
38463846
#endif
38473847

38483848
// Test write-in-place mode (pool with size 1)
3849-
auto inplace_shared = std::make_shared<VecSimSVSThreadPoolImpl>(1);
3850-
auto inplace_pool = VecSimSVSThreadPool(inplace_shared);
3849+
VecSimSVSThreadPool::resize(1);
3850+
VecSimSVSThreadPool inplace_pool;
38513851
inplace_pool.setParallelism(1);
38523852
ASSERT_EQ(inplace_pool.size(), 1);
38533853
ASSERT_EQ(inplace_pool.poolSize(), 1);
@@ -3856,7 +3856,8 @@ TEST(SVSTieredIndexTest, testThreadPool) {
38563856
ASSERT_EQ(counter, 1);
38573857

38583858
// parallel_for works immediately with default parallelism 1
3859-
auto default_pool = VecSimSVSThreadPool(shared_pool);
3859+
VecSimSVSThreadPool::resize(num_threads);
3860+
VecSimSVSThreadPool default_pool;
38603861
counter = 0;
38613862
default_pool.parallel_for(task, 1);
38623863
ASSERT_EQ(counter, 1); // 0+1 = 1
@@ -3868,6 +3869,9 @@ TEST(SVSTieredIndexTest, testThreadPool) {
38683869
pool.setParallelism(num_threads);
38693870
ASSERT_THROW(pool.parallel_for(err_task, 1), svs::threads::ThreadingException);
38703871
ASSERT_THROW(pool.parallel_for(err_task, num_threads), svs::threads::ThreadingException);
3872+
3873+
// Restore pool to default size so we don't leak state to other tests.
3874+
VecSimSVSThreadPool::resize(1);
38713875
}
38723876

38733877
#else // HAVE_SVS

0 commit comments

Comments
 (0)