From 7441afe794135eadfa9a629148bb34e60d2a94e8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:31:54 -0500 Subject: [PATCH 01/10] Have FileScanConfig own table_schema --- datafusion/datasource/src/file_scan_config.rs | 42 +++++++++---------- datafusion/datasource/src/table_schema.rs | 14 +++++++ 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index d557a99274eab..4dfb6a4ec3d33 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -156,6 +156,11 @@ pub struct FileScanConfig { /// Schema information including the file schema, table partition columns, /// and the combined table schema. /// + /// The table schema (file schema + partition columns) is the schema exposed + /// upstream of [`FileScanConfig`] (e.g. in [`DataSourceExec`]). + /// + /// See [`TableSchema`] for more information. + /// /// [`DataSourceExec`]: crate::source::DataSourceExec pub table_schema: TableSchema, /// List of files to be processed, grouped into partitions @@ -244,23 +249,19 @@ pub struct FileScanConfig { #[derive(Clone)] pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, - /// Table schema before any projections or partition columns are applied. + /// Schema information including the file schema, table partition columns, + /// and the combined table schema. /// - /// This schema is used to read the files, but is **not** necessarily the - /// schema of the physical files. Rather this is the schema that the + /// This schema is used to read the files, but the file schema is **not** necessarily + /// the schema of the physical files. Rather this is the schema that the /// physical file schema will be mapped onto, and the schema that the /// [`DataSourceExec`] will return. /// - /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns. - /// - /// This probably would be better named `table_schema` - /// /// [`DataSourceExec`]: crate::source::DataSourceExec - file_schema: SchemaRef, + table_schema: TableSchema, file_source: Arc, limit: Option, projection: Option>, - table_partition_cols: Vec, constraints: Option, file_groups: Vec, statistics: Option, @@ -285,7 +286,7 @@ impl FileScanConfigBuilder { ) -> Self { Self { object_store_url, - file_schema, + table_schema: TableSchema::from_file_schema(file_schema), file_source, file_groups: vec![], statistics: None, @@ -294,7 +295,6 @@ impl FileScanConfigBuilder { new_lines_in_values: None, limit: None, projection: None, - table_partition_cols: vec![], constraints: None, batch_size: None, expr_adapter_factory: None, @@ -326,10 +326,13 @@ impl FileScanConfigBuilder { /// Set the partitioning columns pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { - self.table_partition_cols = table_partition_cols + let table_partition_cols: Vec = table_partition_cols .into_iter() .map(|f| Arc::new(f) as FieldRef) .collect(); + self.table_schema = self + .table_schema + .with_table_partition_cols(table_partition_cols); self } @@ -427,11 +430,10 @@ impl FileScanConfigBuilder { pub fn build(self) -> FileScanConfig { let Self { object_store_url, - file_schema, + table_schema, file_source, limit, projection, - table_partition_cols, constraints, file_groups, statistics, @@ -443,19 +445,16 @@ impl FileScanConfigBuilder { } = self; let constraints = constraints.unwrap_or_default(); - let statistics = - statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema)); + let statistics = statistics + .unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema())); let file_source = file_source .with_statistics(statistics.clone()) - .with_schema(Arc::clone(&file_schema)); + .with_schema(Arc::clone(table_schema.file_schema())); let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); - // Create TableSchema from file_schema and table_partition_cols - let table_schema = TableSchema::new(file_schema, table_partition_cols); - FileScanConfig { object_store_url, table_schema, @@ -477,7 +476,7 @@ impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { Self { object_store_url: config.object_store_url, - file_schema: Arc::clone(config.table_schema.file_schema()), + table_schema: config.table_schema, file_source: Arc::::clone(&config.file_source), file_groups: config.file_groups, statistics: config.file_source.statistics().ok(), @@ -486,7 +485,6 @@ impl From for FileScanConfigBuilder { new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, projection: config.projection, - table_partition_cols: config.table_schema.table_partition_cols().clone(), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 9413bd9ef20bf..8e95585ce873b 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -121,6 +121,20 @@ impl TableSchema { } } + /// Create a new TableSchema from a file schema with no partition columns. + pub fn from_file_schema(file_schema: SchemaRef) -> Self { + Self::new(file_schema, vec![]) + } + + /// Set the table partition columns and rebuild the table schema. + pub fn with_table_partition_cols( + mut self, + table_partition_cols: Vec, + ) -> TableSchema { + self.table_partition_cols = table_partition_cols; + self + } + /// Get the file schema (without partition columns). /// /// This is the schema of the actual data files on disk. From 2621f4e7b370de420d35b71dc0ddc341012c6ed8 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Thu, 23 Oct 2025 12:37:13 -0400 Subject: [PATCH 02/10] Nit: rename Projection -> ProjectionExprs --- datafusion/physical-expr/src/projection.rs | 68 +++++++++++----------- datafusion/physical-plan/src/projection.rs | 8 ++- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index e35bfbb3a20de..147e976fad3f4 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -100,24 +100,24 @@ impl From for (Arc, String) { /// representing a complete projection operation and provides /// methods to manipulate and analyze the projection as a whole. #[derive(Debug, Clone)] -pub struct Projection { +pub struct ProjectionExprs { exprs: Vec, } -impl std::fmt::Display for Projection { +impl std::fmt::Display for ProjectionExprs { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let exprs: Vec = self.exprs.iter().map(|e| e.to_string()).collect(); write!(f, "Projection[{}]", exprs.join(", ")) } } -impl From> for Projection { +impl From> for ProjectionExprs { fn from(value: Vec) -> Self { Self { exprs: value } } } -impl From<&[ProjectionExpr]> for Projection { +impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { exprs: value.to_vec(), @@ -125,13 +125,13 @@ impl From<&[ProjectionExpr]> for Projection { } } -impl AsRef<[ProjectionExpr]> for Projection { +impl AsRef<[ProjectionExpr]> for ProjectionExprs { fn as_ref(&self) -> &[ProjectionExpr] { &self.exprs } } -impl Projection { +impl ProjectionExprs { pub fn new(exprs: Vec) -> Self { Self { exprs } } @@ -224,7 +224,7 @@ impl Projection { /// # Errors /// This function returns an error if any expression in the `other` projection cannot be /// applied on top of this projection. - pub fn try_merge(&self, other: &Projection) -> Result { + pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); for proj_expr in &other.exprs { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? @@ -240,7 +240,7 @@ impl Projection { alias: proj_expr.alias.clone(), }); } - Ok(Projection::new(new_exprs)) + Ok(ProjectionExprs::new(new_exprs)) } /// Extract the column indices used in this projection. @@ -327,7 +327,7 @@ impl Projection { } } -impl<'a> IntoIterator for &'a Projection { +impl<'a> IntoIterator for &'a ProjectionExprs { type Item = &'a ProjectionExpr; type IntoIter = std::slice::Iter<'a, ProjectionExpr>; @@ -336,7 +336,7 @@ impl<'a> IntoIterator for &'a Projection { } } -impl IntoIterator for Projection { +impl IntoIterator for ProjectionExprs { type Item = ProjectionExpr; type IntoIter = std::vec::IntoIter; @@ -1570,7 +1570,7 @@ pub(crate) mod tests { let source = get_stats(); let schema = get_schema(); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col1", 1)), alias: "col1".to_string(), @@ -1612,7 +1612,7 @@ pub(crate) mod tests { let source = get_stats(); let schema = get_schema(); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "col2".to_string(), @@ -1663,7 +1663,7 @@ pub(crate) mod tests { alias: "b".to_string(), }, ]; - let projection = Projection::new(exprs.clone()); + let projection = ProjectionExprs::new(exprs.clone()); assert_eq!(projection.as_ref().len(), 2); Ok(()) } @@ -1674,7 +1674,7 @@ pub(crate) mod tests { expr: Arc::new(Column::new("x", 0)), alias: "x".to_string(), }]; - let projection: Projection = exprs.clone().into(); + let projection: ProjectionExprs = exprs.clone().into(); assert_eq!(projection.as_ref().len(), 1); Ok(()) } @@ -1691,7 +1691,7 @@ pub(crate) mod tests { alias: "col2".to_string(), }, ]; - let projection = Projection::new(exprs); + let projection = ProjectionExprs::new(exprs); let as_ref: &[ProjectionExpr] = projection.as_ref(); assert_eq!(as_ref.len(), 2); Ok(()) @@ -1700,7 +1700,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_multiple_columns() -> Result<()> { // Test with reversed column order to ensure proper reordering - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 5)), alias: "c".to_string(), @@ -1722,7 +1722,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_duplicates() -> Result<()> { // Test that duplicate column indices appear only once - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("a", 1)), alias: "a".to_string(), @@ -1743,7 +1743,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_unsorted() -> Result<()> { // Test that column indices are sorted in the output - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 5)), alias: "c".to_string(), @@ -1769,7 +1769,7 @@ pub(crate) mod tests { Operator::Plus, Arc::new(Column::new("b", 4)), )); - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr, alias: "sum".to_string(), @@ -1786,7 +1786,7 @@ pub(crate) mod tests { #[test] fn test_column_indices_empty() -> Result<()> { - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); assert_eq!(projection.column_indices(), Vec::::new()); Ok(()) } @@ -1794,7 +1794,7 @@ pub(crate) mod tests { #[test] fn test_merge_simple_columns() -> Result<()> { // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - let base_projection = Projection::new(vec![ + let base_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 2)), alias: "x".to_string(), @@ -1810,7 +1810,7 @@ pub(crate) mod tests { ]); // Second projection: SELECT y@1 AS col2, x@0 AS col1 - let top_projection = Projection::new(vec![ + let top_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("y", 1)), alias: "col2".to_string(), @@ -1831,7 +1831,7 @@ pub(crate) mod tests { #[test] fn test_merge_with_expressions() -> Result<()> { // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - let base_projection = Projection::new(vec![ + let base_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("c", 2)), alias: "x".to_string(), @@ -1847,7 +1847,7 @@ pub(crate) mod tests { ]); // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1 - let top_projection = Projection::new(vec![ + let top_projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("y", 1)), @@ -1876,7 +1876,7 @@ pub(crate) mod tests { #[test] fn try_merge_error() { // Create a base projection - let base = Projection::new(vec![ + let base = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("a", 0)), alias: "x".to_string(), @@ -1888,7 +1888,7 @@ pub(crate) mod tests { ]); // Create a top projection that references a non-existent column index - let top = Projection::new(vec![ProjectionExpr { + let top = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(Column::new("z", 5)), // Invalid index alias: "result".to_string(), }]); @@ -1907,7 +1907,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col2 AS c, col0 AS a - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "c".to_string(), @@ -1940,7 +1940,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col0 + 1 AS incremented - let projection = Projection::new(vec![ProjectionExpr { + let projection = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("col0", 0)), Operator::Plus, @@ -1974,7 +1974,7 @@ pub(crate) mod tests { ]); // Projection: SELECT col0 AS renamed - let projection = Projection::new(vec![ProjectionExpr { + let projection = ProjectionExprs::new(vec![ProjectionExpr { expr: Arc::new(Column::new("col0", 0)), alias: "renamed".to_string(), }]); @@ -1994,7 +1994,7 @@ pub(crate) mod tests { #[test] fn test_project_schema_empty() -> Result<()> { let input_schema = get_schema(); - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); let output_schema = projection.project_schema(&input_schema)?; @@ -2009,7 +2009,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection: SELECT col1 AS text, col0 AS num - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col1", 1)), alias: "text".to_string(), @@ -2057,7 +2057,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("col0", 0)), @@ -2105,7 +2105,7 @@ pub(crate) mod tests { let input_schema = get_schema(); // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i - let projection = Projection::new(vec![ + let projection = ProjectionExprs::new(vec![ ProjectionExpr { expr: Arc::new(Column::new("col2", 2)), alias: "f".to_string(), @@ -2136,7 +2136,7 @@ pub(crate) mod tests { let input_stats = get_stats(); let input_schema = get_schema(); - let projection = Projection::new(vec![]); + let projection = ProjectionExprs::new(vec![]); let output_stats = projection.project_statistics(input_stats, &input_schema)?; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 4dc88bc566310..2c84570b33d9d 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -53,7 +53,9 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; // Re-exported from datafusion-physical-expr for backwards compatibility // We recommend updating your imports to use datafusion-physical-expr directly -pub use datafusion_physical_expr::projection::{update_expr, Projection, ProjectionExpr}; +pub use datafusion_physical_expr::projection::{ + update_expr, ProjectionExpr, ProjectionExprs, +}; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -65,7 +67,7 @@ use log::trace; #[derive(Debug, Clone)] pub struct ProjectionExec { /// The projection expressions stored as tuples of (expression, output column name) - projection: Projection, + projection: ProjectionExprs, /// The schema once the projection has been applied to the input schema: SchemaRef, /// The input plan @@ -130,7 +132,7 @@ impl ProjectionExec { let input_schema = input.schema(); // convert argument to Vec let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = Projection::new(expr_vec); + let projection = ProjectionExprs::new(expr_vec); let schema = Arc::new(projection.project_schema(&input_schema)?); From 2d1ad86bb475d7a44b4af59a53d3e78e2145e56d Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Thu, 23 Oct 2025 12:40:16 -0400 Subject: [PATCH 03/10] Have FileScanConfig own a ProjectionExprs --- .../filter_pushdown/util.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 74 +++++++++++++------ datafusion/datasource/src/table_schema.rs | 4 + datafusion/physical-expr/src/projection.rs | 45 ++++++++++- .../proto/src/physical_plan/to_proto.rs | 3 +- .../substrait/src/physical_plan/producer.rs | 5 +- 6 files changed, 103 insertions(+), 30 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index f05f3f00281d6..50a7d3af2c540 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -165,7 +165,7 @@ impl FileSource for TestSource { fn with_projection(&self, config: &FileScanConfig) -> Arc { Arc::new(TestSource { - projection: config.projection.clone(), + projection: config.projection.as_ref().map(|p| p.column_indices()), ..self.clone() }) } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4dfb6a4ec3d33..83120dd991979 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -44,18 +44,20 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::BinaryExpr; -use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::projection::ProjectionExpr; +use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, ProjectionExpr, +}; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, - projection::{all_alias_free_columns, new_projections_for_columns}, DisplayAs, DisplayFormatType, }; use std::{ @@ -177,7 +179,7 @@ pub struct FileScanConfig { pub constraints: Constraints, /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. - pub projection: Option>, + pub projection: Option, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -317,10 +319,14 @@ impl FileScanConfigBuilder { self } + pub fn table_schema(&self) -> &SchemaRef { + self.table_schema.table_schema() + } + /// Set the columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. - pub fn with_projection(mut self, projection: Option>) -> Self { - self.projection = projection; + pub fn with_projection(mut self, indices: Option>) -> Self { + self.projection = indices; self } @@ -455,6 +461,10 @@ impl FileScanConfigBuilder { file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); + let projection = projection.as_ref().map(|indices| { + ProjectionExprs::from_indices(indices, table_schema.table_schema()) + }); + FileScanConfig { object_store_url, table_schema, @@ -484,7 +494,7 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection: config.projection, + projection: config.projection.map(|p| p.ordered_column_indices()), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -643,7 +653,8 @@ impl DataSource for FileScanConfig { projection, &file_scan .projection - .clone() + .as_ref() + .map(|p| p.ordered_column_indices()) .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()), ); @@ -697,7 +708,7 @@ impl FileScanConfig { fn projection_indices(&self) -> Vec { match &self.projection { - Some(proj) => proj.clone(), + Some(proj) => proj.ordered_column_indices(), None => (0..self.file_schema().fields().len() + self.table_partition_cols().len()) .collect(), @@ -813,12 +824,18 @@ impl FileScanConfig { } pub fn projected_file_column_names(&self) -> Option> { + let fields = self.file_schema().fields(); + self.projection.as_ref().map(|p| { - p.iter() - .filter(|col_idx| **col_idx < self.file_schema().fields().len()) - .map(|col_idx| self.file_schema().field(*col_idx).name()) + let column_indicies = p.ordered_column_indices(); + + column_indicies + .iter() + .filter_map(|&col_i| { + (col_i < fields.len()).then(|| self.file_schema().field(col_i).name()) + }) .cloned() - .collect() + .collect::>() }) } @@ -845,10 +862,10 @@ impl FileScanConfig { pub fn file_column_projection_indices(&self) -> Option> { self.projection.as_ref().map(|p| { - p.iter() - .filter(|col_idx| **col_idx < self.file_schema().fields().len()) - .copied() - .collect() + p.ordered_column_indices() + .into_iter() + .filter(|&i| i < self.file_schema().fields().len()) + .collect::>() }) } @@ -1384,10 +1401,15 @@ fn get_projected_output_ordering( return false; } + let indices = base_config + .projection + .as_ref() + .map(|p| p.ordered_column_indices()); + let statistics = match MinMaxStatistics::new_from_files( &new_ordering, projected_schema, - base_config.projection.as_deref(), + indices.as_deref(), group.iter(), ) { Ok(statistics) => statistics, @@ -1448,7 +1470,7 @@ mod tests { use datafusion_common::{assert_batches_eq, internal_err}; use datafusion_expr::{Operator, SortExpr}; use datafusion_physical_expr::create_physical_sort_expr; - use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; + use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; /// Returns the column names on the schema @@ -2188,7 +2210,10 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, Some(1000)); - assert_eq!(config.projection, Some(vec![0, 1])); + assert_eq!( + config.projection.as_ref().map(|p| p.column_indices()), + Some(vec![0, 1]) + ); assert_eq!(config.table_partition_cols().len(), 1); assert_eq!(config.table_partition_cols()[0].name(), "date"); assert_eq!(config.file_groups.len(), 1); @@ -2271,7 +2296,7 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, None); - assert_eq!(config.projection, None); + assert_eq!(config.projection.as_ref().map(|p| p.column_indices()), None); assert!(config.table_partition_cols().is_empty()); assert!(config.file_groups.is_empty()); assert_eq!( @@ -2344,7 +2369,10 @@ mod tests { let partition_cols = partition_cols.into_iter().map(Arc::new).collect::>(); assert_eq!(new_config.object_store_url, object_store_url); assert_eq!(*new_config.file_schema(), schema); - assert_eq!(new_config.projection, Some(vec![0, 2])); + assert_eq!( + new_config.projection.as_ref().map(|p| p.column_indices()), + Some(vec![0, 2]) + ); assert_eq!(new_config.limit, Some(10)); assert_eq!(*new_config.table_partition_cols(), partition_cols); assert_eq!(new_config.file_groups.len(), 1); diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 8e95585ce873b..863c123e3b1d2 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -132,6 +132,10 @@ impl TableSchema { table_partition_cols: Vec, ) -> TableSchema { self.table_partition_cols = table_partition_cols; + // Rebuild the table schema with the new partition columns + let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.table_partition_cols.iter().cloned()); + self.table_schema = Arc::new(builder.finish()); self } diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 147e976fad3f4..3443d72ebde06 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -125,6 +125,14 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { } } +impl FromIterator for ProjectionExprs { + fn from_iter>(exprs: T) -> Self { + Self { + exprs: exprs.into_iter().collect::>(), + } + } +} + impl AsRef<[ProjectionExpr]> for ProjectionExprs { fn as_ref(&self) -> &[ProjectionExpr] { &self.exprs @@ -132,8 +140,25 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new(exprs: Vec) -> Self { - Self { exprs } + pub fn new(exprs: I) -> Self + where + I: IntoIterator, + { + Self { + exprs: exprs.into_iter().collect::>(), + } + } + + pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self { + let projection_exprs = indices.into_iter().map(|&i| { + let field = schema.field(i); + ProjectionExpr { + expr: Arc::new(Column::new(field.name(), i)), + alias: field.name().clone(), + } + }); + + Self::from_iter(projection_exprs) } /// Returns an iterator over the projection expressions @@ -251,11 +276,25 @@ impl ProjectionExprs { self.exprs .iter() .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index())) - .sorted_unstable() .dedup() + .sorted_unstable() .collect_vec() } + /// Extract the ordered column indices for a column-only projection. + pub fn ordered_column_indices(&self) -> Vec { + self.exprs + .iter() + .map(|e| { + e.expr + .as_any() + .downcast_ref::() + .expect("Expected column reference in projection") + .index() + }) + .collect() + } + /// Project a schema according to this projection. /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`. diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 399c234191aa7..02c612ee37743 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -534,7 +534,8 @@ pub fn serialize_file_scan_config( projection: conf .projection .as_ref() - .unwrap_or(&(0..schema.fields().len()).collect::>()) + .map(|p| p.column_indices()) + .unwrap_or((0..schema.fields().len()).collect::>()) .iter() .map(|n| *n as u32) .collect(), diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index 63abd14d6f5e1..dab0548de3886 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -94,9 +94,10 @@ pub fn to_substrait_rel( let mut select_struct = None; if let Some(projection) = file_config.projection.as_ref() { let struct_items = projection - .iter() + .column_indices() + .into_iter() .map(|index| StructItem { - field: *index as i32, + field: index as i32, // FIXME: duckdb sets this to None, but it's not clear why. // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191 child: None, From 9e4344ac29668f7902ac835200a7c5fc0ca2e6a9 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Thu, 23 Oct 2025 15:45:09 -0400 Subject: [PATCH 04/10] Fix documentation, typos, clippy --- datafusion/datasource/src/file_scan_config.rs | 26 ++++++++++--------- datafusion/physical-expr/src/projection.rs | 10 +++---- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 83120dd991979..8aa14ad964558 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -177,8 +177,11 @@ pub struct FileScanConfig { pub file_groups: Vec, /// Table constraints pub constraints: Constraints, - /// Columns on which to project the data. Indexes that are higher than the - /// number of columns of `file_schema` refer to `table_partition_cols`. + /// Physical expressions defining the projection to apply when reading data. + /// + /// Each expression in the projection can reference columns from both the file + /// schema and table partition columns. If `None`, all columns from the table + /// schema are projected. pub projection: Option, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. @@ -263,7 +266,7 @@ pub struct FileScanConfigBuilder { table_schema: TableSchema, file_source: Arc, limit: Option, - projection: Option>, + projection_indices: Option>, constraints: Option, file_groups: Vec, statistics: Option, @@ -296,7 +299,7 @@ impl FileScanConfigBuilder { file_compression_type: None, new_lines_in_values: None, limit: None, - projection: None, + projection_indices: None, constraints: None, batch_size: None, expr_adapter_factory: None, @@ -326,7 +329,7 @@ impl FileScanConfigBuilder { /// Set the columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. pub fn with_projection(mut self, indices: Option>) -> Self { - self.projection = indices; + self.projection_indices = indices; self } @@ -439,7 +442,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, - projection, + projection_indices: projection, constraints, file_groups, statistics, @@ -494,7 +497,7 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection: config.projection.map(|p| p.ordered_column_indices()), + projection_indices: config.projection.map(|p| p.ordered_column_indices()), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -827,13 +830,12 @@ impl FileScanConfig { let fields = self.file_schema().fields(); self.projection.as_ref().map(|p| { - let column_indicies = p.ordered_column_indices(); + let column_indices = p.ordered_column_indices(); - column_indicies + column_indices .iter() - .filter_map(|&col_i| { - (col_i < fields.len()).then(|| self.file_schema().field(col_i).name()) - }) + .filter(|&&col_i| col_i < fields.len()) + .map(|&col_i| self.file_schema().field(col_i).name()) .cloned() .collect::>() }) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 3443d72ebde06..18023fc97f9af 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -150,7 +150,7 @@ impl ProjectionExprs { } pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self { - let projection_exprs = indices.into_iter().map(|&i| { + let projection_exprs = indices.iter().map(|&i| { let field = schema.field(i); ProjectionExpr { expr: Arc::new(Column::new(field.name(), i)), @@ -192,7 +192,7 @@ impl ProjectionExprs { /// /// ```rust /// use std::sync::Arc; - /// use datafusion_physical_expr::projection::{Projection, ProjectionExpr}; + /// use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionExpr}; /// use datafusion_physical_expr::expressions::{Column, BinaryExpr, Literal}; /// use datafusion_common::{Result, ScalarValue}; /// use datafusion_expr::Operator; @@ -200,7 +200,7 @@ impl ProjectionExprs { /// fn main() -> Result<()> { /// // Example from the docstring: /// // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z - /// let base = Projection::new(vec![ + /// let base = ProjectionExprs::new(vec![ /// ProjectionExpr { /// expr: Arc::new(Column::new("c", 2)), /// alias: "x".to_string(), @@ -216,7 +216,7 @@ impl ProjectionExprs { /// ]); /// /// // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2 - /// let top = Projection::new(vec![ + /// let top = ProjectionExprs::new(vec![ /// ProjectionExpr { /// expr: Arc::new(BinaryExpr::new( /// Arc::new(Column::new("x", 0)), @@ -276,8 +276,8 @@ impl ProjectionExprs { self.exprs .iter() .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index())) - .dedup() .sorted_unstable() + .dedup() .collect_vec() } From e20b1a031093f393e7dd3fd939531eae8bb238b8 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 11:20:11 -0400 Subject: [PATCH 05/10] Rename FileScanConfig::projection -> FileScanConfig::projection_exprs --- .../filter_pushdown/util.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 32 ++++++++++++------- .../proto/src/physical_plan/to_proto.rs | 2 +- .../substrait/src/physical_plan/producer.rs | 2 +- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 50a7d3af2c540..54e8e7bf04da5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -165,7 +165,7 @@ impl FileSource for TestSource { fn with_projection(&self, config: &FileScanConfig) -> Arc { Arc::new(TestSource { - projection: config.projection.as_ref().map(|p| p.column_indices()), + projection: config.projection_exprs.as_ref().map(|p| p.column_indices()), ..self.clone() }) } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 37a116b850d6b..02f0f32c93da7 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -182,7 +182,7 @@ pub struct FileScanConfig { /// Each expression in the projection can reference columns from both the file /// schema and table partition columns. If `None`, all columns from the table /// schema are projected. - pub projection: Option, + pub projection_exprs: Option, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -473,7 +473,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, - projection, + projection_exprs: projection, constraints, file_groups, output_ordering, @@ -497,7 +497,9 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection_indices: config.projection.map(|p| p.ordered_column_indices()), + projection_indices: config + .projection_exprs + .map(|p| p.ordered_column_indices()), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -655,7 +657,7 @@ impl DataSource for FileScanConfig { let new_projections = new_projections_for_columns( projection, &file_scan - .projection + .projection_exprs .as_ref() .map(|p| p.ordered_column_indices()) .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()), @@ -710,7 +712,7 @@ impl FileScanConfig { } fn projection_indices(&self) -> Vec { - match &self.projection { + match &self.projection_exprs { Some(proj) => proj.ordered_column_indices(), None => (0..self.file_schema().fields().len() + self.table_partition_cols().len()) @@ -808,7 +810,7 @@ impl FileScanConfig { /// Project the schema, constraints, and the statistics on the given column indices pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec) { - if self.projection.is_none() && self.table_partition_cols().is_empty() { + if self.projection_exprs.is_none() && self.table_partition_cols().is_empty() { return ( Arc::clone(self.file_schema()), self.constraints.clone(), @@ -829,7 +831,7 @@ impl FileScanConfig { pub fn projected_file_column_names(&self) -> Option> { let fields = self.file_schema().fields(); - self.projection.as_ref().map(|p| { + self.projection_exprs.as_ref().map(|p| { let column_indices = p.ordered_column_indices(); column_indices @@ -863,7 +865,7 @@ impl FileScanConfig { } pub fn file_column_projection_indices(&self) -> Option> { - self.projection.as_ref().map(|p| { + self.projection_exprs.as_ref().map(|p| { p.ordered_column_indices() .into_iter() .filter(|&i| i < self.file_schema().fields().len()) @@ -1404,7 +1406,7 @@ fn get_projected_output_ordering( } let indices = base_config - .projection + .projection_exprs .as_ref() .map(|p| p.ordered_column_indices()); @@ -2213,7 +2215,7 @@ mod tests { assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, Some(1000)); assert_eq!( - config.projection.as_ref().map(|p| p.column_indices()), + config.projection_exprs.as_ref().map(|p| p.column_indices()), Some(vec![0, 1]) ); assert_eq!(config.table_partition_cols().len(), 1); @@ -2298,7 +2300,10 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, None); - assert_eq!(config.projection.as_ref().map(|p| p.column_indices()), None); + assert_eq!( + config.projection_exprs.as_ref().map(|p| p.column_indices()), + None + ); assert!(config.table_partition_cols().is_empty()); assert!(config.file_groups.is_empty()); assert_eq!( @@ -2372,7 +2377,10 @@ mod tests { assert_eq!(new_config.object_store_url, object_store_url); assert_eq!(*new_config.file_schema(), schema); assert_eq!( - new_config.projection.as_ref().map(|p| p.column_indices()), + new_config + .projection_exprs + .as_ref() + .map(|p| p.column_indices()), Some(vec![0, 2]) ); assert_eq!(new_config.limit, Some(10)); diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 02c612ee37743..dc0a78dbccf11 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -532,7 +532,7 @@ pub fn serialize_file_scan_config( statistics: Some((&conf.file_source.statistics().unwrap()).into()), limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }), projection: conf - .projection + .projection_exprs .as_ref() .map(|p| p.column_indices()) .unwrap_or((0..schema.fields().len()).collect::>()) diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index dab0548de3886..20d41c2e6112a 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -92,7 +92,7 @@ pub fn to_substrait_rel( }; let mut select_struct = None; - if let Some(projection) = file_config.projection.as_ref() { + if let Some(projection) = file_config.projection_exprs.as_ref() { let struct_items = projection .column_indices() .into_iter() From f5091c30792b115739f55db83c565826e8ca9857 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 11:25:51 -0400 Subject: [PATCH 06/10] Add comment for ProjectionExpr::from_indices --- datafusion/physical-expr/src/projection.rs | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 18023fc97f9af..e991772da3eef 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -149,6 +149,49 @@ impl ProjectionExprs { } } + /// Creates a [`ProjectionExpr`] from a list of column indices. + /// + /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column + /// in the input schema. + /// + /// # Behavior + /// - Ordering: the output projection preserves the exact order of indices provided in the input slice + /// For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order + /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column + /// For example, `[0, 0]` creates 2 separate projections both referencing column 0 + /// + /// # Panics + /// Panics if any index in `indices` is out of bounds for the provided schema. + /// + /// # Example + /// + /// ```rust + /// use std::sync::Arc; + /// use arrow::datatypes::{Schema, Field, DataType}; + /// use datafusion_physical_expr::projection::ProjectionExprs; + /// + /// // Create a schema with three columns + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("a", DataType::Int32, false), + /// Field::new("b", DataType::Utf8, false), + /// Field::new("c", DataType::Float64, false), + /// ])); + /// + /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved + /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema); + /// + /// // This creates: SELECT c@2 AS c, a@0 AS a + /// assert_eq!(projection.as_ref().len(), 2); + /// assert_eq!(projection.as_ref()[0].alias, "c"); + /// assert_eq!(projection.as_ref()[1].alias, "a"); + /// + /// // Duplicate indices are allowed + /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema); + /// assert_eq!(projection_with_dups.as_ref().len(), 3); + /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a"); + /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate + /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b"); + /// ``` pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self { let projection_exprs = indices.iter().map(|&i| { let field = schema.field(i); From 59513012806aeb4e2b35f506aaa825489d547e79 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 11:46:44 -0400 Subject: [PATCH 07/10] Add comments for ProjectionExprs::ordered_column_indices --- datafusion/physical-expr/src/projection.rs | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index e991772da3eef..fc972d644e677 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -325,6 +325,32 @@ impl ProjectionExprs { } /// Extract the ordered column indices for a column-only projection. + /// + /// This function assumes that all expressions in the projection are simple column references. + /// It returns the column indices in the order they appear in the projection. + /// + /// # Panics + /// + /// Panics if any expression in the projection is not a simple column reference. This includes: + /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`) + /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`) + /// - Literals (e.g., `42`, `'hello'`) + /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`) + /// + /// # Returns + /// + /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices), + /// this function: + /// - Preserves the projection order (does not sort) + /// - Preserves duplicates (does not deduplicate) + /// + /// # Example + /// + /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2, + /// this function would return `[2, 0, 2]`. + /// + /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain + /// non-column expressions or if you need a deduplicated sorted list. pub fn ordered_column_indices(&self) -> Vec { self.exprs .iter() From 6d6776ace7b6e923ae9e722a15777e5763c72078 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 12:42:25 -0400 Subject: [PATCH 08/10] Deprecate with_projection --- .../examples/advanced_parquet_index.rs | 2 +- .../examples/csv_json_opener.rs | 4 +-- .../examples/default_column_values.rs | 2 +- datafusion-examples/examples/parquet_index.rs | 2 +- datafusion/catalog-listing/src/table.rs | 2 +- .../core/src/datasource/file_format/mod.rs | 2 +- .../core/src/datasource/physical_plan/avro.rs | 6 ++-- .../core/src/datasource/physical_plan/csv.rs | 6 ++-- .../core/src/datasource/physical_plan/json.rs | 4 +-- .../src/datasource/physical_plan/parquet.rs | 4 +-- .../core/tests/parquet/schema_coercion.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 6 ++-- datafusion/datasource/src/file_scan_config.rs | 35 +++++++++++++------ .../proto/src/physical_plan/from_proto.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 4 +-- .../substrait/src/physical_plan/consumer.rs | 4 +-- 16 files changed, 50 insertions(+), 37 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 55400e2192832..1c560be6d08a6 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider { let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema, file_source) .with_limit(limit) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_file(partitioned_file) .build(); diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 1a2c2cbff4183..8abed90238d40 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -60,7 +60,7 @@ async fn csv_opener() -> Result<()> { Arc::clone(&schema), Arc::new(CsvSource::default()), ) - .with_projection(Some(vec![12, 0])) + .with_projection_indices(Some(vec![12, 0])) .with_limit(Some(5)) .with_file(PartitionedFile::new(path.display().to_string(), 10)) .build(); @@ -126,7 +126,7 @@ async fn json_opener() -> Result<()> { schema, Arc::new(JsonSource::default()), ) - .with_projection(Some(vec![1, 0])) + .with_projection_indices(Some(vec![1, 0])) .with_limit(Some(5)) .with_file(PartitionedFile::new(path.to_string(), 10)) .build(); diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs index 43e2d4ca09884..d3a7d2ec67f3c 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/default_column_values.rs @@ -260,7 +260,7 @@ impl TableProvider for DefaultValueTableProvider { self.schema.clone(), Arc::new(parquet_source), ) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_limit(limit) .with_file_group(file_group) .with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _)); diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index afc3b279f4a9f..127c55da982c8 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider { let source = Arc::new(ParquetSource::default().with_predicate(predicate)); let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, self.schema(), source) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_limit(limit); // Transform to the format needed to pass to DataSourceExec diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index e9ac1bf097a22..95f9523d4401c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -499,7 +499,7 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection) + .with_projection_indices(projection) .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e165707c2eb0e..4881783eeba69 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -90,7 +90,7 @@ pub(crate) mod test_util { ) .with_file_groups(file_groups) .with_statistics(statistics) - .with_projection(projection) + .with_projection_indices(projection) .with_limit(limit) .build(), ) diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 8a00af959ccc9..9068c9758179d 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -88,7 +88,7 @@ mod tests { source, ) .with_file(meta.into()) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); let source_exec = DataSourceExec::from_data_source(conf); @@ -160,7 +160,7 @@ mod tests { let source = Arc::new(AvroSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file(meta.into()) - .with_projection(projection) + .with_projection_indices(projection) .build(); let source_exec = DataSourceExec::from_data_source(conf); @@ -231,7 +231,7 @@ mod tests { let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. - .with_projection(projection) + .with_projection_indices(projection) .with_file(partitioned_file) .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) .build(); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index b2ef51a76f89a..4f46a57d8b137 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -118,7 +118,7 @@ mod tests { )) .with_file_compression_type(file_compression_type) .with_newlines_in_values(false) - .with_projection(Some(vec![0, 2, 4])) + .with_projection_indices(Some(vec![0, 2, 4])) .build(); assert_eq!(13, config.file_schema().fields().len()); @@ -183,7 +183,7 @@ mod tests { )) .with_newlines_in_values(false) .with_file_compression_type(file_compression_type.to_owned()) - .with_projection(Some(vec![4, 0, 2])) + .with_projection_indices(Some(vec![4, 0, 2])) .build(); assert_eq!(13, config.file_schema().fields().len()); let csv = DataSourceExec::from_data_source(config); @@ -373,7 +373,7 @@ mod tests { .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) // We should be able to project on the partition column // Which is supposed to be after the file fields - .with_projection(Some(vec![0, num_file_schema_fields])) + .with_projection_indices(Some(vec![0, num_file_schema_fields])) .build(); // we don't have `/date=xx/` in the path but that is ok because diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 0d45711c76fb0..f7d5c710bf48a 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -297,7 +297,7 @@ mod tests { let source = Arc::new(JsonSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) - .with_projection(Some(vec![0, 2])) + .with_projection_indices(Some(vec![0, 2])) .with_file_compression_type(file_compression_type.to_owned()) .build(); let exec = DataSourceExec::from_data_source(conf); @@ -345,7 +345,7 @@ mod tests { let source = Arc::new(JsonSource::new()); let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) - .with_projection(Some(vec![3, 0, 2])) + .with_projection_indices(Some(vec![3, 0, 2])) .with_file_compression_type(file_compression_type.to_owned()) .build(); let exec = DataSourceExec::from_data_source(conf); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 10a475c1cc9a6..6df5cd7ac68ff 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -201,7 +201,7 @@ mod tests { source, ) .with_file_group(file_group) - .with_projection(self.projection.clone()) + .with_projection_indices(self.projection.clone()) .build(); DataSourceExec::from_data_source(base_config) } @@ -1655,7 +1655,7 @@ mod tests { let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source) .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day - .with_projection(Some(vec![0, 1, 2, 12, 13])) + .with_projection_indices(Some(vec![0, 1, 2, 12, 13])) .with_table_partition_cols(vec![ Field::new("year", DataType::Utf8, false), Field::new("month", DataType::UInt8, false), diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 59cbf4b0872ea..9be391a9108e6 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -126,7 +126,7 @@ async fn multi_parquet_coercion_projection() { Arc::new(ParquetSource::default()), ) .with_file_group(file_group) - .with_projection(Some(vec![1, 0, 2])) + .with_projection_indices(Some(vec![1, 0, 2])) .build(); let parquet_exec = DataSourceExec::from_data_source(config); diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index c51a5e02c9c33..8631613c3925e 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -390,7 +390,7 @@ fn create_simple_csv_exec() -> Arc { Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![0, 1, 2, 3, 4])) + .with_projection_indices(Some(vec![0, 1, 2, 3, 4])) .build(); DataSourceExec::from_data_source(config) @@ -409,7 +409,7 @@ fn create_projecting_csv_exec() -> Arc { Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![3, 2, 1])) + .with_projection_indices(Some(vec![3, 2, 1])) .build(); DataSourceExec::from_data_source(config) @@ -1596,7 +1596,7 @@ fn partitioned_data_source() -> Arc { ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)]) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); DataSourceExec::from_data_source(config) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 02f0f32c93da7..5e71830a3f0e9 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -126,7 +126,7 @@ use log::{debug, warn}; /// let file_source = Arc::new(ParquetSource::new()); /// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source) /// .with_limit(Some(1000)) // read only the first 1000 records -/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 +/// .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3 /// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group /// .with_file(PartitionedFile::new("file1.parquet", 1234)) /// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes @@ -234,7 +234,7 @@ pub struct FileScanConfig { /// // Set a limit of 1000 rows /// .with_limit(Some(1000)) /// // Project only the first column -/// .with_projection(Some(vec![0])) +/// .with_projection_indices(Some(vec![0])) /// // Add partition columns /// .with_table_partition_cols(vec![ /// Field::new("date", DataType::Utf8, false), @@ -328,7 +328,18 @@ impl FileScanConfigBuilder { /// Set the columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. - pub fn with_projection(mut self, indices: Option>) -> Self { + /// + /// # Deprecated + /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release. + #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")] + pub fn with_projection(self, indices: Option>) -> Self { + self.with_projection_indices(indices) + } + + /// Set the columns on which to project the data using column indices. + /// + /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`. + pub fn with_projection_indices(mut self, indices: Option>) -> Self { self.projection_indices = indices; self } @@ -464,8 +475,10 @@ impl FileScanConfigBuilder { file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); - let projection = projection_indices.as_ref().map(|indices| { - ProjectionExprs::from_indices(indices, table_schema.table_schema()) + // Convert projection indices to ProjectionExprs using the final table schema + // (which now includes partition columns if they were added) + let projection_exprs = projection_indices.map(|indices| { + ProjectionExprs::from_indices(&indices, table_schema.table_schema()) }); FileScanConfig { @@ -473,7 +486,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, - projection_exprs: projection, + projection_exprs, constraints, file_groups, output_ordering, @@ -666,7 +679,7 @@ impl DataSource for FileScanConfig { Arc::new( FileScanConfigBuilder::from(file_scan) // Assign projected statistics to source - .with_projection(Some(new_projections)) + .with_projection_indices(Some(new_projections)) .with_source(source) .build(), ) as _ @@ -2138,7 +2151,7 @@ mod tests { file_schema, Arc::new(MockSource::default()), ) - .with_projection(projection) + .with_projection_indices(projection) .with_statistics(statistics) .with_table_partition_cols(table_partition_cols) .build() @@ -2191,7 +2204,7 @@ mod tests { // Build with various configurations let config = builder .with_limit(Some(1000)) - .with_projection(Some(vec![0, 1])) + .with_projection_indices(Some(vec![0, 1])) .with_table_partition_cols(vec![Field::new( "date", wrap_partition_type_in_dict(DataType::Utf8), @@ -2251,7 +2264,7 @@ mod tests { Arc::clone(&file_schema), Arc::clone(&file_source), ) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); // Simulate projection being updated. Since the filter has already been pushed down, @@ -2358,7 +2371,7 @@ mod tests { Arc::clone(&schema), Arc::clone(&file_source), ) - .with_projection(Some(vec![0, 2])) + .with_projection_indices(Some(vec![0, 2])) .with_limit(Some(10)) .with_table_partition_cols(partition_cols.clone()) .with_file(file.clone()) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 7c4b9e55b8137..2a3906d493476 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -545,7 +545,7 @@ pub fn parse_protobuf_file_scan_config( .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) - .with_projection(Some(projection)) + .with_projection_indices(Some(projection)) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) .with_output_ordering(output_ordering) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a0456e2031be0..c8b2bc02e447b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -920,7 +920,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { schema, file_source, ) - .with_projection(Some(vec![0, 1])) + .with_projection_indices(Some(vec![0, 1])) .with_file_group(FileGroup::new(vec![file_group])) .with_table_partition_cols(vec![Field::new( "part".to_string(), @@ -1814,7 +1814,7 @@ async fn roundtrip_projection_source() -> Result<()> { 1024, )])]) .with_statistics(statistics) - .with_projection(Some(vec![0, 1, 2])) + .with_projection_indices(Some(vec![0, 1, 2])) .build(); let filter = Arc::new( diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ecf465dd3f18d..45a19cea80cfc 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -151,8 +151,8 @@ pub async fn from_substrait_rel( .iter() .map(|item| item.field as usize) .collect(); - base_config_builder = - base_config_builder.with_projection(Some(column_indices)); + base_config_builder = base_config_builder + .with_projection_indices(Some(column_indices)); } } From 8eedb93f230c07e433b3150949c39418b9975abf Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 12:53:20 -0400 Subject: [PATCH 09/10] Add notes to upgrading guide --- docs/source/library-user-guide/upgrading.md | 50 +++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 8b03193e7f992..241e189ef7008 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -116,6 +116,56 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. +### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs` + +The `projection` field in `FileScanConfig` has been renamed to `projection_exprs` and its type has changed from `Option>` to `Option`. This change enables more powerful projection pushdown capabilities by supporting arbitrary physical expressions rather than just column indices. + +**Impact on direct field access:** + +If you directly access the `projection` field: + +```rust +# /* comment to avoid running +let config: FileScanConfig = ...; +let projection = config.projection; +# */ +``` + +You should update to: + +```rust +# /* comment to avoid running +let config: FileScanConfig = ...; +let projection_exprs = config.projection_exprs; +# */ +``` + +**Impact on builders:** + +The `FileScanConfigBuilder::with_projection()` method has been deprecated in favor of `with_projection_indices()`: + +```diff +let config = FileScanConfigBuilder::new(url, schema, file_source) +- .with_projection(Some(vec![0, 2, 3])) ++ .with_projection_indices(Some(vec![0, 2, 3])) + .build(); +``` + +Note: `with_projection()` still works but is deprecated and will be removed in a future release. + +**What is `ProjectionExprs`?** + +`ProjectionExprs` is a new type that represents a list of physical expressions for projection. While it can be constructed from column indices (which is what `with_projection_indices` does internally), it also supports arbitrary physical expressions, enabling advanced features like expression evaluation during scanning. + +You can access column indices from `ProjectionExprs` using its methods if needed: + +```rust +# /* comment to avoid running +let projection_exprs: ProjectionExprs = ...; +// Get the column indices if the projection only contains simple column references +let indices = projection_exprs.column_indices(); +# */ +``` ## DataFusion `50.0.0` ### ListingTable automatically detects Hive Partitioned tables From c6549ddd3171ee65cc58d0f87ba2690a2261ee20 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Mon, 27 Oct 2025 13:24:56 -0400 Subject: [PATCH 10/10] Fix up clippy --- datafusion/datasource/src/file_scan_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7fe83cf0779ee..c52397d9a7cc6 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2645,7 +2645,7 @@ mod tests { Arc::clone(&schema), Arc::new(MockSource::default()), ) - .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2 + .with_projection_indices(Some(vec![0, 2])) // Only project columns 0 and 2 .with_file_groups(vec![file_group]) .build();