diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4f9cb22ee0057..eb32bfcecae7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -665,7 +665,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Repartition(numPartitions, shuffle, child) => if (shuffle) { ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), - planLater(child), canChangeNumPartitions = false) :: Nil + planLater(child), noUserSpecifiedNumPartition = false) :: Nil } else { execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } @@ -698,9 +698,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => - val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumParts) :: Nil + r.partitioning, + planLater(r.child), + noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 45fb36420e770..7bb9265e1717a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -140,9 +140,9 @@ object OptimizeLocalShuffleReader { def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { case s: ShuffleQueryStageExec => - s.shuffle.canChangeNumPartitions + s.shuffle.canChangeNumPartitions && s.mapStats.isDefined case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => - s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty + s.shuffle.canChangeNumPartitions && s.mapStats.isDefined && partitionSpecs.nonEmpty case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 83fdafbadcb60..ed92af6adc186 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -92,9 +92,9 @@ object ShufflePartitionsUtil extends Logging { var coalescedSize = 0L var i = 0 - def createPartitionSpec(): Unit = { + def createPartitionSpec(forceCreate: Boolean = false): Unit = { // Skip empty inputs, as it is a waste to launch an empty task. - if (coalescedSize > 0) { + if (coalescedSize > 0 || forceCreate) { partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i) } } @@ -120,7 +120,8 @@ object ShufflePartitionsUtil extends Logging { } i += 1 } - createPartitionSpec() + // Create at least one partition if all partitions are empty. + createPartitionSpec(partitionSpecs.isEmpty) partitionSpecs.toSeq } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 30c9f0ae1282d..6af4b098bee2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -83,7 +83,12 @@ trait ShuffleExchangeLike extends Exchange { case class ShuffleExchangeExec( override val outputPartitioning: Partitioning, child: SparkPlan, - canChangeNumPartitions: Boolean = true) extends ShuffleExchangeLike { + noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike { + + // If users specify the num partitions via APIs like `repartition`, we shouldn't change it. + // For `SinglePartition`, it requires exactly one partition and we can't change it either. + override def canChangeNumPartitions: Boolean = + noUserSpecifiedNumPartition && outputPartitioning != SinglePartition private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index f5c3b7816f5ea..94e22a414a628 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -198,9 +198,11 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { // the size of data is 0. val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) + // Create at least one partition spec + val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) checkEstimation( Array(bytesByPartitionId1, bytesByPartitionId2), - Seq.empty, targetSize, minNumPartitions) + expectedPartitionSpecs, targetSize, minNumPartitions) } @@ -248,16 +250,19 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { val minNumPartitions = 2 { - // 1 shuffle: All bytes per partition are 0, no partition spec created. + // 1 shuffle: All bytes per partition are 0, 1 empty partition spec created. val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0) - checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize) + val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) + checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize) } { - // 2 shuffles: All bytes per partition are 0, no partition spec created. + // 2 shuffles: All bytes per partition are 0, 1 empty partition spec created. val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) - checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize) + val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) + checkEstimation( + Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionSpecs, targetSize) } { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 03471fb047260..7fdcbd0d089cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -213,7 +213,7 @@ class AdaptiveQueryExecSuite } } - test("Empty stage coalesced to 0-partition RDD") { + test("Empty stage coalesced to 1-partition RDD") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") { @@ -227,8 +227,8 @@ class AdaptiveQueryExecSuite val coalescedReaders = collect(plan) { case r: CustomShuffleReaderExec => r } - assert(coalescedReaders.length == 2) - coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty)) + assert(coalescedReaders.length == 3) + coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1)) } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { @@ -239,8 +239,8 @@ class AdaptiveQueryExecSuite val coalescedReaders = collect(plan) { case r: CustomShuffleReaderExec => r } - assert(coalescedReaders.length == 2, s"$plan") - coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty)) + assert(coalescedReaders.length == 3, s"$plan") + coalescedReaders.foreach(r => assert(r.isLocalReader || r.partitionSpecs.length == 1)) } } }