Skip to content

Commit 96c8715

Browse files
feat: Support data evolution row id filter (#222)
1 parent b33b2d7 commit 96c8715

10 files changed

Lines changed: 956 additions & 20 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,3 +2064,231 @@ async fn test_bucket_predicate_filtering_long_string_key() {
20642064
actual.len()
20652065
);
20662066
}
2067+
2068+
// ---------------------------------------------------------------------------
2069+
// Data Evolution Row ID Range Filter integration tests
2070+
// ---------------------------------------------------------------------------
2071+
2072+
async fn scan_and_read_with_row_ranges(
2073+
table: &paimon::Table,
2074+
row_ranges: Vec<paimon::RowRange>,
2075+
) -> (Plan, Vec<RecordBatch>) {
2076+
let mut read_builder = table.new_read_builder();
2077+
read_builder.with_row_ranges(row_ranges);
2078+
let scan = read_builder.new_scan();
2079+
let plan = scan.plan().await.expect("Failed to plan scan");
2080+
2081+
let read = read_builder.new_read().expect("Failed to create read");
2082+
let stream = read
2083+
.to_arrow(plan.splits())
2084+
.expect("Failed to create arrow stream");
2085+
let batches: Vec<_> = stream
2086+
.try_collect()
2087+
.await
2088+
.expect("Failed to collect batches");
2089+
2090+
(plan, batches)
2091+
}
2092+
2093+
fn extract_id_name_value(batches: &[RecordBatch]) -> Vec<(i32, String, i32)> {
2094+
let mut rows = Vec::new();
2095+
for batch in batches {
2096+
let id = batch
2097+
.column_by_name("id")
2098+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
2099+
.expect("id");
2100+
let name = batch
2101+
.column_by_name("name")
2102+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
2103+
.expect("name");
2104+
let value = batch
2105+
.column_by_name("value")
2106+
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
2107+
.expect("value");
2108+
for i in 0..batch.num_rows() {
2109+
rows.push((id.value(i), name.value(i).to_string(), value.value(i)));
2110+
}
2111+
}
2112+
rows.sort_by_key(|(id, _, _)| *id);
2113+
rows
2114+
}
2115+
2116+
#[tokio::test]
2117+
async fn test_read_data_evolution_table_with_row_ranges() {
2118+
use paimon::RowRange;
2119+
2120+
let catalog = create_file_system_catalog();
2121+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
2122+
2123+
let (full_plan, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await;
2124+
let full_rows = extract_id_name_value(&full_batches);
2125+
let full_row_count: usize = full_batches.iter().map(|b| b.num_rows()).sum();
2126+
assert!(full_row_count > 0);
2127+
2128+
let mut min_row_id = i64::MAX;
2129+
let mut max_row_id_exclusive = i64::MIN;
2130+
for split in full_plan.splits() {
2131+
for file in split.data_files() {
2132+
if let Some(fid) = file.first_row_id {
2133+
min_row_id = min_row_id.min(fid);
2134+
max_row_id_exclusive = max_row_id_exclusive.max(fid + file.row_count);
2135+
}
2136+
}
2137+
}
2138+
assert!(min_row_id < max_row_id_exclusive);
2139+
2140+
let mid = min_row_id + (max_row_id_exclusive - min_row_id) / 2;
2141+
let (filtered_plan, filtered_batches) =
2142+
scan_and_read_with_row_ranges(&table, vec![RowRange::new(min_row_id, mid)]).await;
2143+
2144+
let filtered_row_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum();
2145+
let filtered_rows = extract_id_name_value(&filtered_batches);
2146+
2147+
assert!(
2148+
filtered_row_count < full_row_count || mid >= max_row_id_exclusive,
2149+
"filtered={filtered_row_count}, full={full_row_count}"
2150+
);
2151+
for row in &filtered_rows {
2152+
assert!(
2153+
full_rows.contains(row),
2154+
"Filtered row {row:?} not in full result"
2155+
);
2156+
}
2157+
assert!(filtered_plan.splits().len() <= full_plan.splits().len());
2158+
}
2159+
2160+
#[tokio::test]
2161+
async fn test_read_data_evolution_table_with_empty_row_ranges() {
2162+
use paimon::RowRange;
2163+
2164+
let catalog = create_file_system_catalog();
2165+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
2166+
2167+
let (plan, batches) =
2168+
scan_and_read_with_row_ranges(&table, vec![RowRange::new(999_999, 1_000_000)]).await;
2169+
2170+
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
2171+
assert_eq!(row_count, 0);
2172+
assert!(plan.splits().is_empty());
2173+
}
2174+
2175+
#[tokio::test]
2176+
async fn test_read_data_evolution_table_with_full_row_ranges() {
2177+
use paimon::RowRange;
2178+
2179+
let catalog = create_file_system_catalog();
2180+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
2181+
2182+
let (_, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await;
2183+
let full_rows = extract_id_name_value(&full_batches);
2184+
2185+
let (_, filtered_batches) =
2186+
scan_and_read_with_row_ranges(&table, vec![RowRange::new(0, i64::MAX)]).await;
2187+
let filtered_rows = extract_id_name_value(&filtered_batches);
2188+
2189+
assert_eq!(filtered_rows, full_rows);
2190+
}
2191+
2192+
#[tokio::test]
2193+
async fn test_read_data_evolution_table_with_row_id_projection() {
2194+
let catalog = create_file_system_catalog();
2195+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
2196+
2197+
// Project _ROW_ID along with regular columns
2198+
let mut read_builder = table.new_read_builder();
2199+
read_builder.with_projection(&["_ROW_ID", "id", "name"]);
2200+
let scan = read_builder.new_scan();
2201+
let plan = scan.plan().await.expect("Failed to plan scan");
2202+
2203+
let read = read_builder.new_read().expect("Failed to create read");
2204+
let stream = read
2205+
.to_arrow(plan.splits())
2206+
.expect("Failed to create arrow stream");
2207+
let batches: Vec<RecordBatch> = stream
2208+
.try_collect()
2209+
.await
2210+
.expect("Failed to collect batches");
2211+
2212+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2213+
assert!(total_rows > 0, "Should have rows");
2214+
2215+
// Verify _ROW_ID column exists and contains non-negative values
2216+
let mut row_ids: Vec<i64> = Vec::new();
2217+
for batch in &batches {
2218+
let row_id_col = batch
2219+
.column_by_name("_ROW_ID")
2220+
.expect("_ROW_ID column should exist");
2221+
let row_id_array = row_id_col
2222+
.as_any()
2223+
.downcast_ref::<Int64Array>()
2224+
.expect("_ROW_ID should be Int64");
2225+
for i in 0..batch.num_rows() {
2226+
row_ids.push(row_id_array.value(i));
2227+
}
2228+
}
2229+
2230+
assert_eq!(row_ids.len(), total_rows);
2231+
assert!(
2232+
row_ids.iter().all(|&id| id >= 0),
2233+
"All _ROW_ID values should be non-negative"
2234+
);
2235+
// _ROW_ID values should be unique
2236+
let unique: std::collections::HashSet<i64> = row_ids.iter().copied().collect();
2237+
assert_eq!(
2238+
unique.len(),
2239+
row_ids.len(),
2240+
"_ROW_ID values should be unique"
2241+
);
2242+
}
2243+
2244+
#[tokio::test]
2245+
async fn test_read_data_evolution_table_only_row_id_with_row_ranges() {
2246+
use paimon::RowRange;
2247+
2248+
let catalog = create_file_system_catalog();
2249+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
2250+
2251+
// Get full row ID range
2252+
let full_rb = table.new_read_builder();
2253+
let full_plan = full_rb.new_scan().plan().await.expect("plan");
2254+
let mut min_row_id = i64::MAX;
2255+
let mut max_row_id = i64::MIN;
2256+
for split in full_plan.splits() {
2257+
for file in split.data_files() {
2258+
if let Some(fid) = file.first_row_id {
2259+
min_row_id = min_row_id.min(fid);
2260+
max_row_id = max_row_id.max(fid + file.row_count - 1);
2261+
}
2262+
}
2263+
}
2264+
2265+
// Project only _ROW_ID with a partial row range
2266+
let mid = min_row_id + (max_row_id - min_row_id) / 2;
2267+
let mut read_builder = table.new_read_builder();
2268+
read_builder.with_projection(&["_ROW_ID"]);
2269+
read_builder.with_row_ranges(vec![RowRange::new(min_row_id, mid)]);
2270+
let scan = read_builder.new_scan();
2271+
let plan = scan.plan().await.expect("plan");
2272+
2273+
let read = read_builder.new_read().expect("read");
2274+
let stream = read.to_arrow(plan.splits()).expect("stream");
2275+
let batches: Vec<RecordBatch> = stream.try_collect().await.expect("collect");
2276+
2277+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2278+
assert!(total_rows > 0, "Should have rows");
2279+
// Should have fewer rows than full table due to row_ranges filtering
2280+
let full_read = table.new_read_builder().new_read().expect("read");
2281+
let full_count: usize = full_read
2282+
.to_arrow(full_plan.splits())
2283+
.expect("stream")
2284+
.try_collect::<Vec<_>>()
2285+
.await
2286+
.expect("collect")
2287+
.iter()
2288+
.map(|b| b.num_rows())
2289+
.sum();
2290+
assert!(
2291+
total_rows <= full_count,
2292+
"Row range filtered count ({total_rows}) should be <= full count ({full_count})"
2293+
);
2294+
}

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,17 @@ impl PaimonTableProvider {
5454
///
5555
/// Loads the table schema and converts it to Arrow for DataFusion.
5656
pub fn try_new(table: Table) -> DFResult<Self> {
57-
let fields = table.schema().fields();
57+
let mut fields = table.schema().fields().to_vec();
58+
let core_options = paimon::spec::CoreOptions::new(table.schema().options());
59+
if core_options.data_evolution_enabled() {
60+
fields.push(paimon::spec::DataField::new(
61+
paimon::spec::ROW_ID_FIELD_ID,
62+
paimon::spec::ROW_ID_FIELD_NAME.to_string(),
63+
paimon::spec::DataType::BigInt(paimon::spec::BigIntType::with_nullable(true)),
64+
));
65+
}
5866
let schema =
59-
paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?;
67+
paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?;
6068
Ok(Self { table, schema })
6169
}
6270

@@ -95,7 +103,6 @@ impl TableProvider for PaimonTableProvider {
95103
filters: &[Expr],
96104
limit: Option<usize>,
97105
) -> DFResult<Arc<dyn ExecutionPlan>> {
98-
// Convert projection indices to column names and compute projected schema
99106
let (projected_schema, projected_columns) = if let Some(indices) = projection {
100107
let fields: Vec<Field> = indices
101108
.iter()
@@ -104,7 +111,13 @@ impl TableProvider for PaimonTableProvider {
104111
let column_names: Vec<String> = fields.iter().map(|f| f.name().clone()).collect();
105112
(Arc::new(Schema::new(fields)), Some(column_names))
106113
} else {
107-
(self.schema.clone(), None)
114+
let column_names: Vec<String> = self
115+
.schema
116+
.fields()
117+
.iter()
118+
.map(|f| f.name().clone())
119+
.collect();
120+
(self.schema.clone(), Some(column_names))
108121
};
109122

110123
// Plan splits eagerly so we know partition count upfront.

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,3 +680,77 @@ async fn test_read_complex_type_table_via_datafusion() {
680680
assert_eq!(rows[2].2, "{}");
681681
assert_eq!(rows[2].3, "{name: carol, value: 300}");
682682
}
683+
684+
#[tokio::test]
685+
async fn test_select_row_id_from_data_evolution_table() {
686+
use datafusion::arrow::array::Int64Array;
687+
688+
let ctx = create_context("data_evolution_table").await;
689+
690+
let batches = ctx
691+
.sql(r#"SELECT "_ROW_ID", id, name FROM data_evolution_table"#)
692+
.await
693+
.expect("SQL should parse")
694+
.collect()
695+
.await
696+
.expect("query should execute");
697+
698+
assert!(!batches.is_empty());
699+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
700+
assert!(total_rows > 0);
701+
702+
for batch in &batches {
703+
let row_id_col = batch
704+
.column_by_name("_ROW_ID")
705+
.expect("_ROW_ID column should exist");
706+
let row_id_array = row_id_col
707+
.as_any()
708+
.downcast_ref::<Int64Array>()
709+
.expect("_ROW_ID should be Int64");
710+
for i in 0..batch.num_rows() {
711+
assert!(
712+
row_id_array.is_valid(i),
713+
"_ROW_ID should not be null for data evolution table"
714+
);
715+
assert!(row_id_array.value(i) >= 0);
716+
}
717+
}
718+
}
719+
720+
#[tokio::test]
721+
async fn test_filter_row_id_from_data_evolution_table() {
722+
use datafusion::arrow::array::Int64Array;
723+
724+
let ctx = create_context("data_evolution_table").await;
725+
726+
let all_batches = ctx
727+
.sql(r#"SELECT "_ROW_ID" FROM data_evolution_table"#)
728+
.await
729+
.expect("SQL")
730+
.collect()
731+
.await
732+
.expect("collect");
733+
let all_count: usize = all_batches.iter().map(|b| b.num_rows()).sum();
734+
735+
let filtered_batches = ctx
736+
.sql(r#"SELECT "_ROW_ID", id FROM data_evolution_table WHERE "_ROW_ID" = 0"#)
737+
.await
738+
.expect("SQL")
739+
.collect()
740+
.await
741+
.expect("collect");
742+
let filtered_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum();
743+
744+
assert!(filtered_count <= all_count);
745+
for batch in &filtered_batches {
746+
let row_id_array = batch
747+
.column_by_name("_ROW_ID")
748+
.expect("_ROW_ID")
749+
.as_any()
750+
.downcast_ref::<Int64Array>()
751+
.expect("Int64");
752+
for i in 0..batch.num_rows() {
753+
assert_eq!(row_id_array.value(i), 0);
754+
}
755+
}
756+
}

0 commit comments

Comments
 (0)