From 3657b7c8fe63ff8cac2442228c26dbd26988342a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 16 Jul 2024 09:40:06 -0700 Subject: [PATCH 1/7] SPARK-48921: ScalaUDF in subquery should run through analyzer --- .../sql/catalyst/analysis/Analyzer.scala | 12 ++-- .../analysis/ResolveSubquerySuite.scala | 59 ++++++++++++++++++- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 95e2ddd40af1f..b8d9455f77918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2599,19 +2599,19 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor */ private def resolveSubQueries(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = { plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), ruleId) { - case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.resolved => + case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.analyzed => resolveSubQuery(s, outer)(ScalarSubquery(_, _, exprId)) - case e @ Exists(sub, _, exprId, _, _) if !sub.resolved => + case e @ Exists(sub, _, exprId, _, _) if !sub.analyzed => resolveSubQuery(e, outer)(Exists(_, _, exprId)) - case InSubquery(values, l @ ListQuery(_, _, exprId, _, _, _)) - if values.forall(_.resolved) && !l.resolved => + case InSubquery(values, l @ ListQuery(sub, _, exprId, _, _, _)) + if values.forall(_.resolved) && !sub.analyzed => val expr = resolveSubQuery(l, outer)((plan, exprs) => { ListQuery(plan, exprs, exprId, plan.output.length) }) InSubquery(values, expr.asInstanceOf[ListQuery]) - case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved => + case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.analyzed => resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId)) - case a: FunctionTableSubqueryArgumentExpression if !a.plan.resolved => + case a: FunctionTableSubqueryArgumentExpression if !a.plan.analyzed => resolveSubQuery(a, outer)( (plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala index 4e17f4624f7e0..c97d025c3caf1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, UnresolvedNamedLambdaVariable} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Exists, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, Literal, OuterReference, ScalarSubquery, ScalaUDF, UnresolvedNamedLambdaVariable} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.types.{ArrayType, IntegerType} +import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType} /** * Unit tests for [[ResolveSubquery]]. @@ -299,4 +300,58 @@ class ResolveSubquerySuite extends AnalysisTest { "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.HIGHER_ORDER_FUNCTION", expectedMessageParameters = Map.empty[String, String]) } + + val testRelation = LocalRelation($"a".int, $"b".double) + val testRelation2 = LocalRelation($"c".int, $"d".string) + + test("SPARK-48921: ScalaUDF in subquery should run through analyzer") { + val integer = testRelation2.output(0) + val udf = ScalaUDF((_: Int) => "x", StringType, integer :: Nil, + Option(ExpressionEncoder[Int]()) :: Nil) + + var subPlan = + testRelation2 + .where($"d" === udf) + .select(Literal(1)).analyze + + // Manually remove the resolved encoder from the UDF. + // This simulates the case where the UDF is resolved during analysis + // but its encoders are not resolved. + subPlan = subPlan.transformUp { + case f: Filter => + f.copy(condition = f.condition transform { + case s: ScalaUDF => + val newUDF = s.copy(inputEncoders = Option(ExpressionEncoder[Int]()) :: Nil) + assert(newUDF.resolved) + newUDF + }) + } + + val existsSubquery = + testRelation + .where(Exists(subPlan)) + .select($"a").analyze + assert(existsSubquery.resolved) + + val existPlan = existsSubquery.collect { + case f: Filter => + f.condition.collect { + case e: Exists => + e.plan.collect { + case f: Filter => f + } + } + }.flatten.flatten.head + + val udfs = existPlan.expressions.flatMap(e => e.collect { + case s: ScalaUDF => + assert(s.inputEncoders.nonEmpty) + val encoder = s.inputEncoders.head + assert(encoder.isDefined) + assert(encoder.get.objDeserializer.resolved) + + s + }) + assert(udfs.size == 1) + } } From 708b9591c25d390537167a73d1e11d8ea7bda078 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Jul 2024 07:45:43 -0700 Subject: [PATCH 2/7] check and set analyzed --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b8d9455f77918..995e73e03b431 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2578,6 +2578,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor executeSameContext(e.plan) } + checkAnalysis(newSubqueryPlan) + // If the subquery plan is fully resolved, pull the outer references and record // them as children of SubqueryExpression. if (newSubqueryPlan.resolved) { From c90e4634bd2577ec8c6306d81f73fd650c231e48 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Jul 2024 14:24:11 -0700 Subject: [PATCH 3/7] Move ResolveEncodersInUDF rule --- .../sql/catalyst/analysis/Analyzer.scala | 17 ++-- .../analysis/ResolveEncodersInUDFSuite.scala | 96 +++++++++++++++++++ .../analysis/ResolveSubquerySuite.scala | 59 +----------- 3 files changed, 105 insertions(+), 67 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 995e73e03b431..65a3c8af8d07c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -325,6 +325,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor new ResolveIdentifierClause(earlyBatches) :: ResolveUnion :: ResolveRowLevelCommandAssignments :: + ResolveEncodersInUDF :: RewriteDeleteFromTable :: RewriteUpdateTable :: RewriteMergeIntoTable :: @@ -352,8 +353,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // operators may need null handling as well, so we should run these two rules repeatedly. HandleNullInputsForUDF, UpdateAttributeNullability), - Batch("UDF", Once, - ResolveEncodersInUDF), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, @@ -2578,8 +2577,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor executeSameContext(e.plan) } - checkAnalysis(newSubqueryPlan) - // If the subquery plan is fully resolved, pull the outer references and record // them as children of SubqueryExpression. if (newSubqueryPlan.resolved) { @@ -2601,19 +2598,19 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor */ private def resolveSubQueries(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = { plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), ruleId) { - case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.analyzed => + case s @ ScalarSubquery(sub, _, exprId, _, _, _) if !sub.resolved => resolveSubQuery(s, outer)(ScalarSubquery(_, _, exprId)) - case e @ Exists(sub, _, exprId, _, _) if !sub.analyzed => + case e @ Exists(sub, _, exprId, _, _) if !sub.resolved => resolveSubQuery(e, outer)(Exists(_, _, exprId)) - case InSubquery(values, l @ ListQuery(sub, _, exprId, _, _, _)) - if values.forall(_.resolved) && !sub.analyzed => + case InSubquery(values, l @ ListQuery(_, _, exprId, _, _, _)) + if values.forall(_.resolved) && !l.resolved => val expr = resolveSubQuery(l, outer)((plan, exprs) => { ListQuery(plan, exprs, exprId, plan.output.length) }) InSubquery(values, expr.asInstanceOf[ListQuery]) - case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.analyzed => + case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved => resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId)) - case a: FunctionTableSubqueryArgumentExpression if !a.plan.analyzed => + case a: FunctionTableSubqueryArgumentExpression if !a.plan.resolved => resolveSubQuery(a, outer)( (plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala new file mode 100644 index 0000000000000..e50aa8c3a3f73 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Exists, ScalaUDF} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, MergeIntoTable, ReplaceData, UpdateAction} +import org.apache.spark.sql.catalyst.trees.TreePattern +import org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTable +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ResolveEncodersInUDFSuite extends AnalysisTest { + test("SPARK-48921: ScalaUDF encoders in subquery should be resolved for MergeInto") { + val table = new InMemoryRowLevelOperationTable("table", + StructType(StructField("a", IntegerType) :: + StructField("b", DoubleType) :: + StructField("c", StringType) :: Nil), + Array.empty, + new java.util.HashMap[String, String]() + ) + val relation = DataSourceV2Relation(table, + Seq(AttributeReference("a", IntegerType)(), + AttributeReference("b", DoubleType)(), + AttributeReference("c", StringType)()), + None, + None, + CaseInsensitiveStringMap.empty() + ) + + + val string = relation.output(2) + val udf = ScalaUDF((_: String) => "x", StringType, string :: Nil, + Option(ExpressionEncoder[String]()) :: Nil) + + val mergeIntoSource = + relation + .where($"c" === udf) + .select($"a", $"b") + .limit(1) + val cond = mergeIntoSource.output(0) == relation.output(0) && + mergeIntoSource.output(1) == relation.output(1) + + val mergePlan = MergeIntoTable( + relation, + mergeIntoSource, + cond, + Seq(UpdateAction(None, + Seq(Assignment(relation.output(0), relation.output(0)), + Assignment(relation.output(1), relation.output(1)), + Assignment(relation.output(2), relation.output(2))))), + Seq.empty, + Seq.empty, + withSchemaEvolution = false) + + val replaceData = mergePlan.analyze.asInstanceOf[ReplaceData] + + val existsPlans = replaceData.groupFilterCondition.map(_.collect { + case e: Exists => + e.plan.collect { + case f: Filter if f.containsPattern(TreePattern.SCALA_UDF) => f + } + }.flatten) + + assert(existsPlans.isDefined) + + val udfs = existsPlans.get.map(_.expressions.flatMap(e => e.collect { + case s: ScalaUDF => + assert(s.inputEncoders.nonEmpty) + val encoder = s.inputEncoders.head + assert(encoder.isDefined) + assert(encoder.get.objDeserializer.resolved) + + s + })).flatten + assert(udfs.size == 1) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala index c97d025c3caf1..4e17f4624f7e0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Exists, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, Literal, OuterReference, ScalarSubquery, ScalaUDF, UnresolvedNamedLambdaVariable} +import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, Expression, GetStructField, InSubquery, LambdaFunction, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, UnresolvedNamedLambdaVariable} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType} +import org.apache.spark.sql.types.{ArrayType, IntegerType} /** * Unit tests for [[ResolveSubquery]]. @@ -300,58 +299,4 @@ class ResolveSubquerySuite extends AnalysisTest { "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.HIGHER_ORDER_FUNCTION", expectedMessageParameters = Map.empty[String, String]) } - - val testRelation = LocalRelation($"a".int, $"b".double) - val testRelation2 = LocalRelation($"c".int, $"d".string) - - test("SPARK-48921: ScalaUDF in subquery should run through analyzer") { - val integer = testRelation2.output(0) - val udf = ScalaUDF((_: Int) => "x", StringType, integer :: Nil, - Option(ExpressionEncoder[Int]()) :: Nil) - - var subPlan = - testRelation2 - .where($"d" === udf) - .select(Literal(1)).analyze - - // Manually remove the resolved encoder from the UDF. - // This simulates the case where the UDF is resolved during analysis - // but its encoders are not resolved. - subPlan = subPlan.transformUp { - case f: Filter => - f.copy(condition = f.condition transform { - case s: ScalaUDF => - val newUDF = s.copy(inputEncoders = Option(ExpressionEncoder[Int]()) :: Nil) - assert(newUDF.resolved) - newUDF - }) - } - - val existsSubquery = - testRelation - .where(Exists(subPlan)) - .select($"a").analyze - assert(existsSubquery.resolved) - - val existPlan = existsSubquery.collect { - case f: Filter => - f.condition.collect { - case e: Exists => - e.plan.collect { - case f: Filter => f - } - } - }.flatten.flatten.head - - val udfs = existPlan.expressions.flatMap(e => e.collect { - case s: ScalaUDF => - assert(s.inputEncoders.nonEmpty) - val encoder = s.inputEncoders.head - assert(encoder.isDefined) - assert(encoder.get.objDeserializer.resolved) - - s - }) - assert(udfs.size == 1) - } } From 544828d95cec1de73db2ba3a0157f65ad8df19bc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Jul 2024 18:42:59 -0700 Subject: [PATCH 4/7] For review --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 65a3c8af8d07c..0dd550a25fa42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -325,10 +325,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor new ResolveIdentifierClause(earlyBatches) :: ResolveUnion :: ResolveRowLevelCommandAssignments :: - ResolveEncodersInUDF :: - RewriteDeleteFromTable :: - RewriteUpdateTable :: - RewriteMergeIntoTable :: MoveParameterizedQueriesDown :: BindParameters :: typeCoercionRules() ++ @@ -353,6 +349,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // operators may need null handling as well, so we should run these two rules repeatedly. HandleNullInputsForUDF, UpdateAttributeNullability), + Batch("UDF", Once, + ResolveEncodersInUDF), + Batch("DML rewrite", fixedPoint, + RewriteDeleteFromTable :: + RewriteUpdateTable :: + RewriteMergeIntoTable :: Nil), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, From fd17bd4b86fcfc6ccd85e499d61ef58013b6c26f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Jul 2024 19:27:21 -0700 Subject: [PATCH 5/7] Add comment --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0dd550a25fa42..dc8759a14214d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -351,6 +351,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor UpdateAttributeNullability), Batch("UDF", Once, ResolveEncodersInUDF), + // The rewrite rules might move resolved query plan into subquery. Once the resolved plan + // contains ScalaUDF, their encoders won't be resolved if `ResolveEncodersInUDF` is not + // applied before the rewrite rules. So we need to apply `ResolveEncodersInUDF` before the + // rewrite rules. Batch("DML rewrite", fixedPoint, RewriteDeleteFromTable :: RewriteUpdateTable :: From 0722454fac96bcae16ceed1f5b3801505a982e9a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Jul 2024 22:26:13 -0700 Subject: [PATCH 6/7] Fix --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dc8759a14214d..43b5732bc0337 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -356,9 +356,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // applied before the rewrite rules. So we need to apply `ResolveEncodersInUDF` before the // rewrite rules. Batch("DML rewrite", fixedPoint, - RewriteDeleteFromTable :: - RewriteUpdateTable :: - RewriteMergeIntoTable :: Nil), + RewriteDeleteFromTable, + RewriteUpdateTable, + RewriteMergeIntoTable), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, From c93cac5bab306dd1049b689903824f6ea677f191 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 18 Jul 2024 23:20:30 +0800 Subject: [PATCH 7/7] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 43b5732bc0337..1b194da5ab0a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -357,8 +357,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // rewrite rules. Batch("DML rewrite", fixedPoint, RewriteDeleteFromTable, - RewriteUpdateTable, - RewriteMergeIntoTable), + RewriteUpdateTable, + RewriteMergeIntoTable), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint,