Skip to content

Commit 35b6386

Browse files
authored
feat: support complex types (#202)
1 parent 1f4aac6 commit 35b6386

4 files changed

Lines changed: 208 additions & 7 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
//! Integration tests for reading Paimon tables provisioned by Spark.
1919
20-
use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array, RecordBatch, StringArray};
20+
use arrow_array::{
21+
Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, RecordBatch,
22+
StringArray, StructArray,
23+
};
2124
use futures::TryStreamExt;
2225
use paimon::api::ConfigResponse;
2326
use paimon::catalog::{Identifier, RESTCatalog};
@@ -1369,3 +1372,98 @@ async fn test_read_schema_evolution_drop_column() {
13691372
"Old rows should be readable after DROP COLUMN, with only remaining columns"
13701373
);
13711374
}
1375+
1376+
// ---------------------------------------------------------------------------
1377+
// Complex type integration tests
1378+
// ---------------------------------------------------------------------------
1379+
1380+
/// Test reading a table with complex types: ARRAY<INT>, MAP<STRING, INT>, STRUCT<name: STRING, value: INT>.
1381+
#[tokio::test]
1382+
async fn test_read_complex_type_table() {
1383+
let (_, batches) = scan_and_read_with_fs_catalog("complex_type_table", None).await;
1384+
1385+
#[allow(clippy::type_complexity)]
1386+
let mut rows: Vec<(i32, Vec<i32>, Vec<(String, i32)>, (String, i32))> = Vec::new();
1387+
for batch in &batches {
1388+
let id = batch
1389+
.column_by_name("id")
1390+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1391+
.expect("id");
1392+
let int_array = batch
1393+
.column_by_name("int_array")
1394+
.and_then(|c| c.as_any().downcast_ref::<ListArray>())
1395+
.expect("int_array as ListArray");
1396+
let string_map = batch
1397+
.column_by_name("string_map")
1398+
.and_then(|c| c.as_any().downcast_ref::<MapArray>())
1399+
.expect("string_map as MapArray");
1400+
let row_field = batch
1401+
.column_by_name("row_field")
1402+
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
1403+
.expect("row_field as StructArray");
1404+
1405+
for i in 0..batch.num_rows() {
1406+
// Extract ARRAY<INT>
1407+
let list_values = int_array.value(i);
1408+
let int_arr = list_values
1409+
.as_any()
1410+
.downcast_ref::<Int32Array>()
1411+
.expect("list element as Int32Array");
1412+
let arr_vals: Vec<i32> = (0..int_arr.len()).map(|j| int_arr.value(j)).collect();
1413+
1414+
// Extract MAP<STRING, INT>
1415+
let map_val = string_map.value(i);
1416+
let map_struct = map_val
1417+
.as_any()
1418+
.downcast_ref::<StructArray>()
1419+
.expect("map entries as StructArray");
1420+
let keys = map_struct
1421+
.column(0)
1422+
.as_any()
1423+
.downcast_ref::<StringArray>()
1424+
.expect("map keys");
1425+
let values = map_struct
1426+
.column(1)
1427+
.as_any()
1428+
.downcast_ref::<Int32Array>()
1429+
.expect("map values");
1430+
let mut map_entries: Vec<(String, i32)> = (0..keys.len())
1431+
.map(|j| (keys.value(j).to_string(), values.value(j)))
1432+
.collect();
1433+
map_entries.sort_by(|a, b| a.0.cmp(&b.0));
1434+
1435+
// Extract STRUCT<name: STRING, value: INT>
1436+
let struct_name = row_field
1437+
.column_by_name("name")
1438+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
1439+
.expect("struct name");
1440+
let struct_value = row_field
1441+
.column_by_name("value")
1442+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1443+
.expect("struct value");
1444+
1445+
rows.push((
1446+
id.value(i),
1447+
arr_vals,
1448+
map_entries,
1449+
(struct_name.value(i).to_string(), struct_value.value(i)),
1450+
));
1451+
}
1452+
}
1453+
rows.sort_by_key(|(id, _, _, _)| *id);
1454+
1455+
assert_eq!(
1456+
rows,
1457+
vec![
1458+
(
1459+
1,
1460+
vec![1, 2, 3],
1461+
vec![("a".into(), 10), ("b".into(), 20)],
1462+
("alice".into(), 100),
1463+
),
1464+
(2, vec![4, 5], vec![("c".into(), 30)], ("bob".into(), 200),),
1465+
(3, vec![], vec![], ("carol".into(), 300),),
1466+
],
1467+
"Complex type table should return correct ARRAY, MAP, and STRUCT values"
1468+
);
1469+
}

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,3 +523,79 @@ async fn test_data_evolution_drop_column_null_fill() {
523523
"Old rows should have extra=NULL, new row should have extra='new'"
524524
);
525525
}
526+
527+
// ======================= Complex Type Tests =======================
528+
529+
#[tokio::test]
530+
async fn test_read_complex_type_table_via_datafusion() {
531+
let batches = collect_query(
532+
"complex_type_table",
533+
"SELECT id, int_array, string_map, row_field FROM complex_type_table ORDER BY id",
534+
)
535+
.await
536+
.expect("Complex type query should succeed");
537+
538+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
539+
assert_eq!(total_rows, 3, "Expected 3 rows from complex_type_table");
540+
541+
// Verify column types exist and are correct
542+
for batch in &batches {
543+
let schema = batch.schema();
544+
assert!(
545+
schema.field_with_name("int_array").is_ok(),
546+
"int_array column should exist"
547+
);
548+
assert!(
549+
schema.field_with_name("string_map").is_ok(),
550+
"string_map column should exist"
551+
);
552+
assert!(
553+
schema.field_with_name("row_field").is_ok(),
554+
"row_field column should exist"
555+
);
556+
}
557+
558+
// Extract and verify data using Arrow arrays
559+
let mut rows: Vec<(i32, String, String, String)> = Vec::new();
560+
for batch in &batches {
561+
let id_array = batch
562+
.column_by_name("id")
563+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
564+
.expect("Expected Int32Array for id");
565+
let int_array_col = batch.column_by_name("int_array").expect("int_array");
566+
let string_map_col = batch.column_by_name("string_map").expect("string_map");
567+
let row_field_col = batch.column_by_name("row_field").expect("row_field");
568+
569+
for i in 0..batch.num_rows() {
570+
use datafusion::arrow::util::display::ArrayFormatter;
571+
let fmt_opts = datafusion::arrow::util::display::FormatOptions::default();
572+
573+
let arr_fmt = ArrayFormatter::try_new(int_array_col.as_ref(), &fmt_opts).unwrap();
574+
let map_fmt = ArrayFormatter::try_new(string_map_col.as_ref(), &fmt_opts).unwrap();
575+
let row_fmt = ArrayFormatter::try_new(row_field_col.as_ref(), &fmt_opts).unwrap();
576+
577+
rows.push((
578+
id_array.value(i),
579+
arr_fmt.value(i).to_string(),
580+
map_fmt.value(i).to_string(),
581+
row_fmt.value(i).to_string(),
582+
));
583+
}
584+
}
585+
rows.sort_by_key(|(id, _, _, _)| *id);
586+
587+
assert_eq!(rows[0].0, 1);
588+
assert_eq!(rows[0].1, "[1, 2, 3]");
589+
assert_eq!(rows[0].2, "{a: 10, b: 20}");
590+
assert_eq!(rows[0].3, "{name: alice, value: 100}");
591+
592+
assert_eq!(rows[1].0, 2);
593+
assert_eq!(rows[1].1, "[4, 5]");
594+
assert_eq!(rows[1].2, "{c: 30}");
595+
assert_eq!(rows[1].3, "{name: bob, value: 200}");
596+
597+
assert_eq!(rows[2].0, 3);
598+
assert_eq!(rows[2].1, "[]");
599+
assert_eq!(rows[2].2, "{}");
600+
assert_eq!(rows[2].3, "{name: carol, value: 300}");
601+
}

