Skip to content

Commit 4a8baf2

Browse files
committed
store: Add Parquet reader for restore
Add `parquet/reader.rs` with `read_batches()` to read Parquet files back into Arrow RecordBatches. This is the foundation for the restore pipeline (step 1 of the restore plan).
1 parent 9af691d commit 4a8baf2

2 files changed

Lines changed: 311 additions & 0 deletions

File tree

store/postgres/src/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub(crate) mod convert;
2+
pub(crate) mod reader;
23
pub(crate) mod schema;
34
pub(crate) mod writer;
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
use std::fs;
2+
use std::path::Path;
3+
4+
use arrow::array::RecordBatch;
5+
use graph::components::store::StoreError;
6+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
7+
8+
/// Read all record batches from a Parquet file.
9+
///
10+
/// Opens the file, reads all row groups, and returns them as a vector
11+
/// of `RecordBatch`es. The batches retain the schema embedded in the
12+
/// Parquet file.
13+
#[allow(dead_code)]
14+
pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> {
15+
let file = fs::File::open(path).map_err(|e| {
16+
StoreError::InternalError(format!(
17+
"failed to open parquet file {}: {e}",
18+
path.display()
19+
))
20+
})?;
21+
22+
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
23+
.map_err(|e| {
24+
StoreError::InternalError(format!(
25+
"failed to create parquet reader for {}: {e}",
26+
path.display()
27+
))
28+
})?
29+
.build()
30+
.map_err(|e| {
31+
StoreError::InternalError(format!(
32+
"failed to build parquet reader for {}: {e}",
33+
path.display()
34+
))
35+
})?;
36+
37+
reader
38+
.into_iter()
39+
.map(|batch| {
40+
batch.map_err(|e| {
41+
StoreError::InternalError(format!(
42+
"failed to read record batch from {}: {e}",
43+
path.display()
44+
))
45+
})
46+
})
47+
.collect()
48+
}
49+
50+
#[cfg(test)]
51+
mod tests {
52+
use std::sync::Arc;
53+
54+
use arrow::array::{
55+
Array, BinaryArray, BooleanArray, Int32Array, Int64Array, ListArray, StringArray,
56+
TimestampMicrosecondArray,
57+
};
58+
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
59+
use graph::data::store::scalar::{BigDecimal, Bytes, Timestamp};
60+
use std::str::FromStr;
61+
62+
use super::*;
63+
use crate::parquet::convert::rows_to_record_batch;
64+
use crate::parquet::writer::ParquetChunkWriter;
65+
use crate::relational::value::{OidRow, OidValue};
66+
67+
fn make_row(values: Vec<OidValue>) -> OidRow {
68+
values.into_iter().collect()
69+
}
70+
71+
struct TempFile(std::path::PathBuf);
72+
73+
impl TempFile {
74+
fn new(name: &str) -> Self {
75+
let path = std::env::temp_dir().join(format!(
76+
"graph_node_reader_test_{}_{name}.parquet",
77+
std::process::id()
78+
));
79+
Self(path)
80+
}
81+
82+
fn path(&self) -> &Path {
83+
&self.0
84+
}
85+
}
86+
87+
impl Drop for TempFile {
88+
fn drop(&mut self) {
89+
let _ = std::fs::remove_file(&self.0);
90+
}
91+
}
92+
93+
fn write_and_read(name: &str, schema: &Schema, rows: &[OidRow]) -> Vec<RecordBatch> {
94+
let tmp = TempFile::new(name);
95+
let batch = rows_to_record_batch(schema, rows).unwrap();
96+
97+
let min_vid = if rows.is_empty() { 0 } else { 1 };
98+
let max_vid = rows.len() as i64;
99+
100+
let mut writer = ParquetChunkWriter::new(
101+
tmp.path().to_path_buf(),
102+
"test/chunk_000000.parquet".into(),
103+
schema,
104+
)
105+
.unwrap();
106+
writer.write_batch(&batch, min_vid, max_vid).unwrap();
107+
writer.finish().unwrap();
108+
109+
read_batches(tmp.path()).unwrap()
110+
}
111+
112+
#[test]
113+
fn roundtrip_scalar_columns() {
114+
let schema = Schema::new(vec![
115+
Field::new("vid", DataType::Int64, false),
116+
Field::new("block$", DataType::Int32, false),
117+
Field::new("id", DataType::Utf8, false),
118+
Field::new("flag", DataType::Boolean, true),
119+
Field::new("data", DataType::Binary, true),
120+
Field::new("amount", DataType::Utf8, false),
121+
Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, None), true),
122+
]);
123+
124+
let rows = vec![
125+
make_row(vec![
126+
OidValue::Int8(1),
127+
OidValue::Int(100),
128+
OidValue::String("token-1".into()),
129+
OidValue::Bool(true),
130+
OidValue::Bytes(Bytes::from(vec![0xab, 0xcd])),
131+
OidValue::BigDecimal(BigDecimal::from_str("12345678901234567890").unwrap()),
132+
OidValue::Timestamp(Timestamp::from_microseconds_since_epoch(1_000_000).unwrap()),
133+
]),
134+
make_row(vec![
135+
OidValue::Int8(2),
136+
OidValue::Int(101),
137+
OidValue::String("token-2".into()),
138+
OidValue::Null,
139+
OidValue::Null,
140+
OidValue::BigDecimal(BigDecimal::from_str("999").unwrap()),
141+
OidValue::Null,
142+
]),
143+
];
144+
145+
let batches = write_and_read("scalar", &schema, &rows);
146+
assert_eq!(batches.len(), 1);
147+
let batch = &batches[0];
148+
assert_eq!(batch.num_rows(), 2);
149+
assert_eq!(batch.num_columns(), 7);
150+
assert_eq!(batch.schema().fields().len(), schema.fields().len());
151+
152+
// Verify schema matches
153+
for (got, expected) in batch.schema().fields().iter().zip(schema.fields().iter()) {
154+
assert_eq!(got.name(), expected.name());
155+
assert_eq!(got.data_type(), expected.data_type());
156+
}
157+
158+
// Spot-check values
159+
let vid = batch
160+
.column(0)
161+
.as_any()
162+
.downcast_ref::<Int64Array>()
163+
.unwrap();
164+
assert_eq!(vid.value(0), 1);
165+
assert_eq!(vid.value(1), 2);
166+
167+
let id = batch
168+
.column(2)
169+
.as_any()
170+
.downcast_ref::<StringArray>()
171+
.unwrap();
172+
assert_eq!(id.value(0), "token-1");
173+
assert_eq!(id.value(1), "token-2");
174+
175+
let flag = batch
176+
.column(3)
177+
.as_any()
178+
.downcast_ref::<BooleanArray>()
179+
.unwrap();
180+
assert!(flag.value(0));
181+
assert!(flag.is_null(1));
182+
183+
let data = batch
184+
.column(4)
185+
.as_any()
186+
.downcast_ref::<BinaryArray>()
187+
.unwrap();
188+
assert_eq!(data.value(0), &[0xab, 0xcd]);
189+
assert!(data.is_null(1));
190+
191+
let ts = batch
192+
.column(6)
193+
.as_any()
194+
.downcast_ref::<TimestampMicrosecondArray>()
195+
.unwrap();
196+
assert_eq!(ts.value(0), 1_000_000);
197+
assert!(ts.is_null(1));
198+
}
199+
200+
#[test]
201+
fn roundtrip_list_columns() {
202+
let schema = Schema::new(vec![
203+
Field::new("vid", DataType::Int64, false),
204+
Field::new(
205+
"tags",
206+
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
207+
false,
208+
),
209+
Field::new(
210+
"scores",
211+
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
212+
true,
213+
),
214+
]);
215+
216+
let rows = vec![
217+
make_row(vec![
218+
OidValue::Int8(1),
219+
OidValue::StringArray(vec!["foo".into(), "bar".into()]),
220+
OidValue::Ints(vec![10, 20]),
221+
]),
222+
make_row(vec![
223+
OidValue::Int8(2),
224+
OidValue::StringArray(vec![]),
225+
OidValue::Null,
226+
]),
227+
];
228+
229+
let batches = write_and_read("list", &schema, &rows);
230+
assert_eq!(batches.len(), 1);
231+
let batch = &batches[0];
232+
assert_eq!(batch.num_rows(), 2);
233+
234+
let tags = batch
235+
.column(1)
236+
.as_any()
237+
.downcast_ref::<ListArray>()
238+
.unwrap();
239+
let tags_0 = tags.value(0);
240+
let tags_0_str = tags_0.as_any().downcast_ref::<StringArray>().unwrap();
241+
assert_eq!(tags_0_str.len(), 2);
242+
assert_eq!(tags_0_str.value(0), "foo");
243+
assert_eq!(tags_0_str.value(1), "bar");
244+
245+
let tags_1 = tags.value(1);
246+
let tags_1_str = tags_1.as_any().downcast_ref::<StringArray>().unwrap();
247+
assert_eq!(tags_1_str.len(), 0);
248+
249+
let scores = batch
250+
.column(2)
251+
.as_any()
252+
.downcast_ref::<ListArray>()
253+
.unwrap();
254+
assert!(!scores.is_null(0));
255+
assert!(scores.is_null(1));
256+
}
257+
258+
#[test]
259+
fn roundtrip_mutable_block_range() {
260+
let schema = Schema::new(vec![
261+
Field::new("vid", DataType::Int64, false),
262+
Field::new("block_range_start", DataType::Int32, false),
263+
Field::new("block_range_end", DataType::Int32, true),
264+
Field::new("id", DataType::Utf8, false),
265+
]);
266+
267+
let rows = vec![
268+
make_row(vec![
269+
OidValue::Int8(1),
270+
OidValue::Int(100),
271+
OidValue::Int(200),
272+
OidValue::String("a".into()),
273+
]),
274+
make_row(vec![
275+
OidValue::Int8(2),
276+
OidValue::Int(150),
277+
OidValue::Null, // unbounded (current)
278+
OidValue::String("b".into()),
279+
]),
280+
];
281+
282+
let batches = write_and_read("mutable", &schema, &rows);
283+
let batch = &batches[0];
284+
assert_eq!(batch.num_rows(), 2);
285+
286+
let start = batch
287+
.column(1)
288+
.as_any()
289+
.downcast_ref::<Int32Array>()
290+
.unwrap();
291+
assert_eq!(start.value(0), 100);
292+
assert_eq!(start.value(1), 150);
293+
294+
let end = batch
295+
.column(2)
296+
.as_any()
297+
.downcast_ref::<Int32Array>()
298+
.unwrap();
299+
assert_eq!(end.value(0), 200);
300+
assert!(end.is_null(1));
301+
}
302+
303+
#[test]
304+
fn nonexistent_file_returns_error() {
305+
let result = read_batches(Path::new("/tmp/nonexistent_graph_node_test.parquet"));
306+
assert!(result.is_err());
307+
let err = result.unwrap_err().to_string();
308+
assert!(err.contains("failed to open parquet file"));
309+
}
310+
}

0 commit comments

Comments
 (0)