diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 47f03cbb1bfed..97b7e5761e4fc 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -27,13 +27,9 @@ use crate::file_groups::FileGroup; #[allow(unused_imports)] use crate::schema_adapter::SchemaAdapterFactory; use crate::{ - display::FileGroupsDisplay, - file::FileSource, - file_compression_type::FileCompressionType, - file_stream::FileStream, - source::{DataSource, DataSourceExec}, - statistics::MinMaxStatistics, - PartitionedFile, + display::FileGroupsDisplay, file::FileSource, + file_compression_type::FileCompressionType, file_stream::FileStream, + source::DataSource, statistics::MinMaxStatistics, PartitionedFile, }; use arrow::datatypes::FieldRef; use arrow::{ @@ -57,11 +53,12 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::projection::ProjectionExpr; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, - projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, - DisplayAs, DisplayFormatType, ExecutionPlan, + projection::{all_alias_free_columns, new_projections_for_columns}, + DisplayAs, DisplayFormatType, }; use datafusion_physical_plan::{ filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation, @@ -139,6 +136,9 @@ use log::{debug, warn}; /// // create an execution plan from the config /// let plan: Arc = DataSourceExec::from_data_source(config); /// ``` +/// +/// [`DataSourceExec`]: crate::source::DataSourceExec +/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source #[derive(Clone)] pub struct FileScanConfig { /// Object store URL, used to get an [`ObjectStore`] instance from @@ -159,6 +159,8 @@ pub struct FileScanConfig { /// Note that this is **not** the schema of the physical files. /// This is the schema that the physical file schema will be /// mapped onto, and the schema that the [`DataSourceExec`] will return. + /// + /// [`DataSourceExec`]: crate::source::DataSourceExec pub file_schema: SchemaRef, /// List of files to be processed, grouped into partitions /// @@ -258,9 +260,10 @@ pub struct FileScanConfigBuilder { /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns. /// /// This probably would be better named `table_schema` + /// + /// [`DataSourceExec`]: crate::source::DataSourceExec file_schema: SchemaRef, file_source: Arc, - limit: Option, projection: Option>, table_partition_cols: Vec, @@ -631,12 +634,12 @@ impl DataSource for FileScanConfig { fn try_swapping_with_projection( &self, - projection: &ProjectionExec, - ) -> Result>> { + projection: &[ProjectionExpr], + ) -> Result>> { // This process can be moved into CsvExec, but it would be an overlap of their responsibility. // Must be all column references, with no table partition columns (which can not be projected) - let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| { + let partitioned_columns_in_proj = projection.iter().any(|(expr, _)| { expr.as_any() .downcast_ref::() .map(|expr| expr.index() >= self.file_schema.fields().len()) @@ -644,7 +647,7 @@ impl DataSource for FileScanConfig { }); // If there is any non-column or alias-carrier expression, Projection should not be removed. - let no_aliases = all_alias_free_columns(projection.expr()); + let no_aliases = all_alias_free_columns(projection); Ok((no_aliases && !partitioned_columns_in_proj).then(|| { let file_scan = self.clone(); @@ -656,7 +659,8 @@ impl DataSource for FileScanConfig { .clone() .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()), ); - DataSourceExec::from_data_source( + + Arc::new( FileScanConfigBuilder::from(file_scan) // Assign projected statistics to source .with_projection(Some(new_projections)) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 673c1b9dd45d4..f83af7aa06609 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -37,11 +37,11 @@ use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::memory::MemoryStream; use datafusion_physical_plan::projection::{ - all_alias_free_columns, new_projections_for_columns, ProjectionExec, + all_alias_free_columns, new_projections_for_columns, ProjectionExpr, }; use datafusion_physical_plan::{ - common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PhysicalExpr, SendableRecordBatchStream, Statistics, + common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr, + SendableRecordBatchStream, Statistics, }; use async_trait::async_trait; @@ -213,11 +213,11 @@ impl DataSource for MemorySourceConfig { fn try_swapping_with_projection( &self, - projection: &ProjectionExec, - ) -> Result>> { + projection: &[ProjectionExpr], + ) -> Result>> { // If there is any non-column or alias-carrier expression, Projection should not be removed. // This process can be moved into MemoryExec, but it would be an overlap of their responsibility. - all_alias_free_columns(projection.expr()) + all_alias_free_columns(projection) .then(|| { let all_projections = (0..self.schema.fields().len()).collect(); let new_projections = new_projections_for_columns( @@ -225,12 +225,12 @@ impl DataSource for MemorySourceConfig { self.projection().as_ref().unwrap_or(&all_projections), ); - MemorySourceConfig::try_new_exec( + MemorySourceConfig::try_new( self.partitions(), self.original_schema(), Some(new_projections), ) - .map(|e| e as _) + .map(|s| Arc::new(s) as Arc) }) .transpose() } @@ -835,6 +835,7 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::lit; + use datafusion_physical_plan::ExecutionPlan; use futures::StreamExt; #[tokio::test] diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 60be39bc637d2..20d9a1d6e53f0 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,13 +22,12 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_plan::execution_plan::{ Boundedness, EmissionType, SchedulingType, }; use datafusion_physical_plan::metrics::SplitMetrics; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::stream::BatchSplitStream; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -161,8 +160,8 @@ pub trait DataSource: Send + Sync + Debug { } fn try_swapping_with_projection( &self, - _projection: &ProjectionExec, - ) -> Result>>; + _projection: &[ProjectionExpr], + ) -> Result>>; /// Try to push down filters into this DataSource. /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// @@ -318,36 +317,12 @@ impl ExecutionPlan for DataSourceExec { &self, projection: &ProjectionExec, ) -> Result>> { - match self.data_source.try_swapping_with_projection(projection)? { - Some(new_plan) => { - if let Some(new_data_source_exec) = - new_plan.as_any().downcast_ref::() - { - let projection_mapping = ProjectionMapping::try_new( - projection.expr().iter().cloned(), - &self.schema(), - )?; - - // Project the equivalence properties to the new schema - let projected_eq_properties = self - .cache - .eq_properties - .project(&projection_mapping, new_data_source_exec.schema()); - - let preserved_exec = DataSourceExec { - data_source: Arc::clone(&new_data_source_exec.data_source), - cache: PlanProperties::new( - projected_eq_properties, - new_data_source_exec.cache.partitioning.clone(), - new_data_source_exec.cache.emission_type, - new_data_source_exec.cache.boundedness, - ) - .with_scheduling_type(new_data_source_exec.cache.scheduling_type), - }; - Ok(Some(Arc::new(preserved_exec))) - } else { - Ok(Some(new_plan)) - } + match self + .data_source + .try_swapping_with_projection(projection.expr())? + { + Some(new_data_source) => { + Ok(Some(Arc::new(DataSourceExec::new(new_data_source)))) } None => Ok(None), } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cf8400f4aceee..770ebd8b0ecbb 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -61,7 +61,7 @@ use log::trace; #[derive(Debug, Clone)] pub struct ProjectionExec { /// The projection expressions stored as tuples of (expression, output column name) - pub(crate) expr: Vec<(Arc, String)>, + pub(crate) expr: Vec, /// The schema once the projection has been applied to the input schema: SchemaRef, /// The input plan @@ -75,7 +75,7 @@ pub struct ProjectionExec { impl ProjectionExec { /// Create a projection on an input pub fn try_new( - expr: Vec<(Arc, String)>, + expr: Vec, input: Arc, ) -> Result { let input_schema = input.schema(); @@ -115,7 +115,7 @@ impl ProjectionExec { } /// The projection expressions stored as tuples of (expression, output column name) - pub fn expr(&self) -> &[(Arc, String)] { + pub fn expr(&self) -> &[ProjectionExpr] { &self.expr } @@ -147,6 +147,8 @@ impl ProjectionExec { } } +pub type ProjectionExpr = (Arc, String); + impl DisplayAs for ProjectionExec { fn fmt_as( &self, @@ -566,7 +568,7 @@ fn is_projection_removable(projection: &ProjectionExec) -> bool { /// Given the expression set of a projection, checks if the projection causes /// any renaming or constructs a non-`Column` physical expression. -pub fn all_alias_free_columns(exprs: &[(Arc, String)]) -> bool { +pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool { exprs.iter().all(|(expr, alias)| { expr.as_any() .downcast_ref::() @@ -579,11 +581,10 @@ pub fn all_alias_free_columns(exprs: &[(Arc, String)]) -> bool /// projection operator's expressions. To use this function safely, one must /// ensure that all expressions are `Column` expressions without aliases. pub fn new_projections_for_columns( - projection: &ProjectionExec, + projection: &[ProjectionExpr], source: &[usize], ) -> Vec { projection - .expr() .iter() .filter_map(|(expr, _)| { expr.as_any() @@ -604,7 +605,7 @@ pub fn make_with_child( } /// Returns `true` if all the expressions in the argument are `Column`s. -pub fn all_columns(exprs: &[(Arc, String)]) -> bool { +pub fn all_columns(exprs: &[ProjectionExpr]) -> bool { exprs.iter().all(|(expr, _)| expr.as_any().is::()) } @@ -627,7 +628,7 @@ pub fn all_columns(exprs: &[(Arc, String)]) -> bool { /// `a@0`, but `b@2` results in `None` since the projection does not include `b`. pub fn update_expr( expr: &Arc, - projected_exprs: &[(Arc, String)], + projected_exprs: &[ProjectionExpr], sync_with_child: bool, ) -> Result>> { #[derive(Debug, PartialEq)] @@ -692,7 +693,7 @@ pub fn update_expr( /// expressions using the [`update_expr`] function. pub fn update_ordering( ordering: LexOrdering, - projected_exprs: &[(Arc, String)], + projected_exprs: &[ProjectionExpr], ) -> Result> { let mut updated_exprs = vec![]; for mut sort_expr in ordering.into_iter() { @@ -710,7 +711,7 @@ pub fn update_ordering( /// expressions using the [`update_expr`] function. pub fn update_ordering_requirement( reqs: LexRequirement, - projected_exprs: &[(Arc, String)], + projected_exprs: &[ProjectionExpr], ) -> Result> { let mut updated_exprs = vec![]; for mut sort_expr in reqs.into_iter() { @@ -727,7 +728,7 @@ pub fn update_ordering_requirement( /// Downcasts all the expressions in `exprs` to `Column`s. If any of the given /// expressions is not a `Column`, returns `None`. pub fn physical_to_column_exprs( - exprs: &[(Arc, String)], + exprs: &[ProjectionExpr], ) -> Option> { exprs .iter() @@ -952,7 +953,7 @@ fn try_unifying_projections( } /// Collect all column indices from the given projection expressions. -fn collect_column_indices(exprs: &[(Arc, String)]) -> Vec { +fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec { // Collect indices and remove duplicates. let mut indices = exprs .iter() @@ -1314,7 +1315,7 @@ mod tests { // of output schema columns < input schema columns and hence if we use the last few columns // from the input schema in the expressions here, bounds_check would fail on them if output // schema is supplied to the partitions_statistics method. - let exprs: Vec<(Arc, String)> = vec![ + let exprs: Vec = vec![ ( Arc::new(Column::new("c", 2)) as Arc, "c_renamed".to_string(), diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index d4e6ba4c96c79..f9a7feb9e726e 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -299,7 +299,7 @@ impl ExecutionPlan for StreamingTableExec { let streaming_table_projections = self.projection().as_ref().map(|i| i.as_ref().to_vec()); let new_projections = new_projections_for_columns( - projection, + projection.expr(), &streaming_table_projections .unwrap_or_else(|| (0..self.schema().fields().len()).collect()), ); diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 49e58cb116510..7558b05c93e69 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -218,6 +218,13 @@ impl LazyBatchGenerator for MyBatchGenerator { See [#17200](https://github.com/apache/datafusion/pull/17200) for details. +### Refactored `DataSource::try_swapping_with_projection` + +We refactored `DataSource::try_swapping_with_projection` to simplify the method and minimize leakage across the ExecutionPlan <-> DataSource abstraction layer. +Reimplementation for any custom `DataSource` should be relatively straightforward, see [#17395] for more details. + +[#17395]: https://github.com/apache/datafusion/pull/17395/ + ## DataFusion `49.0.0` ### `MSRV` updated to 1.85.1