diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 55400e2192832..1c560be6d08a6 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider { let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema, file_source) .with_limit(limit) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_file(partitioned_file) .build(); diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 1a2c2cbff4183..8abed90238d40 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -60,7 +60,7 @@ async fn csv_opener() -> Result<()> { Arc::clone(&schema), Arc::new(CsvSource::default()), ) - .with_projection(Some(vec![12, 0])) + .with_projection_indices(Some(vec![12, 0])) .with_limit(Some(5)) .with_file(PartitionedFile::new(path.display().to_string(), 10)) .build(); @@ -126,7 +126,7 @@ async fn json_opener() -> Result<()> { schema, Arc::new(JsonSource::default()), ) - .with_projection(Some(vec![1, 0])) + .with_projection_indices(Some(vec![1, 0])) .with_limit(Some(5)) .with_file(PartitionedFile::new(path.to_string(), 10)) .build(); diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs index 43e2d4ca09884..d3a7d2ec67f3c 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/default_column_values.rs @@ -260,7 +260,7 @@ impl TableProvider for DefaultValueTableProvider { self.schema.clone(), Arc::new(parquet_source), ) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_limit(limit) .with_file_group(file_group) .with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _)); diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index afc3b279f4a9f..127c55da982c8 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider { let source = Arc::new(ParquetSource::default().with_predicate(predicate)); let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, self.schema(), source) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_limit(limit); // Transform to the format needed to pass to DataSourceExec diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index e9ac1bf097a22..95f9523d4401c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -499,7 +499,7 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection) + .with_projection_indices(projection) .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e165707c2eb0e..4881783eeba69 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -90,7 +90,7 @@ pub(crate) mod test_util { ) .with_file_groups(file_groups) .with_statistics(statistics) - .with_projection(projection) + .with_projection_indices(projection) .with_limit(limit) .build(), ) diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 8a00af959ccc9..9068c9758179d 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -88,7 +88,7 @@ mod tests { source, ) .with_file(meta.into()) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); let source_exec = DataSourceExec::from_data_source(conf); @@ -160,7 +160,7 @@ mod tests { let source = Arc::new(AvroSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file(meta.into()) - .with_projection(projection) + .with_projection_indices(projection) .build(); let source_exec = DataSourceExec::from_data_source(conf); @@ -231,7 +231,7 @@ mod tests { let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. - .with_projection(projection) + .with_projection_indices(projection) .with_file(partitioned_file) .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) .build(); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index b2ef51a76f89a..4f46a57d8b137 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -118,7 +118,7 @@ mod tests { )) .with_file_compression_type(file_compression_type) .with_newlines_in_values(false) - .with_projection(Some(vec![0, 2, 4])) + .with_projection_indices(Some(vec![0, 2, 4])) .build(); assert_eq!(13, config.file_schema().fields().len()); @@ -183,7 +183,7 @@ mod tests { )) .with_newlines_in_values(false) .with_file_compression_type(file_compression_type.to_owned()) - .with_projection(Some(vec![4, 0, 2])) + .with_projection_indices(Some(vec![4, 0, 2])) .build(); assert_eq!(13, config.file_schema().fields().len()); let csv = DataSourceExec::from_data_source(config); @@ -373,7 +373,7 @@ mod tests { .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) // We should be able to project on the partition column // Which is supposed to be after the file fields - .with_projection(Some(vec![0, num_file_schema_fields])) + .with_projection_indices(Some(vec![0, num_file_schema_fields])) .build(); // we don't have `/date=xx/` in the path but that is ok because diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 0d45711c76fb0..f7d5c710bf48a 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -297,7 +297,7 @@ mod tests { let source = Arc::new(JsonSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) - .with_projection(Some(vec![0, 2])) + .with_projection_indices(Some(vec![0, 2])) .with_file_compression_type(file_compression_type.to_owned()) .build(); let exec = DataSourceExec::from_data_source(conf); @@ -345,7 +345,7 @@ mod tests { let source = Arc::new(JsonSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) - .with_projection(Some(vec![3, 0, 2])) + .with_projection_indices(Some(vec![3, 0, 2])) .with_file_compression_type(file_compression_type.to_owned()) .build(); let exec = DataSourceExec::from_data_source(conf); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 10a475c1cc9a6..6df5cd7ac68ff 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -201,7 +201,7 @@ mod tests { source, ) .with_file_group(file_group) - .with_projection(self.projection.clone()) + .with_projection_indices(self.projection.clone()) .build(); DataSourceExec::from_data_source(base_config) } @@ -1655,7 +1655,7 @@ mod tests { let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source) .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day - .with_projection(Some(vec![0, 1, 2, 12, 13])) + .with_projection_indices(Some(vec![0, 1, 2, 12, 13])) .with_table_partition_cols(vec![ Field::new("year", DataType::Utf8, false), Field::new("month", DataType::UInt8, false), diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 59cbf4b0872ea..9be391a9108e6 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -126,7 +126,7 @@ async fn multi_parquet_coercion_projection() { Arc::new(ParquetSource::default()), ) .with_file_group(file_group) - .with_projection(Some(vec![1, 0, 2])) + .with_projection_indices(Some(vec![1, 0, 2])) .build(); let parquet_exec = DataSourceExec::from_data_source(config); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index f05f3f00281d6..54e8e7bf04da5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -165,7 +165,7 @@ impl FileSource for TestSource { fn with_projection(&self, config: &FileScanConfig) -> Arc { Arc::new(TestSource { - projection: config.projection.clone(), + projection: config.projection_exprs.as_ref().map(|p| p.column_indices()), ..self.clone() }) } diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index c51a5e02c9c33..8631613c3925e 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -390,7 +390,7 @@ fn create_simple_csv_exec() -> Arc { Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![0, 1, 2, 3, 4])) + .with_projection_indices(Some(vec![0, 1, 2, 3, 4])) .build(); DataSourceExec::from_data_source(config) @@ -409,7 +409,7 @@ fn create_projecting_csv_exec() -> Arc { Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![3, 2, 1])) + .with_projection_indices(Some(vec![3, 2, 1])) .build(); DataSourceExec::from_data_source(config) @@ -1596,7 +1596,7 @@ fn partitioned_data_source() -> Arc { ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)]) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); DataSourceExec::from_data_source(config) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 695252803bae7..c52397d9a7cc6 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -44,18 +44,20 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::BinaryExpr; -use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::projection::ProjectionExpr; +use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, ProjectionExpr, +}; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, - projection::{all_alias_free_columns, new_projections_for_columns}, DisplayAs, DisplayFormatType, }; use std::{ @@ -124,7 +126,7 @@ use log::{debug, warn}; /// let file_source = Arc::new(ParquetSource::new()); /// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source) /// .with_limit(Some(1000)) // read only the first 1000 records -/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 +/// .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3 /// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group /// .with_file(PartitionedFile::new("file1.parquet", 1234)) /// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes @@ -175,9 +177,12 @@ pub struct FileScanConfig { pub file_groups: Vec, /// Table constraints pub constraints: Constraints, - /// Columns on which to project the data. Indexes that are higher than the - /// number of columns of `file_schema` refer to `table_partition_cols`. - pub projection: Option>, + /// Physical expressions defining the projection to apply when reading data. + /// + /// Each expression in the projection can reference columns from both the file + /// schema and table partition columns. If `None`, all columns from the table + /// schema are projected. + pub projection_exprs: Option, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -229,7 +234,7 @@ pub struct FileScanConfig { /// // Set a limit of 1000 rows /// .with_limit(Some(1000)) /// // Project only the first column -/// .with_projection(Some(vec![0])) +/// .with_projection_indices(Some(vec![0])) /// // Add partition columns /// .with_table_partition_cols(vec![ /// Field::new("date", DataType::Utf8, false), @@ -261,7 +266,7 @@ pub struct FileScanConfigBuilder { table_schema: TableSchema, file_source: Arc, limit: Option, - projection: Option>, + projection_indices: Option>, constraints: Option, file_groups: Vec, statistics: Option, @@ -294,7 +299,7 @@ impl FileScanConfigBuilder { file_compression_type: None, new_lines_in_values: None, limit: None, - projection: None, + projection_indices: None, constraints: None, batch_size: None, expr_adapter_factory: None, @@ -317,10 +322,25 @@ impl FileScanConfigBuilder { self } + pub fn table_schema(&self) -> &SchemaRef { + self.table_schema.table_schema() + } + /// Set the columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. - pub fn with_projection(mut self, projection: Option>) -> Self { - self.projection = projection; + /// + /// # Deprecated + /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release. + #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")] + pub fn with_projection(self, indices: Option>) -> Self { + self.with_projection_indices(indices) + } + + /// Set the columns on which to project the data using column indices. + /// + /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`. + pub fn with_projection_indices(mut self, indices: Option>) -> Self { + self.projection_indices = indices; self } @@ -433,7 +453,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, - projection, + projection_indices, constraints, file_groups, statistics, @@ -455,12 +475,18 @@ impl FileScanConfigBuilder { file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); + // Convert projection indices to ProjectionExprs using the final table schema + // (which now includes partition columns if they were added) + let projection_exprs = projection_indices.map(|indices| { + ProjectionExprs::from_indices(&indices, table_schema.table_schema()) + }); + FileScanConfig { object_store_url, table_schema, file_source, limit, - projection, + projection_exprs, constraints, file_groups, output_ordering, @@ -484,7 +510,9 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection: config.projection, + projection_indices: config + .projection_exprs + .map(|p| p.ordered_column_indices()), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -673,15 +701,16 @@ impl DataSource for FileScanConfig { let new_projections = new_projections_for_columns( projection, &file_scan - .projection - .clone() + .projection_exprs + .as_ref() + .map(|p| p.ordered_column_indices()) .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()), ); Arc::new( FileScanConfigBuilder::from(file_scan) // Assign projected statistics to source - .with_projection(Some(new_projections)) + .with_projection_indices(Some(new_projections)) .with_source(source) .build(), ) as _ @@ -727,8 +756,8 @@ impl FileScanConfig { } fn projection_indices(&self) -> Vec { - match &self.projection { - Some(proj) => proj.clone(), + match &self.projection_exprs { + Some(proj) => proj.ordered_column_indices(), None => (0..self.file_schema().fields().len() + self.table_partition_cols().len()) .collect(), @@ -825,7 +854,7 @@ impl FileScanConfig { /// Project the schema, constraints, and the statistics on the given column indices pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec) { - if self.projection.is_none() && self.table_partition_cols().is_empty() { + if self.projection_exprs.is_none() && self.table_partition_cols().is_empty() { return ( Arc::clone(self.file_schema()), self.constraints.clone(), @@ -844,12 +873,17 @@ impl FileScanConfig { } pub fn projected_file_column_names(&self) -> Option> { - self.projection.as_ref().map(|p| { - p.iter() - .filter(|col_idx| **col_idx < self.file_schema().fields().len()) - .map(|col_idx| self.file_schema().field(*col_idx).name()) + let fields = self.file_schema().fields(); + + self.projection_exprs.as_ref().map(|p| { + let column_indices = p.ordered_column_indices(); + + column_indices + .iter() + .filter(|&&col_i| col_i < fields.len()) + .map(|&col_i| self.file_schema().field(col_i).name()) .cloned() - .collect() + .collect::>() }) } @@ -875,11 +909,11 @@ impl FileScanConfig { } pub fn file_column_projection_indices(&self) -> Option> { - self.projection.as_ref().map(|p| { - p.iter() - .filter(|col_idx| **col_idx < self.file_schema().fields().len()) - .copied() - .collect() + self.projection_exprs.as_ref().map(|p| { + p.ordered_column_indices() + .into_iter() + .filter(|&i| i < self.file_schema().fields().len()) + .collect::>() }) } @@ -1415,10 +1449,15 @@ fn get_projected_output_ordering( return false; } + let indices = base_config + .projection_exprs + .as_ref() + .map(|p| p.ordered_column_indices()); + let statistics = match MinMaxStatistics::new_from_files( &new_ordering, projected_schema, - base_config.projection.as_deref(), + indices.as_deref(), group.iter(), ) { Ok(statistics) => statistics, @@ -1479,7 +1518,7 @@ mod tests { use datafusion_common::{assert_batches_eq, internal_err}; use datafusion_expr::{Operator, SortExpr}; use datafusion_physical_expr::create_physical_sort_expr; - use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; + use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; /// Returns the column names on the schema @@ -2143,7 +2182,7 @@ mod tests { file_schema, Arc::new(MockSource::default()), ) - .with_projection(projection) + .with_projection_indices(projection) .with_statistics(statistics) .with_table_partition_cols(table_partition_cols) .build() @@ -2196,7 +2235,7 @@ mod tests { // Build with various configurations let config = builder .with_limit(Some(1000)) - .with_projection(Some(vec![0, 1])) + .with_projection_indices(Some(vec![0, 1])) .with_table_partition_cols(vec![Field::new( "date", wrap_partition_type_in_dict(DataType::Utf8), @@ -2219,7 +2258,10 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, Some(1000)); - assert_eq!(config.projection, Some(vec![0, 1])); + assert_eq!( + config.projection_exprs.as_ref().map(|p| p.column_indices()), + Some(vec![0, 1]) + ); assert_eq!(config.table_partition_cols().len(), 1); assert_eq!(config.table_partition_cols()[0].name(), "date"); assert_eq!(config.file_groups.len(), 1); @@ -2253,7 +2295,7 @@ mod tests { Arc::clone(&file_schema), Arc::clone(&file_source), ) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); // Simulate projection being updated. Since the filter has already been pushed down, @@ -2302,7 +2344,10 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, None); - assert_eq!(config.projection, None); + assert_eq!( + config.projection_exprs.as_ref().map(|p| p.column_indices()), + None + ); assert!(config.table_partition_cols().is_empty()); assert!(config.file_groups.is_empty()); assert_eq!( @@ -2357,7 +2402,7 @@ mod tests { Arc::clone(&schema), Arc::clone(&file_source), ) - .with_projection(Some(vec![0, 2])) + .with_projection_indices(Some(vec![0, 2])) .with_limit(Some(10)) .with_table_partition_cols(partition_cols.clone()) .with_file(file.clone()) @@ -2375,7 +2420,13 @@ mod tests { let partition_cols = partition_cols.into_iter().map(Arc::new).collect::>(); assert_eq!(new_config.object_store_url, object_store_url); assert_eq!(*new_config.file_schema(), schema); - assert_eq!(new_config.projection, Some(vec![0, 2])); + assert_eq!( + new_config + .projection_exprs + .as_ref() + .map(|p| p.column_indices()), + Some(vec![0, 2]) + ); assert_eq!(new_config.limit, Some(10)); assert_eq!(*new_config.table_partition_cols(), partition_cols); assert_eq!(new_config.file_groups.len(), 1); @@ -2594,7 +2645,7 @@ mod tests { Arc::clone(&schema), Arc::new(MockSource::default()), ) - .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2 + .with_projection_indices(Some(vec![0, 2])) // Only project columns 0 and 2 .with_file_groups(vec![file_group]) .build(); diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 8e95585ce873b..863c123e3b1d2 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -132,6 +132,10 @@ impl TableSchema { table_partition_cols: Vec, ) -> TableSchema { self.table_partition_cols = table_partition_cols; + // Rebuild the table schema with the new partition columns + let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.table_partition_cols.iter().cloned()); + self.table_schema = Arc::new(builder.finish()); self } diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index e35bfbb3a20de..fc972d644e677 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -100,24 +100,24 @@ impl From for (Arc, String) { /// representing a complete projection operation and provides /// methods to manipulate and analyze the projection as a whole. #[derive(Debug, Clone)] -pub struct Projection { +pub struct ProjectionExprs { exprs: Vec, } -impl std::fmt::Display for Projection { +impl std::fmt::Display for ProjectionExprs { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let exprs: Vec = self.exprs.iter().map(|e| e.to_string()).collect(); write!(f, "Projection[{}]", exprs.join(", ")) } } -impl From> for Projection { +impl From> for ProjectionExprs { fn from(value: Vec) -> Self { Self { exprs: value } } } -impl From<&[ProjectionExpr]> for Projection { +impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { exprs: value.to_vec(), @@ -125,15 +125,83 @@ impl From<&[ProjectionExpr]> for Projection { } } -impl AsRef<[ProjectionExpr]> for Projection { +impl FromIterator for ProjectionExprs { + fn from_iter>(exprs: T) -> Self { + Self { + exprs: exprs.into_iter().collect::>(), + } + } +} + +impl AsRef<[ProjectionExpr]> for ProjectionExprs { fn as_ref(&self) -> &[ProjectionExpr] { &self.exprs } } -impl Projection { - pub fn new(exprs: Vec) -> Self { - Self { exprs } +impl ProjectionExprs { + pub fn new(exprs: I) -> Self + where + I: IntoIterator, + { + Self { + exprs: exprs.into_iter().collect::>(), + } + } + + /// Creates a [`ProjectionExpr`] from a list of column indices. + /// + /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column + /// in the input schema. + /// + /// # Behavior + /// - Ordering: the output projection preserves the exact order of indices provided in the input slice + /// For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order + /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column + /// For example, `[0, 0]` creates 2 separate projections both referencing column 0 + /// + /// # Panics + /// Panics if any index in `indices` is out of bounds for the provided schema. + /// + /// # Example + /// + /// ```rust + /// use std::sync::Arc; + /// use arrow::datatypes::{Schema, Field, DataType}; + /// use datafusion_physical_expr::projection::ProjectionExprs; + /// + /// // Create a schema with three columns + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("a", DataType::Int32, false), + /// Field::new("b", DataType::Utf8, false), + /// Field::new("c", DataType::Float64, false), + /// ])); + /// + /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved + /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema); + /// + /// // This creates: SELECT c@2 AS c, a@0 AS a + /// assert_eq!(projection.as_ref().len(), 2); + /// assert_eq!(projection.as_ref()[0].alias, "c"); + /// assert_eq!(projection.as_ref()[1].alias, "a"); + /// + /// // Duplicate indices are allowed + /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema); + /// assert_eq!(projection_with_dups.as_ref().len(), 3); + /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a"); + /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate + /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b"); + /// ``` + pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self { + let projection_exprs = indices.iter().map(|&i| { + let field = schema.field(i); + ProjectionExpr { + expr: Arc::new(Column::new(field.name(), i)), + alias: field.name().clone(), + } + }); + + Self::from_iter(projection_exprs) } /// Returns an iterator over the projection expressions @@ -167,7 +235,7 @@ impl Projection { /// /// ```rust /// use std::sync::Arc; - /// use datafusion_physical_expr::projection::{Projection, ProjectionExpr}; + /// use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionExpr}; /// use datafusion_physical_expr::expressions::{Column, BinaryExpr, Literal}; /// use datafusion_common::{Result, ScalarValue}; /// use datafusion_expr::Operator; @@ -175,7 +243,7 @@ impl Projection { /// fn main() -> Result<()> { /// // Example from the docstring: /// // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - /// let base = Projection::new(vec![ + /// let base = ProjectionExprs::new(vec![ /// ProjectionExpr { /// expr: Arc::new(Column::new("c", 2)), /// alias: "x".to_string(), @@ -191,7 +259,7 @@ impl Projection { /// ]); /// /// // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2 - /// let top = Projection::new(vec![ + /// let top = ProjectionExprs::new(vec![ /// ProjectionExpr { /// expr: Arc::new(BinaryExpr::new( /// Arc::new(Column::new("x", 0)), @@ -224,7 +292,7 @@ impl Projection { /// # Errors /// This function returns an error if any expression in the `other` projection cannot be /// applied on top of this projection. - pub fn try_merge(&self, other: &Projection) -> Result { + pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); for proj_expr in &other.exprs { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? @@ -240,7 +308,7 @@ impl Projection { alias: proj_expr.alias.clone(), }); } - Ok(Projection::new(new_exprs)) + Ok(ProjectionExprs::new(new_exprs)) } /// Extract the column indices used in this projection. @@ -256,6 +324,46 @@ impl Projection { .collect_vec() } + /// Extract the ordered column indices for a column-only projection. + /// + /// This function assumes that all expressions in the projection are simple column references. + /// It returns the column indices in the order they appear in the projection. + /// + /// # Panics + /// + /// Panics if any expression in the projection is not a simple column reference. This includes: + /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`) + /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`) + /// - Literals (e.g., `42`, `'hello'`) + /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`) + /// + /// # Returns + /// + /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices), + /// this function: + /// - Preserves the projection order (does not sort) + /// - Preserves duplicates (does not deduplicate) + /// + /// # Example + /// + /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2, + /// this function would return `[2, 0, 2]`. + /// + /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain + /// non-column expressions or if you need a deduplicated sorted list. + pub fn ordered_column_indices(&self) -> Vec { + self.exprs + .iter() + .map(|e| { + e.expr + .as_any() + .downcast_ref::() + .expect("Expected column reference in projection") + .index() + }) + .collect() + } + /// Project a schema according to this projection. /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`. @@ -327,7 +435,7 @@ impl Projection { } } -impl<'a> IntoIterator for &'a Projection { +impl<'a> IntoIterator for &'a ProjectionExprs { type Item = &'a ProjectionExpr; type IntoIter = std::slice::Iter<'a, ProjectionExpr>; @@ -336,7 +444,7 @@ impl<'a> IntoIterator for &'a Projection { } } -impl IntoIterator for Projection { +impl IntoIterator for ProjectionExprs { type Item = ProjectionExpr; type IntoIter = std::vec::IntoIter; @@ -1570,7 +1678,7 @@ pub(crate) mod tests { let source = get_stats(); let schema = get_schema(); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col1", 1)), alias: "col1".to_string(), @@ -1612,7 +1720,7 @@ pub(crate) mod tests { let source = get_stats(); let schema = get_schema(); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "col2".to_string(), @@ -1663,7 +1771,7 @@ pub(crate) mod tests { alias: "b".to_string(), }, ]; - let projection = Projection::new(exprs.clone()); + let projection = ProjectionExprs::new(exprs.clone()); assert_eq!(projection.as_ref().len(), 2); Ok(()) } @@ -1674,7 +1782,7 @@ pub(crate) mod tests { expr: Arc::new(Column::new("x", 0)), alias: "x".to_string(), }]; - let projection: Projection = exprs.clone().into(); + let projection: ProjectionExprs = exprs.clone().into(); assert_eq!(projection.as_ref().len(), 1); Ok(()) } @@ -1691,7 +1799,7 @@ pub(crate) mod tests { alias: "col2".to_string(), }, ]; - let projection = Projection::new(exprs); + let projection = ProjectionExprs::new(exprs); let as_ref: &[ProjectionExpr] = projection.as_ref(); assert_eq!(as_ref.len(), 2); Ok(()) @@ -1700,7 +1808,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_multiple_columns() -> Result<()> { // Test with reversed column order to ensure proper reordering - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 5)), alias: "c".to_string(), @@ -1722,7 +1830,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_duplicates() -> Result<()> { // Test that duplicate column indices appear only once - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("a", 1)), alias: "a".to_string(), @@ -1743,7 +1851,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_unsorted() -> Result<()> { // Test that column indices are sorted in the output - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 5)), alias: "c".to_string(), @@ -1769,7 +1877,7 @@ pub(crate) mod tests { Operator::Plus, Arc::new(Column::new("b", 4)), )); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr, alias: "sum".to_string(), @@ -1786,7 +1894,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_empty() -> Result<()> { - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); assert_eq!(projection.column_indices(), Vec::::new()); Ok(()) } @@ -1794,7 +1902,7 @@ pub(crate) mod tests { #[test] fn test_merge_simple_columns() -> Result<()> { // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - let base_projection = Projection::new(vec![ + let base_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 2)), alias: "x".to_string(), @@ -1810,7 +1918,7 @@ pub(crate) mod tests { ]); // Second projection: SELECT y@1 AS col2, x@0 AS col1 - let top_projection = Projection::new(vec![ + let top_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("y", 1)), alias: "col2".to_string(), @@ -1831,7 +1939,7 @@ pub(crate) mod tests { #[test] fn test_merge_with_expressions() -> Result<()> { // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - let base_projection = Projection::new(vec![ + let base_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 2)), alias: "x".to_string(), @@ -1847,7 +1955,7 @@ pub(crate) mod tests { ]); // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1 - let top_projection = Projection::new(vec![ + let top_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("y", 1)), @@ -1876,7 +1984,7 @@ pub(crate) mod tests { #[test] fn try_merge_error() { // Create a base projection - let base = Projection::new(vec![ + let base = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("a", 0)), alias: "x".to_string(), @@ -1888,7 +1996,7 @@ pub(crate) mod tests { ]); // Create a top projection that references a non-existent column index - let top = Projection::new(vec![ProjectionExpr { + let top = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(Column::new("z", 5)), // Invalid index alias: "result".to_string(), }]); @@ -1907,7 +2015,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col2 AS c, col0 AS a - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "c".to_string(), @@ -1940,7 +2048,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col0 + 1 AS incremented - let projection = Projection::new(vec![ProjectionExpr { + let projection = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("col0", 0)), Operator::Plus, @@ -1974,7 +2082,7 @@ pub(crate) mod tests { ]); // Projection: SELECT col0 AS renamed - let projection = Projection::new(vec![ProjectionExpr { + let projection = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(Column::new("col0", 0)), alias: "renamed".to_string(), }]); @@ -1994,7 +2102,7 @@ pub(crate) mod tests { #[test] fn test_project_schema_empty() -> Result<()> { let input_schema = get_schema(); - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); let output_schema = projection.project_schema(&input_schema)?; @@ -2009,7 +2117,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col1 AS text, col0 AS num - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col1", 1)), alias: "text".to_string(), @@ -2057,7 +2165,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("col0", 0)), @@ -2105,7 +2213,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "f".to_string(), @@ -2136,7 +2244,7 @@ pub(crate) mod tests { let input_stats = get_stats(); let input_schema = get_schema(); - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); let output_stats = projection.project_statistics(input_stats, &input_schema)?; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 4dc88bc566310..2c84570b33d9d 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -53,7 +53,9 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; // Re-exported from datafusion-physical-expr for backwards compatibility // We recommend updating your imports to use datafusion-physical-expr directly -pub use datafusion_physical_expr::projection::{update_expr, Projection, ProjectionExpr}; +pub use datafusion_physical_expr::projection::{ + update_expr, ProjectionExpr, ProjectionExprs, +}; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -65,7 +67,7 @@ use log::trace; #[derive(Debug, Clone)] pub struct ProjectionExec { /// The projection expressions stored as tuples of (expression, output column name) - projection: Projection, + projection: ProjectionExprs, /// The schema once the projection has been applied to the input schema: SchemaRef, /// The input plan @@ -130,7 +132,7 @@ impl ProjectionExec { let input_schema = input.schema(); // convert argument to Vec let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = Projection::new(expr_vec); + let projection = ProjectionExprs::new(expr_vec); let schema = Arc::new(projection.project_schema(&input_schema)?); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 7c4b9e55b8137..2a3906d493476 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -545,7 +545,7 @@ pub fn parse_protobuf_file_scan_config( .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) - .with_projection(Some(projection)) + .with_projection_indices(Some(projection)) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) .with_output_ordering(output_ordering) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 399c234191aa7..dc0a78dbccf11 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -532,9 +532,10 @@ pub fn serialize_file_scan_config( statistics: Some((&conf.file_source.statistics().unwrap()).into()), limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }), projection: conf - .projection + .projection_exprs .as_ref() - .unwrap_or(&(0..schema.fields().len()).collect::>()) + .map(|p| p.column_indices()) + .unwrap_or((0..schema.fields().len()).collect::>()) .iter() .map(|n| *n as u32) .collect(), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a0456e2031be0..c8b2bc02e447b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -920,7 +920,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { schema, file_source, ) - .with_projection(Some(vec![0, 1])) + .with_projection_indices(Some(vec![0, 1])) .with_file_group(FileGroup::new(vec![file_group])) .with_table_partition_cols(vec![Field::new( "part".to_string(), @@ -1814,7 +1814,7 @@ async fn roundtrip_projection_source() -> Result<()> { 1024, )])]) .with_statistics(statistics) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); let filter = Arc::new( diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ecf465dd3f18d..45a19cea80cfc 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -151,8 +151,8 @@ pub async fn from_substrait_rel( .iter() .map(|item| item.field as usize) .collect(); - base_config_builder = - base_config_builder.with_projection(Some(column_indices)); + base_config_builder = base_config_builder + .with_projection_indices(Some(column_indices)); } } diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index 63abd14d6f5e1..20d41c2e6112a 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -92,11 +92,12 @@ pub fn to_substrait_rel( }; let mut select_struct = None; - if let Some(projection) = file_config.projection.as_ref() { + if let Some(projection) = file_config.projection_exprs.as_ref() { let struct_items = projection - .iter() + .column_indices() + .into_iter() .map(|index| StructItem { - field: *index as i32, + field: index as i32, // FIXME: duckdb sets this to None, but it's not clear why. // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191 child: None, diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 4174fef7a6922..c568b8b28e1f9 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,6 +125,57 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. +### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs` + +The `projection` field in `FileScanConfig` has been renamed to `projection_exprs` and its type has changed from `Option>` to `Option`. This change enables more powerful projection pushdown capabilities by supporting arbitrary physical expressions rather than just column indices. + +**Impact on direct field access:** + +If you directly access the `projection` field: + +```rust +# /* comment to avoid running +let config: FileScanConfig = ...; +let projection = config.projection; +# */ +``` + +You should update to: + +```rust +# /* comment to avoid running +let config: FileScanConfig = ...; +let projection_exprs = config.projection_exprs; +# */ +``` + +**Impact on builders:** + +The `FileScanConfigBuilder::with_projection()` method has been deprecated in favor of `with_projection_indices()`: + +```diff +let config = FileScanConfigBuilder::new(url, schema, file_source) +- .with_projection(Some(vec![0, 2, 3])) ++ .with_projection_indices(Some(vec![0, 2, 3])) + .build(); +``` + +Note: `with_projection()` still works but is deprecated and will be removed in a future release. + +**What is `ProjectionExprs`?** + +`ProjectionExprs` is a new type that represents a list of physical expressions for projection. While it can be constructed from column indices (which is what `with_projection_indices` does internally), it also supports arbitrary physical expressions, enabling advanced features like expression evaluation during scanning. + +You can access column indices from `ProjectionExprs` using its methods if needed: + +```rust +# /* comment to avoid running +let projection_exprs: ProjectionExprs = ...; +// Get the column indices if the projection only contains simple column references +let indices = projection_exprs.column_indices(); +# */ +``` + ### `DESCRIBE query` support `DESCRIBE query` was previously an alias for `EXPLAIN query`, which outputs the