crates/paimon/src/arrow/reader.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,16 +283,23 @@ fn read_single_file_stream(
283283

284284
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
285285

286-
// Only project columns that exist in this file.
286+
// Project columns by root-level index to correctly handle complex types
287+
// (ARRAY, MAP, STRUCT). `ProjectionMask::columns` matches leaf column names
288+
// which doesn't work for nested types; `ProjectionMask::roots` uses top-level
289+
// field indices instead.
287290
let parquet_schema = batch_stream_builder.parquet_schema().clone();
288-
let file_column_names: Vec<&str> = parquet_schema.columns().iter().map(|c| c.name()).collect();
289-
let available_columns: Vec<&str> = parquet_column_names
291+
let root_schema = parquet_schema.root_schema();
292+
let root_indices: Vec<usize> = parquet_column_names
290293
.iter()
291-
.filter(|name| file_column_names.contains(&name.as_str()))
292-
.map(String::as_str)
294+
.filter_map(|name| {
295+
root_schema
296+
.get_fields()
297+
.iter()
298+
.position(|f| f.name() == name)
299+
})
293300
.collect();
294301

295-
let mask = ProjectionMask::columns(&parquet_schema, available_columns.iter().copied());
302+
let mask = ProjectionMask::roots(&parquet_schema, root_indices);
296303
batch_stream_builder = batch_stream_builder.with_projection(mask);
297304

298305
if let Some(ref dv) = dv {

dev/spark/provision.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,26 @@ def main():
515515
"""
516516
)
517517

518+
# ===== Complex Types table: ARRAY, MAP, STRUCT =====
519+
spark.sql(
520+
"""
521+
CREATE TABLE IF NOT EXISTS complex_type_table (
522+
id INT,
523+
int_array ARRAY<INT>,
524+
string_map MAP<STRING, INT>,
525+
row_field STRUCT<name: STRING, value: INT>
526+
) USING paimon
527+
"""
528+
)
529+
spark.sql(
530+
"""
531+
INSERT INTO complex_type_table VALUES
532+
(1, array(1, 2, 3), map('a', 10, 'b', 20), named_struct('name', 'alice', 'value', 100)),
533+
(2, array(4, 5), map('c', 30), named_struct('name', 'bob', 'value', 200)),
534+
(3, array(), map(), named_struct('name', 'carol', 'value', 300))
535+
"""
536+
)
537+
518538

519539
if __name__ == "__main__":
520540
main()

0 commit comments

Comments
 (0)