Skip to content

Commit b33b2d7

Browse files
authored
feat: Implement statistics for data fusion scan (#217)
* feat: Implement statistics for data fusion scan
1 parent 6635792 commit b33b2d7

1 file changed

Lines changed: 47 additions & 1 deletion

File tree

  • crates/integrations/datafusion/src/physical_plan

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::any::Any;
1919
use std::sync::Arc;
2020

2121
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
22+
use datafusion::common::stats::Precision;
23+
use datafusion::common::Statistics;
2224
use datafusion::error::Result as DFResult;
2325
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2426
use datafusion::physical_expr::EquivalenceProperties;
@@ -165,6 +167,34 @@ impl ExecutionPlan for PaimonTableScan {
165167
futures::stream::once(fut).try_flatten(),
166168
)))
167169
}
170+
171+
fn statistics(&self) -> DFResult<Statistics> {
172+
self.partition_statistics(None)
173+
}
174+
175+
fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
176+
let partitions: &[Arc<[DataSplit]>] = match partition {
177+
Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
178+
None => &self.planned_partitions,
179+
};
180+
181+
let mut total_rows: usize = 0;
182+
let mut total_bytes: usize = 0;
183+
for splits in partitions {
184+
for split in splits.iter() {
185+
total_rows += split.merged_row_count().unwrap_or(split.row_count()) as usize;
186+
for file in split.data_files() {
187+
total_bytes += file.file_size as usize;
188+
}
189+
}
190+
}
191+
192+
Ok(Statistics {
193+
num_rows: Precision::Inexact(total_rows),
194+
total_byte_size: Precision::Inexact(total_bytes),
195+
column_statistics: Statistics::unknown_column(&self.schema()),
196+
})
197+
}
168198
}
169199

170200
impl DisplayAs for PaimonTableScan {
@@ -173,11 +203,27 @@ impl DisplayAs for PaimonTableScan {
173203
_t: datafusion::physical_plan::DisplayFormatType,
174204
f: &mut std::fmt::Formatter,
175205
) -> std::fmt::Result {
206+
write!(f, "PaimonTableScan: table={}", self.table.identifier())?;
207+
208+
let total_splits: usize = self.planned_partitions.iter().map(|p| p.len()).sum();
209+
let total_files: usize = self
210+
.planned_partitions
211+
.iter()
212+
.flat_map(|p| p.iter())
213+
.map(|s| s.data_files().len())
214+
.sum();
176215
write!(
177216
f,
178-
"PaimonTableScan: partitions={}",
217+
", partitions={}, splits={total_splits}, files={total_files}",
179218
self.planned_partitions.len()
180219
)?;
220+
221+
if let Some(ref columns) = self.projected_columns {
222+
write!(f, ", projection=[{}]", columns.join(", "))?;
223+
}
224+
if let Some(ref predicate) = self.pushed_predicate {
225+
write!(f, ", predicate={predicate}")?;
226+
}
181227
if let Some(limit) = self.limit {
182228
write!(f, ", limit={limit}")?;
183229
}

0 commit comments

Comments
 (0)