Skip to content

Commit e59df34

Browse files
committed
Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number
1 parent a410658 commit e59df34

2 files changed

Lines changed: 40 additions & 5 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
8585
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
8686
.map(_.outputPartitioning.numPartitions)
8787
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
88-
// Here we pick the max number of partitions among these non-shuffle children as the
89-
// expected number of shuffle partitions. However, if it's smaller than
90-
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
91-
// expected number of shuffle partitions.
92-
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
88+
if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
89+
// Here we pick the max number of partitions among these non-shuffle children.
90+
nonShuffleChildrenNumPartitions.max
91+
} else {
92+
// Here we pick the max number of partitions among these non-shuffle children as the
93+
// expected number of shuffle partitions. However, if it's smaller than
94+
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
95+
// expected number of shuffle partitions.
96+
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
97+
}
9398
} else {
9499
childrenNumPartitions.max
95100
}

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
875875
}
876876
}
877877

878+
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
879+
withSQLConf(
880+
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
881+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
882+
883+
val testSpec1 = BucketedTableTestSpec(
884+
Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))),
885+
numPartitions = 1,
886+
expectedShuffle = false,
887+
expectedSort = false,
888+
expectedNumOutputPartitions = Some(8))
889+
val testSpec2 = BucketedTableTestSpec(
890+
Some(BucketSpec(6, Seq("i", "j"), Seq("i", "j"))),
891+
numPartitions = 1,
892+
expectedShuffle = true,
893+
expectedSort = true,
894+
expectedNumOutputPartitions = Some(8))
895+
Seq(false, true).foreach { enableAdaptive =>
896+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") {
897+
Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs =>
898+
testBucketing(
899+
bucketedTableTestSpecLeft = specs._1,
900+
bucketedTableTestSpecRight = specs._2,
901+
joinCondition = joinCondition(Seq("i", "j")))
902+
}
903+
}
904+
}
905+
}
906+
}
907+
878908
test("bucket coalescing eliminates shuffle") {
879909
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
880910
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.

0 commit comments

Comments
 (0)