diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 6a1268ef8cdbe..005e5776d3ae6 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::iter::Peekable; @@ -709,7 +710,7 @@ impl EquivalenceProperties { /// c ASC: Node {None, HashSet{a ASC}} /// ``` fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap { - let mut dependency_map = IndexMap::new(); + let mut dependency_map = DependencyMap::new(); for ordering in self.normalized_oeq_class().iter() { for (idx, sort_expr) in ordering.iter().enumerate() { let target_sort_expr = @@ -731,13 +732,11 @@ impl EquivalenceProperties { let dependency = idx.checked_sub(1).map(|a| &ordering[a]); // Add sort expressions that can be projected or referred to // by any of the projection expressions to the dependency map: - dependency_map - .entry(sort_expr.clone()) - .or_insert_with(|| DependencyNode { - target_sort_expr: target_sort_expr.clone(), - dependencies: IndexSet::new(), - }) - .insert_dependency(dependency); + dependency_map.insert( + sort_expr, + target_sort_expr.as_ref(), + dependency, + ); } if !is_projected { // If we can not project, stop constructing the dependency @@ -1257,7 +1256,7 @@ fn referred_dependencies( // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them: let mut expr_to_sort_exprs = IndexMap::::new(); for sort_expr in dependency_map - .keys() + .sort_exprs() .filter(|sort_expr| expr_refers(source, &sort_expr.expr)) { let key = ExprWrapper(Arc::clone(&sort_expr.expr)); @@ -1270,10 +1269,16 @@ fn referred_dependencies( // Generate all valid dependencies for the source. For example, if the source // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`. - expr_to_sort_exprs - .values() + let dependencies = expr_to_sort_exprs + .into_values() + .map(Dependencies::into_inner) + .collect::>(); + dependencies + .iter() .multi_cartesian_product() - .map(|referred_deps| referred_deps.into_iter().cloned().collect()) + .map(|referred_deps| { + Dependencies::new_from_iter(referred_deps.into_iter().cloned()) + }) .collect() } @@ -1296,7 +1301,9 @@ fn construct_prefix_orderings( dependency_map: &DependencyMap, ) -> Vec { let mut dep_enumerator = DependencyEnumerator::new(); - dependency_map[relevant_sort_expr] + dependency_map + .get(relevant_sort_expr) + .expect("no relevant sort expr found") .dependencies .iter() .flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map)) @@ -1433,13 +1440,161 @@ impl DependencyNode { } } -// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query. -// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality. -// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering -// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings). -// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above. -type DependencyMap = IndexMap; -type Dependencies = IndexSet; +impl Display for DependencyNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(target) = &self.target_sort_expr { + write!(f, "(target: {}, ", target)?; + } else { + write!(f, "(")?; + } + write!(f, "dependencies: [{}])", self.dependencies) + } +} + +/// Maps an expression --> DependencyNode +/// +/// # Debugging / deplaying `DependencyMap` +/// +/// This structure implements `Display` to assist debugging. For example: +/// +/// ```text +/// DependencyMap: { +/// a@0 ASC --> (target: a@0 ASC, dependencies: [[]]) +/// b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]]) +/// c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]]) +/// d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]]) +/// } +/// ``` +/// +/// # Note on IndexMap Rationale +/// +/// Using `IndexMap` (which preserves insert order) to ensure consistent results +/// across different executions for the same query. We could have used +/// `HashSet`, `HashMap` in place of them without any loss of functionality. +/// +/// As an example, if existing orderings are +/// 1. `[a ASC, b ASC]` +/// 2. `[c ASC]` for +/// +/// Then both the following output orderings are valid +/// 1. `[a ASC, b ASC, c ASC]` +/// 2. `[c ASC, a ASC, b ASC]` +/// +/// (this are both valid as they are concatenated versions of the alternative +/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to generate +/// consistent result, among the possible 2 results in the example above. +#[derive(Debug)] +struct DependencyMap { + inner: IndexMap, +} + +impl DependencyMap { + fn new() -> Self { + Self { + inner: IndexMap::new(), + } + } + + /// Insert a new dependency `sort_expr` --> `dependency` into the map. + /// + /// If `target_sort_expr` is none, a new entry is created with empty dependencies. + fn insert( + &mut self, + sort_expr: &PhysicalSortExpr, + target_sort_expr: Option<&PhysicalSortExpr>, + dependency: Option<&PhysicalSortExpr>, + ) { + self.inner + .entry(sort_expr.clone()) + .or_insert_with(|| DependencyNode { + target_sort_expr: target_sort_expr.cloned(), + dependencies: Dependencies::new(), + }) + .insert_dependency(dependency) + } + + /// Iterator over (sort_expr, DependencyNode) pairs + fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + /// iterator over all sort exprs + fn sort_exprs(&self) -> impl Iterator { + self.inner.keys() + } + + /// Return the dependency node for the given sort expression, if any + fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> { + self.inner.get(sort_expr) + } +} + +impl Display for DependencyMap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "DependencyMap: {{")?; + for (sort_expr, node) in self.inner.iter() { + writeln!(f, " {sort_expr} --> {node}")?; + } + writeln!(f, "}}") + } +} + +/// A list of sort expressions that can be calculated from a known set of +/// dependencies. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +struct Dependencies { + inner: IndexSet, +} + +impl Display for Dependencies { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + let mut iter = self.inner.iter(); + if let Some(dep) = iter.next() { + write!(f, "{}", dep)?; + } + for dep in iter { + write!(f, ", {}", dep)?; + } + write!(f, "]") + } +} + +impl Dependencies { + /// Create a new empty `Dependencies` instance. + fn new() -> Self { + Self { + inner: IndexSet::new(), + } + } + + /// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`. + fn new_from_iter(iter: impl IntoIterator) -> Self { + Self { + inner: iter.into_iter().collect(), + } + } + + /// Insert a new dependency into the set. + fn insert(&mut self, sort_expr: PhysicalSortExpr) { + self.inner.insert(sort_expr); + } + + /// Iterator over dependencies in the set + fn iter(&self) -> impl Iterator + Clone { + self.inner.iter() + } + + /// Return the inner set of dependencies + fn into_inner(self) -> IndexSet { + self.inner + } + + /// Returns true if there are no dependencies + fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} /// Contains a mapping of all dependencies we have processed for each sort expr struct DependencyEnumerator<'a> { @@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> { referred_sort_expr: &'a PhysicalSortExpr, dependency_map: &'a DependencyMap, ) -> Vec { - // We are sure that `referred_sort_expr` is inside `dependency_map`. - let node = &dependency_map[referred_sort_expr]; + let node = dependency_map + .get(referred_sort_expr) + .expect("`referred_sort_expr` should be inside `dependency_map`"); // Since we work on intermediate nodes, we are sure `val.target_sort_expr` // exists. let target_sort_expr = node.target_sort_expr.as_ref().unwrap(); @@ -1506,6 +1662,7 @@ impl<'a> DependencyEnumerator<'a> { } else { vec![] }; + for ordering in orderings.iter_mut() { ordering.push(target_sort_expr.clone()) }