Skip to content

Commit 259216f

Browse files
committed
store: Add resumable entity data import for restore
Add import_data() to the restore module that reads Parquet chunks and inserts entity data into PostgreSQL tables. Supports resumability by checking max(vid) in each table and skipping already-imported rows. Entity tables use InsertQuery::for_restore() for efficient batch inserts. The data_sources$ table uses raw SQL with bind parameters since it has a fixed schema outside the Layout.
1 parent 6bb9c78 commit 259216f

1 file changed

Lines changed: 221 additions & 6 deletions

File tree

store/postgres/src/relational/restore.rs

Lines changed: 221 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,29 @@ use std::fs;
1010
use std::path::Path;
1111
use std::sync::Arc;
1212

13-
use diesel_async::SimpleAsyncConnection;
14-
13+
use diesel::sql_types::{BigInt, Binary, Integer, Nullable, Text};
14+
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};
1515
use graph::blockchain::BlockHash;
1616
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphManifestEntity};
17-
use graph::prelude::{BlockPtr as GraphBlockPtr, StoreError};
17+
use graph::prelude::{info, BlockPtr as GraphBlockPtr, Logger, StoreError};
1818
use graph::schema::{EntityType, InputSchema};
1919
use graph::semver::Version;
2020

2121
use crate::deployment::create_deployment;
2222
use crate::dynds::DataSourcesTable;
23+
use crate::parquet::convert::{
24+
record_batch_to_data_source_rows, record_batch_to_restore_rows, DataSourceRestoreRow,
25+
};
26+
use crate::parquet::reader::read_batches;
2327
use crate::primary::Site;
24-
use crate::relational::dump::Metadata;
25-
use crate::relational::Layout;
28+
use crate::relational::dump::{Metadata, TableInfo};
29+
use crate::relational::{Layout, Table};
30+
use crate::relational_queries::InsertQuery;
31+
use crate::vid_batcher::VidRange;
2632
use crate::AsyncPgConnection;
2733

34+
const DATA_SOURCES_TABLE: &str = "data_sources$";
35+
2836
/// Convert a dump `BlockPtr` (hex hash string) to a graph `BlockPtr`.
2937
fn to_graph_block_ptr(bp: &super::dump::BlockPtr) -> Result<GraphBlockPtr, StoreError> {
3038
let hash = BlockHash::try_from(bp.hash.as_str())
@@ -35,6 +43,167 @@ fn to_graph_block_ptr(bp: &super::dump::BlockPtr) -> Result<GraphBlockPtr, Store
3543
})
3644
}
3745

