Skip to content

Commit de15ad5

Browse files
committed
Fixed indexes propagation.
1 parent 928d6c0 commit de15ad5

5 files changed

Lines changed: 201 additions & 7 deletions

File tree

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,52 @@ std::vector<column_meta> load_columns(const std::string& root_path, const std::s
457457
}
458458
}
459459

460-
std::vector<index_meta> load_indexes(const std::string&, const std::string&, icm::string_map<>)
460+
std::vector<index_meta> load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds)
461461
{
462-
return {};
462+
std::vector<index_meta> out;
463+
try {
464+
auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds));
465+
if (!table) {
466+
return out;
467+
}
468+
auto snapshot = table->read().get_future().get();
469+
if (snapshot.row_count() == 0) {
470+
return out;
471+
}
472+
473+
for (const auto& row : snapshot.rows()) {
474+
auto table_id_it = row.find("table_id");
475+
auto column_names_it = row.find("column_names");
476+
auto index_type_it = row.find("index_type");
477+
auto order_type_it = row.find("order_type");
478+
479+
if (table_id_it == row.end() || column_names_it == row.end() || index_type_it == row.end()) {
480+
continue;
481+
}
482+
483+
index_meta meta;
484+
meta.table_id = deeplake_api::array_to_string(table_id_it->second);
485+
meta.column_names = deeplake_api::array_to_string(column_names_it->second);
486+
meta.index_type = deeplake_api::array_to_string(index_type_it->second);
487+
if (order_type_it != row.end()) {
488+
try {
489+
auto order_vec = load_vector<int32_t>(order_type_it->second);
490+
meta.order_type = order_vec.empty() ? 0 : order_vec.front();
491+
} catch (...) {
492+
meta.order_type = 0;
493+
}
494+
}
495+
496+
out.push_back(std::move(meta));
497+
}
498+
return out;
499+
} catch (const std::exception& e) {
500+
elog(DEBUG1, "Failed to load catalog indexes for db '%s': %s", db_name.c_str(), e.what());
501+
return out;
502+
} catch (...) {
503+
elog(DEBUG1, "Failed to load catalog indexes for db '%s': unknown error", db_name.c_str());
504+
return out;
505+
}
463506
}
464507

465508
std::vector<schema_meta> load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds)
@@ -699,6 +742,25 @@ void upsert_columns(const std::string& root_path, const std::string& db_name, ic
699742
table->upsert_many(std::move(rows)).get_future().get();
700743
}
701744

745+
void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<index_meta>& indexes)
746+
{
747+
if (indexes.empty()) {
748+
return;
749+
}
750+
auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds));
751+
icm::vector<icm::string_map<nd::array>> rows;
752+
rows.reserve(indexes.size());
753+
for (const auto& idx : indexes) {
754+
icm::string_map<nd::array> row;
755+
row["table_id"] = nd::adapt(idx.table_id);
756+
row["column_names"] = nd::adapt(idx.column_names);
757+
row["index_type"] = nd::adapt(idx.index_type);
758+
row["order_type"] = nd::adapt(idx.order_type);
759+
rows.push_back(std::move(row));
760+
}
761+
table->upsert_many(std::move(rows)).get_future().get();
762+
}
763+
702764
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds)
703765
{
704766
std::vector<database_meta> out;

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ void upsert_schema(const std::string& root_path, const std::string& db_name, icm
8282
// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/)
8383
void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta);
8484
void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<column_meta>& columns);
85+
void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<index_meta>& indexes);
8586

8687
// Shared (cluster-wide) database catalog
8788
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);

cpp/deeplake_pg/pg_deeplake.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "pg_deeplake.hpp"
2+
#include "dl_catalog.hpp"
23
#include "logger.hpp"
34
#include "table_storage.hpp"
5+
#include "utils.hpp"
46

