diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4d1fd756510d2..413b809c4e6ab 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -45,7 +45,7 @@ use crate::physical_plan::joins::{ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, }; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; @@ -2185,17 +2185,25 @@ impl DefaultPhysicalPlanner { PlannedExprResult::ExprWithName(physical_exprs), input_physical_schema.as_ref(), )? { - PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => Ok( - Arc::new(ProjectionExec::try_new(physical_exprs, input_exec)?), - ), + PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => { + let proj_exprs: Vec = physical_exprs + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?)) + } PlanAsyncExpr::Async( async_map, PlannedExprResult::ExprWithName(physical_exprs), ) => { let async_exec = AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?; + let proj_exprs: Vec = physical_exprs + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); let new_proj_exec = - ProjectionExec::try_new(physical_exprs, Arc::new(async_exec))?; + ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?; Ok(Arc::new(new_proj_exec)) } _ => internal_err!("Unexpected PlanAsyncExpressions variant"), @@ -2700,7 +2708,7 @@ mod tests { let execution_plan = plan(&logical_plan).await?; // verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated. - let expected = "expr: [(BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }"; + let expected = "expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }"; let actual = format!("{execution_plan:?}"); assert!(actual.contains(expected), "{}", actual); diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index f2c4fde2c0143..a775ef4270f3e 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -46,7 +46,7 @@ use datafusion_physical_plan::coop::make_cooperative; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -651,7 +651,7 @@ async fn join_agg_yields( // Project only one column (“value” from the left side) because we just want to sum that let input_schema = join.schema(); - let proj_expr = vec![( + let proj_expr = vec![ProjectionExpr::new( Arc::new(Column::new_with_schema("value", &input_schema)?) as _, "value".to_string(), )]; diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 7e0528581ce9f..e0826c90dd8d2 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -62,7 +62,7 @@ use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::JoinOn; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ @@ -243,7 +243,10 @@ fn projection_exec_with_alias( ) -> Arc { let mut exprs = vec![]; for (column, alias) in alias_pairs.iter() { - exprs.push((col(column, &input.schema()).unwrap(), alias.to_string())); + exprs.push(ProjectionExpr { + expr: col(column, &input.schema()).unwrap(), + alias: alias.to_string(), + }); } Arc::new(ProjectionExec::try_new(exprs, input).unwrap()) } @@ -2207,14 +2210,14 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { #[test] fn repartition_transitively_with_projection() -> Result<()> { let schema = schema(); - let proj_exprs = vec![( - Arc::new(BinaryExpr::new( + let proj_exprs = vec![ProjectionExpr { + expr: Arc::new(BinaryExpr::new( col("a", &schema)?, Operator::Plus, col("b", &schema)?, )) as _, - "sum".to_string(), - )]; + alias: "sum".to_string(), + }]; // non sorted input let proj = Arc::new(ProjectionExec::try_new(proj_exprs, parquet_exec())?); let sort_key = [PhysicalSortExpr { diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index ee647e0019613..7ae1d6e50dc3f 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -235,12 +235,12 @@ async fn test_join_with_swap() { .expect("A proj is required to swap columns back to their original order"); assert_eq!(swapping_projection.expr().len(), 2); - let (col, name) = &swapping_projection.expr()[0]; - assert_eq!(name, "big_col"); - assert_col_expr(col, "big_col", 1); - let (col, name) = &swapping_projection.expr()[1]; - assert_eq!(name, "small_col"); - assert_col_expr(col, "small_col", 0); + let proj_expr = &swapping_projection.expr()[0]; + assert_eq!(proj_expr.alias, "big_col"); + assert_col_expr(&proj_expr.expr, "big_col", 1); + let proj_expr = &swapping_projection.expr()[1]; + assert_eq!(proj_expr.alias, "small_col"); + assert_col_expr(&proj_expr.expr, "small_col", 0); let swapped_join = swapping_projection .input() @@ -526,12 +526,12 @@ async fn test_nl_join_with_swap(join_type: JoinType) { .expect("A proj is required to swap columns back to their original order"); assert_eq!(swapping_projection.expr().len(), 2); - let (col, name) = &swapping_projection.expr()[0]; - assert_eq!(name, "big_col"); - assert_col_expr(col, "big_col", 1); - let (col, name) = &swapping_projection.expr()[1]; - assert_eq!(name, "small_col"); - assert_col_expr(col, "small_col", 0); + let proj_expr = &swapping_projection.expr()[0]; + assert_eq!(proj_expr.alias, "big_col"); + assert_col_expr(&proj_expr.expr, "big_col", 1); + let proj_expr = &swapping_projection.expr()[1]; + assert_eq!(proj_expr.alias, "small_col"); + assert_col_expr(&proj_expr.expr, "small_col", 0); let swapped_join = swapping_projection .input() diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index df1032e0652e1..7e9d5bf1b901b 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -47,7 +47,7 @@ mod test { use datafusion_physical_plan::joins::CrossJoinExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; - use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::UnionExec; @@ -235,8 +235,10 @@ mod test { async fn test_statistics_by_partition_of_projection() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; // Add projection execution plan - let exprs: Vec<(Arc, String)> = - vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; + let exprs = vec![ProjectionExpr { + expr: Arc::new(Column::new("id", 0)) as Arc, + alias: "id".to_string(), + }]; let projection: Arc = Arc::new(ProjectionExec::try_new(exprs, scan)?); let statistics = (0..projection.output_partitioning().partition_count()) diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 149c50557c3a3..7160ed4184b04 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -51,7 +51,7 @@ use datafusion_physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, SymmetricHashJoinExec, }; -use datafusion_physical_plan::projection::{update_expr, ProjectionExec}; +use datafusion_physical_plan::projection::{update_expr, ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -224,8 +224,12 @@ fn test_update_matching_exprs() -> Result<()> { )?), ]; + let child_exprs: Vec = child + .iter() + .map(|(expr, alias)| ProjectionExpr::new(expr.clone(), alias.clone())) + .collect(); for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { - assert!(update_expr(&expr, &child, true)? + assert!(update_expr(&expr, &child_exprs, true)? .unwrap() .eq(&expected_expr)); } @@ -359,8 +363,12 @@ fn test_update_projected_exprs() -> Result<()> { )?), ]; + let proj_exprs: Vec = projected_exprs + .iter() + .map(|(expr, alias)| ProjectionExpr::new(expr.clone(), alias.clone())) + .collect(); for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { - assert!(update_expr(&expr, &projected_exprs, false)? + assert!(update_expr(&expr, &proj_exprs, false)? .unwrap() .eq(&expected_expr)); } @@ -424,8 +432,8 @@ fn test_csv_after_projection() -> Result<()> { let csv = create_projecting_csv_exec(); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("b", 2)), "b".to_string()), - (Arc::new(Column::new("d", 0)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 2)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 0)), "d".to_string()), ], csv.clone(), )?); @@ -461,9 +469,9 @@ fn test_memory_after_projection() -> Result<()> { let memory = create_projecting_memory_exec(); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("d", 2)), "d".to_string()), - (Arc::new(Column::new("e", 3)), "e".to_string()), - (Arc::new(Column::new("a", 1)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 2)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("e", 3)), "e".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 1)), "a".to_string()), ], memory.clone(), )?); @@ -567,9 +575,9 @@ fn test_streaming_table_after_projection() -> Result<()> { )?; let projection = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("d", 3)), "d".to_string()), - (Arc::new(Column::new("e", 2)), "e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("e", 2)), "e".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()), ], Arc::new(streaming_table) as _, )?) as _; @@ -634,17 +642,17 @@ fn test_projection_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); let child_projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("e", 4)), "new_e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "new_b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("e", 4)), "new_e".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "new_b".to_string()), ], csv.clone(), )?); let top_projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("new_b", 3)), "new_b".to_string()), - ( + ProjectionExpr::new(Arc::new(Column::new("new_b", 3)), "new_b".to_string()), + ProjectionExpr::new( Arc::new(BinaryExpr::new( Arc::new(Column::new("c", 0)), Operator::Plus, @@ -652,7 +660,10 @@ fn test_projection_after_projection() -> Result<()> { )), "binary".to_string(), ), - (Arc::new(Column::new("new_b", 3)), "newest_b".to_string()), + ProjectionExpr::new( + Arc::new(Column::new("new_b", 3)), + "newest_b".to_string(), + ), ], child_projection.clone(), )?); @@ -720,9 +731,9 @@ fn test_output_req_after_projection() -> Result<()> { )); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), ], sort_req.clone(), )?); @@ -812,9 +823,9 @@ fn test_coalesce_partitions_after_projection() -> Result<()> { Arc::new(CoalescePartitionsExec::new(csv)); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("a", 0)), "a_new".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()), ], coalesce_partitions, )?); @@ -869,9 +880,9 @@ fn test_filter_after_projection() -> Result<()> { let filter = Arc::new(FilterExec::try_new(predicate, csv)?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("a", 0)), "a_new".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()), ], filter.clone(), )?) as _; @@ -964,11 +975,17 @@ fn test_join_after_projection() -> Result<()> { )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), - (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), - (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), - (Arc::new(Column::new("a", 5)), "a_from_right".to_string()), - (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left".to_string()), + ProjectionExpr::new( + Arc::new(Column::new("a", 5)), + "a_from_right".to_string(), + ), + ProjectionExpr::new( + Arc::new(Column::new("c", 7)), + "c_from_right".to_string(), + ), ], join, )?) as _; @@ -1089,16 +1106,16 @@ fn test_join_after_required_projection() -> Result<()> { )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("a", 5)), "a".to_string()), - (Arc::new(Column::new("b", 6)), "b".to_string()), - (Arc::new(Column::new("c", 7)), "c".to_string()), - (Arc::new(Column::new("d", 8)), "d".to_string()), - (Arc::new(Column::new("e", 9)), "e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), - (Arc::new(Column::new("e", 4)), "e".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 5)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 6)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 8)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("e", 9)), "e".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()), + ProjectionExpr::new(Arc::new(Column::new("e", 4)), "e".to_string()), ], join, )?) as _; @@ -1178,7 +1195,7 @@ fn test_nested_loop_join_after_projection() -> Result<()> { )?) as _; let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![(col_left_c, "c".to_string())], + vec![ProjectionExpr::new(col_left_c, "c".to_string())], Arc::clone(&join), )?) as _; let initial = displayable(projection.as_ref()).indent(true).to_string(); @@ -1268,10 +1285,13 @@ fn test_hash_join_after_projection() -> Result<()> { )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), - (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), - (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), - (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left".to_string()), + ProjectionExpr::new( + Arc::new(Column::new("c", 7)), + "c_from_right".to_string(), + ), ], join.clone(), )?) as _; @@ -1307,10 +1327,10 @@ fn test_hash_join_after_projection() -> Result<()> { let projection = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("c", 7)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c".to_string()), ], join.clone(), )?); @@ -1351,9 +1371,9 @@ fn test_repartition_after_projection() -> Result<()> { )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("b", 1)), "b_new".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("d", 3)), "d_new".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_new".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d_new".to_string()), ], repartition, )?) as _; @@ -1421,9 +1441,9 @@ fn test_sort_after_projection() -> Result<()> { ); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), ], Arc::new(sort_exec), )?) as _; @@ -1475,9 +1495,9 @@ fn test_sort_preserving_after_projection() -> Result<()> { ); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), ], Arc::new(sort_exec), )?) as _; @@ -1518,9 +1538,9 @@ fn test_union_after_projection() -> Result<()> { let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), + ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()), ], union.clone(), )?) as _; @@ -1589,15 +1609,15 @@ fn test_partition_col_projection_pushdown() -> Result<()> { let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - ( + ProjectionExpr::new( col("string_col", partitioned_schema.as_ref())?, "string_col".to_string(), ), - ( + ProjectionExpr::new( col("partition_col", partitioned_schema.as_ref())?, "partition_col".to_string(), ), - ( + ProjectionExpr::new( col("int_col", partitioned_schema.as_ref())?, "int_col".to_string(), ), @@ -1630,11 +1650,11 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ - ( + ProjectionExpr::new( col("string_col", partitioned_schema.as_ref())?, "string_col".to_string(), ), - ( + ProjectionExpr::new( // CAST(partition_col, Utf8View) cast( col("partition_col", partitioned_schema.as_ref())?, @@ -1643,7 +1663,7 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { )?, "partition_col".to_string(), ), - ( + ProjectionExpr::new( col("int_col", partitioned_schema.as_ref())?, "int_col".to_string(), ), diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 7f7926060edc4..6ed886af7b13b 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -56,7 +56,7 @@ use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::{JoinFilter, JoinOn}; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -381,7 +381,11 @@ pub fn projection_exec( expr: Vec<(Arc, String)>, input: Arc, ) -> Result> { - Ok(Arc::new(ProjectionExec::try_new(expr, input)?)) + let proj_exprs: Vec = expr + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?)) } /// A test [`ExecutionPlan`] whose requirements can be configured. diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 97b7e5761e4fc..82d7057a98608 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -639,8 +639,10 @@ impl DataSource for FileScanConfig { // This process can be moved into CsvExec, but it would be an overlap of their responsibility. // Must be all column references, with no table partition columns (which can not be projected) - let partitioned_columns_in_proj = projection.iter().any(|(expr, _)| { - expr.as_any() + let partitioned_columns_in_proj = projection.iter().any(|proj_expr| { + proj_expr + .expr + .as_any() .downcast_ref::() .map(|expr| expr.index() >= self.file_schema.fields().len()) .unwrap_or(false) diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 6c44c8fe86c5e..672317060d902 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -22,7 +22,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs}; use datafusion_physical_plan::{expressions, ExecutionPlan}; use std::sync::Arc; @@ -67,8 +67,10 @@ impl PhysicalOptimizerRule for AggregateStatistics { if let Some((optimizable_statistic, name)) = take_optimizable_value_from_statistics(&statistics_args, expr) { - projections - .push((expressions::lit(optimizable_statistic), name.to_owned())); + projections.push(ProjectionExpr { + expr: expressions::lit(optimizable_statistic), + alias: name.to_owned(), + }); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) break; diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 88dcd4c523cf4..898386e2f9880 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -50,7 +50,7 @@ use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::{ CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec, }; -use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::tree_node::PlanContext; @@ -407,7 +407,11 @@ pub fn adjust_input_keys_ordering( // For Projection, we need to transform the requirements to the columns before the Projection // And then to push down the requirements // Construct a mapping from new name to the original Column - let new_required = map_columns_before_projection(&requirements.data, expr); + let proj_exprs: Vec<_> = expr + .iter() + .map(|p| (Arc::clone(&p.expr), p.alias.clone())) + .collect(); + let new_required = map_columns_before_projection(&requirements.data, &proj_exprs); if new_required.len() == requirements.data.len() { requirements.children[0].data = new_required; } else { @@ -544,7 +548,10 @@ pub fn reorder_aggregate_keys( .map(|col| { let name = col.name(); let index = agg_schema.index_of(name)?; - Ok((Arc::new(Column::new(name, index)) as _, name.to_owned())) + Ok(ProjectionExpr { + expr: Arc::new(Column::new(name, index)) as _, + alias: name.to_owned(), + }) }) .collect::>>()?; let agg_fields = agg_schema.fields(); @@ -553,7 +560,10 @@ pub fn reorder_aggregate_keys( { let name = field.name(); let plan = Arc::new(Column::new(name, idx)) as _; - proj_exprs.push((plan, name.clone())) + proj_exprs.push(ProjectionExpr { + expr: plan, + alias: name.clone(), + }) } return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| { PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]) diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index bff0b1e49684f..b7505f0df4edb 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -110,11 +110,12 @@ impl TopKAggregation { } } else if let Some(proj) = plan.as_any().downcast_ref::() { // track renames due to successive projections - for (src_expr, proj_name) in proj.expr() { - let Some(src_col) = src_expr.as_any().downcast_ref::() else { + for proj_expr in proj.expr() { + let Some(src_col) = proj_expr.expr.as_any().downcast_ref::() + else { continue; }; - if *proj_name == cur_col_name { + if proj_expr.alias == cur_col_name { cur_col_name = src_col.name().to_string(); } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 066b7a95e9cef..0a811a8826818 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -34,7 +34,7 @@ use crate::filter_pushdown::{ }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, - ProjectionExec, + ProjectionExec, ProjectionExpr, }; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, @@ -516,11 +516,11 @@ impl ExecutionPlan for FilterExec { .iter() .map(|p| { let field = filter_child_schema.field(*p).clone(); - ( - Arc::new(Column::new(field.name(), *p)) + ProjectionExpr { + expr: Arc::new(Column::new(field.name(), *p)) as Arc, - field.name().to_string(), - ) + alias: field.name().to_string(), + } }) .collect::>(); Some(Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf665f2a5ad0d..d392650f88dda 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -28,7 +28,7 @@ use std::task::{Context, Poll}; use crate::joins::SharedBitmapBuilder; use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; -use crate::projection::ProjectionExec; +use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, }; @@ -1589,20 +1589,27 @@ pub fn reorder_output_after_swap( fn swap_reverting_projection( left_schema: &Schema, right_schema: &Schema, -) -> Vec<(Arc, String)> { - let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| { - ( - Arc::new(Column::new(f.name(), i)) as Arc, - f.name().to_owned(), - ) - }); +) -> Vec { + let right_cols = + right_schema + .fields() + .iter() + .enumerate() + .map(|(i, f)| ProjectionExpr { + expr: Arc::new(Column::new(f.name(), i)) as Arc, + alias: f.name().to_owned(), + }); let right_len = right_cols.len(); - let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| { - ( - Arc::new(Column::new(f.name(), right_len + i)) as Arc, - f.name().to_owned(), - ) - }); + let left_cols = + left_schema + .fields() + .iter() + .enumerate() + .map(|(i, f)| ProjectionExpr { + expr: Arc::new(Column::new(f.name(), right_len + i)) + as Arc, + alias: f.name().to_owned(), + }); left_cols.chain(right_cols).collect() } @@ -2676,17 +2683,17 @@ mod tests { assert_eq!(proj.len(), 3); - let (col, name) = &proj[0]; - assert_eq!(name, "a"); - assert_col_expr(col, "a", 1); + let proj_expr = &proj[0]; + assert_eq!(proj_expr.alias, "a"); + assert_col_expr(&proj_expr.expr, "a", 1); - let (col, name) = &proj[1]; - assert_eq!(name, "b"); - assert_col_expr(col, "b", 2); + let proj_expr = &proj[1]; + assert_eq!(proj_expr.alias, "b"); + assert_col_expr(&proj_expr.expr, "b", 2); - let (col, name) = &proj[2]; - assert_eq!(name, "c"); - assert_col_expr(col, "c", 0); + let proj_expr = &proj[2]; + assert_eq!(proj_expr.alias, "c"); + assert_col_expr(&proj_expr.expr, "c", 0); } fn assert_col_expr(expr: &Arc, name: &str, index: usize) { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 770ebd8b0ecbb..6eea70e1176d3 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -20,12 +20,6 @@ //! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the //! projection expressions. `SELECT` without `FROM` will only evaluate expressions. -use std::any::Any; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - use super::expressions::{Column, Literal}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{ @@ -39,6 +33,11 @@ use crate::filter_pushdown::{ }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr}; +use std::any::Any; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; @@ -57,7 +56,10 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::stream::{Stream, StreamExt}; use log::trace; -/// Execution plan for a projection +/// [`ExecutionPlan`] for a projection +/// +/// Computes a set of scalar value expressions for each input row, producing one +/// output row for each input row. #[derive(Debug, Clone)] pub struct ProjectionExec { /// The projection expressions stored as tuples of (expression, output column name) @@ -74,21 +76,72 @@ pub struct ProjectionExec { impl ProjectionExec { /// Create a projection on an input - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { + /// + /// # Example: + /// Create a `ProjectionExec` to crate `SELECT a, a+b AS sum_ab FROM t1`: + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_schema::{Schema, Field, DataType}; + /// # use datafusion_expr::Operator; + /// # use datafusion_physical_plan::ExecutionPlan; + /// # use datafusion_physical_expr::expressions::{col, binary}; + /// # use datafusion_physical_plan::empty::EmptyExec; + /// # use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; + /// # fn schema() -> Arc { + /// # Arc::new(Schema::new(vec![ + /// # Field::new("a", DataType::Int32, false), + /// # Field::new("b", DataType::Int32, false), + /// # ])) + /// # } + /// # + /// # fn input() -> Arc { + /// # Arc::new(EmptyExec::new(schema())) + /// # } + /// # + /// # fn main() { + /// let schema = schema(); + /// // Create PhysicalExprs + /// let a = col("a", &schema).unwrap(); + /// let b = col("b", &schema).unwrap(); + /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, &schema).unwrap(); + /// // create ProjectionExec + /// let proj = ProjectionExec::try_new([ + /// ProjectionExpr { + /// // expr a produces the column named "a" + /// expr: a, + /// alias: "a".to_string(), + /// }, + /// ProjectionExpr { + /// // expr: a + b produces the column named "sum_ab" + /// expr: a_plus_b, + /// alias: "sum_ab".to_string(), + /// } + /// ], input()).unwrap(); + /// # } + /// ``` + pub fn try_new(expr: I, input: Arc) -> Result + where + I: IntoIterator, + E: Into, + { let input_schema = input.schema(); + // convert argument to Vec + let expr = expr.into_iter().map(Into::into).collect::>(); let fields: Result> = expr .iter() - .map(|(e, name)| { - let metadata = e.return_field(&input_schema)?.metadata().clone(); + .map(|proj_expr| { + let metadata = proj_expr + .expr + .return_field(&input_schema)? + .metadata() + .clone(); let field = Field::new( - name, - e.data_type(&input_schema)?, - e.nullable(&input_schema)?, + &proj_expr.alias, + proj_expr.expr.data_type(&input_schema)?, + proj_expr.expr.nullable(&input_schema)?, ) .with_metadata(metadata); @@ -102,7 +155,10 @@ impl ProjectionExec { )); // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = ProjectionMapping::try_new(expr.clone(), &input_schema)?; + let projection_mapping = ProjectionMapping::try_new( + expr.iter().map(|p| (Arc::clone(&p.expr), p.alias.clone())), + &input_schema, + )?; let cache = Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?; Ok(Self { @@ -147,7 +203,34 @@ impl ProjectionExec { } } -pub type ProjectionExpr = (Arc, String); +/// A projection expression that is created by [`ProjectionExec`] +/// +/// The expression is evaluated and the result is stored in a column +/// with the name specified by `alias`. +/// +/// For example, the SQL expression `a + b AS sum_ab` would be represented +/// as a `ProjectionExpr` where `expr` is the expression `a + b` +/// and `alias` is the string `sum_ab`. +#[derive(Debug, Clone)] +pub struct ProjectionExpr { + /// The expression that will be evaluated. + pub expr: Arc, + /// The name of the output column for use an output schema. + pub alias: String, +} + +impl ProjectionExpr { + /// Create a new projection expression + pub fn new(expr: Arc, alias: String) -> Self { + Self { expr, alias } + } +} + +impl From<(Arc, String)> for ProjectionExpr { + fn from(value: (Arc, String)) -> Self { + Self::new(value.0, value.1) + } +} impl DisplayAs for ProjectionExec { fn fmt_as( @@ -160,10 +243,10 @@ impl DisplayAs for ProjectionExec { let expr: Vec = self .expr .iter() - .map(|(e, alias)| { - let e = e.to_string(); - if &e != alias { - format!("{e} as {alias}") + .map(|proj_expr| { + let e = proj_expr.expr.to_string(); + if e != proj_expr.alias { + format!("{e} as {}", proj_expr.alias) } else { e } @@ -173,12 +256,12 @@ impl DisplayAs for ProjectionExec { write!(f, "ProjectionExec: expr=[{}]", expr.join(", ")) } DisplayFormatType::TreeRender => { - for (i, (e, alias)) in self.expr().iter().enumerate() { - let expr_sql = fmt_sql(e.as_ref()); - if &e.to_string() == alias { + for (i, proj_expr) in self.expr().iter().enumerate() { + let expr_sql = fmt_sql(proj_expr.expr.as_ref()); + if proj_expr.expr.to_string() == proj_expr.alias { writeln!(f, "expr{i}={expr_sql}")?; } else { - writeln!(f, "{alias}={expr_sql}")?; + writeln!(f, "{}={expr_sql}", proj_expr.alias)?; } } @@ -208,10 +291,10 @@ impl ExecutionPlan for ProjectionExec { } fn benefits_from_input_partitioning(&self) -> Vec { - let all_simple_exprs = self - .expr - .iter() - .all(|(e, _)| e.as_any().is::() || e.as_any().is::()); + let all_simple_exprs = self.expr.iter().all(|proj_expr| { + proj_expr.expr.as_any().is::() + || proj_expr.expr.as_any().is::() + }); // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename, // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false. vec![!all_simple_exprs] @@ -237,7 +320,7 @@ impl ExecutionPlan for ProjectionExec { trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); Ok(Box::pin(ProjectionStream { schema: Arc::clone(&self.schema), - expr: self.expr.iter().map(|x| Arc::clone(&x.0)).collect(), + expr: self.expr.iter().map(|x| Arc::clone(&x.expr)).collect(), input: self.input.execute(partition, context)?, baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) @@ -255,7 +338,9 @@ impl ExecutionPlan for ProjectionExec { let input_stats = self.input.partition_statistics(partition)?; stats_projection( input_stats, - self.expr.iter().map(|(e, _)| Arc::clone(e)), + self.expr + .iter() + .map(|proj_expr| Arc::clone(&proj_expr.expr)), Arc::clone(&self.input.schema()), ) } @@ -428,22 +513,25 @@ pub fn try_embed_projection( let embed_project_exprs = projection_index .iter() .zip(new_execution_plan.schema().fields()) - .map(|(index, field)| { - ( - Arc::new(Column::new(field.name(), *index)) as Arc, - field.name().to_owned(), - ) + .map(|(index, field)| ProjectionExpr { + expr: Arc::new(Column::new(field.name(), *index)) as Arc, + alias: field.name().to_owned(), }) .collect::>(); let mut new_projection_exprs = Vec::with_capacity(projection.expr().len()); - for (expr, alias) in projection.expr() { + for proj_expr in projection.expr() { // update column index for projection expression since the input schema has been changed. - let Some(expr) = update_expr(expr, embed_project_exprs.as_slice(), false)? else { + let Some(expr) = + update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)? + else { return Ok(None); }; - new_projection_exprs.push((expr, alias.clone())); + new_projection_exprs.push(ProjectionExpr { + expr, + alias: proj_expr.alias.clone(), + }); } // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection. let new_projection = Arc::new(ProjectionExec::try_new( @@ -558,21 +646,23 @@ pub fn remove_unnecessary_projections( /// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not. fn is_projection_removable(projection: &ProjectionExec) -> bool { let exprs = projection.expr(); - exprs.iter().enumerate().all(|(idx, (expr, alias))| { - let Some(col) = expr.as_any().downcast_ref::() else { + exprs.iter().enumerate().all(|(idx, proj_expr)| { + let Some(col) = proj_expr.expr.as_any().downcast_ref::() else { return false; }; - col.name() == alias && col.index() == idx + col.name() == proj_expr.alias && col.index() == idx }) && exprs.len() == projection.input().schema().fields().len() } /// Given the expression set of a projection, checks if the projection causes /// any renaming or constructs a non-`Column` physical expression. pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool { - exprs.iter().all(|(expr, alias)| { - expr.as_any() + exprs.iter().all(|proj_expr| { + proj_expr + .expr + .as_any() .downcast_ref::() - .map(|column| column.name() == alias) + .map(|column| column.name() == proj_expr.alias) .unwrap_or(false) }) } @@ -586,8 +676,10 @@ pub fn new_projections_for_columns( ) -> Vec { projection .iter() - .filter_map(|(expr, _)| { - expr.as_any() + .filter_map(|proj_expr| { + proj_expr + .expr + .as_any() .downcast_ref::() .map(|expr| source[expr.index()]) }) @@ -606,7 +698,9 @@ pub fn make_with_child( /// Returns `true` if all the expressions in the argument are `Column`s. pub fn all_columns(exprs: &[ProjectionExpr]) -> bool { - exprs.iter().all(|(expr, _)| expr.as_any().is::()) + exprs + .iter() + .all(|proj_expr| proj_expr.expr.as_any().is::()) } /// The function operates in two modes: @@ -657,7 +751,7 @@ pub fn update_expr( state = RewriteState::RewrittenValid; // Update the index of `column`: Ok(Transformed::yes(Arc::clone( - &projected_exprs[column.index()].0, + &projected_exprs[column.index()].expr, ))) } else { // default to invalid, in case we can't find the relevant column @@ -666,14 +760,14 @@ pub fn update_expr( projected_exprs .iter() .enumerate() - .find_map(|(index, (projected_expr, alias))| { - projected_expr.as_any().downcast_ref::().and_then( + .find_map(|(index, proj_expr)| { + proj_expr.expr.as_any().downcast_ref::().and_then( |projected_column| { (column.name().eq(projected_column.name()) && column.index() == projected_column.index()) .then(|| { state = RewriteState::RewrittenValid; - Arc::new(Column::new(alias, index)) as _ + Arc::new(Column::new(&proj_expr.alias, index)) as _ }) }, ) @@ -732,10 +826,12 @@ pub fn physical_to_column_exprs( ) -> Option> { exprs .iter() - .map(|(expr, alias)| { - expr.as_any() + .map(|proj_expr| { + proj_expr + .expr + .as_any() .downcast_ref::() - .map(|col| (col.clone(), alias.clone())) + .map(|col| (col.clone(), proj_expr.alias.clone())) }) .collect() } @@ -753,13 +849,10 @@ pub fn new_join_children( let new_left = ProjectionExec::try_new( projection_as_columns[0..=far_right_left_col_ind as _] .iter() - .map(|(col, alias)| { - ( - Arc::new(Column::new(col.name(), col.index())) as _, - alias.clone(), - ) - }) - .collect(), + .map(|(col, alias)| ProjectionExpr { + expr: Arc::new(Column::new(col.name(), col.index())) as _, + alias: alias.clone(), + }), Arc::clone(left_child), )?; let left_size = left_child.schema().fields().len() as i32; @@ -767,17 +860,16 @@ pub fn new_join_children( projection_as_columns[far_left_right_col_ind as _..] .iter() .map(|(col, alias)| { - ( - Arc::new(Column::new( + ProjectionExpr { + expr: Arc::new(Column::new( col.name(), // Align projected expressions coming from the right // table with the new right child projection: (col.index() as i32 - left_size) as _, )) as _, - alias.clone(), - ) - }) - .collect(), + alias: alias.clone(), + } + }), Arc::clone(right_child), )?; @@ -919,34 +1011,39 @@ fn try_unifying_projections( let mut column_ref_map: HashMap = HashMap::new(); // Collect the column references usage in the outer projection. - projection.expr().iter().for_each(|(expr, _)| { - expr.apply(|expr| { - Ok({ - if let Some(column) = expr.as_any().downcast_ref::() { - *column_ref_map.entry(column.clone()).or_default() += 1; - } - TreeNodeRecursion::Continue + projection.expr().iter().for_each(|proj_expr| { + proj_expr + .expr + .apply(|expr| { + Ok({ + if let Some(column) = expr.as_any().downcast_ref::() { + *column_ref_map.entry(column.clone()).or_default() += 1; + } + TreeNodeRecursion::Continue + }) }) - }) - .unwrap(); + .unwrap(); }); // Merging these projections is not beneficial, e.g // If an expression is not trivial and it is referred more than 1, unifies projections will be // beneficial as caching mechanism for non-trivial computations. // See discussion in: https://github.com/apache/datafusion/issues/8296 if column_ref_map.iter().any(|(column, count)| { - *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].0)) + *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr)) }) { return Ok(None); } - for (expr, alias) in projection.expr() { + for proj_expr in projection.expr() { // If there is no match in the input projection, we cannot unify these // projections. This case will arise if the projection expression contains // a `PhysicalExpr` variant `update_expr` doesn't support. - let Some(expr) = update_expr(expr, child.expr(), true)? else { + let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else { return Ok(None); }; - projected_exprs.push((expr, alias.clone())); + projected_exprs.push(ProjectionExpr { + expr, + alias: proj_expr.alias.clone(), + }); } ProjectionExec::try_new(projected_exprs, Arc::clone(child.input())) .map(|e| Some(Arc::new(e) as _)) @@ -957,7 +1054,7 @@ fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec { // Collect indices and remove duplicates. let mut indices = exprs .iter() - .flat_map(|(expr, _)| collect_columns(expr)) + .flat_map(|proj_expr| collect_columns(&proj_expr.expr)) .map(|x| x.index()) .collect::>() .into_iter() @@ -1063,7 +1160,7 @@ mod tests { use datafusion_common::ScalarValue; use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; + use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal}; #[test] fn test_collect_column_indices() -> Result<()> { @@ -1076,7 +1173,10 @@ mod tests { Arc::new(Column::new("a", 1)), )), )); - let column_indices = collect_column_indices(&[(expr, "b-(1+a)".to_string())]); + let column_indices = collect_column_indices(&[ProjectionExpr { + expr, + alias: "b-(1+a)".to_string(), + }]); assert_eq!(column_indices, vec![1, 7]); Ok(()) } @@ -1143,7 +1243,7 @@ mod tests { let exec = test::scan_partitioned(1); let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?; - let projection = ProjectionExec::try_new(vec![], exec)?; + let projection = ProjectionExec::try_new(vec![] as Vec, exec)?; let stream = projection.execute(0, Arc::clone(&task_ctx))?; let output = collect(stream).await?; assert_eq!(output.len(), expected.len()); @@ -1151,6 +1251,23 @@ mod tests { Ok(()) } + #[tokio::test] + async fn project_old_syntax() { + let exec = test::scan_partitioned(1); + let schema = exec.schema(); + let expr = col("i", &schema).unwrap(); + ProjectionExec::try_new( + vec![ + // use From impl of ProjectionExpr to create ProjectionExpr + // to test old syntax + (expr, "c".to_string()), + ], + exec, + ) + // expect this to succeed + .unwrap(); + } + fn get_stats() -> Statistics { Statistics { num_rows: Precision::Exact(5), @@ -1316,18 +1433,18 @@ mod tests { // from the input schema in the expressions here, bounds_check would fail on them if output // schema is supplied to the partitions_statistics method. let exprs: Vec = vec![ - ( - Arc::new(Column::new("c", 2)) as Arc, - "c_renamed".to_string(), - ), - ( - Arc::new(BinaryExpr::new( + ProjectionExpr { + expr: Arc::new(Column::new("c", 2)) as Arc, + alias: "c_renamed".to_string(), + }, + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( Arc::new(Column::new("e", 4)), Operator::Plus, Arc::new(Column::new("f", 5)), )) as Arc, - "e_plus_f".to_string(), - ), + alias: "e_plus_f".to_string(), + }, ]; let projection = ProjectionExec::try_new(exprs, input).unwrap(); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 4c991544f877b..1bb19654f8626 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1223,7 +1223,7 @@ mod tests { use crate::common::collect; use crate::expressions::PhysicalSortExpr; - use crate::projection::ProjectionExec; + use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::streaming::{PartitionStream, StreamingTableExec}; use crate::test::TestMemoryExec; use crate::windows::{ @@ -1402,7 +1402,11 @@ mod tests { (expr, name) }) .collect::>(); - Ok(Arc::new(ProjectionExec::try_new(exprs, input)?)) + let proj_exprs: Vec = exprs + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?)) } fn task_context_helper() -> TaskContext { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1a1b369fabee1..9ec542a6c30bd 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -82,7 +82,7 @@ use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::memory::LazyMemoryExec; use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -582,7 +582,11 @@ impl protobuf::PhysicalPlanNode { )) }) .collect::, String)>>>()?; - Ok(Arc::new(ProjectionExec::try_new(exprs, input)?)) + let proj_exprs: Vec = exprs + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?)) } fn try_into_filter_physical_plan( @@ -1983,9 +1987,13 @@ impl protobuf::PhysicalPlanNode { let expr = exec .expr() .iter() - .map(|expr| serialize_physical_expr(&expr.0, extension_codec)) + .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec)) .collect::>>()?; - let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); + let expr_name = exec + .expr() + .iter() + .map(|proj_expr| proj_expr.alias.clone()) + .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( protobuf::ProjectionExecNode { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7ea4a5cb308c4..fe6f0573ff2af 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -80,7 +80,7 @@ use datafusion::physical_plan::joins::{ }; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; @@ -211,7 +211,10 @@ fn roundtrip_date_time_interval() -> Result<()> { let date_time_interval_expr = binary(date_expr, Operator::Plus, literal_expr, &schema)?; let plan = Arc::new(ProjectionExec::try_new( - vec![(date_time_interval_expr, "result".to_string())], + vec![ProjectionExpr { + expr: date_time_interval_expr, + alias: "result".to_string(), + }], input, )?); roundtrip_test(plan) @@ -1100,8 +1103,13 @@ fn roundtrip_scalar_udf() -> Result<()> { Arc::new(ConfigOptions::default()), ); - let project = - ProjectionExec::try_new(vec![(Arc::new(expr), "a".to_string())], input)?; + let project = ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(expr), + alias: "a".to_string(), + }], + input, + )?; let ctx = SessionContext::new(); @@ -1401,7 +1409,10 @@ fn roundtrip_like() -> Result<()> { &schema, )?; let plan = Arc::new(ProjectionExec::try_new( - vec![(like_expr, "result".to_string())], + vec![ProjectionExpr { + expr: like_expr, + alias: "result".to_string(), + }], input, )?); roundtrip_test(plan) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 7558b05c93e69..ca55bde55899a 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -84,6 +84,38 @@ impl AsyncScalarUDFImpl for AskLLM { See [#16896](https://github.com/apache/datafusion/issues/16896) for more details. +### `ProjectionExpr` changed from type alias to struct + +`ProjectionExpr` has been changed from a type alias to a struct with named fields to improve code clarity and maintainability. + +**Before:** + +```rust,ignore +pub type ProjectionExpr = (Arc, String); +``` + +**After:** + +```rust,ignore +#[derive(Debug, Clone)] +pub struct ProjectionExpr { + pub expr: Arc, + pub alias: String, +} +``` + +To upgrade your code: + +- Replace tuple construction `(expr, alias)` with `ProjectionExpr::new(expr, alias)` or `ProjectionExpr { expr, alias }` +- Replace tuple field access `.0` and `.1` with `.expr` and `.alias` +- Update pattern matching from `(expr, alias)` to `ProjectionExpr { expr, alias }` + +This mainly impacts use of `ProjectionExec`. + +This change was done in [#17398] + +[#17398]: https://github.com/apache/datafusion/pull/17398 + ### `SessionState`, `SessionConfig`, and `OptimizerConfig` returns `&Arc` instead of `&ConfigOptions` To provide broader access to `ConfigOptions` and reduce required clones, some