Skip to content

Commit 47d76a7

Browse files
committed
store: Add restore orchestration in SubgraphStore and DeploymentStore
Wire up the restore pipeline through SubgraphStore::restore() and DeploymentStore::restore(). Uses plan_restore() to determine whether to create or replace the deployment site, validates the target shard exists, resolves the subgraph name for deployment rule matching, and assigns the restored deployment to a node. Changes: - DeploymentStore::restore() coordinates schema creation, data import, and finalization - Inner::restore() handles conflict resolution, site allocation, and node assignment via deployment rules - Expose create_site() and find_active_site() on primary::Connection - Make create_site() accept an `active` parameter
1 parent 4e8f7bf commit 47d76a7

8 files changed

Lines changed: 166 additions & 16 deletions

File tree

store/postgres/src/deployment_store.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::collections::{BTreeMap, HashMap};
3030
use std::convert::Into;
3131
use std::ops::Bound;
3232
use std::ops::{Deref, Range};
33-
use std::path::PathBuf;
33+
use std::path::{Path, PathBuf};
3434
use std::str::FromStr;
3535
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
3636
use std::time::{Duration, Instant};
@@ -909,6 +909,21 @@ impl DeploymentStore {
909909
.dump(&mut conn, index_list, dir, &site.network, entity_count)
910910
.await
911911
}
912+
913+
pub(crate) async fn restore(
914+
&self,
915+
site: Arc<Site>,
916+
dir: &Path,
917+
metadata: &crate::relational::dump::Metadata,
918+
) -> Result<(), StoreError> {
919+
let mut conn = self.pool.get_permitted().await?;
920+
let layout =
921+
crate::relational::restore::create_schema(&mut conn, site, metadata, dir).await?;
922+
crate::relational::restore::import_data(&mut conn, &layout, metadata, dir, &self.logger)
923+
.await?;
924+
crate::relational::restore::finalize(&mut conn, &layout, metadata, &self.logger).await?;
925+
Ok(())
926+
}
912927
}
913928

914929
/// Methods that back the trait `WritableStore`, but have small variations in their signatures

store/postgres/src/parquet/convert.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ impl ColumnBuilder {
238238
}
239239

