Skip to content

Commit 6bb9c78

Browse files
committed
store: Add core restore module for schema & deployment creation
1 parent 9a1e8b4 commit 6bb9c78

2 files changed

Lines changed: 143 additions & 0 deletions

File tree

store/postgres/src/relational.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub(crate) mod dsl;
1818
pub(crate) mod dump;
1919
pub(crate) mod index;
2020
pub(crate) mod prune;
21+
pub(crate) mod restore;
2122
pub(crate) mod rollup;
2223
pub(crate) mod value;
2324

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//! Restore a subgraph deployment from a dump directory.
2+
//!
3+
//! The dump directory must contain:
4+
//! - `metadata.json` — deployment metadata and per-table state
5+
//! - `schema.graphql` — raw GraphQL schema text
6+
//! - Per-entity Parquet files in subdirectories
7+
8+
use std::collections::BTreeSet;
9+
use std::fs;
10+
use std::path::Path;
11+
use std::sync::Arc;
12+
13+
use diesel_async::SimpleAsyncConnection;
14+
15+
use graph::blockchain::BlockHash;
16+
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphManifestEntity};
17+
use graph::prelude::{BlockPtr as GraphBlockPtr, StoreError};
18+
use graph::schema::{EntityType, InputSchema};
19+
use graph::semver::Version;
20+
21+
use crate::deployment::create_deployment;
22+
use crate::dynds::DataSourcesTable;
23+
use crate::primary::Site;
24+
use crate::relational::dump::Metadata;
25+
use crate::relational::Layout;
26+
use crate::AsyncPgConnection;
27+
28+
/// Convert a dump `BlockPtr` (hex hash string) to a graph `BlockPtr`.
29+
fn to_graph_block_ptr(bp: &super::dump::BlockPtr) -> Result<GraphBlockPtr, StoreError> {
30+
let hash = BlockHash::try_from(bp.hash.as_str())
31+
.map_err(|e| StoreError::InternalError(format!("invalid block hash '{}': {e}", bp.hash)))?;
32+
Ok(GraphBlockPtr {
33+
number: bp.number,
34+
hash,
35+
})
36+
}
37+
38+
/// Restore a subgraph deployment's schema and metadata from a dump
39+
/// directory.
40+
///
41+
/// This creates the deployment metadata rows (`subgraphs.head`,
42+
/// `subgraphs.deployment`, `subgraphs.subgraph_manifest`), entity
43+
/// tables, and optionally the `data_sources$` table.
44+
///
45+
/// The caller is responsible for:
46+
/// - Reading `metadata.json` via `Metadata::from_file()`
47+
/// - Site allocation and conflict resolution (force-drop)
48+
/// - Obtaining the right shard connection
49+
///
50+
/// Entity data import and finalization are handled separately.
51+
#[allow(dead_code)]
52+
pub async fn create_schema(
53+
conn: &mut AsyncPgConnection,
54+
site: Arc<Site>,
55+
metadata: &Metadata,
56+
dir: &Path,
57+
) -> Result<Layout, StoreError> {
58+
// 1. Read schema.graphql
59+
let schema_path = dir.join("schema.graphql");
60+
let schema_text = fs::read_to_string(&schema_path).map_err(|e| {
61+
StoreError::InternalError(format!("failed to read {}: {e}", schema_path.display()))
62+
})?;
63+
64+
// 2. Read subgraph.yaml (optional)
65+
let yaml_path = dir.join("subgraph.yaml");
66+
let raw_yaml = fs::read_to_string(&yaml_path).ok();
67+
68+
// 3. Parse schema
69+
let spec_version = Version::parse(&metadata.manifest.spec_version).map_err(|e| {
70+
StoreError::InternalError(format!(
71+
"invalid spec_version '{}': {e}",
72+
metadata.manifest.spec_version
73+
))
74+
})?;
75+
let input_schema = InputSchema::parse(&spec_version, &schema_text, site.deployment.clone())?;
76+
77+
// 4. Resolve entities_with_causality_region from names
78+
let entities_with_causality_region: BTreeSet<EntityType> = metadata
79+
.manifest
80+
.entities_with_causality_region
81+
.iter()
82+
.map(|name| input_schema.entity_type(name))
83+
.collect::<Result<_, _>>()
84+
.map_err(StoreError::from)?;
85+
86+
// 5. Build SubgraphManifestEntity for create_deployment
87+
let manifest_entity = SubgraphManifestEntity {
88+
spec_version: metadata.manifest.spec_version.clone(),
89+
description: metadata.manifest.description.clone(),
90+
repository: metadata.manifest.repository.clone(),
91+
features: metadata.manifest.features.clone(),
92+
schema: schema_text,
93+
raw_yaml,
94+
entities_with_causality_region: entities_with_causality_region.iter().cloned().collect(),
95+
history_blocks: metadata.manifest.history_blocks,
96+
};
97+
98+
let start_block = metadata
99+
.start_block
100+
.as_ref()
101+
.map(to_graph_block_ptr)
102+
.transpose()?;
103+
let graft_block = metadata
104+
.graft_block
105+
.as_ref()
106+
.map(to_graph_block_ptr)
107+
.transpose()?;
108+
109+
let create = DeploymentCreate {
110+
manifest: manifest_entity,
111+
start_block,
112+
graft_base: metadata.graft_base.clone(),
113+
graft_block,
114+
debug_fork: metadata.debug_fork.clone(),
115+
history_blocks_override: None,
116+
};
117+
118+
// 6. Create deployment metadata rows
119+
create_deployment(conn, &site, create, false, false).await?;
120+
121+
// 7. Create database schema and entity tables
122+
let query = format!("create schema {}", &site.namespace);
123+
conn.batch_execute(&query).await?;
124+
125+
let layout = Layout::create_relational_schema(
126+
conn,
127+
site.clone(),
128+
&input_schema,
129+
entities_with_causality_region,
130+
None,
131+
)
132+
.await?;
133+
134+
// 8. Create data_sources$ table if present in dump
135+
if metadata.tables.contains_key("data_sources$") {
136+
let ds_table = DataSourcesTable::new(site.namespace.clone());
137+
let ddl = ds_table.as_ddl();
138+
conn.batch_execute(&ddl).await?;
139+
}
140+
141+
Ok(layout)
142+
}

0 commit comments

Comments
 (0)