Skip to content

Commit faf103e

Browse files
committed
Fixed per db tables, columns, indices.
1 parent c57b418 commit faf103e

8 files changed

Lines changed: 447 additions & 223 deletions

File tree

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 213 additions & 81 deletions
Large diffs are not rendered by default.

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include <icm/string_map.hpp>
44

55
#include <cstdint>
6+
#include <memory>
67
#include <string>
78
#include <vector>
89

10+
namespace deeplake_api { class catalog_table; }
11+
912
namespace pg::dl_catalog {
1013

1114
struct table_meta
@@ -15,6 +18,7 @@ struct table_meta
1518
std::string table_name;
1619
std::string dataset_path;
1720
std::string state;
21+
std::string db_name;
1822
int64_t updated_at = 0;
1923
};
2024

@@ -48,23 +52,39 @@ struct database_meta
4852
int64_t updated_at = 0;
4953
};
5054

55+
// Shared (cluster-wide) catalog: meta + databases
5156
int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds);
5257

53-
std::vector<table_meta> load_tables(const std::string& root_path, icm::string_map<> creds);
54-
std::vector<column_meta> load_columns(const std::string& root_path, icm::string_map<> creds);
55-
std::vector<index_meta> load_indexes(const std::string& root_path, icm::string_map<> creds);
58+
// Per-database catalog: tables + columns + indexes + meta
59+
int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
60+
61+
// Per-database loaders (read from {root}/{db_name}/__deeplake_catalog/)
62+
std::vector<table_meta> load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
63+
std::vector<column_meta> load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
64+
std::vector<index_meta> load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
5665

5766
// Load tables and columns in parallel for better performance
5867
std::pair<std::vector<table_meta>, std::vector<column_meta>>
59-
load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
68+
load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
6069

61-
void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
62-
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);
70+
// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/)
71+
void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta);
72+
void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<column_meta>& columns);
6373

74+
// Shared (cluster-wide) database catalog
6475
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);
6576
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta);
6677

78+
// Global (shared) catalog version
6779
int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds);
6880
void bump_catalog_version(const std::string& root_path, icm::string_map<> creds);
6981

82+
// Per-database catalog version
83+
int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
84+
void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
85+
86+
// Open the per-database meta table handle (for parallel .version() calls in sync worker)
87+
std::shared_ptr<deeplake_api::catalog_table>
88+
open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
89+
7090
} // namespace pg::dl_catalog

