Skip to content

Commit 1154bdb

Browse files
authored
feat: support limit push down in datafusion (#177)
1 parent aa2cb75 commit 1154bdb

7 files changed

Lines changed: 317 additions & 12 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,3 +947,55 @@ async fn test_read_data_evolution_table_with_projection() {
947947
"Projected data evolution read should return correct values"
948948
);
949949
}
950+
951+
// ---------------------------------------------------------------------------
952+
// Limit pushdown integration tests
953+
// ---------------------------------------------------------------------------
954+
955+
/// Helper function to scan and read with limit pushdown.
956+
async fn plan_table(table: &paimon::Table, limit: Option<usize>) -> Plan {
957+
let mut read_builder = table.new_read_builder();
958+
if let Some(limit) = limit {
959+
read_builder.with_limit(limit);
960+
}
961+
let scan = read_builder.new_scan();
962+
scan.plan().await.expect("Failed to plan scan")
963+
}
964+
965+
/// Test limit pushdown: when limit is smaller than total rows, fewer data files may be generated.
966+
#[tokio::test]
967+
async fn test_limit_pushdown() {
968+
let catalog = create_file_system_catalog();
969+
970+
// Test limit pushdown for data evolution table
971+
let table = get_table_from_catalog(&catalog, "data_evolution_table").await;
972+
973+
// Get full plan without limit
974+
let full_plan = plan_table(&table, None).await;
975+
let full_data_split_count: usize = full_plan.splits().iter().count();
976+
977+
// Get the plan with limit = 2
978+
let limited_plan = plan_table(&table, Some(2)).await;
979+
let limited_data_split_count: usize = limited_plan.splits().iter().count();
980+
981+
// For data evolution tables, limit pushdown at split level uses merged_row_count
982+
// The limited data split count should be < full data split count
983+
assert!(
984+
limited_data_split_count < full_data_split_count,
985+
"Limit pushdown should reduce data split count for data evolution table: limited={limited_data_split_count}, full={full_data_split_count}"
986+
);
987+
988+
// Verify data evolution splits have merged_row_count
989+
for split in full_plan.splits() {
990+
let merged_count = split.merged_row_count().expect(
991+
"Data evolution table should have merged_row_count (all files should have first_row_id)",
992+
);
993+
// merged_row_count should be < row_count (overlapping ranges reduce count)
994+
assert!(
995+
merged_count < split.row_count(),
996+
"merged_row_count ({}) should be < row_count ({})",
997+
merged_count,
998+
split.row_count()
999+
);
1000+
}
1001+
}

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@
1616
// under the License.
1717

1818
use std::any::Any;
19+
use std::pin::Pin;
1920
use std::sync::Arc;
2021

22+
use datafusion::arrow::array::RecordBatch;
2123
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2224
use datafusion::error::Result as DFResult;
2325
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2426
use datafusion::physical_expr::EquivalenceProperties;
2527
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2628
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2729
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
28-
use futures::{StreamExt, TryStreamExt};
30+
use futures::{Stream, StreamExt, TryStreamExt};
2931
use paimon::table::Table;
3032
use paimon::DataSplit;
3133

@@ -46,6 +48,8 @@ pub struct PaimonTableScan {
4648
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
4749
planned_partitions: Vec<Arc<[DataSplit]>>,
4850
plan_properties: PlanProperties,
51+
/// Optional limit on the number of rows to return.
52+
limit: Option<usize>,
4953
}
5054

