diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 7a2cf403fd8d..519f4c54b274 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -29,9 +29,14 @@ use crate::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, +}; use datafusion_physical_plan::DisplayFormatType; use object_store::ObjectStore; @@ -129,6 +134,47 @@ pub trait FileSource: Send + Sync { )) } + fn try_pushdown_projections( + &self, + projection_exprs: &ProjectionExprs, + file_schema: &SchemaRef, + current_projection: Option<&[usize]>, + ) -> Result { + let projection_slice: Vec<_> = projection_exprs.iter().cloned().collect(); + + // check if there are any partition columns in projection (columns beyond file schema) + let partitioned_columns_in_proj = projection_slice.iter().any(|proj_expr| { + proj_expr + .expr + .as_any() + .downcast_ref::() + .map(|expr| expr.index() >= file_schema.fields().len()) + .unwrap_or(false) + }); + + // if there are any non-column or alias-carrier expressions, projection should not be removed + let no_aliases = all_alias_free_columns(&projection_slice); + + if !no_aliases || partitioned_columns_in_proj { + return Ok(ProjectionPushdownResult::None); + } + + let all_projections: Vec = (0..file_schema.fields().len()).collect(); + let source_projection = current_projection.unwrap_or(&all_projections); + + let new_projection_indices = + new_projections_for_columns(&projection_slice, source_projection); + + // return a partial projection with the new projection indices + // if `new_file_source` is None, it means the file source doesn't change, + // rather the new projection is updated in `FileScanConfig` + Ok(ProjectionPushdownResult::Partial { + new_file_source: None, + remaining_projections: None, + new_projection_indices: Some(new_projection_indices), + }) + } + /// Set optional schema adapter factory. /// /// [`SchemaAdapterFactory`] allows user to specify how fields from the @@ -155,3 +201,12 @@ pub trait FileSource: Send + Sync { None } } + +pub enum ProjectionPushdownResult { + None, + Partial { + new_file_source: Option>, + remaining_projections: Option, + new_projection_indices: Option>, + }, +} diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c52397d9a7cc..13faa208482a 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -18,6 +18,7 @@ //! [`FileScanConfig`] to configure scanning of possibly partitioned //! file sources. +use crate::file::ProjectionPushdownResult; use crate::file_groups::FileGroup; #[allow(unused_imports)] use crate::schema_adapter::SchemaAdapterFactory; @@ -44,16 +45,14 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::projection::{ - all_alias_free_columns, new_projections_for_columns, ProjectionExpr, -}; +use datafusion_physical_plan::projection::ProjectionExpr; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, filter_pushdown::FilterPushdownPropagation, @@ -679,42 +678,42 @@ impl DataSource for FileScanConfig { fn try_swapping_with_projection( &self, projection: &[ProjectionExpr], - ) -> Result>> { - // This process can be moved into CsvExec, but it would be an overlap of their responsibility. - - // Must be all column references, with no table partition columns (which can not be projected) - let partitioned_columns_in_proj = projection.iter().any(|proj_expr| { - proj_expr - .expr - .as_any() - .downcast_ref::() - .map(|expr| expr.index() >= self.file_schema().fields().len()) - .unwrap_or(false) - }); + ) -> Result { + let new_projection_exprs = ProjectionExprs::from(projection); + + // get current projection indices if they exist + let current_projection = self + .projection_exprs + .as_ref() + .map(|p| p.ordered_column_indices()); + + // pass the new projections to the file source, along with the current projection + // the file source will merge them if possible + let res = self.file_source().try_pushdown_projections( + &new_projection_exprs, + self.file_schema(), + current_projection.as_deref(), + )?; - // If there is any non-column or alias-carrier expression, Projection should not be removed. - let no_aliases = all_alias_free_columns(projection); - - Ok((no_aliases && !partitioned_columns_in_proj).then(|| { - let file_scan = self.clone(); - let source = Arc::clone(&file_scan.file_source); - let new_projections = new_projections_for_columns( - projection, - &file_scan - .projection_exprs - .as_ref() - .map(|p| p.ordered_column_indices()) - .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()), - ); + match res { + ProjectionPushdownResult::None => Ok(None), + ProjectionPushdownResult::Partial { + new_file_source, + remaining_projections, + new_projection_indices, + } => { + let mut builder = FileScanConfigBuilder::from(self.clone()); + + if let Some(new_source) = new_file_source { + builder = builder.with_source(new_source); + } + + builder = builder.with_projection_indices(new_projection_indices); - Arc::new( - FileScanConfigBuilder::from(file_scan) - // Assign projected statistics to source - .with_projection_indices(Some(new_projections)) - .with_source(source) - .build(), - ) as _ - })) + let new_config = Arc::new(builder.build()) as Arc; + Ok(Some((new_config, remaining_projections))) + } + } } fn try_pushdown_filters( @@ -2300,7 +2299,7 @@ mod tests { // Simulate projection being updated. Since the filter has already been pushed down, // the new projection won't include the filtered column. - let data_source = config + let (data_source, _remaining_projections) = config .try_swapping_with_projection(&[ProjectionExpr::new( col("c3", &file_schema).unwrap(), "c3".to_string(), diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 7d5c8c4834ea..605796ae3665 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -35,9 +35,7 @@ use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::memory::MemoryStream; -use datafusion_physical_plan::projection::{ - all_alias_free_columns, new_projections_for_columns, ProjectionExpr, -}; +use datafusion_physical_plan::projection::ProjectionExpr; use datafusion_physical_plan::{ common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr, SendableRecordBatchStream, Statistics, @@ -228,25 +226,29 @@ impl DataSource for MemorySourceConfig { fn try_swapping_with_projection( &self, projection: &[ProjectionExpr], - ) -> Result>> { + ) -> Result { + use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, + }; + // If there is any non-column or alias-carrier expression, Projection should not be removed. - // This process can be moved into MemoryExec, but it would be an overlap of their responsibility. - all_alias_free_columns(projection) - .then(|| { - let all_projections = (0..self.schema.fields().len()).collect(); - let new_projections = new_projections_for_columns( - projection, - self.projection().as_ref().unwrap_or(&all_projections), - ); + if !all_alias_free_columns(projection) { + return Ok(None); + } - MemorySourceConfig::try_new( - self.partitions(), - self.original_schema(), - Some(new_projections), - ) - .map(|s| Arc::new(s) as Arc) - }) - .transpose() + let all_projections: Vec = (0..self.schema.fields().len()).collect(); + let new_projections = new_projections_for_columns( + projection, + self.projection().as_ref().unwrap_or(&all_projections), + ); + + let new_source = MemorySourceConfig::try_new( + self.partitions(), + self.original_schema(), + Some(new_projections), + )?; + + Ok(Some((Arc::new(new_source) as Arc, None))) } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 11a8a3867b80..ff556900cbf0 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,6 +22,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_plan::execution_plan::{ Boundedness, EmissionType, SchedulingType, }; @@ -175,7 +176,7 @@ pub trait DataSource: Send + Sync + Debug { fn try_swapping_with_projection( &self, _projection: &[ProjectionExpr], - ) -> Result>>; + ) -> Result; /// Try to push down filters into this DataSource. /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// @@ -191,6 +192,9 @@ pub trait DataSource: Send + Sync + Debug { } } +pub type ProjectionPushdownResult = + Option<(Arc, Option)>; + /// [`ExecutionPlan`] that reads one or more files /// /// `DataSourceExec` implements common functionality such as applying @@ -321,8 +325,16 @@ impl ExecutionPlan for DataSourceExec { .data_source .try_swapping_with_projection(projection.expr())? { - Some(new_data_source) => { - Ok(Some(Arc::new(DataSourceExec::new(new_data_source)))) + Some((new_data_source, remaining_projections)) => { + let new_exec = Arc::new(DataSourceExec::new(new_data_source)); + if let Some(remaining_projections) = remaining_projections { + let new_projection_exec = + ProjectionExec::try_new(remaining_projections, new_exec)?; + + return Ok(Some(Arc::new(new_projection_exec))); + } + + Ok(Some(new_exec)) } None => Ok(None), }