Skip to content

Commit 0772110

Browse files
committed
store: Implement Parquet dump for subgraph data
Wire dump queries and Parquet writing into Layout::dump(). This is the core of the dump feature, covering both entity tables and the special data_sources$ table. For entity tables: build a DynamicSelectClause that selects vid, block columns (split into lower/upper for mutable tables), causality_region, and data columns. Use VidBatcher for adaptive batching, convert rows to Arrow RecordBatch, and write via ParquetChunkWriter. For data_sources$: use a concrete QueryableByName struct with raw SQL (fixed schema, no DynamicSelectClause needed). Check table existence via catalog::table_exists before attempting dump. Metadata includes version, network, block pointers, entity count, graft info, health, indexes, and per-table chunk tracking. Written atomically via tmp+rename so its presence signals a complete dump. Add entity_count() helper in detail.rs. Wire dump() through DeploymentStore and SubgraphStore.
1 parent ff71687 commit 0772110

6 files changed

Lines changed: 716 additions & 1 deletion

File tree

store/postgres/src/deployment_store.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::collections::{BTreeMap, HashMap};
3030
use std::convert::Into;
3131
use std::ops::Bound;
3232
use std::ops::{Deref, Range};
33+
use std::path::PathBuf;
3334
use std::str::FromStr;
3435
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
3536
use std::time::{Duration, Instant};
@@ -896,6 +897,18 @@ impl DeploymentStore {
896897

897898
Ok(relational::prune::Viewer::new(self.pool.clone(), layout))
898899
}
900+
901+
pub(crate) async fn dump(&self, site: Arc<Site>, dir: PathBuf) -> Result<(), StoreError> {
902+
let mut conn = self.pool.get_permitted().await?;
903+
let layout = self.layout(&mut conn, site.cheap_clone()).await?;
904+
let entity_count = crate::detail::entity_count(&mut conn, &site).await?;
905+
// Loading the IndexList should happen inside dump, but the
906+
// interface does not allow it; should be changed
907+
let index_list = IndexList::load(&mut conn, site.cheap_clone(), self.clone()).await?;
908+
layout
909+
.dump(&mut conn, index_list, dir, &site.network, entity_count)
910+
.await
911+
}
899912
}
900913

901914
/// Methods that back the trait `WritableStore`, but have small variations in their signatures

store/postgres/src/detail.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,18 @@ pub async fn deployment_entity(
657657
StoredDeploymentEntity(detail, manifest).into_subgraph_deployment_entity(schema)
658658
}
659659

660+
/// Load the entity count for a deployment from `subgraphs.head`.
661+
pub async fn entity_count(conn: &mut AsyncPgConnection, site: &Site) -> Result<usize, StoreError> {
662+
use subgraph_head as h;
663+
664+
let count: i64 = h::table
665+
.find(site.id)
666+
.select(h::entity_count)
667+
.first(conn)
668+
.await?;
669+
Ok(count as usize)
670+
}
671+
660672
#[derive(Queryable, Identifiable, Insertable)]
661673
#[diesel(table_name = graph_node_versions)]
662674
pub struct GraphNodeVersion {

store/postgres/src/parquet/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
#[allow(dead_code)]
1+
pub(crate) mod convert;
22
pub(crate) mod schema;
3+
pub(crate) mod writer;

store/postgres/src/relational.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod ddl_tests;
1515
mod query_tests;
1616

1717
pub(crate) mod dsl;
18+
pub(crate) mod dump;
1819
pub(crate) mod index;
1920
pub(crate) mod prune;
2021
pub(crate) mod rollup;

0 commit comments

Comments
 (0)