Skip to content

Commit 4ff1c11

Browse files
authored
feat: support schema evolution read with SchemaManager (#197)
* feat(arrow): support schema evolution read with SchemaManager and field-ID-based index mapping
1 parent 30d76b9 commit 4ff1c11

16 files changed

Lines changed: 1231 additions & 235 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ rust-version = "1.86.0"
3030
[workspace.dependencies]
3131
arrow-array = { version = "57.0", features = ["ffi"] }
3232
arrow-schema = "57.0"
33+
arrow-cast = "57.0"
3334
arrow-select = "57.0"
3435
parquet = "57.0"
3536
tokio = "1.39.2"

crates/integration_tests/tests/read_tables.rs

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Integration tests for reading Paimon tables provisioned by Spark.
1919
20-
use arrow_array::{Int32Array, RecordBatch, StringArray};
20+
use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array, RecordBatch, StringArray};
2121
use futures::TryStreamExt;
2222
use paimon::api::ConfigResponse;
2323
use paimon::catalog::{Identifier, RESTCatalog};
@@ -999,3 +999,222 @@ async fn test_limit_pushdown() {
999999
);
10001000
}
10011001
}
1002+
1003+
// ---------------------------------------------------------------------------
1004+
// Schema Evolution integration tests
1005+
// ---------------------------------------------------------------------------
1006+
1007+
/// Test reading a table after ALTER TABLE ADD COLUMNS.
1008+
/// Old data files lack the new column; reader should fill nulls.
1009+
#[tokio::test]
1010+
async fn test_read_schema_evolution_add_column() {
1011+
let (_, batches) = scan_and_read_with_fs_catalog("schema_evolution_add_column", None).await;
1012+
1013+
let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
1014+
for batch in &batches {
1015+
let id = batch
1016+
.column_by_name("id")
1017+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1018+
.expect("id");
1019+
let name = batch
1020+
.column_by_name("name")
1021+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
1022+
.expect("name");
1023+
let age = batch
1024+
.column_by_name("age")
1025+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1026+
.expect("age");
1027+
for i in 0..batch.num_rows() {
1028+
let age_val = if age.is_null(i) {
1029+
None
1030+
} else {
1031+
Some(age.value(i))
1032+
};
1033+
rows.push((id.value(i), name.value(i).to_string(), age_val));
1034+
}
1035+
}
1036+
rows.sort_by_key(|(id, _, _)| *id);
1037+
1038+
assert_eq!(
1039+
rows,
1040+
vec![
1041+
(1, "alice".into(), None),
1042+
(2, "bob".into(), None),
1043+
(3, "carol".into(), Some(30)),
1044+
(4, "dave".into(), Some(40)),
1045+
],
1046+
"Old rows should have null for added column 'age'"
1047+
);
1048+
}
1049+
1050+
/// Test reading a table after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
1051+
/// Old data files have INT; reader should cast to BIGINT.
1052+
#[tokio::test]
1053+
async fn test_read_schema_evolution_type_promotion() {
1054+
let (_, batches) = scan_and_read_with_fs_catalog("schema_evolution_type_promotion", None).await;
1055+
1056+
// Verify the value column is Int64 (BIGINT) in all batches
1057+
for batch in &batches {
1058+
let value_col = batch.column_by_name("value").expect("value column");
1059+
assert_eq!(
1060+
value_col.data_type(),
1061+
&arrow_array::types::Int64Type::DATA_TYPE,
1062+
"value column should be Int64 (BIGINT) after type promotion"
1063+
);
1064+
}
1065+
1066+
let mut rows: Vec<(i32, i64)> = Vec::new();
1067+
for batch in &batches {
1068+
let id = batch
1069+
.column_by_name("id")
1070+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1071+
.expect("id");
1072+
let value = batch
1073+
.column_by_name("value")
1074+
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1075+
.expect("value as Int64Array");
1076+
for i in 0..batch.num_rows() {
1077+
rows.push((id.value(i), value.value(i)));
1078+
}
1079+
}
1080+
rows.sort_by_key(|(id, _)| *id);
1081+
1082+
assert_eq!(
1083+
rows,
1084+
vec![(1, 100i64), (2, 200i64), (3, 3_000_000_000i64)],
1085+
"INT values should be promoted to BIGINT, including values > INT_MAX"
1086+
);
1087+
}
1088+
1089+
/// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
1090+
/// Old files lack the new column; reader should fill nulls even in data evolution mode.
1091+
#[tokio::test]
1092+
async fn test_read_data_evolution_add_column() {
1093+
let (_, batches) = scan_and_read_with_fs_catalog("data_evolution_add_column", None).await;
1094+
1095+
let mut rows: Vec<(i32, String, i32, Option<String>)> = Vec::new();
1096+
for batch in &batches {
1097+
let id = batch
1098+
.column_by_name("id")
1099+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1100+
.expect("id");
1101+
let name = batch
1102+
.column_by_name("name")
1103+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
1104+
.expect("name");
1105+
let value = batch
1106+
.column_by_name("value")
1107+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1108+
.expect("value");
1109+
let extra = batch
1110+
.column_by_name("extra")
1111+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
1112+
.expect("extra");
1113+
for i in 0..batch.num_rows() {
1114+
let extra_val = if extra.is_null(i) {
1115+
None
1116+
} else {
1117+
Some(extra.value(i).to_string())
1118+
};
1119+
rows.push((
1120+
id.value(i),
1121+
name.value(i).to_string(),
1122+
value.value(i),
1123+
extra_val,
1124+
));
1125+
}
1126+
}
1127+
rows.sort_by_key(|(id, _, _, _)| *id);
1128+
1129+
assert_eq!(
1130+
rows,
1131+
vec![
1132+
(1, "alice-v2".into(), 100, None),
1133+
(2, "bob".into(), 200, None),
1134+
(3, "carol".into(), 300, Some("new".into())),
1135+
(4, "dave".into(), 400, Some("new".into())),
1136+
],
1137+
"Data evolution + add column: old rows should have null for 'extra', MERGE INTO updates name"
1138+
);
1139+
}
1140+
1141+
/// Test reading a data-evolution table after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
1142+
/// Old files have INT; reader should cast to BIGINT in data evolution mode.
1143+
#[tokio::test]
1144+
async fn test_read_data_evolution_type_promotion() {
1145+
let (_, batches) = scan_and_read_with_fs_catalog("data_evolution_type_promotion", None).await;
1146+
1147+
// Verify the value column is Int64 (BIGINT) in all batches
1148+
for batch in &batches {
1149+
let value_col = batch.column_by_name("value").expect("value column");
1150+
assert_eq!(
1151+
value_col.data_type(),
1152+
&arrow_array::types::Int64Type::DATA_TYPE,
1153+
"value column should be Int64 (BIGINT) after type promotion in data evolution mode"
1154+
);
1155+
}
1156+
1157+
let mut rows: Vec<(i32, i64)> = Vec::new();
1158+
for batch in &batches {
1159+
let id = batch
1160+
.column_by_name("id")
1161+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1162+
.expect("id");
1163+
let value = batch
1164+
.column_by_name("value")
1165+
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1166+
.expect("value as Int64Array");
1167+
for i in 0..batch.num_rows() {
1168+
rows.push((id.value(i), value.value(i)));
1169+
}
1170+
}
1171+
rows.sort_by_key(|(id, _)| *id);
1172+
1173+
assert_eq!(
1174+
rows,
1175+
vec![(1, 999i64), (2, 200i64), (3, 3_000_000_000i64)],
1176+
"Data evolution + type promotion: INT should be cast to BIGINT, MERGE INTO updates value"
1177+
);
1178+
}
1179+
1180+
/// Test reading a table after ALTER TABLE DROP COLUMN.
1181+
/// Old data files have the dropped column; reader should ignore it.
1182+
#[tokio::test]
1183+
async fn test_read_schema_evolution_drop_column() {
1184+
let (_, batches) = scan_and_read_with_fs_catalog("schema_evolution_drop_column", None).await;
1185+
1186+
// Verify the dropped column 'score' is not present in the output.
1187+
for batch in &batches {
1188+
assert!(
1189+
batch.column_by_name("score").is_none(),
1190+
"Dropped column 'score' should not appear in output"
1191+
);
1192+
}
1193+
1194+
let mut rows: Vec<(i32, String)> = Vec::new();
1195+
for batch in &batches {
1196+
let id = batch
1197+
.column_by_name("id")
1198+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1199+
.expect("id");
1200+
let name = batch
1201+
.column_by_name("name")
1202+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
1203+
.expect("name");
1204+
for i in 0..batch.num_rows() {
1205+
rows.push((id.value(i), name.value(i).to_string()));
1206+
}
1207+
}
1208+
rows.sort_by_key(|(id, _)| *id);
1209+
1210+
assert_eq!(
1211+
rows,
1212+
vec![
1213+
(1, "alice".into()),
1214+
(2, "bob".into()),
1215+
(3, "carol".into()),
1216+
(4, "dave".into()),
1217+
],
1218+
"Old rows should be readable after DROP COLUMN, with only remaining columns"
1219+
);
1220+
}

