Skip to content

Commit 5a3d846

Browse files
committed
SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner
1 parent 4c722d7 commit 5a3d846

3 files changed

Lines changed: 23 additions & 1 deletion

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10551055
/** Build the union of a list of RDDs. */
10561056
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
10571057
val partitioners = rdds.flatMap(_.partitioner).toSet
1058-
if (partitioners.size == 1) {
1058+
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
10591059
new PartitionerAwareUnionRDD(this, rdds)
10601060
} else {
10611061
new UnionRDD(this, rdds)

core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
6060
var rdds: Seq[RDD[T]]
6161
) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
6262
require(rdds.length > 0)
63+
require(rdds.forall(_.partitioner.isDefined))
6364
require(rdds.flatMap(_.partitioner).toSet.size == 1,
6465
"Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
6566

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
9999
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
100100
}
101101

102+
test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
103+
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
104+
val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
105+
val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
106+
assert(unionRdd.isInstanceOf[UnionRDD[_]])
107+
}
108+
109+
test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") {
110+
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
111+
val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
112+
assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
113+
}
114+
115+
test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") {
116+
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
117+
val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
118+
intercept[IllegalArgumentException] {
119+
new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner))
120+
}
121+
}
122+
102123
test("partitioner aware union") {
103124
def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
104125
sc.makeRDD(seq, 1)

0 commit comments

Comments
 (0)