5155
impl PaimonTableScan {
@@ -54,6 +58,7 @@ impl PaimonTableScan {
5458
table: Table,
5559
projected_columns: Option<Vec<String>>,
5660
planned_partitions: Vec<Arc<[DataSplit]>>,
61+
limit: Option<usize>,
5762
) -> Self {
5863
let plan_properties = PlanProperties::new(
5964
EquivalenceProperties::new(schema.clone()),
@@ -66,6 +71,7 @@ impl PaimonTableScan {
6671
projected_columns,
6772
planned_partitions,
6873
plan_properties,
74+
limit,
6975
}
7076
}
7177

@@ -77,6 +83,10 @@ impl PaimonTableScan {
7783
pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] {
7884
&self.planned_partitions
7985
}
86+
87+
pub fn limit(&self) -> Option<usize> {
88+
self.limit
89+
}
8090
}
8191

8292
impl ExecutionPlan for PaimonTableScan {
@@ -118,6 +128,7 @@ impl ExecutionPlan for PaimonTableScan {
118128
let table = self.table.clone();
119129
let schema = self.schema();
120130
let projected_columns = self.projected_columns.clone();
131+
let limit = self.limit;
121132

122133
let fut = async move {
123134
let mut read_builder = table.new_read_builder();
@@ -138,9 +149,30 @@ impl ExecutionPlan for PaimonTableScan {
138149
};
139150

140151
let stream = futures::stream::once(fut).try_flatten();
152+
153+
// Enforce the final LIMIT at the DataFusion execution layer.
154+
let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
155+
if let Some(limit) = limit {
156+
let mut remaining = limit;
157+
Box::pin(stream.try_filter_map(move |batch| {
158+
futures::future::ready(if remaining == 0 {
159+
Ok(None)
160+
} else if batch.num_rows() <= remaining {
161+
remaining -= batch.num_rows();
162+
Ok(Some(batch))
163+
} else {
164+
let limited_batch = batch.slice(0, remaining);
165+
remaining = 0;
166+
Ok(Some(limited_batch))
167+
})
168+
}))
169+
} else {
170+
Box::pin(stream)
171+
};
172+
141173
Ok(Box::pin(RecordBatchStreamAdapter::new(
142174
self.schema(),
143-
stream,
175+
limited_stream,
144176
)))
145177
}
146178
}
@@ -155,7 +187,11 @@ impl DisplayAs for PaimonTableScan {
155187
f,
156188
"PaimonTableScan: partitions={}",
157189
self.planned_partitions.len()
158-
)
190+
)?;
191+
if let Some(limit) = self.limit {
192+
write!(f, ", limit={limit}")?;
193+
}
194+
Ok(())
159195
}
160196
}
161197

@@ -176,7 +212,13 @@ mod tests {
176212
#[test]
177213
fn test_partition_count_empty_plan() {
178214
let schema = test_schema();
179-
let scan = PaimonTableScan::new(schema, dummy_table(), None, vec![Arc::from(Vec::new())]);
215+
let scan = PaimonTableScan::new(
216+
schema,
217+
dummy_table(),
218+
None,
219+
vec![Arc::from(Vec::new())],
220+
None,
221+
);
180222
assert_eq!(scan.properties().output_partitioning().partition_count(), 1);
181223
}
182224

@@ -188,7 +230,7 @@ mod tests {
188230
Arc::from(Vec::new()),
189231
Arc::from(Vec::new()),
190232
];
191-
let scan = PaimonTableScan::new(schema, dummy_table(), None, planned_partitions);
233+
let scan = PaimonTableScan::new(schema, dummy_table(), None, planned_partitions, None);
192234
assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
193235
}
194236

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl TableProvider for PaimonTableProvider {
100100
state: &dyn Session,
101101
projection: Option<&Vec<usize>>,
102102
filters: &[Expr],
103-
_limit: Option<usize>,
103+
limit: Option<usize>,
104104
) -> DFResult<Arc<dyn ExecutionPlan>> {
105105
// Convert projection indices to column names and compute projected schema
106106
let (projected_schema, projected_columns) = if let Some(indices) = projection {
@@ -119,6 +119,11 @@ impl TableProvider for PaimonTableProvider {
119119
if let Some(filter) = build_pushed_predicate(filters, self.table.schema().fields()) {
120120
read_builder.with_filter(filter);
121121
}
122+
// Push the limit hint to paimon-core planning to reduce splits when possible.
123+
// DataFusion still enforces the final LIMIT semantics.
124+
if let Some(limit) = limit {
125+
read_builder.with_limit(limit);
126+
}
122127
let scan = read_builder.new_scan();
123128
let plan = scan.plan().await.map_err(to_datafusion_error)?;
124129

@@ -143,6 +148,7 @@ impl TableProvider for PaimonTableProvider {
143148
self.table.clone(),
144149
projected_columns,
145150
planned_partitions,
151+
limit,
146152
)))
147153
}
148154
}

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,54 @@ async fn test_mixed_and_filter_keeps_residual_datafusion_filter() {
286286
assert_eq!(actual_rows, vec![(2, "bob".to_string())]);
287287
}
288288

