From fcee8b0f84c6f2b76516830c7ca9d15872840042 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 9 Dec 2022 13:32:15 +0100 Subject: [PATCH] [SPARK-41468][SQL] Fix PlanExpression handling in EquivalentExpressions --- .../expressions/EquivalentExpressions.scala | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 2bbde304c281..3ffd9f9d8875 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -142,6 +142,20 @@ class EquivalentExpressions { case _ => Nil } + private def supportedExpression(e: Expression) = { + !e.exists { + // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the + // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. + case _: LambdaVariable => true + + // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, + // can cause error like NPE. + case _: PlanExpression[_] => Utils.isInRunningSparkTask + + case _ => false + } + } + /** * Adds the expression to this data structure recursively. Stops if a matching expression * is found. That is, if `expr` has already been added, its children are not added. @@ -149,21 +163,16 @@ class EquivalentExpressions { def addExprTree( expr: Expression, map: mutable.HashMap[ExpressionEquals, ExpressionStats] = equivalenceMap): Unit = { - updateExprTree(expr, map) + if (supportedExpression(expr)) { + updateExprTree(expr, map) + } } private def updateExprTree( expr: Expression, map: mutable.HashMap[ExpressionEquals, ExpressionStats] = equivalenceMap, useCount: Int = 1): Unit = { - val skip = useCount == 0 || - expr.isInstanceOf[LeafExpression] || - // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the - // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. - expr.exists(_.isInstanceOf[LambdaVariable]) || - // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, - // can cause error like NPE. - (expr.exists(_.isInstanceOf[PlanExpression[_]]) && Utils.isInRunningSparkTask) + val skip = useCount == 0 || expr.isInstanceOf[LeafExpression] if (!skip && !updateExprInMap(expr, map, useCount)) { val uc = useCount.signum @@ -177,7 +186,11 @@ class EquivalentExpressions { * equivalent expressions. */ def getExprState(e: Expression): Option[ExpressionStats] = { - equivalenceMap.get(ExpressionEquals(e)) + if (supportedExpression(e)) { + equivalenceMap.get(ExpressionEquals(e)) + } else { + None + } } // Exposed for testing.