Skip to content

Commit b801c44

Browse files
committed
store: Stream parquet batches lazily during restore
read_batches previously collected all row groups into a Vec, loading the entire parquet file into memory. Since dump writes all VidBatcher batches into a single file per table, this could be very large. Return a lazy iterator instead so only one row group is in memory at a time.
1 parent eeb1f49 commit b801c44

2 files changed

Lines changed: 36 additions & 33 deletions

File tree

store/postgres/src/parquet/reader.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,45 +5,45 @@ use arrow::array::RecordBatch;
55
use graph::components::store::StoreError;
66
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
77

8-
/// Read all record batches from a Parquet file.
8+
/// Open a Parquet file and return a lazy iterator over its record batches.
99
///
10-
/// Opens the file, reads all row groups, and returns them as a vector
11-
/// of `RecordBatch`es. The batches retain the schema embedded in the
12-
/// Parquet file.
13-
pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> {
10+
/// Each call to `next()` on the returned iterator reads one row group
11+
/// from disk, so only one batch is in memory at a time. The caller is
12+
/// responsible for iterating and processing batches incrementally.
13+
pub fn read_batches(
14+
path: &Path,
15+
) -> Result<impl Iterator<Item = Result<RecordBatch, StoreError>>, StoreError> {
16+
let display = path.display().to_string();
1417
let file = fs::File::open(path).map_err(|e| {
15-
StoreError::InternalError(format!(
16-
"failed to open parquet file {}: {e}",
17-
path.display()
18-
))
18+
StoreError::InternalError(format!("failed to open parquet file {display}: {e}"))
1919
})?;
2020

2121
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
2222
.map_err(|e| {
2323
StoreError::InternalError(format!(
24-
"failed to create parquet reader for {}: {e}",
25-
path.display()
24+
"failed to create parquet reader for {display}: {e}"
2625
))
2726
})?
2827
.build()
2928
.map_err(|e| {
30-
StoreError::InternalError(format!(
31-
"failed to build parquet reader for {}: {e}",
32-
path.display()
33-
))
29+
StoreError::InternalError(format!("failed to build parquet reader for {display}: {e}"))
3430
})?;
3531

36-
reader
37-
.into_iter()
38-
.map(|batch| {
39-
batch.map_err(|e| {
40-
StoreError::InternalError(format!(
41-
"failed to read record batch from {}: {e}",
42-
path.display()
43-
))
44-
})
32+
Ok(reader.into_iter().map(move |batch| {
33+
batch.map_err(|e| {
34+
StoreError::InternalError(format!("failed to read record batch from {display}: {e}"))
4535
})
46-
.collect()
36+
}))
37+
}
38+
39+
/// Read all record batches from a Parquet file into memory.
40+
///
41+
/// This is a convenience wrapper around `read_batches` that collects
42+
/// all batches. Prefer `read_batches` for production code to avoid
43+
/// loading the entire file at once.
44+
#[cfg(test)]
45+
pub fn read_all_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> {
46+
read_batches(path)?.collect()
4747
}
4848

4949
#[cfg(test)]
@@ -105,7 +105,7 @@ mod tests {
105105
writer.write_batch(&batch, min_vid, max_vid).unwrap();
106106
writer.finish().unwrap();
107107

108-
read_batches(tmp.path()).unwrap()
108+
read_all_batches(tmp.path()).unwrap()
109109
}
110110

111111
#[test]
@@ -301,9 +301,10 @@ mod tests {
301301

302302
#[test]
303303
fn nonexistent_file_returns_error() {
304-
let result = read_batches(Path::new("/tmp/nonexistent_graph_node_test.parquet"));
305-
assert!(result.is_err());
306-
let err = result.unwrap_err().to_string();
304+
let err = read_batches(Path::new("/tmp/nonexistent_graph_node_test.parquet"))
305+
.err()
306+
.expect("should fail for nonexistent file")
307+
.to_string();
307308
assert!(err.contains("failed to open parquet file"));
308309
}
309310
}

store/postgres/src/relational/restore.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ async fn import_entity_table(
101101
let chunk_path = dir.join(&chunk_info.file);
102102
let batches = read_batches(&chunk_path)?;
103103

104-
for batch in &batches {
105-
let mut rows = record_batch_to_restore_rows(batch, table)?;
104+
for batch in batches {
105+
let batch = batch?;
106+
let mut rows = record_batch_to_restore_rows(&batch, table)?;
106107

107108
// Filter out already-imported rows (for boundary chunks on resume)
108109
if max_vid_db >= 0 {
@@ -190,8 +191,9 @@ async fn import_data_sources(
190191
let chunk_path = dir.join(&chunk_info.file);
191192
let batches = read_batches(&chunk_path)?;
192193

193-
for batch in &batches {
194-
let rows = record_batch_to_data_source_rows(batch)?;
194+
for batch in batches {
195+
let batch = batch?;
196+
let rows = record_batch_to_data_source_rows(&batch)?;
195197

196198
for row in &rows {
197199
if max_vid_db >= 0 && row.vid <= max_vid_db {

0 commit comments

Comments
 (0)