diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f1a99a7714ac4..fc93cc7e282e3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -81,7 +81,7 @@ use datafusion_expr::{ WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::execution_plan::InvariantLevel; @@ -2006,7 +2006,8 @@ impl DefaultPhysicalPlanner { input: &Arc, expr: &[Expr], ) -> Result> { - let input_schema = input.as_ref().schema(); + let input_logical_schema = input.as_ref().schema(); + let input_physical_schema = input_exec.schema(); let physical_exprs = expr .iter() .map(|e| { @@ -2025,7 +2026,7 @@ impl DefaultPhysicalPlanner { // This depends on the invariant that logical schema field index MUST match // with physical schema field index. let physical_name = if let Expr::Column(col) = e { - match input_schema.index_of_column(col) { + match input_logical_schema.index_of_column(col) { Ok(idx) => { // index physical field using logical field index Ok(input_exec.schema().field(idx).name().to_string()) @@ -2038,10 +2039,14 @@ impl DefaultPhysicalPlanner { physical_name(e) }; - tuple_err(( - self.create_physical_expr(e, input_schema, session_state), - physical_name, - )) + let physical_expr = + self.create_physical_expr(e, input_logical_schema, session_state); + + // Check for possible column name mismatches + let final_physical_expr = + maybe_fix_physical_column_name(physical_expr, &input_physical_schema); + + tuple_err((final_physical_expr, physical_name)) }) .collect::>>()?; @@ -2061,6 +2066,40 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } +// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names. +// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names. +// +// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'), +// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`. +fn maybe_fix_physical_column_name( + expr: Result>, + input_physical_schema: &SchemaRef, +) -> Result> { + if let Ok(e) = &expr { + if let Some(column) = e.as_any().downcast_ref::() { + let physical_field = input_physical_schema.field(column.index()); + let expr_col_name = column.name(); + let physical_name = physical_field.name(); + + if physical_name != expr_col_name { + // handle edge cases where the physical_name contains ':'. + let colon_count = physical_name.matches(':').count(); + let mut splits = expr_col_name.match_indices(':'); + let split_pos = splits.nth(colon_count); + + if let Some((idx, _)) = split_pos { + let base_name = &expr_col_name[..idx]; + if base_name == physical_name { + let updated_column = Column::new(physical_name, column.index()); + return Ok(Arc::new(updated_column)); + } + } + } + } + } + expr +} + struct OptimizationInvariantChecker<'a> { rule: &'a Arc, } @@ -2656,6 +2695,30 @@ mod tests { } } + #[tokio::test] + async fn test_maybe_fix_colon_in_physical_name() { + // The physical schema has a field name with a colon + let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]); + let schema_ref: SchemaRef = Arc::new(schema); + + // What might happen after deduplication + let logical_col_name = "metric:avg:1"; + let expr_with_suffix = + Arc::new(Column::new(logical_col_name, 0)) as Arc; + let expr_result = Ok(expr_with_suffix); + + // Call function under test + let fixed_expr = + maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap(); + + // Downcast back to Column so we can check the name + let col = fixed_expr + .as_any() + .downcast_ref::() + .expect("Column"); + + assert_eq!(col.name(), "metric:avg"); + } struct ErrorExtensionPlanner {} #[async_trait] diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index d10243fbab452..a33339091c85d 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -22,7 +22,7 @@ use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::Result; +use datafusion_common::{internal_err, Result}; /// Stores the mapping between source expressions and target expressions for a /// projection. @@ -66,8 +66,8 @@ impl ProjectionMapping { let idx = col.index(); let matching_input_field = input_schema.field(idx); if col.name() != matching_input_field.name() { - let fixed_col = Column::new(col.name(), idx); - return Ok(Transformed::yes(Arc::new(fixed_col))); + return internal_err!("Input field name {} does not match with the projection expression {}", + matching_input_field.name(),col.name()) } let matching_input_column = Column::new(matching_input_field.name(), idx);