Skip to content

Commit 8bbfe24

Browse files
committed
remove the max splits cofig in skewed join
1 parent a5efbb2 commit 8bbfe24

File tree

3 files changed

+18
-29
lines changed

3 files changed

+18
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -447,14 +447,6 @@ object SQLConf {
447447
.intConf
448448
.createWithDefault(10)
449449

450-
val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS =
451-
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits")
452-
.doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" +
453-
"join.")
454-
.intConf
455-
.checkValue( _ >= 1, "The split size at least be 1")
456-
.createWithDefault(5)
457-
458450
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
459451
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
460452
.doc("The relation with a non-empty partition ratio lower than this config will not be " +

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
7575
private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = {
7676
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
7777
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
78-
val maxSplits = math.min(conf.getConf(
79-
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)
80-
val avgPartitionSize = mapPartitionSizes.sum / maxSplits
78+
val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
8179
val advisoryPartitionSize = math.max(avgPartitionSize,
8280
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
8381
val partitionStartIndices = ArrayBuffer[Int]()
@@ -95,9 +93,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
9593
i += 1
9694
}
9795

98-
if (partitionStartIndices.size > maxSplits) {
99-
partitionStartIndices.take(maxSplits).toArray
100-
} else partitionStartIndices.toArray
96+
partitionStartIndices.toArray
10197
}
10298

10399
private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = {

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
609609
withSQLConf(
610610
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
611611
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
612-
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
612+
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
613613
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
614614
withTempView("skewData1", "skewData2") {
615615
spark
@@ -636,42 +636,43 @@ class AdaptiveQueryExecSuite
636636
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
637637
// left stats: [3496, 0, 0, 0, 4014]
638638
// right stats:[6292, 0, 0, 0, 0]
639-
// Partition 0: both left and right sides are skewed, and divide into 5 splits, so
640-
// 5 x 5 sub-partitions.
639+
// Partition 0: both left and right sides are skewed, left side is divided
640+
// into 2 splits and right side is divided into 4 splits, so
641+
// 2 x 4 sub-partitions.
641642
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
642-
// Partition 4: only left side is skewed, and divide into 5 splits, so
643-
// 5 sub-partitions.
644-
// So total (25 + 1 + 5) partitions.
643+
// Partition 4: only left side is skewed, and divide into 3 splits, so
644+
// 3 sub-partitions.
645+
// So total (8 + 1 + 3) partitions.
645646
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
646-
checkSkewJoin(innerSmj, 25 + 1 + 5)
647+
checkSkewJoin(innerSmj, 8 + 1 + 3)
647648

648649
// skewed left outer join optimization
649650
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
650651
"SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
651652
// left stats: [3496, 0, 0, 0, 4014]
652653
// right stats:[6292, 0, 0, 0, 0]
653654
// Partition 0: both left and right sides are skewed, but left join can't split right side,
654-
// so only left side is divided into 5 splits, and thus 5 sub-partitions.
655+
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
655656
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
656-
// Partition 4: only left side is skewed, and divide into 5 splits, so
657-
// 5 sub-partitions.
658-
// So total (5 + 1 + 5) partitions.
657+
// Partition 4: only left side is skewed, and divide into 3 splits, so
658+
// 3 sub-partitions.
659+
// So total (2 + 1 + 3) partitions.
659660
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
660-
checkSkewJoin(leftSmj, 5 + 1 + 5)
661+
checkSkewJoin(leftSmj, 2 + 1 + 3)
661662

662663
// skewed right outer join optimization
663664
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
664665
"SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
665666
// left stats: [3496, 0, 0, 0, 4014]
666667
// right stats:[6292, 0, 0, 0, 0]
667668
// Partition 0: both left and right sides are skewed, but right join can't split left side,
668-
// so only right side is divided into 5 splits, and thus 5 sub-partitions.
669+
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
669670
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
670671
// Partition 4: only left side is skewed, but right join can't split left side, so just
671672
// 1 partition.
672-
// So total (5 + 1 + 1) partitions.
673+
// So total (4 + 1 + 1) partitions.
673674
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
674-
checkSkewJoin(rightSmj, 5 + 1 + 1)
675+
checkSkewJoin(rightSmj, 4 + 1 + 1)
675676
}
676677
}
677678
}

0 commit comments

Comments
 (0)