Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
.map(_.outputPartitioning.numPartitions)
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
// Here we pick the max number of partitions among these non-shuffle children as the
// expected number of shuffle partitions. However, if it's smaller than
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
// expected number of shuffle partitions.
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
// Here we pick the max number of partitions among these non-shuffle children.
nonShuffleChildrenNumPartitions.max
} else {
// Here we pick the max number of partitions among these non-shuffle children as the
// expected number of shuffle partitions. However, if it's smaller than
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
// expected number of shuffle partitions.
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
}
} else {
childrenNumPartitions.max
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,4 +843,32 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {

val testSpec1 = BucketedTableTestSpec(
Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))),
numPartitions = 1,
expectedShuffle = false,
expectedSort = false)
val testSpec2 = BucketedTableTestSpec(
Some(BucketSpec(6, Seq("i", "j"), Seq("i", "j"))),
numPartitions = 1,
expectedShuffle = true,
expectedSort = true)
Seq(false, true).foreach { enableAdaptive =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") {
Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs =>
testBucketing(
bucketedTableTestSpecLeft = specs._1,
bucketedTableTestSpecRight = specs._2,
joinCondition = joinCondition(Seq("i", "j")))
}
}
}
}
}
}