Skip to content

Commit ee74bd0

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys
### What changes were proposed in this pull request? Make `EliminateDistinct` support eliminate distinct by child distinct keys. ### Why are the changes needed? We can remove the distinct in aggregate expression if the distinct semantics is guaranteed by child. For example: ```sql SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ) ``` ### Does this PR introduce _any_ user-facing change? improve performance ### How was this patch tested? add test in `EliminateDistinctSuite` Closes #36117 from ulysses-you/remove-distinct. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent eb699ec commit ee74bd0

File tree

3 files changed

+44
-7
lines changed

3 files changed

+44
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
146146
PushDownPredicates) :: Nil
147147
}
148148

149-
val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
149+
val batches = (
150150
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
151151
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
152152
// However, because we also use the analyzer to canonicalized queries (for view definition),
@@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
166166
//////////////////////////////////////////////////////////////////////////////////////////
167167
// Optimizer rules start here
168168
//////////////////////////////////////////////////////////////////////////////////////////
169+
Batch("Eliminate Distinct", Once, EliminateDistinct) ::
169170
// - Do the first call of CombineUnions before starting the major Optimizer rules,
170171
// since it can reduce the number of iteration and the other rules could add/move
171172
// extra operators between two adjacent Union operators.
@@ -411,14 +412,26 @@ abstract class Optimizer(catalogManager: CatalogManager)
411412
}
412413

413414
/**
414-
* Remove useless DISTINCT for MAX and MIN.
415+
* Remove useless DISTINCT:
416+
* 1. For some aggregate expression, e.g.: MAX and MIN.
417+
* 2. If the distinct semantics is guaranteed by child.
418+
*
415419
* This rule should be applied before RewriteDistinctAggregates.
416420
*/
417421
object EliminateDistinct extends Rule[LogicalPlan] {
418-
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
419-
_.containsPattern(AGGREGATE_EXPRESSION)) {
420-
case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) =>
421-
ae.copy(isDistinct = false)
422+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
423+
_.containsPattern(AGGREGATE)) {
424+
case agg: Aggregate =>
425+
agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
426+
case ae: AggregateExpression if ae.isDistinct &&
427+
isDuplicateAgnostic(ae.aggregateFunction) =>
428+
ae.copy(isDistinct = false)
429+
430+
case ae: AggregateExpression if ae.isDistinct &&
431+
agg.child.distinctKeys.exists(
432+
_.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) =>
433+
ae.copy(isDistinct = false)
434+
}
422435
}
423436

424437
def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ import org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED
2929
*/
3030
trait LogicalPlanDistinctKeys { self: LogicalPlan =>
3131
lazy val distinctKeys: Set[ExpressionSet] = {
32-
if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty
32+
if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) {
33+
val keys = DistinctKeyVisitor.visit(self)
34+
require(keys.forall(_.nonEmpty))
35+
keys
36+
} else {
37+
Set.empty
38+
}
3339
}
3440
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class EliminateDistinctSuite extends PlanTest {
3333
}
3434

3535
val testRelation = LocalRelation($"a".int)
36+
val testRelation2 = LocalRelation($"a".int, $"b".string)
3637

3738
Seq(
3839
Max(_),
@@ -71,4 +72,21 @@ class EliminateDistinctSuite extends PlanTest {
7172
comparePlans(Optimize.execute(query), answer)
7273
}
7374
}
75+
76+
test("SPARK-38832: Remove unnecessary distinct in aggregate expression by distinctKeys") {
77+
val q1 = testRelation2.groupBy($"a")($"a")
78+
.rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze
79+
val r1 = testRelation2.groupBy($"a")($"a")
80+
.rebalance().groupBy()(count($"a") as "x", sum($"a") as "y").analyze
81+
comparePlans(Optimize.execute(q1), r1)
82+
83+
// not a subset of distinct attr
84+
val q2 = testRelation2.groupBy($"a", $"b")($"a", $"b")
85+
.rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze
86+
comparePlans(Optimize.execute(q2), q2)
87+
88+
// child distinct key is empty
89+
val q3 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze
90+
comparePlans(Optimize.execute(q3), q3)
91+
}
7492
}

0 commit comments

Comments
 (0)