cpp/deeplake_pg/extension_init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ static void process_utility(PlannedStmt* pstmt,
691691
}
692692
if (!root_path.empty()) {
693693
auto creds = pg::session_credentials::get_credentials();
694+
pg::dl_catalog::ensure_catalog(root_path, creds);
694695
pg::dl_catalog::database_meta db_meta;
695696
db_meta.db_name = dbstmt->dbname;
696697
db_meta.state = "dropping";
@@ -727,6 +728,7 @@ static void process_utility(PlannedStmt* pstmt,
727728
}
728729
if (!root_path.empty()) {
729730
auto creds = pg::session_credentials::get_credentials();
731+
pg::dl_catalog::ensure_catalog(root_path, creds);
730732
pg::dl_catalog::database_meta db_meta;
731733
db_meta.db_name = dbstmt->dbname;
732734
db_meta.state = "ready";

cpp/deeplake_pg/sync_worker.cpp

Lines changed: 142 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,13 @@ extern "C" {
3232
#include "table_storage.hpp"
3333
#include "utils.hpp"
3434

35+
#include <async/promise.hpp>
36+
#include <deeplake_api/catalog_table.hpp>
37+
#include <icm/vector.hpp>
38+
3539
#include <algorithm>
3640
#include <cstring>
41+
#include <unordered_map>
3742
#include <vector>
3843

3944
// GUC variables
@@ -305,98 +310,151 @@ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::str
305310
}
306311

307312
/**
308-
* Sync tables from the deeplake catalog to PostgreSQL.
309-
*
310-
* This function checks the catalog for tables that exist in the deeplake
311-
* catalog but not in PostgreSQL, and creates them.
313+
* Sync tables for a specific database from pre-loaded catalog data via libpq.
314+
* Creates missing tables in the target database.
312315
*/
313-
void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string_map<> creds)
316+
void deeplake_sync_tables_for_db(const std::string& db_name,
317+
const std::vector<pg::dl_catalog::table_meta>& tables,
318+
const std::vector<pg::dl_catalog::column_meta>& columns)
314319
{
315-
// Load tables and columns in parallel for better performance
316-
auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_path, creds);
317-
318-
for (const auto& meta : catalog_tables) {
319-
// Skip tables marked as dropping
320+
for (const auto& meta : tables) {
320321
if (meta.state == "dropping") {
321322
continue;
322323
}
323324

324325
const std::string qualified_name = meta.schema_name + "." + meta.table_name;
325326

326-
// Check if table exists in PostgreSQL
327-
auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1);
328-
Oid relid = RangeVarGetRelid(rel, NoLock, true);
329-
330-
if (!OidIsValid(relid)) {
331-
// Gather columns for this table, sorted by position
332-
std::vector<pg::dl_catalog::column_meta> table_columns;
333-
for (const auto& col : catalog_columns) {
334-
if (col.table_id == meta.table_id) {
335-
table_columns.push_back(col);
336-
}
327+
// Gather columns for this table, sorted by position
328+
std::vector<pg::dl_catalog::column_meta> table_columns;
329+
for (const auto& col : columns) {
330+
if (col.table_id == meta.table_id) {
331+
table_columns.push_back(col);
337332
}
338-
std::sort(table_columns.begin(), table_columns.end(),
339-
[](const auto& a, const auto& b) { return a.position < b.position; });
333+
}
334+
std::sort(table_columns.begin(), table_columns.end(),
335+
[](const auto& a, const auto& b) { return a.position < b.position; });
340336

341-
if (table_columns.empty()) {
342-
elog(DEBUG1, "pg_deeplake sync: no columns found for table %s, skipping", qualified_name.c_str());
343-
continue;
344-
}
337+
if (table_columns.empty()) {
338+
elog(DEBUG1, "pg_deeplake sync: no columns for %s in db %s, skipping",
339+
qualified_name.c_str(), db_name.c_str());
340+
continue;
341+
}
345342

346-
const char* qschema = quote_identifier(meta.schema_name.c_str());
347-
const char* qtable = quote_identifier(meta.table_name.c_str());
343+
const char* qschema = quote_identifier(meta.schema_name.c_str());
344+
const char* qtable = quote_identifier(meta.table_name.c_str());
348345

349-
// Build CREATE TABLE IF NOT EXISTS statement
350-
StringInfoData buf;
351-
initStringInfo(&buf);
352-
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);
346+
// Combine schema + table creation into a single SQL statement
347+
StringInfoData buf;
348+
initStringInfo(&buf);
349+
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s; ", qschema);
350+
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);
353351

354-
bool first = true;
355-
for (const auto& col : table_columns) {
356-
if (!first) {
357-
appendStringInfoString(&buf, ", ");
358-
}
359-
first = false;
360-
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
352+
bool first = true;
353+
for (const auto& col : table_columns) {
354+
if (!first) {
355+
appendStringInfoString(&buf, ", ");
361356
}
362-
appendStringInfo(&buf, ") USING deeplake");
363-
364-
// Wrap in subtransaction so that if another backend concurrently
365-
// creates the same table (race on composite type), the error is
366-
// caught and we continue instead of aborting the sync cycle.
367-
MemoryContext saved_context = CurrentMemoryContext;
368-
ResourceOwner saved_owner = CurrentResourceOwner;
369-
370-
BeginInternalSubTransaction(NULL);
371-
PG_TRY();
372-
{
373-
pg::utils::spi_connector connector;
374-
375-
// Create schema if needed
376-
StringInfoData schema_buf;
377-
initStringInfo(&schema_buf);
378-
appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
379-
SPI_execute(schema_buf.data, false, 0);
380-
pfree(schema_buf.data);
381-
382-
if (SPI_execute(buf.data, false, 0) == SPI_OK_UTILITY) {
383-
elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str());
384-
}
357+
first = false;
358+
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
359+
}
360+
appendStringInfo(&buf, ") USING deeplake");
385361

386-
ReleaseCurrentSubTransaction();
387-
}
388-
PG_CATCH();
389-
{
390-
// Another backend created this table concurrently — not an error.
391-
MemoryContextSwitchTo(saved_context);
392-
CurrentResourceOwner = saved_owner;
393-
RollbackAndReleaseCurrentSubTransaction();
394-
FlushErrorState();
395-
elog(DEBUG1, "pg_deeplake sync: concurrent creation of %s, skipping", qualified_name.c_str());
362+
if (execute_via_libpq(db_name.c_str(), buf.data)) {
363+
elog(LOG, "pg_deeplake sync: created table %s in database %s",
364+
qualified_name.c_str(), db_name.c_str());
365+
}
366+
367+
pfree(buf.data);
368+
}
369+
}
370+
371+
/**
372+
* Sync all databases: check per-db versions in parallel, load changed ones,
373+
* create missing tables via libpq.
374+
*
375+
* Called OUTSIDE transaction context.
376+
*/
377+
void sync_all_databases(
378+
const std::string& root_path,
379+
icm::string_map<> creds,
380+
std::unordered_map<std::string, int64_t>& last_db_versions)
381+
{
382+
// Step 1: Sync databases (create missing ones, install extension)
383+
deeplake_sync_databases_from_catalog(root_path, creds);
384+
385+
// Step 2: Get list of all databases from the shared catalog
386+
auto databases = pg::dl_catalog::load_databases(root_path, creds);
387+
388+
// Always include "postgres" which may not be in the databases catalog
389+
bool has_postgres = false;
390+
for (const auto& db : databases) {
391+
if (db.db_name == "postgres") { has_postgres = true; break; }
392+
}
393+
if (!has_postgres) {
394+
pg::dl_catalog::database_meta pg_meta;
395+
pg_meta.db_name = "postgres";
396+
pg_meta.state = "ready";
397+
databases.push_back(std::move(pg_meta));
398+
}
399+
400+
// Step 3: Open per-db meta tables and check versions in parallel
401+
std::vector<std::string> db_names;
402+
std::vector<std::shared_ptr<deeplake_api::catalog_table>> meta_handles;
403+
404+
for (const auto& db : databases) {
405+
if (db.db_name == "template0" || db.db_name == "template1") {
406+
continue;
407+
}
408+
try {
409+
auto handle = pg::dl_catalog::open_db_meta_table(root_path, db.db_name, creds);
410+
if (handle) {
411+
db_names.push_back(db.db_name);
412+
meta_handles.push_back(std::move(handle));
396413
}
397-
PG_END_TRY();
414+
} catch (...) {
415+
// Per-db catalog may not exist yet — skip silently
416+
elog(DEBUG1, "pg_deeplake sync: no per-db catalog for '%s', skipping", db.db_name.c_str());
417+
}
418+
}
419+
420+
if (db_names.empty()) {
421+
return;
422+
}
398423

399-
pfree(buf.data);
424+
// Fire all version() promises in parallel (1 round-trip wall-clock)
425+
icm::vector<async::promise<uint64_t>> version_promises;
426+
version_promises.reserve(db_names.size());
427+
for (auto& handle : meta_handles) {
428+
version_promises.push_back(handle->version());
429+
}
430+
auto versions = async::combine(std::move(version_promises)).get_future().get();
431+
432+
// Step 4: Identify databases whose version changed since last sync
433+
std::vector<std::string> changed_dbs;
434+
for (size_t i = 0; i < db_names.size(); ++i) {
435+
int64_t ver = static_cast<int64_t>(versions[i]);
436+
auto it = last_db_versions.find(db_names[i]);
437+
if (it == last_db_versions.end() || it->second != ver) {
438+
changed_dbs.push_back(db_names[i]);
439+
last_db_versions[db_names[i]] = ver;
440+
}
441+
}
442+
443+
if (changed_dbs.empty()) {
444+
return;
445+
}
446+
447+
// Step 5: For each changed database, load tables+columns and sync
448+
for (const auto& db_name : changed_dbs) {
449+
try {
450+
auto [tables, columns] = pg::dl_catalog::load_tables_and_columns(root_path, db_name, creds);
451+
deeplake_sync_tables_for_db(db_name, tables, columns);
452+
elog(LOG, "pg_deeplake sync: synced %zu tables for database '%s'",
453+
tables.size(), db_name.c_str());
454+
} catch (const std::exception& e) {
455+
elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db_name.c_str(), e.what());
456+
} catch (...) {
457+
elog(WARNING, "pg_deeplake sync: failed to sync database '%s': unknown error", db_name.c_str());
400458
}
401459
}
402460
}
@@ -421,6 +479,7 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg)
421479

