From b2e02f171d5cfafbffb6ace8960fcca23fef79aa Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 6 Dec 2024 22:28:37 +0800 Subject: [PATCH 1/5] refactor: replace Vec with IndexMap for expression mappings in ProjectionMapping and EquivalenceGroup --- .../physical-expr/src/equivalence/class.rs | 17 ++--- .../src/equivalence/projection.rs | 72 ++++++++++--------- 2 files changed, 43 insertions(+), 46 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 1812844d98c9..c0af4d1a3d35 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -27,7 +27,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; -use indexmap::IndexSet; +use indexmap::{IndexMap, IndexSet}; /// A structure representing a expression known to be constant in a physical execution plan. /// @@ -546,20 +546,15 @@ impl EquivalenceGroup { .collect::>(); (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) }); - // TODO: Convert the algorithm below to a version that uses `HashMap`. - // once `Arc` can be stored in `HashMap`. - // See issue: https://github.com/apache/datafusion/issues/8027 - let mut new_classes = vec![]; + let mut new_classes: IndexMap, Vec>> = + IndexMap::new(); for (source, target) in mapping.iter() { - if new_classes.is_empty() { - new_classes.push((source, vec![Arc::clone(target)])); - } - if let Some((_, values)) = - new_classes.iter_mut().find(|(key, _)| *key == source) - { + if let Some(values) = new_classes.get_mut(source) { if !physical_exprs_contains(values, target) { values.push(Arc::clone(target)); } + } else { + new_classes.insert(Arc::clone(source), vec![Arc::clone(target)]); } } // Only add equivalence classes with at least two members as singleton diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 25a05a2a5918..4dbbdf078634 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -23,6 +23,7 @@ use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, Result}; +use indexmap::IndexMap; /// Stores the mapping between source expressions and target expressions for a /// projection. @@ -30,7 +31,7 @@ use datafusion_common::{internal_err, Result}; pub struct ProjectionMapping { /// Mapping between source expressions and target expressions. /// Vector indices correspond to the indices after projection. - pub map: Vec<(Arc, Arc)>, + pub map: IndexMap, Arc>, } impl ProjectionMapping { @@ -52,34 +53,38 @@ impl ProjectionMapping { input_schema: &SchemaRef, ) -> Result { // Construct a map from the input expressions to the output expression of the projection: - expr.iter() - .enumerate() - .map(|(expr_idx, (expression, name))| { - let target_expr = Arc::new(Column::new(name, expr_idx)) as _; - Arc::clone(expression) - .transform_down(|e| match e.as_any().downcast_ref::() { - Some(col) => { - // Sometimes, an expression and its name in the input_schema - // doesn't match. This can cause problems, so we make sure - // that the expression name matches with the name in `input_schema`. - // Conceptually, `source_expr` and `expression` should be the same. - let idx = col.index(); - let matching_input_field = input_schema.field(idx); - if col.name() != matching_input_field.name() { - return internal_err!("Input field name {} does not match with the projection expression {}", - matching_input_field.name(),col.name()) - } - let matching_input_column = - Column::new(matching_input_field.name(), idx); - Ok(Transformed::yes(Arc::new(matching_input_column))) - } - None => Ok(Transformed::no(e)), - }) - .data() - .map(|source_expr| (source_expr, target_expr)) - }) - .collect::>>() - .map(|map| Self { map }) + let mut map = IndexMap::new(); + + for (expr_idx, (expression, name)) in expr.iter().enumerate() { + let target_expr = Arc::new(Column::new(name, expr_idx)) as _; + let transformed_expr = Arc::clone(expression).transform_down(|e| { + if let Some(col) = e.as_any().downcast_ref::() { + // Sometimes, an expression and its name in the input_schema + // doesn't match. This can cause problems, so we make sure + // that the expression name matches with the name in `input_schema`. + // Conceptually, `source_expr` and `expression` should be the same. + let idx = col.index(); + let matching_input_field = input_schema.field(idx); + if col.name() != matching_input_field.name() { + return internal_err!( + "Input field name {} does not match with the projection expression {}", + matching_input_field.name(), + col.name() + ); + } + let matching_input_column = Column::new(matching_input_field.name(), idx); + Ok(Transformed::yes(Arc::new(matching_input_column))) + } else { + Ok(Transformed::no(e)) + } + }); + + let _ = transformed_expr + .data() + .map(|source_expr| map.insert(source_expr, target_expr)); + } + + Ok(Self { map }) } /// Constructs a subset mapping using the provided indices. @@ -91,10 +96,10 @@ impl ProjectionMapping { ProjectionMapping::try_new(&projection_exprs, schema) } - /// Iterate over pairs of (source, target) expressions + /// Iterate over the mapping. pub fn iter( &self, - ) -> impl Iterator, Arc)> + '_ { + ) -> impl Iterator, &Arc)> { self.map.iter() } @@ -112,10 +117,7 @@ impl ProjectionMapping { &self, expr: &Arc, ) -> Option> { - self.map - .iter() - .find(|(source, _)| source.eq(expr)) - .map(|(_, target)| Arc::clone(target)) + self.map.get(expr).map(Arc::clone) } } From 762c2a9639ddc477f4c84ff3df013370230c2ed9 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 6 Dec 2024 22:32:15 +0800 Subject: [PATCH 2/5] chore --- datafusion/physical-expr/src/equivalence/projection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 4dbbdf078634..118ea26481de 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -96,7 +96,7 @@ impl ProjectionMapping { ProjectionMapping::try_new(&projection_exprs, schema) } - /// Iterate over the mapping. + /// Iterate over pairs of (source, target) expressions pub fn iter( &self, ) -> impl Iterator, &Arc)> { From 297e1cd69be0318c3ab293ec8c2205c8c5e5b402 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 6 Dec 2024 23:16:47 +0800 Subject: [PATCH 3/5] chore: Fix CI --- .../src/equivalence/projection.rs | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 118ea26481de..25a05a2a5918 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -23,7 +23,6 @@ use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, Result}; -use indexmap::IndexMap; /// Stores the mapping between source expressions and target expressions for a /// projection. @@ -31,7 +30,7 @@ use indexmap::IndexMap; pub struct ProjectionMapping { /// Mapping between source expressions and target expressions. /// Vector indices correspond to the indices after projection. - pub map: IndexMap, Arc>, + pub map: Vec<(Arc, Arc)>, } impl ProjectionMapping { @@ -53,38 +52,34 @@ impl ProjectionMapping { input_schema: &SchemaRef, ) -> Result { // Construct a map from the input expressions to the output expression of the projection: - let mut map = IndexMap::new(); - - for (expr_idx, (expression, name)) in expr.iter().enumerate() { - let target_expr = Arc::new(Column::new(name, expr_idx)) as _; - let transformed_expr = Arc::clone(expression).transform_down(|e| { - if let Some(col) = e.as_any().downcast_ref::() { - // Sometimes, an expression and its name in the input_schema - // doesn't match. This can cause problems, so we make sure - // that the expression name matches with the name in `input_schema`. - // Conceptually, `source_expr` and `expression` should be the same. - let idx = col.index(); - let matching_input_field = input_schema.field(idx); - if col.name() != matching_input_field.name() { - return internal_err!( - "Input field name {} does not match with the projection expression {}", - matching_input_field.name(), - col.name() - ); - } - let matching_input_column = Column::new(matching_input_field.name(), idx); - Ok(Transformed::yes(Arc::new(matching_input_column))) - } else { - Ok(Transformed::no(e)) - } - }); - - let _ = transformed_expr - .data() - .map(|source_expr| map.insert(source_expr, target_expr)); - } - - Ok(Self { map }) + expr.iter() + .enumerate() + .map(|(expr_idx, (expression, name))| { + let target_expr = Arc::new(Column::new(name, expr_idx)) as _; + Arc::clone(expression) + .transform_down(|e| match e.as_any().downcast_ref::() { + Some(col) => { + // Sometimes, an expression and its name in the input_schema + // doesn't match. This can cause problems, so we make sure + // that the expression name matches with the name in `input_schema`. + // Conceptually, `source_expr` and `expression` should be the same. + let idx = col.index(); + let matching_input_field = input_schema.field(idx); + if col.name() != matching_input_field.name() { + return internal_err!("Input field name {} does not match with the projection expression {}", + matching_input_field.name(),col.name()) + } + let matching_input_column = + Column::new(matching_input_field.name(), idx); + Ok(Transformed::yes(Arc::new(matching_input_column))) + } + None => Ok(Transformed::no(e)), + }) + .data() + .map(|source_expr| (source_expr, target_expr)) + }) + .collect::>>() + .map(|map| Self { map }) } /// Constructs a subset mapping using the provided indices. @@ -99,7 +94,7 @@ impl ProjectionMapping { /// Iterate over pairs of (source, target) expressions pub fn iter( &self, - ) -> impl Iterator, &Arc)> { + ) -> impl Iterator, Arc)> + '_ { self.map.iter() } @@ -117,7 +112,10 @@ impl ProjectionMapping { &self, expr: &Arc, ) -> Option> { - self.map.get(expr).map(Arc::clone) + self.map + .iter() + .find(|(source, _)| source.eq(expr)) + .map(|(_, target)| Arc::clone(target)) } } From 072e7d04005396545ca01e54bd0fc283114e9906 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 7 Dec 2024 11:34:24 +0800 Subject: [PATCH 4/5] chore: comment --- datafusion/physical-expr/src/equivalence/class.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index c0af4d1a3d35..1ab7a50892ba 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -546,9 +546,11 @@ impl EquivalenceGroup { .collect::>(); (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) }); + // the key is the source expression and the value is the target expression from the ProjectionMapping let mut new_classes: IndexMap, Vec>> = IndexMap::new(); for (source, target) in mapping.iter() { + // Check if the source expression already exists in the new_classes map if let Some(values) = new_classes.get_mut(source) { if !physical_exprs_contains(values, target) { values.push(Arc::clone(target)); From 85fd38669c6e1c25d42c57b00ba886b1bcd2db99 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 7 Dec 2024 11:59:03 +0800 Subject: [PATCH 5/5] chore: simplify --- .../physical-expr/src/equivalence/class.rs | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 1ab7a50892ba..49f522d7bafc 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -17,8 +17,8 @@ use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; use crate::{ - expressions::Column, physical_exprs_contains, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, + expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, + PhysicalSortExpr, PhysicalSortRequirement, }; use std::fmt::Display; use std::sync::Arc; @@ -546,25 +546,20 @@ impl EquivalenceGroup { .collect::>(); (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) }); - // the key is the source expression and the value is the target expression from the ProjectionMapping - let mut new_classes: IndexMap, Vec>> = + // the key is the source expression and the value is the EquivalenceClass that contains the target expression of the source expression. + let mut new_classes: IndexMap, EquivalenceClass> = IndexMap::new(); - for (source, target) in mapping.iter() { - // Check if the source expression already exists in the new_classes map - if let Some(values) = new_classes.get_mut(source) { - if !physical_exprs_contains(values, target) { - values.push(Arc::clone(target)); - } - } else { - new_classes.insert(Arc::clone(source), vec![Arc::clone(target)]); - } - } + mapping.iter().for_each(|(source, target)| { + new_classes + .entry(Arc::clone(source)) + .or_insert_with(EquivalenceClass::new_empty) + .push(Arc::clone(target)); + }); // Only add equivalence classes with at least two members as singleton // equivalence classes are meaningless. let new_classes = new_classes .into_iter() - .filter_map(|(_, values)| (values.len() > 1).then_some(values)) - .map(EquivalenceClass::new); + .filter_map(|(_, cls)| (cls.len() > 1).then_some(cls)); let classes = projected_classes.chain(new_classes).collect(); Self::new(classes)