diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 574a67fecf936..7f5a5e3c2333c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -297,8 +297,10 @@ object QueryExecution { Seq( PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), - RemoveRedundantSorts(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), + // `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same + // number of partitions when instantiating PartitioningCollection. + RemoveRedundantSorts(sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ead8c00031112..062aa69b3adb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` - /** Specifies how data is partitioned across different nodes in the cluster. */ + /** + * Specifies how data is partitioned across different nodes in the cluster. + * Note this method may fail if it is invoked before `EnsureRequirements` is applied + * since `PartitioningCollection` requires all its partitionings to have + * the same number of partitions. + */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 4e73f06f36f1d..187827ca6005e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -90,8 +90,8 @@ case class AdaptiveSparkPlanExec( // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( - removeRedundantSorts, - ensureRequirements + ensureRequirements, + removeRedundantSorts ) ++ context.session.sessionState.queryStagePrepRules // A list of physical optimizer rules to be applied to a new stage before its execution. These diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 1978d22cb87bb..9cea81239f37d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -99,6 +101,29 @@ abstract class RemoveRedundantSortsSuiteBase } } } + + test("SPARK-33472: shuffled join with different left and right side partition numbers") { + withTempView("t1", "t2") { + spark.range(0, 100, 1, 2).select('id as "key").createOrReplaceTempView("t1") + (0 to 100).toDF("key").createOrReplaceTempView("t2") + + val query = """ + |SELECT /*+ MERGE(t1) */ t1.key + |FROM t1 JOIN t2 ON t1.key = t2.key + |WHERE t1.key > 10 AND t2.key < 50 + |ORDER BY t1.key ASC + """.stripMargin + + val df = sql(query) + val sparkPlan = df.queryExecution.sparkPlan + val join = sparkPlan.collect { case j: SortMergeJoinExec => j }.head + val leftPartitioning = join.left.outputPartitioning + assert(leftPartitioning.isInstanceOf[RangePartitioning]) + assert(leftPartitioning.numPartitions == 2) + assert(join.right.outputPartitioning == UnknownPartitioning(0)) + checkSorts(query, 3, 3) + } + } } class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase