Skip to content

Commit 5d1a930

Browse files
authored
feat(scan): add bucket pruning, DV/postpone filtering, and DE group pruning (#205)
* feat(scan): add bucket predicate filtering, DV/postpone filtering, and data evolution group pruning
1 parent abb0a43 commit 5d1a930

18 files changed

Lines changed: 3019 additions & 1526 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 539 additions & 6 deletions
Large diffs are not rendered by default.

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,16 @@
1616
// under the License.
1717

1818
use std::any::Any;
19-
use std::pin::Pin;
2019
use std::sync::Arc;
2120

22-
use datafusion::arrow::array::RecordBatch;
2321
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2422
use datafusion::error::Result as DFResult;
2523
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2624
use datafusion::physical_expr::EquivalenceProperties;
2725
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2826
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2927
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
30-
use futures::{Stream, StreamExt, TryStreamExt};
28+
use futures::{StreamExt, TryStreamExt};
3129
use paimon::table::Table;
3230
use paimon::DataSplit;
3331

@@ -128,7 +126,6 @@ impl ExecutionPlan for PaimonTableScan {
128126
let table = self.table.clone();
129127
let schema = self.schema();
130128
let projected_columns = self.projected_columns.clone();
131-
let limit = self.limit;
132129

133130
let fut = async move {
134131
let mut read_builder = table.new_read_builder();
@@ -148,31 +145,9 @@ impl ExecutionPlan for PaimonTableScan {
148145
))
149146
};
150147

151-
let stream = futures::stream::once(fut).try_flatten();
152-
153-
// Enforce the final LIMIT at the DataFusion execution layer.
154-
let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
155-
if let Some(limit) = limit {
156-
let mut remaining = limit;
157-
Box::pin(stream.try_filter_map(move |batch| {
158-
futures::future::ready(if remaining == 0 {
159-
Ok(None)
160-
} else if batch.num_rows() <= remaining {
161-
remaining -= batch.num_rows();
162-
Ok(Some(batch))
163-
} else {
164-
let limited_batch = batch.slice(0, remaining);
165-
remaining = 0;
166-
Ok(Some(limited_batch))
167-
})
168-
}))
169-
} else {
170-
Box::pin(stream)
171-
};
172-
173148
Ok(Box::pin(RecordBatchStreamAdapter::new(
174149
self.schema(),
175-
limited_stream,
150+
futures::stream::once(fut).try_flatten(),
176151
)))
177152
}
178153
}

0 commit comments

Comments
 (0)