Skip to content

Commit 77d046e

Browse files
megasergsrowen
authored andcommitted
[SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2
## Problem When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized, due to using fixed seed to initialize PRNG, and using the PRNG only once. See details in https://issues.apache.org/jira/browse/SPARK-21782 ## What changes were proposed in this pull request? Instead of directly using `0, 1, 2,...` seeds to initialize `Random`, hash them with `scala.util.hashing.byteswap32()`. ## How was this patch tested? `build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite test` Author: Sergey Serebryakov <sserebryakov@tesla.com> Closes #18990 from megaserg/repartition-skew.
1 parent 28a6cca commit 77d046e

2 files changed

Lines changed: 6 additions & 3 deletions

File tree

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.io.Codec
2525
import scala.language.implicitConversions
2626
import scala.reflect.{classTag, ClassTag}
27+
import scala.util.hashing
2728

2829
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
2930
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag](
448449
if (shuffle) {
449450
/** Distributes elements evenly across output partitions, starting from a random partition. */
450451
val distributePartition = (index: Int, items: Iterator[T]) => {
451-
var position = (new Random(index)).nextInt(numPartitions)
452+
var position = (new Random(hashing.byteswap32(index))).nextInt(numPartitions)
452453
items.map { t =>
453454
// Note that the hash code of the key will just be the key itself. The HashPartitioner
454455
// will mod it with the number of total partitions.

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
347347
val partitions = repartitioned.glom().collect()
348348
// assert all elements are present
349349
assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
350-
// assert no bucket is overloaded
350+
// assert no bucket is overloaded or empty
351351
for (partition <- partitions) {
352352
val avg = input.size / finalPartitions
353353
val maxPossible = avg + initialPartitions
354-
assert(partition.length <= maxPossible)
354+
assert(partition.length <= maxPossible)
355+
assert(!partition.isEmpty)
355356
}
356357
}
357358

358359
testSplitPartitions(Array.fill(100)(1), 10, 20)
359360
testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
361+
testSplitPartitions(Array.fill(1000)(1), 250, 128)
360362
}
361363

362364
test("coalesced RDDs") {

0 commit comments

Comments
 (0)