Skip to content

Commit dd71c58

Browse files
authored
feat: abstract FormatFileReader and introduce ORC & avro readers (#225)
1 parent cc13352 commit dd71c58

15 files changed

Lines changed: 2749 additions & 935 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ rust-version = "1.86.0"
3030
[workspace.dependencies]
3131
arrow = "57.0"
3232
arrow-array = { version = "57.0", features = ["ffi"] }
33+
arrow-buffer = "57.0"
3334
arrow-schema = "57.0"
3435
arrow-cast = "57.0"
3536
arrow-ord = "57.0"

crates/integration_tests/tests/read_tables.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2292,3 +2292,292 @@ async fn test_read_data_evolution_table_only_row_id_with_row_ranges() {
22922292
"Row range filtered count ({total_rows}) should be <= full count ({full_count})"
22932293
);
22942294
}
2295+
2296+
// ---------------------------------------------------------------------------
2297+
// Full types integration tests (parquet + orc + avro)
2298+
// ---------------------------------------------------------------------------
2299+
2300+
#[tokio::test]
2301+
async fn test_read_full_types_table() {
2302+
use arrow_array::{
2303+
BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
2304+
Int16Array, Int64Array, Int8Array, ListArray, MapArray, StructArray,
2305+
TimestampMicrosecondArray,
2306+
};
2307+
2308+
let (_, batches) = scan_and_read_with_fs_catalog("full_types_table", None).await;
2309+
2310+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2311+
assert_eq!(total_rows, 3, "full_types_table should have 3 rows");
2312+
2313+
// Collect all rows sorted by id.
2314+
// We verify primitive types via a tuple, then complex/extra decimal types separately.
2315+
#[allow(clippy::type_complexity)]
2316+
let mut rows: Vec<(
2317+
i32,
2318+
bool,
2319+
i8,
2320+
i16,
2321+
i32,
2322+
i64,
2323+
f32,
2324+
f64,
2325+
i128,
2326+
i128,
2327+
i128,
2328+
String,
2329+
Vec<u8>,
2330+
i32,
2331+
i64,
2332+
i64,
2333+
Vec<i32>,
2334+
Vec<(String, i32)>,
2335+
(String, i32),
2336+
)> = Vec::new();
2337+
for batch in &batches {
2338+
let id = batch
2339+
.column_by_name("id")
2340+
.unwrap()
2341+
.as_any()
2342+
.downcast_ref::<Int32Array>()
2343+
.unwrap();
2344+
let col_boolean = batch
2345+
.column_by_name("col_boolean")
2346+
.unwrap()
2347+
.as_any()
2348+
.downcast_ref::<BooleanArray>()
2349+
.unwrap();
2350+
let col_tinyint = batch
2351+
.column_by_name("col_tinyint")
2352+
.unwrap()
2353+
.as_any()
2354+
.downcast_ref::<Int8Array>()
2355+
.unwrap();
2356+
let col_smallint = batch
2357+
.column_by_name("col_smallint")
2358+
.unwrap()
2359+
.as_any()
2360+
.downcast_ref::<Int16Array>()
2361+
.unwrap();
2362+
let col_int = batch
2363+
.column_by_name("col_int")
2364+
.unwrap()
2365+
.as_any()
2366+
.downcast_ref::<Int32Array>()
2367+
.unwrap();
2368+
let col_bigint = batch
2369+
.column_by_name("col_bigint")
2370+
.unwrap()
2371+
.as_any()
2372+
.downcast_ref::<Int64Array>()
2373+
.unwrap();
2374+
let col_float = batch
2375+
.column_by_name("col_float")
2376+
.unwrap()
2377+
.as_any()
2378+
.downcast_ref::<Float32Array>()
2379+
.unwrap();
2380+
let col_double = batch
2381+
.column_by_name("col_double")
2382+
.unwrap()
2383+
.as_any()
2384+
.downcast_ref::<Float64Array>()
2385+
.unwrap();
2386+
let col_decimal = batch
2387+
.column_by_name("col_decimal")
2388+
.unwrap()
2389+
.as_any()
2390+
.downcast_ref::<Decimal128Array>()
2391+
.unwrap();
2392+
let col_decimal5 = batch
2393+
.column_by_name("col_decimal5")
2394+
.unwrap()
2395+
.as_any()
2396+
.downcast_ref::<Decimal128Array>()
2397+
.unwrap();
2398+
let col_decimal38 = batch
2399+
.column_by_name("col_decimal38")
2400+
.unwrap()
2401+
.as_any()
2402+
.downcast_ref::<Decimal128Array>()
2403+
.unwrap();
2404+
let col_string = batch
2405+
.column_by_name("col_string")
2406+
.unwrap()
2407+
.as_any()
2408+
.downcast_ref::<StringArray>()
2409+
.unwrap();
2410+
let col_binary = batch
2411+
.column_by_name("col_binary")
2412+
.unwrap()
2413+
.as_any()
2414+
.downcast_ref::<BinaryArray>()
2415+
.unwrap();
2416+
let col_date = batch
2417+
.column_by_name("col_date")
2418+
.unwrap()
2419+
.as_any()
2420+
.downcast_ref::<Date32Array>()
2421+
.unwrap();
2422+
let col_ts = batch
2423+
.column_by_name("col_timestamp")
2424+
.unwrap()
2425+
.as_any()
2426+
.downcast_ref::<TimestampMicrosecondArray>()
2427+
.unwrap();
2428+
let col_ts_ltz = batch
2429+
.column_by_name("col_timestamp_ltz")
2430+
.unwrap()
2431+
.as_any()
2432+
.downcast_ref::<TimestampMicrosecondArray>()
2433+
.unwrap();
2434+
let col_array = batch
2435+
.column_by_name("col_array")
2436+
.unwrap()
2437+
.as_any()
2438+
.downcast_ref::<ListArray>()
2439+
.unwrap();
2440+
let col_map = batch
2441+
.column_by_name("col_map")
2442+
.unwrap()
2443+
.as_any()
2444+
.downcast_ref::<MapArray>()
2445+
.unwrap();
2446+
let col_struct = batch
2447+
.column_by_name("col_struct")
2448+
.unwrap()
2449+
.as_any()
2450+
.downcast_ref::<StructArray>()
2451+
.unwrap();
2452+
2453+
for i in 0..batch.num_rows() {
2454+
// Extract ARRAY<INT>
2455+
let list_values = col_array.value(i);
2456+
let int_arr = list_values.as_any().downcast_ref::<Int32Array>().unwrap();
2457+
let arr_vals: Vec<i32> = (0..int_arr.len()).map(|j| int_arr.value(j)).collect();
2458+
2459+
// Extract MAP<STRING, INT>
2460+
let map_val = col_map.value(i);
2461+
let map_struct = map_val.as_any().downcast_ref::<StructArray>().unwrap();
2462+
let keys = map_struct
2463+
.column(0)
2464+
.as_any()
2465+
.downcast_ref::<StringArray>()
2466+
.unwrap();
2467+
let values = map_struct
2468+
.column(1)
2469+
.as_any()
2470+
.downcast_ref::<Int32Array>()
2471+
.unwrap();
2472+
let mut map_entries: Vec<(String, i32)> = (0..keys.len())
2473+
.map(|j| (keys.value(j).to_string(), values.value(j)))
2474+
.collect();
2475+
map_entries.sort_by(|a, b| a.0.cmp(&b.0));
2476+
2477+
// Extract STRUCT<name: STRING, value: INT>
2478+
let struct_name = col_struct
2479+
.column_by_name("name")
2480+
.unwrap()
2481+
.as_any()
2482+
.downcast_ref::<StringArray>()
2483+
.unwrap();
2484+
let struct_value = col_struct
2485+
.column_by_name("value")
2486+
.unwrap()
2487+
.as_any()
2488+
.downcast_ref::<Int32Array>()
2489+
.unwrap();
2490+
2491+
rows.push((
2492+
id.value(i),
2493+
col_boolean.value(i),
2494+
col_tinyint.value(i),
2495+
col_smallint.value(i),
2496+
col_int.value(i),
2497+
col_bigint.value(i),
2498+
col_float.value(i),
2499+
col_double.value(i),
2500+
col_decimal.value(i),
2501+
col_decimal5.value(i),
2502+
col_decimal38.value(i),
2503+
col_string.value(i).to_string(),
2504+
col_binary.value(i).to_vec(),
2505+
col_date.value(i),
2506+
col_ts.value(i),
2507+
col_ts_ltz.value(i),
2508+
arr_vals,
2509+
map_entries,
2510+
(struct_name.value(i).to_string(), struct_value.value(i)),
2511+
));
2512+
}
2513+
}
2514+
rows.sort_by_key(|r| r.0);
2515+
2516+
assert_eq!(rows.len(), 3);
2517+
2518+
// id=1 (parquet)
2519+
let r = &rows[0];
2520+
assert_eq!(r.0, 1);
2521+
assert!(r.1); // boolean = true
2522+
assert_eq!(r.2, 1i8); // tinyint
2523+
assert_eq!(r.3, 100i16); // smallint
2524+
assert_eq!(r.4, 1000); // int
2525+
assert_eq!(r.5, 100000i64); // bigint
2526+
assert!((r.6 - 1.5f32).abs() < f32::EPSILON); // float
2527+
assert!((r.7 - 2.5f64).abs() < f64::EPSILON); // double
2528+
assert_eq!(r.8, 12345); // decimal(10,2) = 123.45 * 100
2529+
assert_eq!(r.9, 12345); // decimal(5,0) = 12345
2530+
assert_eq!(r.10, 12_345_678_901_234_567_890_000i128); // decimal(38,18) 12345.678901234567890 * 10^18
2531+
assert_eq!(r.11, "parquet-hello"); // string
2532+
assert_eq!(r.12, vec![0xDE, 0xAD, 0xBE, 0xEF]); // binary
2533+
assert_eq!(r.13, 19723); // date: 2024-01-01 days since epoch
2534+
assert_eq!(r.14, 1_704_103_200_123_456); // ts micros
2535+
assert_eq!(r.15, 1_704_103_200_123_456); // ts_ltz micros
2536+
assert_eq!(r.16, vec![1, 2, 3]); // array
2537+
assert_eq!(r.17, vec![("a".into(), 10), ("b".into(), 20)]); // map
2538+
assert_eq!(r.18, ("alice".into(), 100)); // struct
2539+
2540+
// id=2 (orc)
2541+
let r = &rows[1];
2542+
assert_eq!(r.0, 2);
2543+
assert!(!r.1); // boolean = false
2544+
assert_eq!(r.2, 2i8);
2545+
assert_eq!(r.3, 200i16);
2546+
assert_eq!(r.4, 2000);
2547+
assert_eq!(r.5, 200000i64);
2548+
assert!((r.6 - 3.5f32).abs() < f32::EPSILON);
2549+
assert!((r.7 - 4.5f64).abs() < f64::EPSILON);
2550+
assert_eq!(r.8, 67890); // 678.90 * 100
2551+
assert_eq!(r.9, 99999); // decimal(5,0)
2552+
assert_eq!(r.10, 99_999_999_999_999_999_999_000i128); // decimal(38,18) 99999.999999999999999 * 10^18
2553+
assert_eq!(r.11, "orc-world");
2554+
assert_eq!(r.12, vec![0xCA, 0xFE, 0xBA, 0xBE]);
2555+
assert_eq!(r.13, 19889); // date: 2024-06-15 days since epoch
2556+
assert_eq!(r.14, 1_718_454_600_456_789);
2557+
assert_eq!(r.15, 1_718_454_600_456_789);
2558+
assert_eq!(r.16, vec![4, 5]); // array
2559+
assert_eq!(r.17, vec![("c".into(), 30)]); // map
2560+
assert_eq!(r.18, ("bob".into(), 200)); // struct
2561+
2562+
// id=3 (avro)
2563+
let r = &rows[2];
2564+
assert_eq!(r.0, 3);
2565+
assert!(r.1); // boolean = true
2566+
assert_eq!(r.2, 3i8);
2567+
assert_eq!(r.3, 300i16);
2568+
assert_eq!(r.4, 3000);
2569+
assert_eq!(r.5, 300000i64);
2570+
assert!((r.6 - 5.5f32).abs() < f32::EPSILON);
2571+
assert!((r.7 - 6.5f64).abs() < f64::EPSILON);
2572+
assert_eq!(r.8, 99999); // 999.99 * 100
2573+
assert_eq!(r.9, 0); // decimal(5,0)
2574+
assert_eq!(r.10, 1); // decimal(38,18) = 0.000000000000000001 * 10^18
2575+
assert_eq!(r.11, "avro-test");
2576+
assert_eq!(r.12, vec![0x01, 0x02, 0x03, 0x04]);
2577+
assert_eq!(r.13, 20453); // date: 2025-12-31 days since epoch
2578+
assert_eq!(r.14, 1_767_225_599_999_999);
2579+
assert_eq!(r.15, 1_767_225_599_999_999);
2580+
assert_eq!(r.16, vec![6]); // array
2581+
assert_eq!(r.17, vec![("d".into(), 40), ("e".into(), 50)]); // map
2582+
assert_eq!(r.18, ("carol".into(), 300)); // struct
2583+
}

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ mod tests {
313313
vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
314314
Some(2),
315315
);
316+
let file_size = fs::metadata(bucket_dir.join("data.parquet")).unwrap().len() as i64;
316317

317318
let file_io = FileIOBuilder::new("file").build().unwrap();
318319
let table_schema = TableSchema::new(
@@ -336,7 +337,7 @@ mod tests {
336337
.with_bucket(0)
337338
.with_bucket_path(local_file_path(&bucket_dir))
338339
.with_total_buckets(1)
339-
.with_data_files(vec![test_data_file("data.parquet", 4)])
340+
.with_data_files(vec![test_data_file("data.parquet", 4, file_size)])
340341
.with_raw_convertible(true)
341342
.build()
342343
.unwrap();

crates/paimon/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] }
5656
indexmap = "2.5.0"
5757
roaring = "0.11"
5858
arrow-array = { workspace = true }
59+
arrow-buffer = { workspace = true }
5960
arrow-cast = { workspace = true }
6061
arrow-ord = { workspace = true }
6162
arrow-schema = { workspace = true }
6263
futures = "0.3"
6364
parquet = { workspace = true, features = ["async", "zstd"] }
65+
orc-rust = "0.7.0"
6466
async-stream = "0.3.6"
6567
reqwest = { version = "0.12", features = ["json"] }
6668
# DLF authentication dependencies

0 commit comments

Comments
 (0)