Skip to content

Commit ff71687

Browse files
committed
store: Add Parquet chunk file writer
Add ParquetChunkWriter that wraps Arrow's ArrowWriter to stream RecordBatches into a ZSTD-compressed Parquet file while tracking row count and vid range. Returns ChunkInfo metadata on finish for inclusion in metadata.json.
1 parent 070c981 commit ff71687

1 file changed

Lines changed: 354 additions & 0 deletions

File tree

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
use std::fs;
2+
use std::path::PathBuf;
3+
use std::sync::Arc;
4+
5+
use arrow::array::RecordBatch;
6+
use arrow::datatypes::Schema;
7+
use graph::components::store::StoreError;
8+
use parquet::arrow::ArrowWriter;
9+
use parquet::basic::{Compression, ZstdLevel};
10+
use parquet::file::properties::WriterProperties;
11+
12+
use serde::{Deserialize, Serialize};
13+
14+
/// Per-chunk metadata recorded in `metadata.json`.
15+
#[derive(Debug, Clone, Serialize, Deserialize)]
16+
pub struct ChunkInfo {
17+
pub file: String,
18+
pub min_vid: i64,
19+
pub max_vid: i64,
20+
pub row_count: usize,
21+
}
22+
23+
/// Writes `RecordBatch`es into a single Parquet file using ZSTD compression.
24+
///
25+
/// Tracks row count and vid range across all written batches. Call
26+
/// `finish()` to flush and close the file, returning a `ChunkInfo`
27+
/// summary.
28+
pub struct ParquetChunkWriter {
29+
writer: ArrowWriter<fs::File>,
30+
/// Relative path from the dump directory (e.g. `"Token/chunk_000000.parquet"`).
31+
relative_path: String,
32+
row_count: usize,
33+
min_vid: i64,
34+
max_vid: i64,
35+
}
36+
37+
impl ParquetChunkWriter {
38+
/// Create a new writer for a Parquet chunk file.
39+
///
40+
/// `path` is the absolute file path. `relative_path` is stored in the
41+
/// resulting `ChunkInfo` (e.g. `"Token/chunk_000000.parquet"`).
42+
pub fn new(path: PathBuf, relative_path: String, schema: &Schema) -> Result<Self, StoreError> {
43+
let props = WriterProperties::builder()
44+
.set_compression(Compression::ZSTD(ZstdLevel::default()))
45+
.build();
46+
47+
let file = fs::File::create(&path).map_err(|e| {
48+
StoreError::InternalError(format!(
49+
"failed to create parquet file {}: {e}",
50+
path.display()
51+
))
52+
})?;
53+
54+
let writer =
55+
ArrowWriter::try_new(file, Arc::new(schema.clone()), Some(props)).map_err(|e| {
56+
StoreError::InternalError(format!(
57+
"failed to create ArrowWriter for {}: {e}",
58+
path.display()
59+
))
60+
})?;
61+
62+
Ok(Self {
63+
writer,
64+
relative_path,
65+
row_count: 0,
66+
min_vid: i64::MAX,
67+
max_vid: i64::MIN,
68+
})
69+
}
70+
71+
/// Write a `RecordBatch` and update tracking stats.
72+
///
73+
/// `batch_min_vid` and `batch_max_vid` are the vid bounds of this
74+
/// batch (typically the first and last vid values).
75+
pub fn write_batch(
76+
&mut self,
77+
batch: &RecordBatch,
78+
batch_min_vid: i64,
79+
batch_max_vid: i64,
80+
) -> Result<(), StoreError> {
81+
if batch.num_rows() == 0 {
82+
return Ok(());
83+
}
84+
self.writer
85+
.write(batch)
86+
.map_err(|e| StoreError::InternalError(format!("failed to write RecordBatch: {e}")))?;
87+
self.row_count += batch.num_rows();
88+
self.min_vid = self.min_vid.min(batch_min_vid);
89+
self.max_vid = self.max_vid.max(batch_max_vid);
90+
Ok(())
91+
}
92+
93+
/// Flush and close the Parquet file, returning chunk metadata.
94+
pub fn finish(self) -> Result<ChunkInfo, StoreError> {
95+
self.writer.close().map_err(|e| {
96+
StoreError::InternalError(format!("failed to close parquet writer: {e}"))
97+
})?;
98+
Ok(ChunkInfo {
99+
file: self.relative_path,
100+
min_vid: self.min_vid,
101+
max_vid: self.max_vid,
102+
row_count: self.row_count,
103+
})
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use arrow::array::{
110+
Array, BinaryArray, BooleanArray, Int32Array, Int64Array, StringArray,
111+
TimestampMicrosecondArray,
112+
};
113+
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
114+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
115+
use std::sync::Arc;
116+
117+
use super::*;
118+
119+
/// Create a unique temp file path for a test. The caller is responsible
120+
/// for cleanup via the returned `TempFile` guard.
121+
struct TempFile(PathBuf);
122+
123+
impl TempFile {
124+
fn new(name: &str) -> Self {
125+
let path = std::env::temp_dir().join(format!(
126+
"graph_node_test_{}_{name}.parquet",
127+
std::process::id()
128+
));
129+
Self(path)
130+
}
131+
132+
fn path(&self) -> &std::path::Path {
133+
&self.0
134+
}
135+
}
136+
137+
impl Drop for TempFile {
138+
fn drop(&mut self) {
139+
let _ = std::fs::remove_file(&self.0);
140+
}
141+
}
142+
143+
fn test_schema() -> Schema {
144+
Schema::new(vec![
145+
Field::new("vid", DataType::Int64, false),
146+
Field::new("block$", DataType::Int32, false),
147+
Field::new("id", DataType::Utf8, false),
148+
Field::new("flag", DataType::Boolean, true),
149+
Field::new("data", DataType::Binary, true),
150+
Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, None), true),
151+
])
152+
}
153+
154+
fn test_batch(schema: &Schema) -> RecordBatch {
155+
RecordBatch::try_new(
156+
Arc::new(schema.clone()),
157+
vec![
158+
Arc::new(Int64Array::from(vec![1, 2, 3])),
159+
Arc::new(Int32Array::from(vec![100, 101, 102])),
160+
Arc::new(StringArray::from(vec!["a", "b", "c"])),
161+
Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])),
162+
Arc::new(BinaryArray::from_opt_vec(vec![
163+
Some(b"\xab\xcd"),
164+
None,
165+
Some(b"\xff"),
166+
])),
167+
Arc::new(TimestampMicrosecondArray::from(vec![
168+
Some(1_000_000),
169+
Some(2_000_000),
170+
None,
171+
])),
172+
],
173+
)
174+
.unwrap()
175+
}
176+
177+
/// Read all record batches from a parquet file.
178+
fn read_parquet(path: &std::path::Path) -> Vec<RecordBatch> {
179+
let file = std::fs::File::open(path).unwrap();
180+
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
181+
.unwrap()
182+
.build()
183+
.unwrap();
184+
reader.map(|r| r.unwrap()).collect()
185+
}
186+
187+
#[test]
188+
fn write_and_read_back() {
189+
let schema = test_schema();
190+
let batch = test_batch(&schema);
191+
let tmp = TempFile::new("write_read");
192+
193+
let mut writer = ParquetChunkWriter::new(
194+
tmp.path().to_path_buf(),
195+
"Test/chunk_000000.parquet".into(),
196+
&schema,
197+
)
198+
.unwrap();
199+
writer.write_batch(&batch, 1, 3).unwrap();
200+
let chunk_info = writer.finish().unwrap();
201+
202+
assert_eq!(chunk_info.file, "Test/chunk_000000.parquet");
203+
assert_eq!(chunk_info.min_vid, 1);
204+
assert_eq!(chunk_info.max_vid, 3);
205+
assert_eq!(chunk_info.row_count, 3);
206+
207+
// Read back and verify
208+
let batches = read_parquet(tmp.path());
209+
assert_eq!(batches.len(), 1);
210+
let read_batch = &batches[0];
211+
212+
assert_eq!(read_batch.num_rows(), 3);
213+
assert_eq!(read_batch.num_columns(), 6);
214+
215+
let vid = read_batch
216+
.column(0)
217+
.as_any()
218+
.downcast_ref::<Int64Array>()
219+
.unwrap();
220+
assert_eq!(vid.values(), &[1, 2, 3]);
221+
222+
let block = read_batch
223+
.column(1)
224+
.as_any()
225+
.downcast_ref::<Int32Array>()
226+
.unwrap();
227+
assert_eq!(block.values(), &[100, 101, 102]);
228+
229+
let id = read_batch
230+
.column(2)
231+
.as_any()
232+
.downcast_ref::<StringArray>()
233+
.unwrap();
234+
assert_eq!(id.value(0), "a");
235+
assert_eq!(id.value(2), "c");
236+
237+
let flag = read_batch
238+
.column(3)
239+
.as_any()
240+
.downcast_ref::<BooleanArray>()
241+
.unwrap();
242+
assert!(flag.value(0));
243+
assert!(flag.is_null(1));
244+
assert!(!flag.value(2));
245+
246+
let data = read_batch
247+
.column(4)
248+
.as_any()
249+
.downcast_ref::<BinaryArray>()
250+
.unwrap();
251+
assert_eq!(data.value(0), b"\xab\xcd");
252+
assert!(data.is_null(1));
253+
254+
let ts = read_batch
255+
.column(5)
256+
.as_any()
257+
.downcast_ref::<TimestampMicrosecondArray>()
258+
.unwrap();
259+
assert_eq!(ts.value(0), 1_000_000);
260+
assert!(ts.is_null(2));
261+
}
262+
263+
#[test]
264+
fn multiple_batches_accumulate_stats() {
265+
let schema = test_schema();
266+
267+
let batch1 = RecordBatch::try_new(
268+
Arc::new(schema.clone()),
269+
vec![
270+
Arc::new(Int64Array::from(vec![10, 20])),
271+
Arc::new(Int32Array::from(vec![1, 1])),
272+
Arc::new(StringArray::from(vec!["x", "y"])),
273+
Arc::new(BooleanArray::from(vec![true, false])),
274+
Arc::new(BinaryArray::from_vec(vec![b"a", b"b"])),
275+
Arc::new(TimestampMicrosecondArray::from(vec![100, 200])),
276+
],
277+
)
278+
.unwrap();
279+
280+
let batch2 = RecordBatch::try_new(
281+
Arc::new(schema.clone()),
282+
vec![
283+
Arc::new(Int64Array::from(vec![30])),
284+
Arc::new(Int32Array::from(vec![2])),
285+
Arc::new(StringArray::from(vec!["z"])),
286+
Arc::new(BooleanArray::from(vec![true])),
287+
Arc::new(BinaryArray::from_vec(vec![b"c"])),
288+
Arc::new(TimestampMicrosecondArray::from(vec![300])),
289+
],
290+
)
291+
.unwrap();
292+
293+
let tmp = TempFile::new("multi_batch");
294+
295+
let mut writer = ParquetChunkWriter::new(
296+
tmp.path().to_path_buf(),
297+
"Foo/chunk_000000.parquet".into(),
298+
&schema,
299+
)
300+
.unwrap();
301+
writer.write_batch(&batch1, 10, 20).unwrap();
302+
writer.write_batch(&batch2, 30, 30).unwrap();
303+
let chunk_info = writer.finish().unwrap();
304+
305+
assert_eq!(chunk_info.min_vid, 10);
306+
assert_eq!(chunk_info.max_vid, 30);
307+
assert_eq!(chunk_info.row_count, 3);
308+
309+
// Verify all 3 rows readable
310+
let batches = read_parquet(tmp.path());
311+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
312+
assert_eq!(total_rows, 3);
313+
}
314+
315+
#[test]
316+
fn empty_batch_is_noop() {
317+
let schema = test_schema();
318+
let empty = RecordBatch::new_empty(Arc::new(schema.clone()));
319+
let tmp = TempFile::new("empty_batch");
320+
321+
let mut writer = ParquetChunkWriter::new(
322+
tmp.path().to_path_buf(),
323+
"X/chunk_000000.parquet".into(),
324+
&schema,
325+
)
326+
.unwrap();
327+
writer.write_batch(&empty, 0, 0).unwrap();
328+
let chunk_info = writer.finish().unwrap();
329+
330+
assert_eq!(chunk_info.row_count, 0);
331+
// min_vid/max_vid stay at initial sentinel values when nothing was written
332+
assert_eq!(chunk_info.min_vid, i64::MAX);
333+
assert_eq!(chunk_info.max_vid, i64::MIN);
334+
}
335+
336+
#[test]
337+
fn chunk_info_serialization() {
338+
let info = ChunkInfo {
339+
file: "Token/chunk_000000.parquet".into(),
340+
min_vid: 0,
341+
max_vid: 50000,
342+
row_count: 50000,
343+
};
344+
let json = serde_json::to_string_pretty(&info).unwrap();
345+
assert!(json.contains("Token/chunk_000000.parquet"));
346+
assert!(json.contains("50000"));
347+
348+
let deserialized: ChunkInfo = serde_json::from_str(&json).unwrap();
349+
assert_eq!(deserialized.file, info.file);
350+
assert_eq!(deserialized.min_vid, info.min_vid);
351+
assert_eq!(deserialized.max_vid, info.max_vid);
352+
assert_eq!(deserialized.row_count, info.row_count);
353+
}
354+
}

0 commit comments

Comments
 (0)