From b5cbd73164e495ddc3c1133a84410dd092a7c1f5 Mon Sep 17 00:00:00 2001 From: Dereck Li Date: Sat, 26 Feb 2022 06:46:33 +0800 Subject: [PATCH 1/3] [SPARK-38333][SQL] DPP cause DataSourceScanExec java.lang.NullPointerException --- .../spark/sql/catalyst/expressions/EquivalentExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 1dfff412d9a8e..be90d3fb0361c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -147,7 +147,7 @@ class EquivalentExpressions { expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, // can cause error like NPE. - (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null) + (expr.find(_.isInstanceOf[PlanExpression[_]]).isDefined && TaskContext.get != null) if (!skip && !addFunc(expr)) { childrenToRecurse(expr).foreach(addExprTree(_, addFunc)) From 801ee509749210e53e6e59addbc5f47e5ac84075 Mon Sep 17 00:00:00 2001 From: Dereck Li Date: Mon, 28 Feb 2022 23:14:42 +0800 Subject: [PATCH 2/3] add UT --- .../expressions/SubexpressionEliminationSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 65671d253dc53..78c063b31302d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite + +import org.apache.spark.sql.catalyst.analysis.DummyCommand import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType} @@ -255,6 +257,13 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel assert(equivalence2.getAllEquivalentExprs.count(_.size == 2) == 0) } + test("SPARK-38333: DPP expression should not be eliminated") { + val equivalence = new EquivalentExpressions + val expression = DynamicPruningExpression(Exists(DummyCommand())) + equivalence.addExprTree(expression) + assert(equivalence.getEquivalentExprs(expression).size == 0) + } + test("SPARK-34723: Correct parameter type for subexpression elimination under whole-stage") { withSQLConf(SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1") { val str = BoundReference(0, BinaryType, false) @@ -315,3 +324,4 @@ case class CodegenFallbackExpression(child: Expression) extends UnaryExpression with CodegenFallback { override def dataType: DataType = child.dataType } + From 708a16859245c21b5f9c5dbcd9b986d3f95a2dba Mon Sep 17 00:00:00 2001 From: Dereck Li Date: Tue, 1 Mar 2022 00:16:56 +0800 Subject: [PATCH 3/3] fix code style --- .../SubexpressionEliminationSuite.scala | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 78c063b31302d..b80843fee7881 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -16,10 +16,9 @@ */ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.SparkFunSuite - -import org.apache.spark.sql.catalyst.analysis.DummyCommand +import org.apache.spark.{SparkFunSuite, TaskContext, TaskContextImpl} import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.plans.logical.Command import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType} @@ -258,10 +257,18 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel } test("SPARK-38333: DPP expression should not be eliminated") { - val equivalence = new EquivalentExpressions - val expression = DynamicPruningExpression(Exists(DummyCommand())) - equivalence.addExprTree(expression) - assert(equivalence.getEquivalentExprs(expression).size == 0) + try { + // support we in executor + val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) + TaskContext.setTaskContext(context1) + + val equivalence = new EquivalentExpressions + val expression = DynamicPruningExpression(Exists(TestCommand("foo"))) + equivalence.addExprTree(expression) + assert(equivalence.getEquivalentExprs(expression).size == 0) + } finally { + TaskContext.unset() + } } test("SPARK-34723: Correct parameter type for subexpression elimination under whole-stage") { @@ -325,3 +332,4 @@ case class CodegenFallbackExpression(child: Expression) override def dataType: DataType = child.dataType } +case class TestCommand(foo: String) extends Command