Skip to content

Commit 93f89f9

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6587] Support Generic Skew Join Patterns (#1281)
* [CARMEL-6587] Support Generic Skew Join Patterns * fix ut * fix ut * fix ut
1 parent 81d76f6 commit 93f89f9

6 files changed

Lines changed: 906 additions & 69 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,15 @@ object SQLConf {
697697
.booleanConf
698698
.createWithDefault(true)
699699

700+
val GENERIC_SKEW_JOIN_ENABLED =
701+
buildConf("spark.sql.adaptive.genericSkewJoin.enabled")
702+
.doc(s"When true and '${SKEW_JOIN_ENABLED.key}' is true, Spark dynamically " +
703+
"handles skew in sort-merge join by splitting (and replicating if needed) skewed " +
704+
"partitions.")
705+
.version("3.0.0")
706+
.booleanConf
707+
.createWithDefault(false)
708+
700709
val SKEW_BROADCAST_HASH_JOIN_ENABLED =
701710
buildConf("spark.sql.adaptive.broadcastHashJoinSkew.enabled")
702711
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark dynamically " +
@@ -712,7 +721,7 @@ object SQLConf {
712721
"handles skew in broadcast hash join leveraging local shuffle read.")
713722
.version("3.0.0")
714723
.booleanConf
715-
.createWithDefault(false)
724+
.createWithDefault(true)
716725

717726
val ALLOW_ADDITIONAL_SHUFFLE =
718727
buildConf("spark.sql.adaptive.allowAdditionalShuffle")

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ case class AdaptiveSparkPlanExec(
9999
DisableUnnecessaryBucketedScan,
100100
AdjustScanPartitionSizeDynamically,
101101
OptimizeSkewedJoin, // ensureRequirements
102+
OptimizeGenericSkewedJoin,
102103
EliminateSkewOptimzeIntroducedShuffle,
103104
removeRedundantSorts
104105
) ++ context.session.sessionState.queryStagePrepRules

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,21 @@ case class CustomShuffleReaderExec private(
146146
driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong)
147147

148148
if (hasSkewedPartition) {
149-
val skewedMetric = metrics("numSkewedPartitions")
150-
val numSkewedPartitions = partitionSpecs.collect {
149+
val skewedSpecs = partitionSpecs.collect {
151150
case p: PartialReducerPartitionSpec => p.reducerIndex
152-
}.distinct.length
153-
skewedMetric.set(numSkewedPartitions)
154-
driverAccumUpdates += (skewedMetric.id -> numSkewedPartitions.toLong)
151+
}
152+
153+
val skewedPartitions = metrics("numSkewedPartitions")
154+
val skewedSplits = metrics("numSkewedSplits")
155+
156+
val numSkewedPartitions = skewedSpecs.distinct.length
157+
val numSplits = skewedSpecs.length
158+
159+
skewedPartitions.set(numSkewedPartitions)
160+
driverAccumUpdates += (skewedPartitions.id -> numSkewedPartitions)
161+
162+
skewedSplits.set(numSplits)
163+
driverAccumUpdates += (skewedSplits.id -> numSplits)
155164
}
156165

157166
partitionDataSizes.foreach { dataSizes =>
@@ -178,7 +187,9 @@ case class CustomShuffleReaderExec private(
178187
} ++ {
179188
if (hasSkewedPartition) {
180189
Map("numSkewedPartitions" ->
181-
SQLMetrics.createMetric(sparkContext, "number of skewed partitions"))
190+
SQLMetrics.createMetric(sparkContext, "number of skewed partitions"),
191+
"numSkewedSplits" ->
192+
SQLMetrics.createMetric(sparkContext, "number of skewed partition splits"))
182193
} else {
183194
Map.empty
184195
}

0 commit comments

Comments
 (0)