Skip to content

Commit 1f4aac6

Browse files
authored
feat(scan): support data-level stats pruning in TableScan (#196)
1 parent 4ff1c11 commit 1f4aac6

5 files changed

Lines changed: 1142 additions & 126 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 179 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,6 @@ async fn test_read_partitioned_table_with_filter() {
543543

544544
let catalog = create_file_system_catalog();
545545
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
546-
// Build a filter: dt = '2024-01-01'
547546
let schema = table.schema();
548547
let pb = PredicateBuilder::new(schema.fields());
549548
let filter = pb
@@ -577,7 +576,6 @@ async fn test_read_multi_partitioned_table_with_filter() {
577576
let schema = table.schema();
578577
let pb = PredicateBuilder::new(schema.fields());
579578

580-
// Filter: dt = '2024-01-01' AND hr = 10
581579
let filter = Predicate::and(vec![
582580
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
583581
pb.equal("hr", Datum::Int(10)).unwrap(),
@@ -600,16 +598,14 @@ async fn test_read_multi_partitioned_table_with_filter() {
600598
}
601599

602600
#[tokio::test]
603-
async fn test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
601+
async fn test_read_partitioned_table_data_only_filter_prunes_all_files() {
604602
use paimon::spec::{Datum, PredicateBuilder};
605603

606604
let catalog = create_file_system_catalog();
607605
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
608606
let schema = table.schema();
609607
let pb = PredicateBuilder::new(schema.fields());
610608

611-
// Data-only filter: id > 10 — should NOT prune any partitions,
612-
// and is still ignored at read level in Phase 2.
613609
let filter = pb
614610
.greater_than("id", Datum::Int(10))
615611
.expect("Failed to build predicate");
@@ -618,24 +614,18 @@ async fn test_read_partitioned_table_data_only_filter_preserves_all_partitions()
618614
let seen_partitions = extract_plan_partitions(&plan);
619615
assert_eq!(
620616
seen_partitions,
621-
HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
622-
"Data-only filter should not prune any partitions"
617+
HashSet::<String>::new(),
618+
"Data-only filter should prune all files when stats prove no match"
623619
);
624620

625621
let actual = extract_id_name(&batches);
626622
assert_eq!(
627623
actual,
628-
vec![
629-
(1, "alice".to_string()),
630-
(2, "bob".to_string()),
631-
(3, "carol".to_string()),
632-
],
633-
"Data predicate is not applied at read level; all rows are still returned"
624+
Vec::<(i32, String)>::new(),
625+
"No rows should be planned when stats prove the predicate is unsatisfiable"
634626
);
635627
}
636628

637-
/// Mixed AND: partition predicate prunes partitions, but data predicate is
638-
/// silently ignored — all rows from the matching partition are returned.
639629
#[tokio::test]
640630
async fn test_read_partitioned_table_mixed_and_filter() {
641631
use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -645,8 +635,6 @@ async fn test_read_partitioned_table_mixed_and_filter() {
645635
let schema = table.schema();
646636
let pb = PredicateBuilder::new(schema.fields());
647637

648-
// dt = '2024-01-01' AND id > 10
649-
// Partition conjunct (dt) is applied; data conjunct (id) is NOT.
650638
let filter = Predicate::and(vec![
651639
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
652640
pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -656,20 +644,92 @@ async fn test_read_partitioned_table_mixed_and_filter() {
656644
let seen_partitions = extract_plan_partitions(&plan);
657645
assert_eq!(
658646
seen_partitions,
659-
HashSet::from(["2024-01-01".into()]),
660-
"Only dt=2024-01-01 should survive"
647+
HashSet::<String>::new(),
648+
"The matching partition should also be pruned when file stats prove no match"
661649
);
662650

663651
let actual = extract_id_name(&batches);
664652
assert_eq!(
665653
actual,
666-
vec![(1, "alice".to_string()), (2, "bob".to_string())],
667-
"Data predicate (id > 10) is NOT applied — all rows from matching partition returned"
654+
Vec::<(i32, String)>::new(),
655+
"No rows should remain after partition pruning and data stats pruning"
656+
);
657+
}
658+
659+
#[tokio::test]
660+
async fn test_read_partitioned_table_data_only_filter_keeps_matching_partition() {
661+
use paimon::spec::{Datum, PredicateBuilder};
662+
663+
let catalog = create_file_system_catalog();
664+
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
665+
let schema = table.schema();
666+
let pb = PredicateBuilder::new(schema.fields());
667+
668+
let filter = pb
669+
.greater_than("id", Datum::Int(2))
670+
.expect("Failed to build predicate");
671+
672+
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
673+
let seen_partitions = extract_plan_partitions(&plan);
674+
assert_eq!(
675+
seen_partitions,
676+
HashSet::from(["2024-01-02".into()]),
677+
"Only files whose stats may satisfy the predicate should remain in the plan"
678+
);
679+
680+
let actual = extract_id_name(&batches);
681+
assert_eq!(
682+
actual,
683+
vec![(3, "carol".to_string())],
684+
"Only rows from files that survive stats pruning should be returned"
668685
);
669686
}
670687

671-
/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
672-
/// predicate, so no partitions should be pruned.
688+
/// Java-style inclusive projection can still extract partition predicates from
689+
/// an OR of mixed AND branches.
690+
#[tokio::test]
691+
async fn test_read_multi_partitioned_table_or_of_mixed_ands_prunes_partitions() {
692+
use paimon::spec::{Datum, Predicate, PredicateBuilder};
693+
694+
let catalog = create_file_system_catalog();
695+
let table = get_table_from_catalog(&catalog, "multi_partitioned_log_table").await;
696+
let schema = table.schema();
697+
let pb = PredicateBuilder::new(schema.fields());
698+
699+
let filter = Predicate::or(vec![
700+
Predicate::and(vec![
701+
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
702+
pb.equal("hr", Datum::Int(10)).unwrap(),
703+
pb.greater_than("id", Datum::Int(10)).unwrap(),
704+
]),
705+
Predicate::and(vec![
706+
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
707+
pb.equal("hr", Datum::Int(20)).unwrap(),
708+
]),
709+
]);
710+
711+
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
712+
let seen_partitions = extract_plan_multi_partitions(&plan);
713+
assert_eq!(
714+
seen_partitions,
715+
HashSet::from([("2024-01-01".into(), 10), ("2024-01-01".into(), 20)]),
716+
"Inclusive projection should prune the dt=2024-01-02 partition"
717+
);
718+
719+
let actual = extract_id_name(&batches);
720+
assert_eq!(
721+
actual,
722+
vec![
723+
(1, "alice".to_string()),
724+
(2, "bob".to_string()),
725+
(3, "carol".to_string()),
726+
],
727+
"All rows from the surviving partitions should be returned"
728+
);
729+
}
730+
731+
/// A directly mixed OR like `dt = '...' OR id > 10` is still not safely
732+
/// splittable into a partition predicate, so no partitions should be pruned.
673733
#[tokio::test]
674734
async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
675735
use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -679,7 +739,6 @@ async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
679739
let schema = table.schema();
680740
let pb = PredicateBuilder::new(schema.fields());
681741

682-
// dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
683742
let filter = Predicate::or(vec![
684743
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
685744
pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -705,7 +764,7 @@ async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
705764
);
706765
}
707766

708-
/// Filter that matches no existing partition — all entries pruned, 0 splits.
767+
/// A filter that matches no partition should produce no splits.
709768
#[tokio::test]
710769
async fn test_read_partitioned_table_filter_matches_no_partition() {
711770
use paimon::spec::{Datum, PredicateBuilder};
@@ -715,7 +774,6 @@ async fn test_read_partitioned_table_filter_matches_no_partition() {
715774
let schema = table.schema();
716775
let pb = PredicateBuilder::new(schema.fields());
717776

718-
// dt = '9999-12-31' matches no partition.
719777
let filter = pb
720778
.equal("dt", Datum::String("9999-12-31".into()))
721779
.expect("Failed to build predicate");
@@ -744,8 +802,7 @@ async fn test_read_partitioned_table_eval_row_error_fails_plan() {
744802
.position(|f| f.name() == "dt")
745803
.expect("dt partition column should exist");
746804

747-
// Use an unsupported DataType in a partition leaf so remapping succeeds
748-
// but `eval_row` fails during partition pruning.
805+
// Use an unsupported partition type so remapping succeeds but `eval_row` fails.
749806
let filter = Predicate::Leaf {
750807
column: "dt".into(),
751808
index: dt_index,
@@ -1086,6 +1143,100 @@ async fn test_read_schema_evolution_type_promotion() {
10861143
);
10871144
}
10881145

1146+
/// Stats pruning should treat a newly added column as all-NULL for old files.
1147+
#[tokio::test]
1148+
async fn test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
1149+
use paimon::spec::{Datum, PredicateBuilder};
1150+
1151+
let catalog = create_file_system_catalog();
1152+
let table = get_table_from_catalog(&catalog, "schema_evolution_add_column").await;
1153+
let pb = PredicateBuilder::new(table.schema().fields());
1154+
let filter = pb
1155+
.equal("age", Datum::Int(30))
1156+
.expect("Failed to build predicate");
1157+
1158+
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
1159+
assert_eq!(
1160+
plan.splits().len(),
1161+
1,
1162+
"Only the file written after ADD COLUMN should survive stats pruning"
1163+
);
1164+
1165+
let actual = extract_id_name(&batches);
1166+
assert_eq!(
1167+
actual,
1168+
vec![(3, "carol".to_string())],
1169+
"Old files missing 'age' and rows with age != 30 should be pruned"
1170+
);
1171+
}
1172+
1173+
/// Stats pruning should keep only old files for IS NULL on a newly added column.
1174+
#[tokio::test]
1175+
async fn test_stats_pruning_schema_evolution_added_column_is_null_prunes_new_files() {
1176+
use paimon::spec::PredicateBuilder;
1177+
1178+
let catalog = create_file_system_catalog();
1179+
let table = get_table_from_catalog(&catalog, "schema_evolution_add_column").await;
1180+
let pb = PredicateBuilder::new(table.schema().fields());
1181+
let filter = pb.is_null("age").expect("Failed to build predicate");
1182+
1183+
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
1184+
assert_eq!(
1185+
plan.splits().len(),
1186+
1,
1187+
"Only files missing 'age' should survive stats pruning for age IS NULL"
1188+
);
1189+
1190+
let actual = extract_id_name(&batches);
1191+
assert_eq!(
1192+
actual,
1193+
vec![(1, "alice".to_string()), (2, "bob".to_string())],
1194+
"New files with non-null age should be pruned for age IS NULL"
1195+
);
1196+
}
1197+
1198+
/// Stats pruning should still work after INT -> BIGINT type promotion.
1199+
#[tokio::test]
1200+
async fn test_stats_pruning_schema_evolution_type_promotion_prunes_old_int_files() {
1201+
use paimon::spec::{Datum, PredicateBuilder};
1202+
1203+
let catalog = create_file_system_catalog();
1204+
let table = get_table_from_catalog(&catalog, "schema_evolution_type_promotion").await;
1205+
let pb = PredicateBuilder::new(table.schema().fields());
1206+
let filter = pb
1207+
.greater_than("value", Datum::Long(250))
1208+
.expect("Failed to build predicate");
1209+
1210+
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
1211+
assert_eq!(
1212+
plan.splits().len(),
1213+
1,
1214+
"Old INT files should still be pruned using promoted BIGINT predicates"
1215+
);
1216+
1217+
let mut rows: Vec<(i32, i64)> = Vec::new();
1218+
for batch in &batches {
1219+
let id = batch
1220+
.column_by_name("id")
1221+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1222+
.expect("id");
1223+
let value = batch
1224+
.column_by_name("value")
1225+
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1226+
.expect("value");
1227+
for i in 0..batch.num_rows() {
1228+
rows.push((id.value(i), value.value(i)));
1229+
}
1230+
}
1231+
rows.sort_by_key(|(id, _)| *id);
1232+
1233+
assert_eq!(
1234+
rows,
1235+
vec![(3, 3_000_000_000i64)],
1236+
"Only the BIGINT file should remain after value > 250 pruning"
1237+
);
1238+
}
1239+
10891240
/// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
10901241
/// Old files lack the new column; reader should fill nulls even in data evolution mode.
10911242
#[tokio::test]

crates/paimon/src/spec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ mod partition_utils;
5757
pub(crate) use partition_utils::PartitionComputer;
5858
mod predicate;
5959
pub(crate) use predicate::eval_row;
60+
pub(crate) use predicate::extract_datum;
6061
pub use predicate::{
6162
field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, PredicateOperator,
6263
};

0 commit comments

Comments
 (0)