57
#include <deeplake_api/deeplake_api.hpp>
68
#include <deeplake_core/deeplake_index_type.hpp>
@@ -9,6 +11,8 @@
911
extern "C" {
1012
#endif
1113

14+
#include <commands/dbcommands.h>
15+
#include <miscadmin.h>
1216
#include <storage/ipc.h>
1317

1418
#ifdef __cplusplus
@@ -260,6 +264,39 @@ void save_index_metadata(Oid oid)
260264
if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) {
261265
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata")));
262266
}
267+
268+
// Persist index to shared catalog for stateless multi-instance sync
269+
if (pg::stateless_enabled) {
270+
try {
271+
auto root_dir = pg::session_credentials::get_root_path();
272+
if (root_dir.empty()) {
273+
root_dir = pg::utils::get_deeplake_root_directory();
274+
}
275+
if (!root_dir.empty()) {
276+
auto creds = pg::session_credentials::get_credentials();
277+
const char* dbname = get_database_name(MyDatabaseId);
278+
std::string db_name = dbname ? dbname : "postgres";
279+
if (dbname) pfree(const_cast<char*>(dbname));
280+
281+
const std::string& table_id = idx_info.table_name(); // already schema-qualified
282+
283+
pg::dl_catalog::index_meta idx_meta;
284+
idx_meta.table_id = table_id;
285+
idx_meta.column_names = idx_info.get_column_names_string();
286+
idx_meta.index_type = std::string(deeplake_core::deeplake_index_type::to_string(idx_info.index_type()));
287+
idx_meta.order_type = static_cast<int32_t>(idx_info.order_type());
288+
289+
std::vector<pg::dl_catalog::index_meta> indexes = {idx_meta};
290+
pg::dl_catalog::upsert_indexes(root_dir, db_name, creds, indexes);
291+
pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, creds);
292+
pg::dl_catalog::bump_catalog_version(root_dir, creds);
293+
}
294+
} catch (const std::exception& e) {
295+
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: %s", e.what());
296+
} catch (...) {
297+
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: unknown error");
298+
}
299+
}
263300
}
264301

265302
void load_index_metadata()

cpp/deeplake_pg/sync_worker.cpp

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,34 @@ void deeplake_sync_schemas_for_db(const std::string& db_name,
346346
* Sync tables for a specific database from pre-loaded catalog data via libpq.
347347
* Creates missing tables in the target database.
348348
*/
349+
/**
350+
* Parse comma-separated column names string into a vector.
351+
* The column_names string uses trailing comma format: "col1,col2,"
352+
*/
353+
std::vector<std::string> parse_column_names(const std::string& column_names)
354+
{
355+
std::vector<std::string> result;
356+
std::string current;
357+
for (char c : column_names) {
358+
if (c == ',') {
359+
if (!current.empty()) {
360+
result.push_back(current);
361+
current.clear();
362+
}
363+
} else {
364+
current += c;
365+
}
366+
}
367+
if (!current.empty()) {
368+
result.push_back(current);
369+
}
370+
return result;
371+
}
372+
349373
void deeplake_sync_tables_for_db(const std::string& db_name,
350374
const std::vector<pg::dl_catalog::table_meta>& tables,
351-
const std::vector<pg::dl_catalog::column_meta>& columns)
375+
const std::vector<pg::dl_catalog::column_meta>& columns,
376+
const std::vector<pg::dl_catalog::index_meta>& indexes)
352377
{
353378
for (const auto& meta : tables) {
354379
if (meta.state == "dropping") {
@@ -373,6 +398,24 @@ void deeplake_sync_tables_for_db(const std::string& db_name,
373398
continue;
374399
}
375400

401+
// Find indexes for this table
402+
std::vector<pg::dl_catalog::index_meta> table_indexes;
403+
for (const auto& idx : indexes) {
404+
if (idx.table_id == meta.table_id) {
405+
table_indexes.push_back(idx);
406+
}
407+
}
408+
409+
// Determine which columns are part of a primary key (inverted_index on non-nullable columns)
410+
// The primary key columns are stored as comma-separated names in column_names
411+
std::vector<std::string> pk_columns;
412+
for (const auto& idx : table_indexes) {
413+
if (idx.index_type == "inverted_index") {
414+
pk_columns = parse_column_names(idx.column_names);
415+
break;
416+
}
417+
}
418+
376419
const char* qschema = quote_identifier(meta.schema_name.c_str());
377420
const char* qtable = quote_identifier(meta.table_name.c_str());
378421

@@ -390,6 +433,19 @@ void deeplake_sync_tables_for_db(const std::string& db_name,
390433
first = false;
391434
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
392435
}
436+
437+
// Add PRIMARY KEY table constraint if we have PK columns
438+
if (!pk_columns.empty()) {
439+
appendStringInfoString(&buf, ", PRIMARY KEY (");
440+
for (size_t i = 0; i < pk_columns.size(); ++i) {
441+
if (i > 0) {
442+
appendStringInfoString(&buf, ", ");
443+
}
444+
appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str()));
445+
}
446+
appendStringInfoChar(&buf, ')');
447+
}
448+
393449
appendStringInfo(&buf, ") USING deeplake");
394450

395451
if (execute_via_libpq(db_name.c_str(), buf.data)) {
@@ -487,9 +543,10 @@ void sync_all_databases(
487543
}
488544

489545
auto [tables, columns] = pg::dl_catalog::load_tables_and_columns(root_path, db_name, creds);
490-
deeplake_sync_tables_for_db(db_name, tables, columns);
491-
elog(LOG, "pg_deeplake sync: synced %zu schemas, %zu tables for database '%s'",
492-
schemas.size(), tables.size(), db_name.c_str());
546+
auto indexes = pg::dl_catalog::load_indexes(root_path, db_name, creds);
547+
deeplake_sync_tables_for_db(db_name, tables, columns, indexes);
548+
elog(LOG, "pg_deeplake sync: synced %zu schemas, %zu tables, %zu indexes for database '%s'",
549+
schemas.size(), tables.size(), indexes.size(), db_name.c_str());
493550
} catch (const std::exception& e) {
494551
elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db_name.c_str(), e.what());
495552
} catch (...) {

cpp/deeplake_pg/table_storage.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,9 @@ void table_storage::load_table_metadata()
353353
}
354354
tables_loaded_ = true;
355355

