Skip to content

Commit 3bb5ea3

Browse files
committed
Simplify RDD union
Deduplicate logic to determine if `PartitionerAwareUnionRDD` should be used instead of `UnionRDD`.
1 parent 9f5149f commit 3bb5ea3

File tree

3 files changed

+5
-20
lines changed

3 files changed

+5
-20
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -568,11 +568,7 @@ abstract class RDD[T: ClassTag](
568568
* times (use `.distinct()` to eliminate them).
569569
*/
570570
def union(other: RDD[T]): RDD[T] = withScope {
571-
if (partitioner.isDefined && other.partitioner == partitioner) {
572-
new PartitionerAwareUnionRDD(sc, Array(this, other))
573-
} else {
574-
new UnionRDD(sc, Array(this, other))
575-
}
571+
sc.union(this, other)
576572
}
577573

578574
/**

streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
2121
import scala.reflect.ClassTag
2222

2323
import org.apache.spark.SparkException
24-
import org.apache.spark.rdd.{RDD, UnionRDD, PartitionerAwareUnionRDD}
24+
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.streaming.{Duration, Time}
2626

2727
private[streaming]
@@ -45,11 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
4545
s" time $validTime")
4646
}
4747
if (rdds.nonEmpty) {
48-
if(rdds.forall(_.partitioner.isDefined) && rdds.flatMap(_.partitioner).toSet.size == 1) {
49-
Some(new PartitionerAwareUnionRDD(ssc.sc, rdds))
50-
} else {
51-
Some(new UnionRDD(ssc.sc, rdds))
52-
}
48+
Some(ssc.sc.union(rdds))
5349
} else {
5450
None
5551
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
1919

2020
import scala.reflect.ClassTag
2121

22-
import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
22+
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.storage.StorageLevel
2424
import org.apache.spark.streaming._
2525
import org.apache.spark.streaming.Duration
@@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag](
6363
override def compute(validTime: Time): Option[RDD[T]] = {
6464
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
6565
val rddsInWindow = parent.slice(currentWindow)
66-
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
67-
logDebug("Using partition aware union for windowing at " + validTime)
68-
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
69-
} else {
70-
logDebug("Using normal union for windowing at " + validTime)
71-
new UnionRDD(ssc.sc, rddsInWindow)
72-
}
73-
Some(windowRDD)
66+
Some(ssc.sc.union(rddsInWindow))
7467
}
7568
}

0 commit comments

Comments
 (0)