Skip to content

Commit 022da71

Browse files
authored
feat: push down filters to parquet read path (#208)
1 parent 6b29f60 commit 022da71

16 files changed

Lines changed: 1855 additions & 334 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ arrow = "57.0"
3232
arrow-array = { version = "57.0", features = ["ffi"] }
3333
arrow-schema = "57.0"
3434
arrow-cast = "57.0"
35+
arrow-ord = "57.0"
3536
datafusion = "52.3.0"
3637
datafusion-ffi = "52.3.0"
3738
parquet = "57.0"
38-
tokio = "1.39.2"
39+
tokio = "1.39.2"

bindings/c/src/table.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,10 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
349349
let end = (offset.saturating_add(length)).min(all_splits.len());
350350
let selected = &all_splits[start..end];
351351

352-
// Create TableRead with the stored read_type (projection)
353-
let table_read = paimon::table::TableRead::new(&state.table, state.read_type.clone());
352+
// C bindings currently persist only the projection, so reconstructing the
353+
// read uses an empty predicate set.
354+
let table_read =
355+
paimon::table::TableRead::new(&state.table, state.read_type.clone(), Vec::new());
354356

355357
match table_read.to_arrow(selected) {
356358
Ok(stream) => {

crates/integrations/datafusion/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,10 @@ futures = "0.3"
3535
tokio = { workspace = true, features = ["rt", "time", "fs"] }
3636

3737
[dev-dependencies]
38+
arrow-array = { workspace = true }
39+
arrow-schema = { workspace = true }
40+
parquet = { workspace = true }
41+
serde = "1"
42+
serde_json = "1"
43+
tempfile = "3"
3844
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 115 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2626
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2727
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
2828
use futures::{StreamExt, TryStreamExt};
29+
use paimon::spec::Predicate;
2930
use paimon::table::Table;
3031
use paimon::DataSplit;
3132

@@ -41,6 +42,9 @@ pub struct PaimonTableScan {
4142
table: Table,
4243
/// Projected column names (if None, reads all columns).
4344
projected_columns: Option<Vec<String>>,
45+
/// Filter translated from DataFusion expressions and reused during execute()
46+
/// so reader-side pruning reaches the actual read path.
47+
pushed_predicate: Option<Predicate>,
4448
/// Pre-planned partition assignments: `planned_partitions[i]` contains the
4549
/// Paimon splits that DataFusion partition `i` will read.
4650
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
@@ -55,6 +59,7 @@ impl PaimonTableScan {
5559
schema: ArrowSchemaRef,
5660
table: Table,
5761
projected_columns: Option<Vec<String>>,
62+
pushed_predicate: Option<Predicate>,
5863
planned_partitions: Vec<Arc<[DataSplit]>>,
5964
limit: Option<usize>,
6065
) -> Self {
@@ -67,6 +72,7 @@ impl PaimonTableScan {
6772
Self {
6873
table,
6974
projected_columns,
75+
pushed_predicate,
7076
planned_partitions,
7177
plan_properties,
7278
limit,
@@ -82,6 +88,11 @@ impl PaimonTableScan {
8288
&self.planned_partitions
8389
}
8490

91+
#[cfg(test)]
92+
pub(crate) fn pushed_predicate(&self) -> Option<&Predicate> {
93+
self.pushed_predicate.as_ref()
94+
}
95+
8596
pub fn limit(&self) -> Option<usize> {
8697
self.limit
8798
}
@@ -126,6 +137,7 @@ impl ExecutionPlan for PaimonTableScan {
126137
let table = self.table.clone();
127138
let schema = self.schema();
128139
let projected_columns = self.projected_columns.clone();
140+
let pushed_predicate = self.pushed_predicate.clone();
129141

130142
let fut = async move {
131143
let mut read_builder = table.new_read_builder();
@@ -134,6 +146,9 @@ impl ExecutionPlan for PaimonTableScan {
134146
let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
135147
read_builder.with_projection(&col_refs);
136148
}
149+
if let Some(filter) = pushed_predicate {
150+
read_builder.with_filter(filter);
151+
}
137152

138153
let read = read_builder.new_read().map_err(to_datafusion_error)?;
139154
let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
@@ -173,11 +188,26 @@ impl DisplayAs for PaimonTableScan {
173188
#[cfg(test)]
174189
mod tests {
175190
use super::*;
176-
use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
191+
mod test_utils {
192+
include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../test_utils.rs"));
193+
}
194+
195+
use datafusion::arrow::array::Int32Array;
196+
use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
177197
use datafusion::physical_plan::ExecutionPlan;
198+
use datafusion::prelude::SessionContext;
199+
use futures::TryStreamExt;
200+
use paimon::catalog::Identifier;
201+
use paimon::io::FileIOBuilder;
202+
use paimon::spec::{
203+
BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema,
204+
};
205+
use std::fs;
206+
use tempfile::tempdir;
207+
use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
178208

179209
fn test_schema() -> ArrowSchemaRef {
180-
Arc::new(Schema::new(vec![Field::new(
210+
Arc::new(ArrowSchema::new(vec![Field::new(
181211
"id",
182212
ArrowDataType::Int32,
183213
false,
@@ -191,6 +221,7 @@ mod tests {
191221
schema,
192222
dummy_table(),
193223
None,
224+
None,
194225
vec![Arc::from(Vec::new())],
195226
None,
196227
);
@@ -205,19 +236,16 @@ mod tests {
205236
Arc::from(Vec::new()),
206237
Arc::from(Vec::new()),
207238
];
208-
let scan = PaimonTableScan::new(schema, dummy_table(), None, planned_partitions, None);
239+
let scan =
240+
PaimonTableScan::new(schema, dummy_table(), None, None, planned_partitions, None);
209241
assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
210242
}
211243

212244
/// Constructs a minimal Table for testing (no real files needed since we
213245
/// only test PlanProperties, not actual reads).
214246
fn dummy_table() -> Table {
215-
use paimon::catalog::Identifier;
216-
use paimon::io::FileIOBuilder;
217-
use paimon::spec::{Schema, TableSchema};
218-
219247
let file_io = FileIOBuilder::new("file").build().unwrap();
220-
let schema = Schema::builder().build().unwrap();
248+
let schema = PaimonSchema::builder().build().unwrap();
221249
let table_schema = TableSchema::new(0, &schema);
222250
Table::new(
223251
file_io,
@@ -226,4 +254,83 @@ mod tests {
226254
table_schema,
227255
)
228256
}
257+
258+
#[tokio::test]
259+
async fn test_execute_applies_pushed_filter_during_read() {
260+
let tempdir = tempdir().unwrap();
261+
let table_path = local_file_path(tempdir.path());
262+
let bucket_dir = tempdir.path().join("bucket-0");
263+
fs::create_dir_all(&bucket_dir).unwrap();
264+
265+
write_int_parquet_file(
266+
&bucket_dir.join("data.parquet"),
267+
vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
268+
Some(2),
269+
);
270+
271+
let file_io = FileIOBuilder::new("file").build().unwrap();
272+
let table_schema = TableSchema::new(
273+
0,
274+
&paimon::spec::Schema::builder()
275+
.column("id", DataType::Int(IntType::new()))
276+
.column("value", DataType::Int(IntType::new()))
277+
.build()
278+
.unwrap(),
279+
);
280+
let table = Table::new(
281+
file_io,
282+
Identifier::new("default", "t"),
283+
table_path,
284+
table_schema,
285+
);
286+
287+
let split = paimon::DataSplitBuilder::new()
288+
.with_snapshot(1)
289+
.with_partition(BinaryRow::new(0))
290+
.with_bucket(0)
291+
.with_bucket_path(local_file_path(&bucket_dir))
292+
.with_total_buckets(1)
293+
.with_data_files(vec![test_data_file("data.parquet", 4)])
294+
.with_raw_convertible(true)
295+
.build()
296+
.unwrap();
297+
298+
let pushed_predicate = PredicateBuilder::new(table.schema().fields())
299+
.greater_or_equal("value", Datum::Int(10))
300+
.unwrap();
301+
302+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
303+
"id",
304+
ArrowDataType::Int32,
305+
false,
306+
)]));
307+
let scan = PaimonTableScan::new(
308+
schema,
309+
table,
310+
Some(vec!["id".to_string()]),
311+
Some(pushed_predicate),
312+
vec![Arc::from(vec![split])],
313+
None,
314+
);
315+
316+
let ctx = SessionContext::new();
317+
let stream = scan
318+
.execute(0, ctx.task_ctx())
319+
.expect("execute should succeed");
320+
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
321+
322+
let actual_ids: Vec<i32> = batches
323+
.iter()
324+
.flat_map(|batch| {
325+
let ids = batch
326+
.column(0)
327+
.as_any()
328+
.downcast_ref::<Int32Array>()
329+
.expect("id column should be Int32Array");
330+
(0..ids.len()).map(|idx| ids.value(idx)).collect::<Vec<_>>()
331+
})
332+
.collect();
333+
334+
assert_eq!(actual_ids, vec![2, 3, 4]);
335+
}
229336
}

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ use crate::runtime::await_with_runtime;
3636

3737
/// Read-only table provider for a Paimon table.
3838
///
39-
/// Supports full table scan, column projection, and partition predicate pushdown.
40-
/// Data-level filtering remains a residual DataFusion filter.
39+
/// Supports full table scan, column projection, and predicate pushdown for
40+
/// planning. Partition predicates prune splits eagerly, while supported
41+
/// non-partition data predicates may also be reused by the Parquet read path
42+
/// for row-group pruning and partial decode-time filtering.
43+
///
44+
/// DataFusion still treats pushed filters as inexact because unsupported
45+
/// predicates and non-Parquet reads remain residual filters.
4146
#[derive(Debug, Clone)]
4247
pub struct PaimonTableProvider {
4348
table: Table,
@@ -103,8 +108,9 @@ impl TableProvider for PaimonTableProvider {
103108
};
104109

105110
// Plan splits eagerly so we know partition count upfront.
111+
let pushed_predicate = build_pushed_predicate(filters, self.table.schema().fields());
106112
let mut read_builder = self.table.new_read_builder();
107-
if let Some(filter) = build_pushed_predicate(filters, self.table.schema().fields()) {
113+
if let Some(filter) = pushed_predicate.clone() {
108114
read_builder.with_filter(filter);
109115
}
110116
// Push the limit hint to paimon-core planning to reduce splits when possible.
@@ -141,6 +147,7 @@ impl TableProvider for PaimonTableProvider {
141147
projected_schema,
142148
self.table.clone(),
143149
projected_columns,
150+
pushed_predicate,
144151
planned_partitions,
145152
limit,
146153
)))
@@ -318,4 +325,27 @@ mod tests {
318325
BTreeSet::from([("2024-01-01".to_string(), 10)]),
319326
);
320327
}
328+
329+
#[tokio::test]
330+
async fn test_scan_keeps_pushed_predicate_for_execute() {
331+
let provider = create_provider("partitioned_log_table").await;
332+
let filter = col("id").gt(lit(1));
333+
334+
let config = SessionConfig::new().with_target_partitions(8);
335+
let ctx = SessionContext::new_with_config(config);
336+
let state = ctx.state();
337+
let plan = provider
338+
.scan(&state, None, std::slice::from_ref(&filter), None)
339+
.await
340+
.expect("scan() should succeed");
341+
let scan = plan
342+
.as_any()
343+
.downcast_ref::<PaimonTableScan>()
344+
.expect("Expected PaimonTableScan");
345+
346+
let expected = build_pushed_predicate(&[filter], provider.table().schema().fields())
347+
.expect("data filter should translate");
348+
349+
assert_eq!(scan.pushed_predicate(), Some(&expected));
350+
}
321351
}

crates/paimon/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ indexmap = "2.5.0"
5757
roaring = "0.11"
5858
arrow-array = { workspace = true }
5959
arrow-cast = { workspace = true }
60+
arrow-ord = { workspace = true }
6061
arrow-schema = { workspace = true }
6162
futures = "0.3"
6263
parquet = { workspace = true, features = ["async", "zstd"] }
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::schema_evolution::create_index_mapping;
19+
pub(crate) use crate::predicate_stats::{predicates_may_match_with_schema, StatsAccessor};
20+
use crate::spec::{DataField, Predicate, PredicateOperator};
21+
22+
pub(crate) fn reader_pruning_predicates(data_predicates: Vec<Predicate>) -> Vec<Predicate> {
23+
data_predicates
24+
.into_iter()
25+
.filter(predicate_supported_for_reader_pruning)
26+
.collect()
27+
}
28+
29+
pub(crate) fn build_field_mapping(
30+
table_fields: &[DataField],
31+
file_fields: &[DataField],
32+
) -> Vec<Option<usize>> {
33+
normalize_field_mapping(
34+
create_index_mapping(table_fields, file_fields),
35+
table_fields.len(),
36+
)
37+
}
38+
39+
fn predicate_supported_for_reader_pruning(predicate: &Predicate) -> bool {
40+
match predicate {
41+
Predicate::AlwaysFalse => true,
42+
Predicate::Leaf { op, .. } => {
43+
matches!(
44+
op,
45+
PredicateOperator::IsNull
46+
| PredicateOperator::IsNotNull
47+
| PredicateOperator::Eq
48+
| PredicateOperator::NotEq
49+
| PredicateOperator::Lt
50+
| PredicateOperator::LtEq
51+
| PredicateOperator::Gt
52+
| PredicateOperator::GtEq
53+
| PredicateOperator::In
54+
| PredicateOperator::NotIn
55+
)
56+
}
57+
Predicate::AlwaysTrue | Predicate::And(_) | Predicate::Or(_) | Predicate::Not(_) => false,
58+
}
59+
}
60+
61+
fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
62+
(0..num_fields).map(Some).collect()
63+
}
64+
65+
fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) -> Vec<Option<usize>> {
66+
mapping
67+
.map(|field_mapping| {
68+
field_mapping
69+
.into_iter()
70+
.map(|index| usize::try_from(index).ok())
71+
.collect()
72+
})
73+
.unwrap_or_else(|| identity_field_mapping(num_fields))
74+
}

crates/paimon/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub(crate) mod filtering;
1819
mod reader;
1920
pub(crate) mod schema_evolution;
2021

0 commit comments

Comments
 (0)