422480
int64_t last_catalog_version = 0;
423481
std::string last_root_path; // Track root_path to detect changes
482+
std::unordered_map<std::string, int64_t> last_db_versions;
424483

425484

426485
while (!got_sigterm) {
@@ -467,15 +526,16 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg)
467526
if (!last_root_path.empty()) {
468527
pg::table_storage::instance().reset_and_load_table_metadata();
469528
last_catalog_version = 0;
529+
last_db_versions.clear();
470530
}
471531
last_root_path = root_path;
472532
}
473533

474-
// Use existing catalog version API to check for changes (now fast with cache)
534+
// Fast global version check (single HEAD request via cached meta table)
475535
int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds);
476536

477537
if (current_version != last_catalog_version) {
478-
// Save state for database sync (which happens outside transaction)
538+
// Save state for sync (which happens outside transaction)
479539
sync_root_path = root_path;
480540
sync_creds = creds;
481541
need_sync = true;
@@ -494,36 +554,16 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg)
494554
PopActiveSnapshot();
495555
CommitTransactionCommand();
496556

497-
// Sync databases via libpq OUTSIDE transaction context
498-
// (CREATE DATABASE cannot run inside a transaction block)
557+
// All sync happens OUTSIDE transaction context via libpq
499558
if (need_sync && !sync_root_path.empty()) {
500559
try {
501-
deeplake_sync_databases_from_catalog(sync_root_path, sync_creds);
560+
sync_all_databases(sync_root_path, sync_creds, last_db_versions);
561+
elog(DEBUG1, "pg_deeplake sync: completed (global version %ld)", last_catalog_version);
502562
} catch (const std::exception& e) {
503-
elog(WARNING, "pg_deeplake sync: database sync failed: %s", e.what());
563+
elog(WARNING, "pg_deeplake sync: sync failed: %s", e.what());
504564
} catch (...) {
505-
elog(WARNING, "pg_deeplake sync: database sync failed: unknown error");
565+
elog(WARNING, "pg_deeplake sync: sync failed: unknown error");
506566
}
507-
508-
// Re-enter transaction for table sync
509-
SetCurrentStatementStartTimestamp();
510-
StartTransactionCommand();
511-
PushActiveSnapshot(GetTransactionSnapshot());
512-
513-
PG_TRY();
514-
{
515-
deeplake_sync_tables_from_catalog(sync_root_path, sync_creds);
516-
elog(DEBUG1, "pg_deeplake sync: synced (catalog version %ld)", last_catalog_version);
517-
}
518-
PG_CATCH();
519-
{
520-
EmitErrorReport();
521-
FlushErrorState();
522-
}
523-
PG_END_TRY();
524-
525-
PopActiveSnapshot();
526-
CommitTransactionCommand();
527567
}
528568

529569
pgstat_report_stat(true);

0 commit comments

Comments
 (0)