356-
// Load tables and columns in parallel from per-database path
356+
// Load tables, columns, and indexes from per-database path
357357
auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_dir, db_name, creds);
358+
auto catalog_indexes = pg::dl_catalog::load_indexes(root_dir, db_name, creds);
358359

359360
if (!catalog_tables.empty()) {
360361
for (const auto& meta : catalog_tables) {
@@ -387,6 +388,29 @@ void table_storage::load_table_metadata()
387388
continue;
388389
}
389390

391+
// Find primary key columns from indexes
392+
std::vector<std::string> pk_columns;
393+
for (const auto& idx : catalog_indexes) {
394+
if (idx.table_id == meta.table_id && idx.index_type == "inverted_index") {
395+
// Parse comma-separated column names
396+
std::string current;
397+
for (char c : idx.column_names) {
398+
if (c == ',') {
399+
if (!current.empty()) {
400+
pk_columns.push_back(current);
401+
current.clear();
402+
}
403+
} else {
404+
current += c;
405+
}
406+
}
407+
if (!current.empty()) {
408+
pk_columns.push_back(current);
409+
}
410+
break;
411+
}
412+
}
413+
390414
// Build CREATE TABLE IF NOT EXISTS from catalog metadata.
391415
// Wrap in a subtransaction so that if another backend concurrently
392416
// creates the same table (race on composite type), the error is
@@ -406,6 +430,19 @@ void table_storage::load_table_metadata()
406430
first = false;
407431
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
408432
}
433+
434+
// Add PRIMARY KEY table constraint if found in catalog indexes
435+
if (!pk_columns.empty()) {
436+
appendStringInfoString(&buf, ", PRIMARY KEY (");
437+
for (size_t i = 0; i < pk_columns.size(); ++i) {
438+
if (i > 0) {
439+
appendStringInfoString(&buf, ", ");
440+
}
441+
appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str()));
442+
}
443+
appendStringInfoChar(&buf, ')');
444+
}
445+
409446
appendStringInfo(&buf, ") USING deeplake");
410447

411448
MemoryContext saved_context = CurrentMemoryContext;

0 commit comments

Comments
 (0)