Skip to content

Commit d01f813

Browse files
committed
Added migration for stateless.
1 parent 28b4990 commit d01f813

5 files changed

Lines changed: 892 additions & 66 deletions

File tree

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 132 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include <icm/vector.hpp>
1313

1414
#include <algorithm>
15+
#include <chrono>
16+
#include <filesystem>
1517
#include <unordered_map>
1618

1719
extern "C" {
@@ -67,12 +69,6 @@ struct catalog_table_cache
6769
}
6870
};
6971

70-
std::shared_ptr<deeplake_api::catalog_table>
71-
open_or_create_table(const std::string& path, deeplake_api::catalog_table_schema schema, icm::string_map<> creds)
72-
{
73-
return deeplake_api::open_or_create_catalog_table(path, schema, std::move(creds)).get_future().get();
74-
}
75-
7672
int64_t now_ms()
7773
{
7874
using namespace std::chrono;
@@ -137,63 +133,137 @@ void ensure_catalog(const std::string& root_path, icm::string_map<> creds)
137133
const auto indexes_path = join_path(root_path, k_indexes_name);
138134
const auto meta_path = join_path(root_path, k_meta_name);
139135

140-
// Build schemas for all catalog tables
141-
deeplake_api::catalog_table_schema tables_schema;
142-
tables_schema.add("table_id", deeplake_core::type::text(codecs::compression::null))
143-
.add("schema_name", deeplake_core::type::text(codecs::compression::null))
144-
.add("table_name", deeplake_core::type::text(codecs::compression::null))
145-
.add("dataset_path", deeplake_core::type::text(codecs::compression::null))
146-
.add("state", deeplake_core::type::text(codecs::compression::null))
147-
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
148-
.set_primary_key("table_id");
149-
150-
deeplake_api::catalog_table_schema columns_schema;
151-
columns_schema.add("column_id", deeplake_core::type::text(codecs::compression::null))
152-
.add("table_id", deeplake_core::type::text(codecs::compression::null))
153-
.add("column_name", deeplake_core::type::text(codecs::compression::null))
154-
.add("pg_type", deeplake_core::type::text(codecs::compression::null))
155-
.add("dl_type_json", deeplake_core::type::text(codecs::compression::null))
156-
.add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean)))
157-
.add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32)))
158-
.set_primary_key("column_id");
159-
160-
deeplake_api::catalog_table_schema indexes_schema;
161-
indexes_schema.add("table_id", deeplake_core::type::text(codecs::compression::null))
162-
.add("column_names", deeplake_core::type::text(codecs::compression::null))
163-
.add("index_type", deeplake_core::type::text(codecs::compression::null))
164-
.add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32)))
165-
.set_primary_key("table_id");
166-
167-
deeplake_api::catalog_table_schema meta_schema;
168-
meta_schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
169-
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
170-
.set_primary_key("catalog_version");
171-
172-
// Launch all 4 open_or_create operations in parallel
173-
icm::vector<async::promise<std::shared_ptr<deeplake_api::catalog_table>>> promises;
174-
promises.reserve(4);
175-
promises.push_back(
176-
deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds)));
177-
promises.push_back(
178-
deeplake_api::open_or_create_catalog_table(columns_path, std::move(columns_schema), icm::string_map<>(creds)));
179-
promises.push_back(
180-
deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds)));
181-
promises.push_back(
182-
deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds)));
183-
184-
// Wait for all to complete
185-
auto results = async::combine(std::move(promises)).get_future().get();
186-
187-
// Initialize meta table if empty (index 3 is meta)
188-
auto& meta_table = results[3];
189-
if (meta_table) {
190-
auto snapshot = meta_table->read().get_future().get();
191-
if (snapshot.row_count() == 0) {
192-
icm::string_map<nd::array> row;
193-
row["catalog_version"] = nd::adapt(static_cast<int64_t>(1));
194-
row["updated_at"] = nd::adapt(now_ms());
195-
meta_table->insert(std::move(row)).get_future().get();
136+
auto migrate_legacy_path_if_needed = [&](const std::string& path) {
137+
const bool is_remote_path = path.find("://") != std::string::npos;
138+
139+
if (!is_remote_path) {
140+
// Local path migration: only remove clear path collisions (e.g. file at table path).
141+
std::error_code ec;
142+
if (!std::filesystem::exists(path, ec)) {
143+
return;
144+
}
145+
if (std::filesystem::is_directory(path, ec)) {
146+
// Keep directories intact; open_or_create handles valid local catalog dirs.
147+
return;
148+
}
149+
150+
elog(WARNING,
151+
"Catalog path %s is a non-directory filesystem artifact. Removing it before catalog initialization.",
152+
path.c_str());
153+
if (!std::filesystem::remove(path, ec) && ec) {
154+
elog(ERROR, "Failed to migrate local catalog path %s: %s", path.c_str(), ec.message().c_str());
155+
}
156+
return;
157+
}
158+
159+
// Remote path migration for legacy non-catalog datasets.
160+
bool exists = false;
161+
try {
162+
exists = deeplake_api::exists(path, icm::string_map<>(creds)).get_future().get();
163+
} catch (...) {
164+
exists = false;
196165
}
166+
if (!exists) {
167+
return;
168+
}
169+
170+
bool is_catalog = false;
171+
try {
172+
is_catalog = deeplake_api::is_catalog_table(path, icm::string_map<>(creds)).get_future().get();
173+
} catch (...) {
174+
is_catalog = false;
175+
}
176+
if (!is_catalog) {
177+
elog(WARNING,
178+
"Existing remote catalog path %s is not a catalog table. Recreating catalog table.",
179+
path.c_str());
180+
try {
181+
deeplake_api::delete_dataset(path, icm::string_map<>(creds)).get_future().get();
182+
} catch (const std::exception& e) {
183+
elog(ERROR, "Failed to migrate remote catalog path %s: %s", path.c_str(), e.what());
184+
} catch (...) {
185+
elog(ERROR, "Failed to migrate remote catalog path %s: unknown error", path.c_str());
186+
}
187+
}
188+
};
189+
190+
try {
191+
// Handle legacy non-catalog artifacts before creating tables.
192+
migrate_legacy_path_if_needed(tables_path);
193+
migrate_legacy_path_if_needed(columns_path);
194+
migrate_legacy_path_if_needed(indexes_path);
195+
migrate_legacy_path_if_needed(meta_path);
196+
197+
// Build schemas for all catalog tables
198+
deeplake_api::catalog_table_schema tables_schema;
199+
tables_schema.add("table_id", deeplake_core::type::text(codecs::compression::null))
200+
.add("schema_name", deeplake_core::type::text(codecs::compression::null))
201+
.add("table_name", deeplake_core::type::text(codecs::compression::null))
202+
.add("dataset_path", deeplake_core::type::text(codecs::compression::null))
203+
.add("state", deeplake_core::type::text(codecs::compression::null))
204+
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
205+
.set_primary_key("table_id");
206+
207+
deeplake_api::catalog_table_schema columns_schema;
208+
columns_schema.add("column_id", deeplake_core::type::text(codecs::compression::null))
209+
.add("table_id", deeplake_core::type::text(codecs::compression::null))
210+
.add("column_name", deeplake_core::type::text(codecs::compression::null))
211+
.add("pg_type", deeplake_core::type::text(codecs::compression::null))
212+
.add("dl_type_json", deeplake_core::type::text(codecs::compression::null))
213+
.add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean)))
214+
.add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32)))
215+
.set_primary_key("column_id");
216+
217+
deeplake_api::catalog_table_schema indexes_schema;
218+
indexes_schema.add("table_id", deeplake_core::type::text(codecs::compression::null))
219+
.add("column_names", deeplake_core::type::text(codecs::compression::null))
220+
.add("index_type", deeplake_core::type::text(codecs::compression::null))
221+
.add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32)))
222+
.set_primary_key("table_id");
223+
224+
deeplake_api::catalog_table_schema meta_schema;
225+
meta_schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
226+
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
227+
.set_primary_key("catalog_version");
228+
229+
// Launch all 4 open_or_create operations in parallel
230+
icm::vector<async::promise<std::shared_ptr<deeplake_api::catalog_table>>> promises;
231+
promises.reserve(4);
232+
promises.push_back(
233+
deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds)));
234+
promises.push_back(
235+
deeplake_api::open_or_create_catalog_table(columns_path, std::move(columns_schema), icm::string_map<>(creds)));
236+
promises.push_back(
237+
deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds)));
238+
promises.push_back(
239+
deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds)));
240+
241+
// Wait for all to complete
242+
auto results = async::combine(std::move(promises)).get_future().get();
243+
if (results.size() != 4) {
244+
elog(ERROR,
245+
"Failed to initialize catalog at %s: expected 4 catalog tables, got %zu",
246+
root_path.c_str(),
247+
static_cast<size_t>(results.size()));
248+
}
249+
250+
// Initialize meta table if empty (index 3 is meta)
251+
auto& meta_table = results[3];
252+
if (meta_table) {
253+
auto snapshot = meta_table->read().get_future().get();
254+
if (snapshot.row_count() == 0) {
255+
icm::string_map<nd::array> row;
256+
row["catalog_version"] = nd::adapt(static_cast<int64_t>(1));
257+
row["updated_at"] = nd::adapt(now_ms());
258+
meta_table->insert(std::move(row)).get_future().get();
259+
}
260+
}
261+
} catch (const std::exception& e) {
262+
catalog_table_cache::instance().invalidate();
263+
elog(ERROR, "Failed to ensure catalog at %s: %s", root_path.c_str(), e.what());
264+
} catch (...) {
265+
catalog_table_cache::instance().invalidate();
266+
elog(ERROR, "Failed to ensure catalog at %s: unknown error", root_path.c_str());
197267
}
198268
}
199269

0 commit comments

Comments
 (0)