Skip to content

Commit ff0292c

Browse files
committed
store: Add restore finalization with vid sequence reset and head block update
1 parent 259216f commit ff0292c

1 file changed

Lines changed: 112 additions & 1 deletion

File tree

store/postgres/src/relational/restore.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use std::fs;
1010
use std::path::Path;
1111
use std::sync::Arc;
1212

13+
use diesel::dsl::update;
14+
use diesel::prelude::{ExpressionMethods, QueryDsl};
1315
use diesel::sql_types::{BigInt, Binary, Integer, Nullable, Text};
1416
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};
1517
use graph::blockchain::BlockHash;
@@ -18,6 +20,7 @@ use graph::prelude::{info, BlockPtr as GraphBlockPtr, Logger, StoreError};
1820
use graph::schema::{EntityType, InputSchema};
1921
use graph::semver::Version;
2022

23+
use crate::catalog;
2124
use crate::deployment::create_deployment;
2225
use crate::dynds::DataSourcesTable;
2326
use crate::parquet::convert::{
@@ -26,7 +29,7 @@ use crate::parquet::convert::{
2629
use crate::parquet::reader::read_batches;
2730
use crate::primary::Site;
2831
use crate::relational::dump::{Metadata, TableInfo};
29-
use crate::relational::{Layout, Table};
32+
use crate::relational::{Layout, Table, VID_COLUMN};
3033
use crate::relational_queries::InsertQuery;
3134
use crate::vid_batcher::VidRange;
3235
use crate::AsyncPgConnection;
@@ -355,3 +358,111 @@ pub async fn import_data(
355358

356359
Ok(())
357360
}
361+
362+
/// Finalize a restored deployment by resetting vid sequences and setting
363+
/// the head block pointer.
364+
///
365+
/// This must be called after `import_data` has completed successfully.
366+
/// Setting the head block is the very last operation — it marks the
367+
/// deployment as "ready".
368+
#[allow(dead_code)]
369+
pub async fn finalize(
370+
conn: &mut AsyncPgConnection,
371+
layout: &Layout,
372+
metadata: &Metadata,
373+
logger: &Logger,
374+
) -> Result<(), StoreError> {
375+
let nsp = layout.site.namespace.as_str();
376+
377+
// 1. Reset vid sequences for entity tables that use bigserial.
378+
// Tables where has_vid_seq() is true use plain bigint (no sequence).
379+
let mut table_names: Vec<_> = metadata
380+
.tables
381+
.keys()
382+
.filter(|name| name.as_str() != DATA_SOURCES_TABLE)
383+
.collect();
384+
table_names.sort();
385+
386+
for table_name in table_names {
387+
let table_info = &metadata.tables[table_name];
388+
if table_info.max_vid < 0 {
389+
continue;
390+
}
391+
392+
let table = layout
393+
.tables
394+
.values()
395+
.find(|t| t.object.as_str() == table_name)
396+
.ok_or_else(|| {
397+
StoreError::InternalError(format!(
398+
"table '{}' from dump not found in layout",
399+
table_name,
400+
))
401+
})?;
402+
403+
if table.object.has_vid_seq() {
404+
continue;
405+
}
406+
407+
let vid_seq = catalog::seq_name(&table.name, VID_COLUMN);
408+
let query = format!(
409+
"SELECT setval('\"{nsp}\".\"{vid_seq}\"', {})",
410+
table_info.max_vid
411+
);
412+
conn.batch_execute(&query).await.map_err(|e| {
413+
StoreError::InternalError(format!("reset vid seq for {table_name}: {e}"))
414+
})?;
415+
}
416+
417+
// 2. Reset data_sources$ vid sequence if present
418+
if let Some(ds_info) = metadata.tables.get(DATA_SOURCES_TABLE) {
419+
if ds_info.max_vid >= 0 {
420+
let qualified = format!("\"{nsp}\".\"{DATA_SOURCES_TABLE}\"");
421+
let query = format!(
422+
"SELECT setval(pg_get_serial_sequence('{qualified}', 'vid'), {})",
423+
ds_info.max_vid
424+
);
425+
conn.batch_execute(&query).await.map_err(|e| {
426+
StoreError::InternalError(format!("reset data_sources$ vid seq: {e}"))
427+
})?;
428+
}
429+
}
430+
431+
// 3. Update earliest_block_number (may differ from start_block after
432+
// pruning) and set the head block pointer. Setting the head block
433+
// is the very last step: it makes the deployment "ready".
434+
{
435+
use crate::deployment::deployment as d;
436+
use crate::deployment::head as h;
437+
438+
update(d::table.filter(d::id.eq(layout.site.id)))
439+
.set(d::earliest_block_number.eq(metadata.earliest_block_number))
440+
.execute(conn)
441+
.await
442+
.map_err(StoreError::from)?;
443+
444+
if let Some(head) = &metadata.head_block {
445+
let head_ptr = to_graph_block_ptr(head)?;
446+
update(h::table.filter(h::id.eq(layout.site.id)))
447+
.set((
448+
h::block_number.eq(head_ptr.number),
449+
h::block_hash.eq(head_ptr.hash_slice()),
450+
h::entity_count.eq(metadata.entity_count as i64),
451+
))
452+
.execute(conn)
453+
.await
454+
.map_err(StoreError::from)?;
455+
456+
info!(
457+
logger,
458+
"Finalized restore: head block #{}, entity count {}",
459+
head_ptr.number,
460+
metadata.entity_count
461+
);
462+
} else {
463+
info!(logger, "Finalized restore (no head block in dump)");
464+
}
465+
}
466+
467+
Ok(())
468+
}

0 commit comments

Comments
 (0)