289+
/// Test limit pushdown: ensures that LIMIT queries return the correct number of rows.
290+
#[tokio::test]
291+
async fn test_limit_pushdown() {
292+
// Test append-only table (simple_log_table)
293+
{
294+
let batches = collect_query(
295+
"simple_log_table",
296+
"SELECT id, name FROM simple_log_table LIMIT 2",
297+
)
298+
.await
299+
.expect("Limit query should succeed");
300+
301+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
302+
assert_eq!(total_rows, 2, "LIMIT 2 should return exactly 2 rows");
303+
}
304+
305+
// Test data evolution table
306+
{
307+
let batches = collect_query(
308+
"data_evolution_table",
309+
"SELECT id, name FROM data_evolution_table LIMIT 3",
310+
)
311+
.await
312+
.expect("Limit query on data evolution table should succeed");
313+
314+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
315+
assert_eq!(
316+
total_rows, 3,
317+
"LIMIT 3 should return exactly 3 rows for data evolution table"
318+
);
319+
320+
// Verify the data is from the merged result (not raw files)
321+
let mut rows = extract_id_name_rows(&batches);
322+
rows.sort_by_key(|(id, _)| *id);
323+
324+
// LIMIT 3 returns ids 1, 2, 3 with merged values
325+
assert_eq!(
326+
rows,
327+
vec![
328+
(1, "alice-v2".to_string()),
329+
(2, "bob".to_string()),
330+
(3, "carol-v2".to_string()),
331+
],
332+
"Data evolution table LIMIT 3 should return merged rows"
333+
);
334+
}
335+
}
336+
289337
// ======================= Catalog Provider Tests =======================
290338
#[tokio::test]
291339
async fn test_query_via_catalog_provider() {

crates/paimon/src/table/read_builder.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct ReadBuilder<'a> {
3636
table: &'a Table,
3737
projected_fields: Option<Vec<String>>,
3838
filter: Option<Predicate>,
39+
limit: Option<usize>,
3940
}
4041

4142
impl<'a> ReadBuilder<'a> {
@@ -44,6 +45,7 @@ impl<'a> ReadBuilder<'a> {
4445
table,
4546
projected_fields: None,
4647
filter: None,
48+
limit: None,
4749
}
4850
}
4951

@@ -72,9 +74,22 @@ impl<'a> ReadBuilder<'a> {
7274
self
7375
}
7476

77+
/// Push a row-limit hint down to scan planning.
78+
///
79+
/// This allows the scan to generate fewer splits when possible. The hint is
80+
/// applied based on the `merged_row_count()` of each split.
81+
///
82+
/// Note: This method does not guarantee that exactly `limit` rows will be
83+
/// returned by [`TableRead`]. It is only a pushdown hint for planning.
84+
/// Callers or query engines are responsible for enforcing the final LIMIT.
85+
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
86+
self.limit = Some(limit);
87+
self
88+
}
89+
7590
/// Create a table scan. Call [TableScan::plan] to get splits.
7691
pub fn new_scan(&self) -> TableScan<'a> {
77-
TableScan::new(self.table, self.filter.clone())
92+
TableScan::new(self.table, self.filter.clone(), self.limit)
7893
}
7994

8095
/// Create a table read for consuming splits (e.g. from a scan plan).

0 commit comments

Comments
 (0)