Skip to content

Commit 41e606e

Browse files
authored
Merge pull request #3130 from activeloopai/stateless-v2
Stateless support and switch to new deeplake-api.
2 parents a758af2 + 46b5cfd commit 41e606e

18 files changed

Lines changed: 1106 additions & 105 deletions

DEEPLAKE_API_VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4.5.0
1+
4.5.1

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 264 additions & 57 deletions
Large diffs are not rendered by default.

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ std::vector<table_meta> load_tables(const std::string& root_path, icm::string_ma
4242
std::vector<column_meta> load_columns(const std::string& root_path, icm::string_map<> creds);
4343
std::vector<index_meta> load_indexes(const std::string& root_path, icm::string_map<> creds);
4444

45+
// Load tables and columns in parallel for better performance
46+
std::pair<std::vector<table_meta>, std::vector<column_meta>>
47+
load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
48+
4549
void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
4650
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);
4751

cpp/deeplake_pg/duckdb_deeplake_convert.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "utils.hpp"
55

66
#include <codecs/compression.hpp>
7+
#include <icm/vector.hpp>
78
#include <nd/adapt.hpp>
89
#include <nd/none.hpp>
910

@@ -85,7 +86,7 @@ T to_cpp_value(const duckdb::Value& val)
8586

8687
nd::array to_deeplake_value_as_array_list(const duckdb::vector<duckdb::Value>& values)
8788
{
88-
std::vector<nd::array> arr;
89+
icm::vector<nd::array> arr;
8990
arr.reserve(values.size());
9091
for (const auto& v : values) {
9192
arr.push_back(pg::to_deeplake_value(v));
@@ -105,7 +106,7 @@ nd::array to_deeplake_value(const duckdb::LogicalType& duckdb_type, const duckdb
105106
}
106107
return switch_duckdb_type(duckdb_type, [&values]<typename T>() {
107108
if constexpr (std::is_same_v<T, bytea_type>) {
108-
std::vector<nd::array> arr;
109+
icm::vector<nd::array> arr;
109110
arr.reserve(values.size());
110111
for (const auto& val : values) {
111112
duckdb::string_t blob_data = duckdb::StringValue::Get(val);

cpp/deeplake_pg/duckdb_deeplake_scan.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct deeplake_scan_bind_data final : public duckdb::TableFunctionData
4343
struct deeplake_scan_global_state final : public duckdb::GlobalTableFunctionState
4444
{
4545
duckdb::vector<duckdb::column_t> column_ids;
46-
std::vector<base::function<async::promise<std::vector<icm::roaring>>()>> index_searchers;
46+
icm::vector<base::function<async::promise<icm::vector<icm::roaring>>()>> index_searchers;
4747
duckdb::unique_ptr<duckdb::Expression> filter_expr;
4848
std::mutex index_search_mutex;
4949
heimdall::dataset_view_ptr index_search_result;
@@ -205,10 +205,10 @@ duckdb::unique_ptr<duckdb::FunctionData> deeplake_scan_bind(duckdb::ClientContex
205205
return duckdb::make_uniq<deeplake_scan_bind_data>(td, return_types);
206206
}
207207

208-
base::function<async::promise<std::vector<icm::roaring>>()>
208+
base::function<async::promise<icm::vector<icm::roaring>>()>
209209
try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::ConstantFilter& filter)
210210
{
211-
base::function<async::promise<std::vector<icm::roaring>>()> result;
211+
base::function<async::promise<icm::vector<icm::roaring>>()> result;
212212
auto index_holder = column_view->index_holder();
213213
ASSERT(index_holder != nullptr);
214214
auto constant = pg::to_deeplake_value(filter.constant);
@@ -250,7 +250,7 @@ try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::Cons
250250
query_core::text_search_info info;
251251
info.column_name = column_view->name();
252252
info.type = query_core::text_search_info::search_type::equals;
253-
info.search_values.push_back(std::vector<std::string>{filter.constant.ToString()});
253+
info.search_values.push_back(icm::vector<std::string>{filter.constant.ToString()});
254254
if (index_holder->can_run_query(info)) {
255255
result = [index_holder, si = std::move(info)]() {
256256
return index_holder->run_query(si);
@@ -261,7 +261,7 @@ try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::Cons
261261
return result;
262262
}
263263

264-
base::function<async::promise<std::vector<icm::roaring>>()>
264+
base::function<async::promise<icm::vector<icm::roaring>>()>
265265
try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::InFilter& filter)
266266
{
267267
query_core::inverted_index_search_info info;
@@ -277,10 +277,10 @@ try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::InFi
277277
};
278278
}
279279

280-
base::function<async::promise<std::vector<icm::roaring>>()>
280+
base::function<async::promise<icm::vector<icm::roaring>>()>
281281
try_get_index_searcher(heimdall::column_view_ptr column_view, const duckdb::TableFilter& filter)
282282
{
283-
base::function<async::promise<std::vector<icm::roaring>>()> result;
283+
base::function<async::promise<icm::vector<icm::roaring>>()> result;
284284
ASSERT(column_view != nullptr);
285285
if (column_view->index_holder() == nullptr) {
286286
return result;
@@ -887,14 +887,14 @@ class deeplake_scan_function_helper
887887
if (is_index_search_done()) {
888888
return;
889889
}
890-
std::vector<async::promise<icm::roaring>> promises;
890+
icm::vector<async::promise<icm::roaring>> promises;
891891
for (auto& is : global_state_.index_searchers) {
892-
promises.push_back(is().then_any([](std::vector<icm::roaring>&& results) {
892+
promises.push_back(is().then_any([](icm::vector<icm::roaring>&& results) {
893893
ASSERT(results.size() == 1);
894894
return std::move(results.front());
895895
}));
896896
}
897-
auto combined_promise = async::combine(std::move(promises)).then_any([](std::vector<icm::roaring>&& results) {
897+
auto combined_promise = async::combine(std::move(promises)).then_any([](icm::vector<icm::roaring>&& results) {
898898
ASSERT(!results.empty());
899899
icm::roaring& combined = results[0];
900900
for (size_t i = 1; i < results.size(); ++i) {
@@ -903,7 +903,7 @@ class deeplake_scan_function_helper
903903
return std::move(combined);
904904
});
905905
auto indices = combined_promise.get_future().get();
906-
std::vector<int64_t> indices_vec;
906+
icm::vector<int64_t> indices_vec;
907907
indices_vec.reserve(indices.cardinality());
908908
for (auto x : indices) {
909909
indices_vec.push_back(x);
@@ -955,7 +955,7 @@ class deeplake_scan_function_helper
955955
}
956956

957957
ASSERT(output_.ColumnCount() == global_state_.column_ids.size());
958-
std::vector<async::promise<void>> column_promises;
958+
icm::vector<async::promise<void>> column_promises;
959959
// Fill output vectors column by column using table_data streamers
960960
for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) {
961961
const auto col_idx = global_state_.column_ids[i];

cpp/deeplake_pg/extension_init.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,10 +1197,18 @@ static void process_utility(PlannedStmt* pstmt,
11971197
}
11981198
// When root_path is set, auto-discover tables from the deeplake catalog
11991199
if (vstmt->name != nullptr && pg_strcasecmp(vstmt->name, "deeplake.root_path") == 0) {
1200-
// Reload table metadata from the catalog at the new root_path
1201-
// This enables stateless multi-instance support where tables are
1202-
// auto-discovered when pointing to a shared root_path
1203-
pg::table_storage::instance().force_load_table_metadata();
1200+
// Track the previous root_path to detect actual changes
1201+
static thread_local std::string last_root_path;
1202+
auto current_root_path = pg::session_credentials::get_root_path();
1203+
1204+
if (current_root_path != last_root_path) {
1205+
// Path changed - force full reload
1206+
last_root_path = current_root_path;
1207+
pg::table_storage::instance().force_load_table_metadata();
1208+
} else {
1209+
// Same path - just check for catalog updates (fast path)
1210+
pg::table_storage::instance().load_table_metadata();
1211+
}
12041212
}
12051213
}
12061214
}

cpp/deeplake_pg/hybrid_query_merge.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ inline query_core::query_result merge_query_results(
126126
// Take top_k results
127127
size_t result_size = std::min(top_k, final_scores.size());
128128
std::vector<float> top_scores;
129-
std::vector<int64_t> top_indices;
129+
icm::vector<int64_t> top_indices;
130130
top_scores.reserve(result_size);
131131
top_indices.reserve(result_size);
132132

cpp/deeplake_pg/index_search.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ struct scan_opaque
108108

109109
query_core::query_result run_index_search(nd::array input_array, std::string func_name, const std::string& column_name, pg::index_info& idx_info)
110110
{
111-
std::vector<query_core::expr> args;
111+
icm::vector<query_core::expr> args;
112112
args.emplace_back(query_core::expr::make_column_ref(column_name, std::string{}));
113113
args.emplace_back(query_core::expr::make_literal_array(std::move(input_array)));
114114
const bool is_cosine_similarity = (func_name == "COSINE_SIMILARITY");
@@ -194,7 +194,7 @@ icm::roaring run_exact_text_search(std::string text_value, StrategyNumber strate
194194
return {};
195195
}
196196
info.column_name = idx_info.column_name();
197-
info.search_values.emplace_back(std::vector<std::string>{std::move(text_value)});
197+
info.search_values.emplace_back(icm::vector<std::string>{std::move(text_value)});
198198
return idx_info.run_query(std::move(info));
199199
}
200200

@@ -563,7 +563,7 @@ void collect_index_data(IndexScanDesc scan, ScanKey keys, int32_t nkeys, ScanKey
563563
}
564564
}
565565
if (nkeys > 0) {
566-
std::vector<int64_t> row_numbers;
566+
icm::vector<int64_t> row_numbers;
567567
row_numbers.reserve(result.cardinality());
568568
std::transform(result.begin(), result.end(), std::back_inserter(row_numbers), [](auto v) {
569569
return static_cast<int64_t>(v);

cpp/deeplake_pg/nd_utils.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "exceptions.hpp"
44

55
#include <icm/shape.hpp>
6+
#include <icm/vector.hpp>
67
#include <nd/adapt.hpp>
78
#include <nd/array.hpp>
89
#include <nd/none.hpp>
@@ -124,7 +125,7 @@ inline pg::array_type pg_to_nd_typed(ArrayType* array, bool copy_data = true)
124125
const auto nrows = dims[0];
125126
const auto ncols = dims[1];
126127
if (copy_data) {
127-
std::vector<nd::array> data_vector;
128+
icm::vector<nd::array> data_vector;
128129
data_vector.reserve(static_cast<size_t>(nrows));
129130
for (int i = 0; i < nrows; ++i) {
130131
data_vector.emplace_back(nd::adapt(std::vector<T>(data + static_cast<size_t>(i) * static_cast<size_t>(ncols),
@@ -564,7 +565,7 @@ inline nd::array datum_to_nd(Datum value, Oid attr_typeid, int32_t typmod)
564565
return nd::none(nd::dtype::byte, 0);
565566
} else {
566567
int nelems = ArrayGetNItems(ARR_NDIM(arr), ARR_DIMS(arr));
567-
std::vector<nd::array> elements;
568+
icm::vector<nd::array> elements;
568569
elements.reserve(static_cast<size_t>(nelems));
569570

570571
Datum* datums = nullptr;
@@ -590,7 +591,7 @@ inline nd::array datum_to_nd(Datum value, Oid attr_typeid, int32_t typmod)
590591
return nd::none(nd::dtype::string, 1);
591592
} else {
592593
int nelems = ArrayGetNItems(ARR_NDIM(arr), ARR_DIMS(arr));
593-
std::vector<nd::array> elements;
594+
icm::vector<nd::array> elements;
594595
elements.reserve(static_cast<size_t>(nelems));
595596
Datum* datums = nullptr;
596597
bool* nulls = nullptr;
@@ -903,7 +904,7 @@ nd::array eval_with_nones(nd::array arr)
903904
return nd::eval(arr);
904905
} catch (const nd::invalid_dynamic_eval&) {
905906
}
906-
std::vector<nd::array> result_elements;
907+
icm::vector<nd::array> result_elements;
907908
result_elements.reserve(arr.size());
908909
for (auto a : arr) {
909910
if (a.is_none()) {

cpp/deeplake_pg/sync_worker.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ void deeplake_sync_worker_sighup(SIGNAL_ARGS)
6767
*/
6868
void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string_map<> creds)
6969
{
70-
auto catalog_tables = pg::dl_catalog::load_tables(root_path, creds);
71-
auto catalog_columns = pg::dl_catalog::load_columns(root_path, creds);
70+
// Load tables and columns in parallel for better performance
71+
auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_path, creds);
7272

7373
for (const auto& meta : catalog_tables) {
7474
// Skip tables marked as dropping
@@ -168,6 +168,8 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg)
168168
elog(LOG, "pg_deeplake sync worker started");
169169

170170
int64_t last_catalog_version = 0;
171+
std::string last_root_path; // Track root_path to detect changes
172+
bool catalog_ensured = false;
171173

172174
while (!got_sigterm) {
173175
// Handle SIGHUP - reload configuration
@@ -199,10 +201,15 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg)
199201
if (!root_path.empty()) {
200202
auto creds = pg::session_credentials::get_credentials();
201203

202-
// Ensure catalog exists
203-
pg::dl_catalog::ensure_catalog(root_path, creds);
204+
// Only ensure catalog on first call or when root_path changes
205+
if (!catalog_ensured || root_path != last_root_path) {
206+
pg::dl_catalog::ensure_catalog(root_path, creds);
207+
catalog_ensured = true;
208+
last_root_path = root_path;
209+
last_catalog_version = 0; // Reset version when path changes
210+
}
204211

205-
// Use existing catalog version API to check for changes
212+
// Use existing catalog version API to check for changes (now fast with cache)
206213
int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds);
207214

208215
if (current_version != last_catalog_version) {

0 commit comments

Comments
 (0)