crates/integrations/datafusion/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ mod error;
4141
mod filter_pushdown;
4242
mod physical_plan;
4343
mod relation_planner;
44-
mod schema;
4544
mod table;
4645

4746
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
4847
pub use error::to_datafusion_error;
4948
pub use physical_plan::PaimonTableScan;
5049
pub use relation_planner::PaimonRelationPlanner;
51-
pub use schema::paimon_schema_to_arrow;
5250
pub use table::PaimonTableProvider;

crates/integrations/datafusion/src/schema.rs

Lines changed: 0 additions & 116 deletions
This file was deleted.

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use paimon::table::Table;
3232
use crate::error::to_datafusion_error;
3333
use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
3434
use crate::physical_plan::PaimonTableScan;
35-
use crate::schema::paimon_schema_to_arrow;
3635

3736
/// Read-only table provider for a Paimon table.
3837
///
@@ -50,7 +49,8 @@ impl PaimonTableProvider {
5049
/// Loads the table schema and converts it to Arrow for DataFusion.
5150
pub fn try_new(table: Table) -> DFResult<Self> {
5251
let fields = table.schema().fields();
53-
let schema = paimon_schema_to_arrow(fields)?;
52+
let schema =
53+
paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?;
5454
Ok(Self { table, schema })
5555
}
5656

0 commit comments

Comments
 (0)