46+
/// Query the current max(vid) for a table. Returns -1 if the table is empty.
47+
async fn current_max_vid(
48+
conn: &mut AsyncPgConnection,
49+
qualified_name: &str,
50+
) -> Result<i64, StoreError> {
51+
let query = format!(
52+
"select coalesce(min(vid), 0)::int8 as min_vid, \
53+
coalesce(max(vid), -1)::int8 as max_vid \
54+
from {}",
55+
qualified_name
56+
);
57+
let range: VidRange = diesel::sql_query(&query)
58+
.get_result(conn)
59+
.await
60+
.map_err(StoreError::from)?;
61+
Ok(range.max)
62+
}
63+
64+
/// Import a single entity table from Parquet chunks.
65+
///
66+
/// Supports resumability: checks the current max(vid) in the DB table
67+
/// and skips already-imported rows.
68+
async fn import_entity_table(
69+
conn: &mut AsyncPgConnection,
70+
table: &Table,
71+
table_info: &TableInfo,
72+
dir: &Path,
73+
logger: &Logger,
74+
) -> Result<usize, StoreError> {
75+
if table_info.chunks.is_empty() || table_info.max_vid < 0 {
76+
return Ok(0);
77+
}
78+
79+
let max_vid_db = current_max_vid(conn, table.qualified_name.as_str()).await?;
80+
if max_vid_db >= table_info.max_vid {
81+
info!(
82+
logger,
83+
"Table {} already fully restored, skipping",
84+
table.object.as_str()
85+
);
86+
return Ok(0);
87+
}
88+
89+
let chunk_size = InsertQuery::chunk_size(table);
90+
let mut total_inserted = 0usize;
91+
92+
for chunk_info in &table_info.chunks {
93+
// Skip chunks that are fully imported
94+
if chunk_info.max_vid <= max_vid_db {
95+
continue;
96+
}
97+
98+
let chunk_path = dir.join(&chunk_info.file);
99+
let batches = read_batches(&chunk_path)?;
100+
101+
for batch in &batches {
102+
let mut rows = record_batch_to_restore_rows(batch, table)?;
103+
104+
// Filter out already-imported rows (for boundary chunks on resume)
105+
if max_vid_db >= 0 {
106+
rows.retain(|row| row.vid > max_vid_db);
107+
}
108+
109+
if rows.is_empty() {
110+
continue;
111+
}
112+
113+
// Split into InsertQuery-sized chunks and execute
114+
for chunk in rows.chunks(chunk_size) {
115+
InsertQuery::for_restore(table, chunk)?
116+
.execute(conn)
117+
.await?;
118+
total_inserted += chunk.len();
119+
}
120+
}
121+
}
122+
123+
info!(
124+
logger,
125+
"Restored {} rows into {}",
126+
total_inserted,
127+
table.object.as_str()
128+
);
129+
Ok(total_inserted)
130+
}
131+
132+
/// Insert a single data_sources$ row via raw SQL.
133+
async fn insert_data_source_row(
134+
conn: &mut AsyncPgConnection,
135+
qualified_table: &str,
136+
row: &DataSourceRestoreRow,
137+
) -> Result<(), StoreError> {
138+
let query = format!(
139+
"INSERT INTO {} (vid, block_range, causality_region, manifest_idx, \
140+
parent, id, param, context, done_at) \
141+
VALUES ($1, int4range($2, $3), $4, $5, $6, $7, $8, $9::jsonb, $10)",
142+
qualified_table,
143+
);
144+
diesel::sql_query(&query)
145+
.bind::<BigInt, _>(row.vid)
146+
.bind::<Integer, _>(row.block_range_start)
147+
.bind::<Nullable<Integer>, _>(row.block_range_end)
148+
.bind::<Integer, _>(row.causality_region)
149+
.bind::<Integer, _>(row.manifest_idx)
150+
.bind::<Nullable<Integer>, _>(row.parent)
151+
.bind::<Nullable<Binary>, _>(row.id.as_deref())
152+
.bind::<Nullable<Binary>, _>(row.param.as_deref())
153+
.bind::<Nullable<Text>, _>(row.context.as_deref())
154+
.bind::<Nullable<Integer>, _>(row.done_at)
155+
.execute(conn)
156+
.await
157+
.map_err(StoreError::from)?;
158+
Ok(())
159+
}
160+
161+
/// Import the `data_sources$` table from Parquet chunks.
162+
async fn import_data_sources(
163+
conn: &mut AsyncPgConnection,
164+
namespace: &str,
165+
table_info: &TableInfo,
166+
dir: &Path,
167+
logger: &Logger,
168+
) -> Result<usize, StoreError> {
169+
if table_info.chunks.is_empty() || table_info.max_vid < 0 {
170+
return Ok(0);
171+
}
172+
173+
let qualified = format!("\"{}\".\"{DATA_SOURCES_TABLE}\"", namespace);
174+
let max_vid_db = current_max_vid(conn, &qualified).await?;
175+
if max_vid_db >= table_info.max_vid {
176+
info!(logger, "data_sources$ already fully restored, skipping");
177+
return Ok(0);
178+
}
179+
180+
let mut total_inserted = 0usize;
181+
182+
for chunk_info in &table_info.chunks {
183+
if chunk_info.max_vid <= max_vid_db {
184+
continue;
185+
}
186+
187+
let chunk_path = dir.join(&chunk_info.file);
188+
let batches = read_batches(&chunk_path)?;
189+
190+
for batch in &batches {
191+
let rows = record_batch_to_data_source_rows(batch)?;
192+
193+
for row in &rows {
194+
if max_vid_db >= 0 && row.vid <= max_vid_db {
195+
continue;
196+
}
197+
insert_data_source_row(conn, &qualified, row).await?;
198+
total_inserted += 1;
199+
}
200+
}
201+
}
202+
203+
info!(logger, "Restored {} data_sources$ rows", total_inserted);
204+
Ok(total_inserted)
205+
}
206+
38207
/// Restore a subgraph deployment's schema and metadata from a dump
39208
/// directory.
40209
///
@@ -132,11 +301,57 @@ pub async fn create_schema(
132301
.await?;
133302

134303
// 8. Create data_sources$ table if present in dump
135-
if metadata.tables.contains_key("data_sources$") {
304+
if metadata.tables.contains_key(DATA_SOURCES_TABLE) {
136305
let ds_table = DataSourcesTable::new(site.namespace.clone());
137306
let ddl = ds_table.as_ddl();
138307
conn.batch_execute(&ddl).await?;
139308
}
140309

141310
Ok(layout)
142311
}
312+
313+
/// Import entity data and data_sources$ from Parquet files into the
314+
/// database tables created by `create_schema`.
315+
///
316+
/// This is resumable: if interrupted, it can be called again and will
317+
/// skip already-imported rows by checking the current max(vid) in each
318+
/// table.
319+
#[allow(dead_code)]
320+
pub async fn import_data(
321+
conn: &mut AsyncPgConnection,
322+
layout: &Layout,
323+
metadata: &Metadata,
324+
dir: &Path,
325+
logger: &Logger,
326+
) -> Result<(), StoreError> {
327+
// Import entity tables (sorted by name for determinism)
328+
let mut table_names: Vec<_> = metadata
329+
.tables
330+
.keys()
331+
.filter(|name| name.as_str() != DATA_SOURCES_TABLE)
332+
.collect();
333+
table_names.sort();
334+
335+
for table_name in table_names {
336+
let table_info = &metadata.tables[table_name];
337+
let table = layout
338+
.tables
339+
.values()
340+
.find(|t| t.object.as_str() == table_name)
341+
.ok_or_else(|| {
342+
StoreError::InternalError(format!(
343+
"table '{}' from dump not found in layout",
344+
table_name,
345+
))
346+
})?;
347+
import_entity_table(conn, table, table_info, dir, logger).await?;
348+
}
349+
350+
// Import data_sources$ if present
351+
if let Some(ds_info) = metadata.tables.get(DATA_SOURCES_TABLE) {
352+
let namespace = layout.site.namespace.as_str();
353+
import_data_sources(conn, namespace, ds_info, dir, logger).await?;
354+
}
355+
356+
Ok(())
357+
}

0 commit comments

Comments
 (0)