From fd5ab12909fb7da30998aedac2c0237f9335dba8 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Nov 2024 21:54:10 -0800 Subject: [PATCH 1/5] Initial commit --- .../src/equivalence/properties.rs | 7 +++-- datafusion/sqllogictest/test_files/order.slt | 31 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 06f1e24ed202..7c14e3699c6e 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -28,7 +28,7 @@ use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; +use crate::expressions::{with_new_schema, CastExpr, Column, Literal, CaseExpr}; use crate::{ physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -1208,7 +1208,10 @@ fn is_constant_recurse( return true; } let children = expr.children(); - !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c)) + // When expression contains branch even if all children are constant + // final result may not be constant + let is_branched = expr.as_any().is::(); + !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c)) && !is_branched } /// This function examines whether a referring expression directly refers to a diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index d5f0521407c5..e7f2c2fcdd54 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1260,3 +1260,34 @@ limit 2; statement ok drop table ordered_table; + +query TT +EXPLAIN SELECT + CASE + WHEN name = 'name1' THEN 0.0 + WHEN name = 'name2' THEN 0.5 + END AS a +FROM ( + SELECT 'name1' AS name + UNION ALL + SELECT 'name2' +) +ORDER BY a DESC; +---- +logical_plan +01)Sort: a DESC NULLS FIRST +02)--Projection: CASE WHEN name = Utf8("name1") THEN Float64(0) WHEN name = Utf8("name2") THEN Float64(0.5) END AS a +03)----Union +04)------Projection: Utf8("name1") AS name +05)--------EmptyRelation +06)------Projection: Utf8("name2") AS name +07)--------EmptyRelation +physical_plan +01)SortPreservingMergeExec: [a@0 DESC] +02)--SortExec: expr=[a@0 DESC], preserve_partitioning=[true] +03)----ProjectionExec: expr=[CASE WHEN name@0 = name1 THEN 0 WHEN name@0 = name2 THEN 0.5 END as a] +04)------UnionExec +05)--------ProjectionExec: expr=[name1 as name] +06)----------PlaceholderRowExec +07)--------ProjectionExec: expr=[name2 as name] +08)----------PlaceholderRowExec From 59f056e2cafbe7091f515d2dea0c3d9657349667 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Nov 2024 22:38:45 -0800 Subject: [PATCH 2/5] Fix formatting --- datafusion/physical-expr/src/equivalence/properties.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 7c14e3699c6e..2974a9154f0f 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -28,7 +28,7 @@ use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::{with_new_schema, CastExpr, Column, Literal, CaseExpr}; +use crate::expressions::{with_new_schema, CaseExpr, CastExpr, Column, Literal}; use crate::{ physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -1211,7 +1211,9 @@ fn is_constant_recurse( // When expression contains branch even if all children are constant // final result may not be constant let is_branched = expr.as_any().is::(); - !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c)) && !is_branched + !children.is_empty() + && children.iter().all(|c| is_constant_recurse(constants, c)) + && !is_branched } /// This function examines whether a referring expression directly refers to a From 50005f31d75076cf779e0e234da74fd20ee7312e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 20 Nov 2024 20:41:26 -0800 Subject: [PATCH 3/5] Add across partitions check --- .../src/equivalence/properties.rs | 46 +++++++++++++++---- datafusion/sqllogictest/test_files/order.slt | 14 +++--- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2974a9154f0f..fe866450b2b2 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -28,7 +28,7 @@ use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::{with_new_schema, CaseExpr, CastExpr, Column, Literal}; +use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -883,9 +883,11 @@ impl EquivalenceProperties { if self.is_expr_constant(source) && !const_exprs_contains(&projected_constants, target) { + let across_partitions = self.is_expr_constant_accross_partitions(source); // Expression evaluates to single value - projected_constants - .push(ConstExpr::from(target).with_across_partitions(true)); + projected_constants.push( + ConstExpr::from(target).with_across_partitions(across_partitions), + ); } } projected_constants @@ -1014,6 +1016,37 @@ impl EquivalenceProperties { is_constant_recurse(&normalized_constants, &normalized_expr) } + /// This function determines whether the provided expression is constant + /// across partitions based on the known constants. + /// + /// # Arguments + /// + /// - `expr`: A reference to a `Arc` representing the + /// expression to be checked. + /// + /// # Returns + /// + /// Returns `true` if the expression is constant across all partitions according + /// to equivalence group, `false` otherwise. + pub fn is_expr_constant_accross_partitions( + &self, + expr: &Arc, + ) -> bool { + // As an example, assume that we know columns `a` and `b` are constant. + // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will + // return `false`. + let const_exprs = self.constants.iter().flat_map(|const_expr| { + if const_expr.across_partitions() { + Some(Arc::clone(const_expr.expr())) + } else { + None + } + }); + let normalized_constants = self.eq_group.normalize_exprs(const_exprs); + let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr)); + is_constant_recurse(&normalized_constants, &normalized_expr) + } + /// Retrieves the properties for a given physical expression. /// /// This function constructs an [`ExprProperties`] object for the given @@ -1208,12 +1241,7 @@ fn is_constant_recurse( return true; } let children = expr.children(); - // When expression contains branch even if all children are constant - // final result may not be constant - let is_branched = expr.as_any().is::(); - !children.is_empty() - && children.iter().all(|c| is_constant_recurse(constants, c)) - && !is_branched + !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c)) } /// This function examines whether a referring expression directly refers to a diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index e7f2c2fcdd54..bfdf77ddb502 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1284,10 +1284,10 @@ logical_plan 07)--------EmptyRelation physical_plan 01)SortPreservingMergeExec: [a@0 DESC] -02)--SortExec: expr=[a@0 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[CASE WHEN name@0 = name1 THEN 0 WHEN name@0 = name2 THEN 0.5 END as a] -04)------UnionExec -05)--------ProjectionExec: expr=[name1 as name] -06)----------PlaceholderRowExec -07)--------ProjectionExec: expr=[name2 as name] -08)----------PlaceholderRowExec +02)--ProjectionExec: expr=[CASE WHEN name@0 = name1 THEN 0 WHEN name@0 = name2 THEN 0.5 END as a] +03)----UnionExec +04)------ProjectionExec: expr=[name1 as name] +05)--------PlaceholderRowExec +06)------ProjectionExec: expr=[name2 as name] +07)--------PlaceholderRowExec + From fa9284a6d8c95073e53a0fb38760f7cf3056ed5d Mon Sep 17 00:00:00 2001 From: Mustafa Akur <33904309+akurmustafa@users.noreply.github.com> Date: Sat, 23 Nov 2024 14:32:17 -0800 Subject: [PATCH 4/5] Add new test case Add a new test case --- datafusion/sqllogictest/test_files/order.slt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index bfdf77ddb502..4c54803f086e 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1291,3 +1291,18 @@ physical_plan 06)------ProjectionExec: expr=[name2 as name] 07)--------PlaceholderRowExec +query I +SELECT + CASE + WHEN name = 'name1' THEN 0.0 + WHEN name = 'name2' THEN 0.5 + END AS a +FROM ( + SELECT 'name1' AS name + UNION ALL + SELECT 'name2' +) +ORDER BY a DESC; +---- +0.5 +0 From 6fbfeb5c8b9e4255c37cf2fbc464620b3260c0f8 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <33904309+akurmustafa@users.noreply.github.com> Date: Sat, 23 Nov 2024 15:28:10 -0800 Subject: [PATCH 5/5] Fix buggy test --- datafusion/sqllogictest/test_files/order.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 4c54803f086e..a46040aa532e 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1291,7 +1291,7 @@ physical_plan 06)------ProjectionExec: expr=[name2 as name] 07)--------PlaceholderRowExec -query I +query R SELECT CASE WHEN name = 'name1' THEN 0.0