Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
Expand Down Expand Up @@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap
}

val hostToExecutors = executors.groupBy(_.split(":")(0))
val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
// Set the initial value to 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
if (scheduledExecutors.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
val preferredLocations =
scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLocality}
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
Expand Down Expand Up @@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}

test("SPARK-11063: TaskSetManager should use Receiver RDD's preferredLocations") {
// Use ManualClock to prevent from starting batches so that we can make sure the only task is
// for starting the Receiver
val _conf = conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc =>
@volatile var receiverTaskLocality: TaskLocality = null
ssc.sparkContext.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
receiverTaskLocality = taskStart.taskInfo.taskLocality
}
})
val input = ssc.receiverStream(new TestReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
// If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will it be if it is not set correctly? That is, without the fix above, what would locality be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's TaskLocality.ANY without this patch.

}
}
}
}

/** An input DStream with for testing rate controlling */
Expand Down