Skip to content

Commit 6ebbe73

Browse files
committed
store: Add InsertQuery::for_restore() for Parquet restore
Add a new constructor to InsertQuery that accepts RestoreRow data from Parquet files, bypassing the WriteChunk/EntityWrite pipeline. This is the insertion foundation for the restore path. Also add From<i32> impl for CausalityRegion to allow constructing values from deserialized Parquet data.
1 parent 234ec91 commit 6ebbe73

2 files changed

Lines changed: 99 additions & 0 deletions

File tree

graph/src/data_source/causality_region.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ impl ToSql<Integer, Pg> for CausalityRegion {
4545
}
4646
}
4747

48+
impl From<i32> for CausalityRegion {
49+
fn from(value: i32) -> Self {
50+
CausalityRegion(value)
51+
}
52+
}
53+
4854
impl CausalityRegion {
4955
/// The causality region of all onchain data sources.
5056
pub const ONCHAIN: CausalityRegion = CausalityRegion(0);

store/postgres/src/relational_queries.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::str::FromStr;
4141
use std::string::ToString;
4242

4343
use crate::block_range::{BoundSide, EntityBlockRange};
44+
use crate::parquet::convert::RestoreRow;
4445
use crate::relational::dsl::AtBlock;
4546
use crate::relational::{
4647
dsl, rollup::Rollup, Column, ColumnType, Layout, SqlName, Table, BYTE_ARRAY_PREFIX_SIZE,
@@ -2348,6 +2349,74 @@ impl<'a> InsertRow<'a> {
23482349
vid,
23492350
})
23502351
}
2352+
2353+
/// Build an `InsertRow` from a `RestoreRow` (Parquet restore path).
2354+
///
2355+
/// Unlike `new()`, this looks up values by SQL column name rather than
2356+
/// entity field name, since `RestoreRow.values` is keyed by SQL name.
2357+
/// Fulltext columns are regenerated from their source fields.
2358+
#[allow(dead_code)]
2359+
fn from_restore(
2360+
columns: &[&'a Column],
2361+
row: &'a RestoreRow,
2362+
table: &'a Table,
2363+
) -> Result<Self, StoreError> {
2364+
let mut values = Vec::with_capacity(columns.len());
2365+
for column in columns {
2366+
let iv = if let Some(fields) = column.fulltext_fields.as_ref() {
2367+
// Fulltext columns: `fields` contains GraphQL field names,
2368+
// but `RestoreRow.values` is keyed by SQL column names.
2369+
// Resolve via the table's columns.
2370+
let fulltext_field_values: Vec<_> = fields
2371+
.iter()
2372+
.filter_map(|field_name| {
2373+
table
2374+
.columns
2375+
.iter()
2376+
.find(|c| c.field.as_str() == field_name.as_str())
2377+
.and_then(|src_col| {
2378+
row.values
2379+
.iter()
2380+
.find(|(w, _)| w.as_str() == src_col.name.as_str())
2381+
.map(|(_, v)| v)
2382+
})
2383+
})
2384+
.map(|value| match value {
2385+
Value::String(s) => Ok(s),
2386+
_ => Err(internal_error!(
2387+
"fulltext fields must be strings but got {:?}",
2388+
value
2389+
)),
2390+
})
2391+
.collect::<Result<_, _>>()?;
2392+
if let ColumnType::TSVector(config) = &column.column_type {
2393+
InsertValue::Fulltext(fulltext_field_values, config)
2394+
} else {
2395+
return Err(StoreError::FulltextColumnMissingConfig);
2396+
}
2397+
} else {
2398+
let value = row
2399+
.values
2400+
.iter()
2401+
.find(|(w, _)| w.as_str() == column.name.as_str())
2402+
.map(|(_, v)| v)
2403+
.unwrap_or(&NULL);
2404+
let qv = QueryValue::new(value, &column.column_type)?;
2405+
InsertValue::Value(qv)
2406+
};
2407+
values.push(iv);
2408+
}
2409+
let end = row.block_range_end.flatten();
2410+
let br_value = BlockRangeValue::new(table, row.block, end);
2411+
let causality_region = CausalityRegion::from(row.causality_region.unwrap_or(0));
2412+
let vid = row.vid;
2413+
Ok(Self {
2414+
values,
2415+
br_value,
2416+
causality_region,
2417+
vid,
2418+
})
2419+
}
23512420
}
23522421

23532422
#[derive(Debug)]
@@ -2386,6 +2455,30 @@ impl<'a> InsertQuery<'a> {
23862455
})
23872456
}
23882457

2458+
/// Build an `InsertQuery` from restore rows (Parquet restore path).
2459+
///
2460+
/// All data columns from the dump are present, so `unique_columns`
2461+
/// includes every column in the table. Fulltext columns are
2462+
/// regenerated from their source fields.
2463+
#[allow(dead_code)]
2464+
pub fn for_restore(
2465+
table: &'a Table,
2466+
rows: &'a [RestoreRow],
2467+
) -> Result<InsertQuery<'a>, StoreError> {
2468+
let unique_columns: Vec<&Column> = table.columns.iter().collect();
2469+
2470+
let rows: Vec<_> = rows
2471+
.iter()
2472+
.map(|row| InsertRow::from_restore(&unique_columns, row, table))
2473+
.collect::<Result<_, _>>()?;
2474+
2475+
Ok(InsertQuery {
2476+
table,
2477+
rows,
2478+
unique_columns,
2479+
})
2480+
}
2481+
23892482
/// Build the column name list using the subset of all keys among present entities.
23902483
fn unique_columns(table: &'a Table, rows: &'a WriteChunk<'a>) -> Vec<&'a Column> {
23912484
table

0 commit comments

Comments
 (0)