Skip to content

Commit 8f229a6

Browse files
authored
Merge branch 'main' into perf/array_resize
2 parents 9d390b4 + 6713439 commit 8f229a6

9 files changed

Lines changed: 781 additions & 265 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ hex = { version = "0.4.3" }
161161
indexmap = "2.13.0"
162162
insta = { version = "1.46.3", features = ["glob", "filters"] }
163163
itertools = "0.14"
164+
itoa = "1.0"
164165
liblzma = { version = "0.4.6", features = ["static"] }
165166
log = "^0.4"
166167
memchr = "2.8.0"

datafusion/core/src/physical_planner.rs

Lines changed: 221 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ use datafusion_common::tree_node::{
6969
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
7070
};
7171
use datafusion_common::{
72-
DFSchema, ScalarValue, exec_err, internal_datafusion_err, internal_err, not_impl_err,
73-
plan_err,
72+
DFSchema, DFSchemaRef, ScalarValue, exec_err, internal_datafusion_err, internal_err,
73+
not_impl_err, plan_err,
7474
};
7575
use datafusion_common::{
7676
TableReference, assert_eq_or_internal_err, assert_or_internal_err,
@@ -157,6 +157,80 @@ pub trait ExtensionPlanner {
157157
physical_inputs: &[Arc<dyn ExecutionPlan>],
158158
session_state: &SessionState,
159159
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
160+
161+
/// Create a physical plan for a [`LogicalPlan::TableScan`].
162+
///
163+
/// This is useful for planning valid [`TableSource`]s that are not [`TableProvider`]s.
164+
///
165+
/// Returns:
166+
/// * `Ok(Some(plan))` if the planner knows how to plan the `scan`
167+
/// * `Ok(None)` if the planner does not know how to plan the `scan` and wants to delegate the planning to another [`ExtensionPlanner`]
168+
/// * `Err` if the planner knows how to plan the `scan` but errors while doing so
169+
///
170+
/// # Example
171+
///
172+
/// ```rust,ignore
173+
/// use std::sync::Arc;
174+
/// use datafusion::physical_plan::ExecutionPlan;
175+
/// use datafusion::logical_expr::TableScan;
176+
/// use datafusion::execution::context::SessionState;
177+
/// use datafusion::error::Result;
178+
/// use datafusion_physical_planner::{ExtensionPlanner, PhysicalPlanner};
179+
/// use async_trait::async_trait;
180+
///
181+
/// // Your custom table source type
182+
/// struct MyCustomTableSource { /* ... */ }
183+
///
184+
/// // Your custom execution plan
185+
/// struct MyCustomExec { /* ... */ }
186+
///
187+
/// struct MyExtensionPlanner;
188+
///
189+
/// #[async_trait]
190+
/// impl ExtensionPlanner for MyExtensionPlanner {
191+
/// async fn plan_extension(
192+
/// &self,
193+
/// _planner: &dyn PhysicalPlanner,
194+
/// _node: &dyn UserDefinedLogicalNode,
195+
/// _logical_inputs: &[&LogicalPlan],
196+
/// _physical_inputs: &[Arc<dyn ExecutionPlan>],
197+
/// _session_state: &SessionState,
198+
/// ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
199+
/// Ok(None)
200+
/// }
201+
///
202+
/// async fn plan_table_scan(
203+
/// &self,
204+
/// _planner: &dyn PhysicalPlanner,
205+
/// scan: &TableScan,
206+
/// _session_state: &SessionState,
207+
/// ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
208+
/// // Check if this is your custom table source
209+
/// if scan.source.as_any().is::<MyCustomTableSource>() {
210+
/// // Create a custom execution plan for your table source
211+
/// let exec = MyCustomExec::new(
212+
/// scan.table_name.clone(),
213+
/// Arc::clone(scan.projected_schema.inner()),
214+
/// );
215+
/// Ok(Some(Arc::new(exec)))
216+
/// } else {
217+
/// // Return None to let other extension planners handle it
218+
/// Ok(None)
219+
/// }
220+
/// }
221+
/// }
222+
/// ```
223+
///
224+
/// [`TableSource`]: datafusion_expr::TableSource
225+
/// [`TableProvider`]: datafusion_catalog::TableProvider
226+
async fn plan_table_scan(
227+
&self,
228+
_planner: &dyn PhysicalPlanner,
229+
_scan: &TableScan,
230+
_session_state: &SessionState,
231+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
232+
Ok(None)
233+
}
160234
}
161235

162236
/// Default single node physical query planner that converts a
@@ -278,7 +352,8 @@ struct LogicalNode<'a> {
278352

279353
impl DefaultPhysicalPlanner {
280354
/// Create a physical planner that uses `extension_planners` to
281-
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
355+
/// plan user-defined logical nodes [`LogicalPlan::Extension`]
356+
/// or user-defined table sources in [`LogicalPlan::TableScan`].
282357
/// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
283358
/// plan.
284359
pub fn with_extension_planners(
@@ -287,6 +362,24 @@ impl DefaultPhysicalPlanner {
287362
Self { extension_planners }
288363
}
289364

365+
fn ensure_schema_matches(
366+
&self,
367+
logical_schema: &DFSchemaRef,
368+
physical_plan: &Arc<dyn ExecutionPlan>,
369+
context: &str,
370+
) -> Result<()> {
371+
if !logical_schema.matches_arrow_schema(&physical_plan.schema()) {
372+
return plan_err!(
373+
"{} created an ExecutionPlan with mismatched schema. \
374+
LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
375+
context,
376+
logical_schema,
377+
physical_plan.schema()
378+
);
379+
}
380+
Ok(())
381+
}
382+
290383
/// Create a physical plan from a logical plan
291384
async fn create_initial_plan(
292385
&self,
@@ -455,25 +548,53 @@ impl DefaultPhysicalPlanner {
455548
) -> Result<Arc<dyn ExecutionPlan>> {
456549
let exec_node: Arc<dyn ExecutionPlan> = match node {
457550
// Leaves (no children)
458-
LogicalPlan::TableScan(TableScan {
459-
source,
460-
projection,
461-
filters,
462-
fetch,
463-
..
464-
}) => {
465-
let source = source_as_provider(source)?;
466-
// Remove all qualifiers from the scan as the provider
467-
// doesn't know (nor should care) how the relation was
468-
// referred to in the query
469-
let filters = unnormalize_cols(filters.iter().cloned());
470-
let filters_vec = filters.into_iter().collect::<Vec<_>>();
471-
let opts = ScanArgs::default()
472-
.with_projection(projection.as_deref())
473-
.with_filters(Some(&filters_vec))
474-
.with_limit(*fetch);
475-
let res = source.scan_with_args(session_state, opts).await?;
476-
Arc::clone(res.plan())
551+
LogicalPlan::TableScan(scan) => {
552+
let TableScan {
553+
source,
554+
projection,
555+
filters,
556+
fetch,
557+
projected_schema,
558+
..
559+
} = scan;
560+
561+
if let Ok(source) = source_as_provider(source) {
562+
// Remove all qualifiers from the scan as the provider
563+
// doesn't know (nor should care) how the relation was
564+
// referred to in the query
565+
let filters = unnormalize_cols(filters.iter().cloned());
566+
let filters_vec = filters.into_iter().collect::<Vec<_>>();
567+
let opts = ScanArgs::default()
568+
.with_projection(projection.as_deref())
569+
.with_filters(Some(&filters_vec))
570+
.with_limit(*fetch);
571+
let res = source.scan_with_args(session_state, opts).await?;
572+
Arc::clone(res.plan())
573+
} else {
574+
let mut maybe_plan = None;
575+
for planner in &self.extension_planners {
576+
if maybe_plan.is_some() {
577+
break;
578+
}
579+
580+
maybe_plan =
581+
planner.plan_table_scan(self, scan, session_state).await?;
582+
}
583+
584+
let plan = match maybe_plan {
585+
Some(plan) => plan,
586+
None => {
587+
return plan_err!(
588+
"No installed planner was able to plan TableScan for custom TableSource: {:?}",
589+
scan.table_name
590+
);
591+
}
592+
};
593+
let context =
594+
format!("Extension planner for table scan {}", scan.table_name);
595+
self.ensure_schema_matches(projected_schema, &plan, &context)?;
596+
plan
597+
}
477598
}
478599
LogicalPlan::Values(Values { values, schema }) => {
479600
let exprs = values
@@ -1616,20 +1737,9 @@ impl DefaultPhysicalPlanner {
16161737
),
16171738
}?;
16181739

1619-
// Ensure the ExecutionPlan's schema matches the
1620-
// declared logical schema to catch and warn about
1621-
// logic errors when creating user defined plans.
1622-
if !node.schema().matches_arrow_schema(&plan.schema()) {
1623-
return plan_err!(
1624-
"Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
1625-
LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
1626-
node,
1627-
node.schema(),
1628-
plan.schema()
1629-
);
1630-
} else {
1631-
plan
1632-
}
1740+
let context = format!("Extension planner for {node:?}");
1741+
self.ensure_schema_matches(node.schema(), &plan, &context)?;
1742+
plan
16331743
}
16341744

16351745
// Other
@@ -2889,7 +2999,9 @@ mod tests {
28892999
use datafusion_execution::TaskContext;
28903000
use datafusion_execution::runtime_env::RuntimeEnv;
28913001
use datafusion_expr::builder::subquery_alias;
2892-
use datafusion_expr::{LogicalPlanBuilder, UserDefinedLogicalNodeCore, col, lit};
3002+
use datafusion_expr::{
3003+
LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit,
3004+
};
28933005
use datafusion_functions_aggregate::count::count_all;
28943006
use datafusion_functions_aggregate::expr_fn::sum;
28953007
use datafusion_physical_expr::EquivalenceProperties;
@@ -4413,4 +4525,76 @@ digraph {
44134525
assert_contains!(&err_str, "field nullability at index");
44144526
assert_contains!(&err_str, "field metadata at index");
44154527
}
4528+
4529+
#[derive(Debug)]
4530+
struct MockTableSource {
4531+
schema: SchemaRef,
4532+
}
4533+
4534+
impl TableSource for MockTableSource {
4535+
fn as_any(&self) -> &dyn Any {
4536+
self
4537+
}
4538+
4539+
fn schema(&self) -> SchemaRef {
4540+
Arc::clone(&self.schema)
4541+
}
4542+
}
4543+
4544+
struct MockTableScanExtensionPlanner;
4545+
4546+
#[async_trait]
4547+
impl ExtensionPlanner for MockTableScanExtensionPlanner {
4548+
async fn plan_extension(
4549+
&self,
4550+
_planner: &dyn PhysicalPlanner,
4551+
_node: &dyn UserDefinedLogicalNode,
4552+
_logical_inputs: &[&LogicalPlan],
4553+
_physical_inputs: &[Arc<dyn ExecutionPlan>],
4554+
_session_state: &SessionState,
4555+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
4556+
Ok(None)
4557+
}
4558+
4559+
async fn plan_table_scan(
4560+
&self,
4561+
_planner: &dyn PhysicalPlanner,
4562+
scan: &TableScan,
4563+
_session_state: &SessionState,
4564+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
4565+
if scan.source.as_any().is::<MockTableSource>() {
4566+
Ok(Some(Arc::new(EmptyExec::new(Arc::clone(
4567+
scan.projected_schema.inner(),
4568+
)))))
4569+
} else {
4570+
Ok(None)
4571+
}
4572+
}
4573+
}
4574+
4575+
#[tokio::test]
4576+
async fn test_table_scan_extension_planner() {
4577+
let session_state = make_session_state();
4578+
let planner = Arc::new(MockTableScanExtensionPlanner);
4579+
let physical_planner =
4580+
DefaultPhysicalPlanner::with_extension_planners(vec![planner]);
4581+
4582+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
4583+
4584+
let table_source = Arc::new(MockTableSource {
4585+
schema: Arc::clone(&schema),
4586+
});
4587+
let logical_plan = LogicalPlanBuilder::scan("test", table_source, None)
4588+
.unwrap()
4589+
.build()
4590+
.unwrap();
4591+
4592+
let plan = physical_planner
4593+
.create_physical_plan(&logical_plan, &session_state)
4594+
.await
4595+
.unwrap();
4596+
4597+
assert_eq!(plan.schema(), schema);
4598+
assert!(plan.as_any().is::<EmptyExec>());
4599+
}
44164600
}

datafusion/functions-nested/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ datafusion-macros = { workspace = true }
5959
datafusion-physical-expr-common = { workspace = true }
6060
hashbrown = { workspace = true }
6161
itertools = { workspace = true, features = ["use_std"] }
62+
itoa = { workspace = true }
6263
log = { workspace = true }
6364
paste = { workspace = true }
6465

@@ -98,6 +99,10 @@ name = "array_repeat"
9899
harness = false
99100
name = "array_set_ops"
100101

102+
[[bench]]
103+
harness = false
104+
name = "array_to_string"
105+
101106
[[bench]]
102107
harness = false
103108
name = "array_position"

0 commit comments

Comments
 (0)