-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathsvs.h
More file actions
762 lines (642 loc) · 32.3 KB
/
svs.h
File metadata and controls
762 lines (642 loc) · 32.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
/*
* Copyright (c) 2006-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#pragma once
#include "VecSim/vec_sim_index.h"
#include "VecSim/query_result_definitions.h"
#include "VecSim/utils/vec_utils.h"
#include <cmath>
#include <memory>
#include <cassert>
#include <limits>
#include <vector>
#include "svs/index/vamana/dynamic_index.h"
#include "svs/index/vamana/multi.h"
#include "spdlog/sinks/callback_sink.h"
#include "VecSim/algorithms/svs/svs_utils.h"
#include "VecSim/algorithms/svs/svs_batch_iterator.h"
#include "VecSim/algorithms/svs/svs_extensions.h"
#ifdef BUILD_TESTS
#include "svs_serializer.h"
#endif
struct SVSIndexBase
#ifdef BUILD_TESTS
: public SVSSerializer
#endif
{
SVSIndexBase() : num_marked_deleted{0} {};
virtual ~SVSIndexBase() = default;
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }
#ifdef BUILD_TESTS
virtual svs::logging::logger_ptr getLogger() const = 0;
#endif
protected:
// Index marked deleted vectors counter to initiate reindexing if it exceeds threshold
// markIndexUpdate() manages this counter
size_t num_marked_deleted;
};
/** Thread Management Strategy:
* - addVector(): Requires numThreads == 1
* - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1
* - Callers are responsible for setting appropriate thread counts
**/
template <typename MetricType, typename DataType, bool isMulti, size_t QuantBits,
size_t ResidualBits, bool IsLeanVec>
class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, float>,
public SVSIndexBase {
protected:
using data_type = DataType;
using distance_f = MetricType;
using Base = VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, float>;
using index_component_t = IndexComponents<svs_details::vecsim_dt<DataType>, float>;
using storage_traits_t = SVSStorageTraits<DataType, QuantBits, ResidualBits, IsLeanVec>;
using index_storage_type = typename storage_traits_t::index_storage_type;
using graph_builder_t = SVSGraphBuilder<uint32_t>;
using graph_type = typename graph_builder_t::graph_type;
using impl_type = std::conditional_t<
isMulti,
svs::index::vamana::MultiMutableVamanaIndex<graph_type, index_storage_type, distance_f>,
svs::index::vamana::MutableVamanaIndex<graph_type, index_storage_type, distance_f>>;
bool forcePreprocessing;
// Index build parameters
svs::index::vamana::VamanaBuildParameters buildParams;
// Index search parameters
size_t search_window_size;
size_t search_buffer_capacity;
// LeanVec dataset dimension
// This parameter allows to tune LeanVec dimension if LeanVec is enabled
size_t leanvec_dim;
double epsilon;
// Check if the dataset is Two-level LVQ
// This allows to tune default window capacity during search
bool is_two_level_lvq;
// SVS thread pool
VecSimSVSThreadPool threadpool_;
svs::logging::logger_ptr logger_;
// SVS Index implementation instance
std::unique_ptr<impl_type> impl_;
static double toVecSimDistance(float v) { return svs_details::toVecSimDistance<distance_f>(v); }
svs::logging::logger_ptr makeLogger() {
spdlog::custom_log_callback callback = [this](const spdlog::details::log_msg &msg) {
if (!VecSimIndexInterface::logCallback) {
return; // No callback function provided
}
// Custom callback implementation
const char *vecsim_level = [msg]() {
switch (msg.level) {
case spdlog::level::trace:
return VecSimCommonStrings::LOG_DEBUG_STRING;
case spdlog::level::debug:
return VecSimCommonStrings::LOG_VERBOSE_STRING;
case spdlog::level::info:
return VecSimCommonStrings::LOG_NOTICE_STRING;
case spdlog::level::warn:
case spdlog::level::err:
case spdlog::level::critical:
return VecSimCommonStrings::LOG_WARNING_STRING;
default:
return "UNKNOWN";
}
}();
std::string msg_str{msg.payload.data(), msg.payload.size()};
// Log the message using the custom callback
VecSimIndexInterface::logCallback(this->logCallbackCtx, vecsim_level, msg_str.c_str());
};
// Create a logger with the custom callback
auto sink = std::make_shared<spdlog::sinks::callback_sink_mt>(callback);
auto logger = std::make_shared<spdlog::logger>("SVSIndex", sink);
// Sink all messages to VecSim
logger->set_level(spdlog::level::trace);
return logger;
}
// Create SVS index instance with initial data
// Data should not be empty
template <svs::data::ImmutableMemoryDataset Dataset>
void initImpl(const Dataset &points, std::span<const labelType> ids) {
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}};
// Construct SVS index initial storage with compression if needed
auto data = storage_traits_t::create_storage(points, this->blockSize, threadpool_handle,
this->getAllocator(), this->leanvec_dim);
// Compute the entry point.
auto entry_point =
svs::index::vamana::extensions::compute_entry_point(data, threadpool_handle);
// Perform graph construction.
auto distance = distance_f{};
const auto ¶meters = this->buildParams;
// Construct initial Vamana Graph
auto graph =
graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point,
this->blockSize, this->getAllocator(), logger_);
// Create SVS MutableIndex instance
impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);
// Set SVS MutableIndex build parameters to be used in future updates
impl_->set_construction_window_size(parameters.window_size);
impl_->set_max_candidates(parameters.max_candidate_pool_size);
impl_->set_prune_to(parameters.prune_to);
impl_->set_alpha(parameters.alpha);
impl_->set_full_search_history(parameters.use_full_search_history);
// Configure default search parameters
auto sp = impl_->get_search_parameters();
sp.buffer_config({this->search_window_size, this->search_buffer_capacity});
impl_->set_search_parameters(sp);
impl_->reset_performance_parameters();
}
// Preprocess batch of vectors
MemoryUtils::unique_blob preprocessForBatchStorage(const void *original_data, size_t n) const {
// Buffer alignment isn't necessary for storage since SVS index will copy the data
if (!this->forcePreprocessing) {
return MemoryUtils::unique_blob{const_cast<void *>(original_data), [](void *) {}};
}
const auto data_size = this->getStoredDataSize() * n;
auto processed_blob =
MemoryUtils::unique_blob{this->allocator->allocate(data_size),
[this](void *ptr) { this->allocator->free_allocation(ptr); }};
// Assuming original data size equals to processed data size
assert(this->getInputBlobSize() == this->getStoredDataSize());
memcpy(processed_blob.get(), original_data, data_size);
// Preprocess each vector in place
for (size_t i = 0; i < n; i++) {
this->preprocessStorageInPlace(static_cast<DataType *>(processed_blob.get()) +
i * this->dim);
}
return processed_blob;
}
// Assuming numThreads was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
// for the operation.
// Important NOTE: For single vector operations (n=1), numThreads should be 1.
// For bulk operations (n>1), numThreads should reflect the number of available threads.
int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) {
if (n == 0) {
return 0;
}
int deleted_num = 0;
if constexpr (!isMulti) {
// SVS index does not support overriding vectors with the same label
// so we have to delete them first if needed
deleted_num = deleteVectorsImpl(labels, n);
}
std::span<const labelType> ids(labels, n);
auto processed_blob = this->preprocessForBatchStorage(vectors_data, n);
auto typed_vectors_data = static_cast<DataType *>(processed_blob.get());
// Wrap data into SVS SimpleDataView for SVS API
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};
if (!impl_) {
// SVS index instance cannot be empty, so we have to construct it at first rows
initImpl(points, ids);
} else {
// Add new points to existing SVS index
impl_->add_points(points, ids);
}
return n - deleted_num;
}
int deleteVectorsImpl(const labelType *labels, size_t n) {
if (indexSize() == 0) {
return 0;
}
// SVS fails if we try to delete non-existing entries
std::vector<labelType> entries_to_delete;
entries_to_delete.reserve(n);
for (size_t i = 0; i < n; i++) {
if (impl_->has_id(labels[i])) {
entries_to_delete.push_back(labels[i]);
}
}
if (entries_to_delete.size() == 0) {
return 0;
}
// If entries_to_delete.size() == 1, we should ensure single-threading
const size_t current_num_threads = getNumThreads();
if (n == 1 && current_num_threads > 1) {
setNumThreads(1);
}
const auto deleted_num = impl_->delete_entries(entries_to_delete);
// Restore multi-threading if needed
if (n == 1 && current_num_threads > 1) {
setNumThreads(current_num_threads);
}
this->markIndexUpdate(deleted_num);
return deleted_num;
}
// Count deletions and consolidate index if needed
void markIndexUpdate(size_t n = 1) {
if (!impl_)
return;
// SVS index instance should not be empty
if (indexSize() == 0) {
this->impl_.reset();
num_marked_deleted = 0;
return;
}
num_marked_deleted += n;
// consolidate index if number of changes bigger than 50% of index size
const float consolidation_threshold = .5f;
// indexSize() should not be 0 see above lines
assert(indexSize() > 0);
// Note: if this function is called after deleteVectorsImpl, indexSize is already updated
if (static_cast<float>(num_marked_deleted) / indexSize() > consolidation_threshold) {
impl_->consolidate();
num_marked_deleted = 0;
}
}
bool isTwoLevelLVQ(const VecSimSvsQuantBits &qbits) {
switch (qbits) {
case VecSimSvsQuant_4x4:
case VecSimSvsQuant_4x8:
case VecSimSvsQuant_4x8_LeanVec:
case VecSimSvsQuant_8x8_LeanVec:
return true;
default:
return false;
}
}
public:
SVSIndex(const SVSParams ¶ms, const AbstractIndexInitParams &abstractInitParams,
const index_component_t &components, bool force_preprocessing)
: Base{abstractInitParams, components}, forcePreprocessing{force_preprocessing},
buildParams{svs_details::makeVamanaBuildParameters(params)},
search_window_size{svs_details::getOrDefault(params.search_window_size,
SVS_VAMANA_DEFAULT_SEARCH_WINDOW_SIZE)},
search_buffer_capacity{
svs_details::getOrDefault(params.search_buffer_capacity, search_window_size)},
leanvec_dim{
svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)},
epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)},
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)},
threadpool_{std::max(size_t{SVS_VAMANA_DEFAULT_NUM_THREADS}, params.num_threads)},
impl_{nullptr} {
logger_ = makeLogger();
}
~SVSIndex() = default;
size_t indexSize() const override { return impl_ ? impl_->size() : 0; }
size_t indexStorageSize() const override { return impl_ ? impl_->view_data().size() : 0; }
size_t indexCapacity() const override {
return impl_ ? storage_traits_t::storage_capacity(impl_->view_data()) : 0;
}
size_t indexLabelCount() const override {
if constexpr (isMulti) {
return impl_ ? impl_->labelcount() : 0;
} else {
return indexSize();
}
}
vecsim_stl::set<size_t> getLabelsSet() const override {
vecsim_stl::set<size_t> labels(this->allocator);
if (impl_) {
impl_->on_ids([&labels](size_t label) { labels.insert(label); });
}
return labels;
}
VecSimIndexBasicInfo basicInfo() const override {
VecSimIndexBasicInfo info = this->getBasicInfo();
info.algo = VecSimAlgo_SVS;
info.isTiered = false;
return info;
}
VecSimIndexDebugInfo debugInfo() const override {
VecSimIndexDebugInfo info;
info.commonInfo = this->getCommonInfo();
info.commonInfo.basicInfo.algo = VecSimAlgo_SVS;
info.svsInfo =
svsInfoStruct{.quantBits = getCompressionMode(),
.alpha = this->buildParams.alpha,
.graphMaxDegree = this->buildParams.graph_max_degree,
.constructionWindowSize = this->buildParams.window_size,
.maxCandidatePoolSize = this->buildParams.max_candidate_pool_size,
.pruneTo = this->buildParams.prune_to,
.useSearchHistory = this->buildParams.use_full_search_history,
.numThreads = this->getThreadPoolCapacity(),
.lastReservedThreads = this->getNumThreads(),
.numberOfMarkedDeletedNodes = this->num_marked_deleted,
.searchWindowSize = this->search_window_size,
.searchBufferCapacity = this->search_buffer_capacity,
.leanvecDim = this->leanvec_dim,
.epsilon = this->epsilon};
return info;
}
VecSimDebugInfoIterator *debugInfoIterator() const override {
VecSimIndexDebugInfo info = this->debugInfo();
// For readability. Update this number when needed.
size_t numberOfInfoFields = 23;
VecSimDebugInfoIterator *infoIterator =
new VecSimDebugInfoIterator(numberOfInfoFields, this->allocator);
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::ALGORITHM_STRING,
.fieldType = INFOFIELD_STRING,
.fieldValue = {
FieldValue{.stringValue = VecSimAlgo_ToString(info.commonInfo.basicInfo.algo)}}});
this->addCommonInfoToIterator(infoIterator, info.commonInfo);
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::BLOCK_SIZE_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.commonInfo.basicInfo.blockSize}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_QUANT_BITS_STRING,
.fieldType = INFOFIELD_STRING,
.fieldValue = {
FieldValue{.stringValue = VecSimQuantBits_ToString(info.svsInfo.quantBits)}}});
infoIterator->addInfoField(
VecSim_InfoField{.fieldName = VecSimCommonStrings::SVS_ALPHA_STRING,
.fieldType = INFOFIELD_FLOAT64,
.fieldValue = {FieldValue{.floatingPointValue = info.svsInfo.alpha}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_GRAPH_MAX_DEGREE_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.graphMaxDegree}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_CONSTRUCTION_WS_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.constructionWindowSize}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_MAX_CANDIDATE_POOL_SIZE_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.maxCandidatePoolSize}}});
infoIterator->addInfoField(
VecSim_InfoField{.fieldName = VecSimCommonStrings::SVS_PRUNE_TO_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.pruneTo}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_USE_SEARCH_HISTORY_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.useSearchHistory}}});
infoIterator->addInfoField(
VecSim_InfoField{.fieldName = VecSimCommonStrings::SVS_NUM_THREADS_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.numThreads}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_LAST_RESERVED_THREADS_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.lastReservedThreads}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::NUM_MARKED_DELETED,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.numberOfMarkedDeletedNodes}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_SEARCH_WS_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.searchWindowSize}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SVS_SEARCH_BC_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.searchBufferCapacity}}});
infoIterator->addInfoField(
VecSim_InfoField{.fieldName = VecSimCommonStrings::SVS_LEANVEC_DIM_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = info.svsInfo.leanvecDim}}});
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::EPSILON_STRING,
.fieldType = INFOFIELD_FLOAT64,
.fieldValue = {FieldValue{.floatingPointValue = info.svsInfo.epsilon}}});
return infoIterator;
}
int addVector(const void *vector_data, labelType label) override {
// Enforce single-threaded execution for single vector operations to ensure optimal
// performance and consistent behavior. Callers must set numThreads=1 before calling this
// method.
assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector");
return addVectorsImpl(vector_data, &label, 1);
}
int addVectors(const void *vectors_data, const labelType *labels, size_t n) override {
// Prevent misuse: single vector operations should use addVector(), not addVectors() with
// n=1 This ensures proper thread management and API contract enforcement.
assert(!(n == 1 && getNumThreads() > 1) &&
"Can't use more than one thread to insert a single vector");
return addVectorsImpl(vectors_data, labels, n);
}
int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); }
int deleteVectors(const labelType *labels, size_t n) override {
return deleteVectorsImpl(labels, n);
}
size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }
size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); }
bool isCompressed() const override { return storage_traits_t::is_compressed(); }
VecSimSvsQuantBits getCompressionMode() const {
return storage_traits_t::get_compression_mode();
}
double getDistanceFrom_Unsafe(labelType label, const void *vector_data) const override {
if (!impl_ || !impl_->has_id(label)) {
return std::numeric_limits<double>::quiet_NaN();
};
auto query_datum = std::span{static_cast<const DataType *>(vector_data), this->dim};
auto dist = impl_->get_distance(label, query_datum);
return toVecSimDistance(dist);
}
VecSimQueryReply *topKQuery(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const override {
auto rep = new VecSimQueryReply(this->allocator);
this->lastMode = STANDARD_KNN;
if (k == 0 || this->indexSize() == 0) {
return rep;
}
// limit result size to index size
k = std::min(k, this->indexLabelCount());
auto processed_query_ptr = this->preprocessQuery(queryBlob);
const void *processed_query = processed_query_ptr.get();
auto query = svs::data::ConstSimpleDataView<DataType>{
static_cast<const DataType *>(processed_query), 1, this->dim};
auto result = svs::QueryResult<size_t>{query.size(), k};
auto sp = svs_details::joinSearchParams(impl_->get_search_parameters(), queryParams,
is_two_level_lvq);
auto timeoutCtx = queryParams ? queryParams->timeoutCtx : nullptr;
auto cancel = [timeoutCtx]() { return VECSIM_TIMEOUT(timeoutCtx); };
impl_->search(result.view(), query, sp, cancel);
if (cancel()) {
rep->code = VecSim_QueryReply_TimedOut;
return rep;
}
assert(result.n_queries() == 1);
const auto n_neighbors = result.n_neighbors();
rep->results.reserve(n_neighbors);
for (size_t i = 0; i < n_neighbors; i++) {
rep->results.push_back(
VecSimQueryResult{result.index(0, i), toVecSimDistance(result.distance(0, i))});
}
// Workaround for VecSim merge_results() that expects results to be sorted
// by score, then by id from both indices.
// TODO: remove this workaround when merge_results() is fixed.
sort_results_by_score_then_id(rep);
return rep;
}
VecSimQueryReply *rangeQuery(const void *queryBlob, double radius,
VecSimQueryParams *queryParams) const override {
auto rep = new VecSimQueryReply(this->allocator);
this->lastMode = RANGE_QUERY;
if (radius == 0 || this->indexSize() == 0) {
return rep;
}
auto timeoutCtx = queryParams ? queryParams->timeoutCtx : nullptr;
auto cancel = [timeoutCtx]() { return VECSIM_TIMEOUT(timeoutCtx); };
// Prepare query blob for SVS
auto processed_query_ptr = this->preprocessQuery(queryBlob);
const void *processed_query = processed_query_ptr.get();
std::span<const data_type> query{static_cast<const data_type *>(processed_query),
this->dim};
// Base search parameters for the SVS iterator schedule.
auto sp = svs_details::joinSearchParams(impl_->get_search_parameters(), queryParams,
is_two_level_lvq);
// SVS BatchIterator handles the search in batches
// The batch size is set to the index search window size by default
const size_t batch_size = sp.buffer_config_.get_search_window_size();
// Create SVS BatchIterator for range search
// Search result is cached in the iterator and can be accessed by the user
auto svs_it = impl_->make_batch_iterator(query);
svs_it.next(batch_size, cancel);
if (cancel()) {
rep->code = VecSim_QueryReply_TimedOut;
return rep;
}
// range search using epsilon
const auto epsilon = queryParams && queryParams->svsRuntimeParams.epsilon != 0
? queryParams->svsRuntimeParams.epsilon
: this->epsilon;
const auto range_search_boundaries = radius * (1.0 + std::abs(epsilon));
bool keep_searching = true;
// Loop while iterator cache is not empty and search radius + epsilon is not exceeded
while (keep_searching && svs_it.size() > 0) {
// Iterate over the cached search results
for (auto &neighbor : svs_it) {
const auto dist = toVecSimDistance(neighbor.distance());
if (dist <= radius) {
rep->results.push_back(VecSimQueryResult{neighbor.id(), dist});
} else if (dist > range_search_boundaries) {
keep_searching = false;
}
}
// If search radius + epsilon is not exceeded, request SVS BatchIterator for the next
// batch
if (keep_searching) {
svs_it.next(batch_size, cancel);
if (cancel()) {
rep->code = VecSim_QueryReply_TimedOut;
return rep;
}
}
}
// Workaround for VecSim merge_results() that expects results to be sorted
// by score, then by id from both indices.
// TODO: remove this workaround when merge_results() is fixed.
sort_results_by_score_then_id(rep);
return rep;
}
VecSimBatchIterator *newBatchIterator(const void *queryBlob,
VecSimQueryParams *queryParams) const override {
// force_copy == true.
auto queryBlobCopy = this->preprocessQuery(queryBlob, true);
// take ownership of the blob copy and pass it to the batch iterator.
auto *queryBlobCopyPtr = queryBlobCopy.release();
// Ownership of queryBlobCopy moves to VecSimBatchIterator that will free it at the end.
if (indexSize() == 0) {
return new (this->getAllocator())
NullSVS_BatchIterator(queryBlobCopyPtr, queryParams, this->getAllocator());
} else {
return new (this->getAllocator()) SVS_BatchIterator<impl_type, data_type>(
queryBlobCopyPtr, impl_.get(), queryParams, this->getAllocator(), is_two_level_lvq);
}
}
bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) const override {
size_t index_size = this->indexSize();
// Calculate the ratio of the subset size to the total index size.
double subsetRatio = (index_size == 0) ? 0.f : static_cast<double>(subsetSize) / index_size;
// Heuristic thresholds
const double smallSubsetThreshold = 0.07; // Subset is small if less than 7% of index.
const double largeSubsetThreshold = 0.21; // Subset is large If more than 21% of index.
const double smallIndexThreshold = 75000; // Index is small if size is less than 75k.
const double largeIndexThreshold = 750000; // Index is large if size is more than 750k.
bool res = false;
if (subsetRatio < smallSubsetThreshold) {
// For small subsets, ad-hoc if index is not large.
res = (index_size < largeIndexThreshold);
} else if (subsetRatio < largeSubsetThreshold) {
// For medium subsets, ad-hoc if index is small or k is big.
res = (index_size < smallIndexThreshold) || (k > 12);
} else {
// For large subsets, ad-hoc only if index is small.
res = (index_size < smallIndexThreshold);
}
this->lastMode =
res ? (initial_check ? HYBRID_ADHOC_BF : HYBRID_BATCHES_TO_ADHOC_BF) : HYBRID_BATCHES;
return res;
}
void runGC() override {
if (impl_) {
// There is documentation for consolidate():
// https://intel.github.io/ScalableVectorSearch/python/dynamic.html#svs.DynamicVamana.consolidate
impl_->consolidate();
// There is documentation for compact():
// https://intel.github.io/ScalableVectorSearch/python/dynamic.html#svs.DynamicVamana.compact
impl_->compact();
}
num_marked_deleted = 0;
}
#ifdef BUILD_TESTS
private:
void saveIndexIMP(std::ofstream &output) override;
void impl_save(const std::string &location) override;
void saveIndexFields(std::ofstream &output) const override;
bool compareMetadataFile(const std::string &metadataFilePath) const override;
void loadIndex(const std::string &folder_path) override;
bool checkIntegrity() const override;
public:
void fitMemory() override {}
size_t indexMetaDataCapacity() const override { return this->indexCapacity(); }
std::vector<std::vector<char>> getStoredVectorDataByLabel(labelType label) const override {
// For compressed/quantized indices, this function is not meaningful
// since the stored data is in compressed format and not directly accessible
if constexpr (QuantBits > 0 || ResidualBits > 0) {
throw std::runtime_error(
"getStoredVectorDataByLabel is not supported for compressed/quantized indices");
} else {
std::vector<std::vector<char>> vectors_output;
if constexpr (isMulti) {
// Multi-index case: get all vectors for this label
auto it = impl_->get_label_to_external_lookup().find(label);
if (it != impl_->get_label_to_external_lookup().end()) {
const auto &external_ids = it->second;
for (auto external_id : external_ids) {
auto indexed_span = impl_->get_parent_index().get_datum(external_id);
// For uncompressed data, indexed_span should be a simple span
const char *data_ptr = reinterpret_cast<const char *>(indexed_span.data());
std::vector<char> vec_data(this->getStoredDataSize());
std::memcpy(vec_data.data(), data_ptr, this->getStoredDataSize());
vectors_output.push_back(std::move(vec_data));
}
}
} else {
// Single-index case
auto indexed_span = impl_->get_datum(label);
// For uncompressed data, indexed_span should be a simple span
const char *data_ptr = reinterpret_cast<const char *>(indexed_span.data());
std::vector<char> vec_data(this->getStoredDataSize());
std::memcpy(vec_data.data(), data_ptr, this->getStoredDataSize());
vectors_output.push_back(std::move(vec_data));
}
return vectors_output;
}
}
void getDataByLabel(
labelType label,
std::vector<std::vector<svs_details::vecsim_dt<DataType>>> &vectors_output) const override {
assert(false && "Not implemented");
}
svs::logging::logger_ptr getLogger() const override { return logger_; }
#endif
};
#ifdef BUILD_TESTS
// Including implementations for Serializer base
#include "svs_serializer_impl.h"
#endif