240240
/// A row extracted from a Parquet file, ready for insertion into an entity table.
241-
#[allow(dead_code)]
242241
pub struct RestoreRow {
243242
pub vid: i64,
244243
pub block: BlockNumber,
@@ -253,7 +252,6 @@ pub struct RestoreRow {
253252
///
254253
/// The batch must follow the column layout produced by `schema::arrow_schema`:
255254
/// `vid`, block tracking columns, optional `causality_region`, then data columns.
256-
#[allow(dead_code)]
257255
pub fn record_batch_to_restore_rows(
258256
batch: &RecordBatch,
259257
table: &Table,
@@ -337,7 +335,6 @@ pub fn record_batch_to_restore_rows(
337335
}
338336

339337
/// A row extracted from a `data_sources$` Parquet file.
340-
#[allow(dead_code)]
341338
pub struct DataSourceRestoreRow {
342339
pub vid: i64,
343340
pub block_range_start: BlockNumber,
@@ -356,7 +353,6 @@ pub struct DataSourceRestoreRow {
356353
///
357354
/// The batch must follow the fixed column layout from
358355
/// `schema::data_sources_arrow_schema`.
359-
#[allow(dead_code)]
360356
pub fn record_batch_to_data_source_rows(
361357
batch: &RecordBatch,
362358
) -> Result<Vec<DataSourceRestoreRow>, StoreError> {

store/postgres/src/parquet/reader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1010
/// Opens the file, reads all row groups, and returns them as a vector
1111
/// of `RecordBatch`es. The batches retain the schema embedded in the
1212
/// Parquet file.
13-
#[allow(dead_code)]
1413
pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> {
1514
let file = fs::File::open(path).map_err(|e| {
1615
StoreError::InternalError(format!(

store/postgres/src/primary.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ impl Connection {
13911391
/// function only performs the basic operations for creation, and the
13921392
/// caller must check that other conditions (like whether there already
13931393
/// is an active site for the deployment) are met
1394-
async fn create_site(
1394+
pub(crate) async fn create_site(
13951395
&mut self,
13961396
shard: Shard,
13971397
deployment: DeploymentHash,
@@ -1467,6 +1467,13 @@ impl Connection {
14671467
.map(|site| (site, site_was_created))
14681468
}
14691469

1470+
pub async fn find_active_site(
1471+
&mut self,
1472+
subgraph: &DeploymentHash,
1473+
) -> Result<Option<Site>, StoreError> {
1474+
queries::find_active_site(&mut self.conn, subgraph).await
1475+
}
1476+
14701477
pub async fn assigned_node(&mut self, site: &Site) -> Result<Option<NodeId>, StoreError> {
14711478
queries::assigned_node(&mut self.conn, site).await
14721479
}

store/postgres/src/relational/dump.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ pub(crate) struct Metadata {
167167

168168
impl Metadata {
169169
/// Read and validate a dump's `metadata.json`.
170-
#[allow(dead_code)]
171170
pub fn from_file(path: &Path) -> Result<Self, StoreError> {
172171
let content = fs::read_to_string(path).map_err(|e| {
173172
StoreError::InternalError(format!("failed to read {}: {e}", path.display()))

store/postgres/src/relational/restore.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ async fn import_data_sources(
220220
/// - Obtaining the right shard connection
221221
///
222222
/// Entity data import and finalization are handled separately.
223-
#[allow(dead_code)]
224223
pub async fn create_schema(
225224
conn: &mut AsyncPgConnection,
226225
site: Arc<Site>,
@@ -319,7 +318,6 @@ pub async fn create_schema(
319318
/// This is resumable: if interrupted, it can be called again and will
320319
/// skip already-imported rows by checking the current max(vid) in each
321320
/// table.
322-
#[allow(dead_code)]
323321
pub async fn import_data(
324322
conn: &mut AsyncPgConnection,
325323
layout: &Layout,
@@ -365,7 +363,6 @@ pub async fn import_data(
365363
/// This must be called after `import_data` has completed successfully.
366364
/// Setting the head block is the very last operation — it marks the
367365
/// deployment as "ready".
368-
#[allow(dead_code)]
369366
pub async fn finalize(
370367
conn: &mut AsyncPgConnection,
371368
layout: &Layout,

store/postgres/src/relational_queries.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2355,7 +2355,6 @@ impl<'a> InsertRow<'a> {
23552355
/// Unlike `new()`, this looks up values by SQL column name rather than
23562356
/// entity field name, since `RestoreRow.values` is keyed by SQL name.
23572357
/// Fulltext columns are regenerated from their source fields.
2358-
#[allow(dead_code)]
23592358
fn from_restore(
23602359
columns: &[&'a Column],
23612360
row: &'a RestoreRow,
@@ -2460,7 +2459,6 @@ impl<'a> InsertQuery<'a> {
24602459
/// All data columns from the dump are present, so `unique_columns`
24612460
/// includes every column in the table. Fulltext columns are
24622461
/// regenerated from their source fields.
2463-
#[allow(dead_code)]
24642462
pub fn for_restore(
24652463
table: &'a Table,
24662464
rows: &'a [RestoreRow],

store/postgres/src/subgraph_store.rs

Lines changed: 142 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use graph::{
1818
components::{
1919
server::index_node::VersionInfo,
2020
store::{
21-
self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait,
22-
PruneReporter, PruneRequest, SubgraphFork,
21+
self, BlockPtrForNumber, BlockStore, DeploymentLocator, DeploymentSchemaVersion,
22+
EnsLookup as EnsLookupTrait, PruneReporter, PruneRequest, SubgraphFork,
2323
},
2424
},
2525
data::{
@@ -42,7 +42,9 @@ use graph::{derive::CheapClone, futures03::future::join_all, prelude::alloy::pri
4242
use crate::{
4343
catalog::Catalog,
4444
deployment::{OnSync, SubgraphHealth},
45-
primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site},
45+
primary::{
46+
self, DeploymentId, Mirror as PrimaryMirror, Primary, RestoreAction, RestoreMode, Site,
47+
},
4648
relational::{
4749
self,
4850
index::{IndexList, Method},
@@ -735,6 +737,35 @@ impl Inner {
735737
}
736738
}
737739

740+
/// Determine the target node for a deployment using the configured
741+
/// deployment rules, ignoring the shard selection. Returns an error
742+
/// if no rule matches.
743+
async fn node_for_deployment(
744+
&self,
745+
name: &SubgraphName,
746+
network: &str,
747+
) -> Result<NodeId, StoreError> {
748+
let placement = self
749+
.placer
750+
.place(name.as_str(), network)
751+
.map_err(|msg| internal_error!("illegal indexer name in deployment rule: {}", msg))?;
752+
753+
match placement {
754+
Some((_, nodes)) if !nodes.is_empty() => {
755+
if nodes.len() == 1 {
756+
Ok(nodes.into_iter().next().unwrap())
757+
} else {
758+
let mut pconn = self.primary_conn().await?;
759+
// unwrap: nodes is not empty
760+
Ok(pconn.least_assigned_node(&nodes).await?.unwrap())
761+
}
762+
}
763+
_ => Err(StoreError::InternalError(
764+
"no deployment rule matches this deployment".into(),
765+
)),
766+
}
767+
}
768+
738769
pub async fn copy_deployment(
739770
&self,
740771
src: &DeploymentLocator,
@@ -1446,6 +1477,114 @@ impl Inner {
14461477

14471478
store.dump(site, directory).await
14481479
}
1480+
1481+
pub async fn restore(
1482+
&self,
1483+
dir: &std::path::Path,
1484+
shard: Shard,
1485+
name: Option<SubgraphName>,
1486+
mode: RestoreMode,
1487+
) -> Result<(), StoreError> {
1488+
use crate::relational::dump::Metadata;
1489+
1490+
let metadata_path = dir.join("metadata.json");
1491+
let metadata = Metadata::from_file(&metadata_path)?;
1492+
1493+
// Validate that the target shard exists before making any DB changes
1494+
self.stores
1495+
.get(&shard)
1496+
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
1497+
1498+
// Resolve the subgraph name for deployment rule matching. If not
1499+
// supplied, look up an existing name from the DB; error if none.
1500+
let name = match name {
1501+
Some(n) => n,
1502+
None => {
1503+
let names = self
1504+
.mirror
1505+
.subgraphs_by_deployment_hash(metadata.deployment.as_str())
1506+
.await?;
1507+
let (name, _) = names.into_iter().next().ok_or_else(|| {
1508+
StoreError::InternalError(
1509+
"no subgraph name found for this deployment; use --name to specify one"
1510+
.into(),
1511+
)
1512+
})?;
1513+
SubgraphName::new(name).map_err(|n| {
1514+
StoreError::InternalError(format!("invalid subgraph name `{n}` in database"))
1515+
})?
1516+
}
1517+
};
1518+
1519+
// Use deployment rules to determine which node should index this
1520+
// deployment. The rules also return candidate shards, but we ignore
1521+
// those since the shard is user-specified for restore.
1522+
let node = self.node_for_deployment(&name, &metadata.network).await?;
1523+
1524+
let mut pconn = self.primary_conn().await?;
1525+
let action = pconn
1526+
.plan_restore(&shard, &metadata.deployment, &mode)
1527+
.await?;
1528+
1529+
// Determine schema_version the same way allocate_site does
1530+
let schema_version = match metadata.graft_base.as_ref() {
1531+
Some(graft_base) => {
1532+
let base_site = pconn.find_active_site(graft_base).await?.ok_or_else(|| {
1533+
StoreError::DeploymentNotFound("graft_base not found".to_string())
1534+
})?;
1535+
base_site.schema_version
1536+
}
1537+
None => DeploymentSchemaVersion::LATEST,
1538+
};
1539+
1540+
let site = match action {
1541+
RestoreAction::Create { active } => {
1542+
pconn
1543+
.create_site(
1544+
shard,
1545+
metadata.deployment.clone(),
1546+
metadata.network.clone(),
1547+
schema_version,
1548+
active,
1549+
)
1550+
.await?
1551+
}
1552+
RestoreAction::Replace { existing } => {
1553+
let was_active = existing.active;
1554+
let existing = Arc::new(existing);
1555+
let store = self.for_site(&existing)?;
1556+
store.drop_deployment(&existing).await?;
1557+
pconn.drop_site(&existing).await?;
1558+
// Drop and re-acquire the primary connection to avoid pool
1559+
// deadlock: drop_deployment above used a separate connection
1560+
// from the same pool, and create_site below needs one too.
1561+
drop(pconn);
1562+
let mut pconn = self.primary_conn().await?;
1563+
pconn
1564+
.create_site(
1565+
shard,
1566+
metadata.deployment.clone(),
1567+
metadata.network.clone(),
1568+
schema_version,
1569+
was_active,
1570+
)
1571+
.await?
1572+
}
1573+
};
1574+
1575+
let site = Arc::new(site);
1576+
let store = self.for_site(&site)?;
1577+
store.restore(site.cheap_clone(), dir, &metadata).await?;
1578+
1579+
// Assign the restored deployment to the node determined by
1580+
// deployment rules
1581+
let mut pconn = self.primary_conn().await?;
1582+
let changes = pconn.assign_subgraph(&site, &node).await?;
1583+
let event = StoreEvent::new(changes);
1584+
pconn.send_store_event(&self.sender, &event).await?;
1585+
1586+
Ok(())
1587+
}
14491588
}
14501589

14511590
const STATE_ENS_NOT_CHECKED: u8 = 0;

0 commit comments

Comments
 (0)