Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private[spark] class TaskSetManager(
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
return Some((index, TaskLocality.NO_PREF, false))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat could you take a look at this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do tomorrow morning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a significant bug in the core. If so probably go into a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how is this related to the main bug we are considering in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related. I will move it in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm....it's a typo but would not take any effect

see discussions here: #3816 (only look at the comments after Dec 30, 2014 is enough)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't notice this PR. Thanks.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
Expand Down Expand Up @@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap
}

val hostToExecutors = executors.groupBy(_.split(":")(0))
val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
// Set the initial value to 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
if (scheduledExecutors.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
val preferredLocations =
scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLocality}
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
Expand Down Expand Up @@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}

test("SPARK-11063: TaskSetManager should use Receiver RDD's preferredLocations") {
// Use ManualClock to prevent from starting batches so that we can make sure the only task is
// for starting the Receiver
val _conf = conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc =>
@volatile var receiverTaskLocality: TaskLocality = null
ssc.sparkContext.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
receiverTaskLocality = taskStart.taskInfo.taskLocality
}
})
val input = ssc.receiverStream(new TestReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
// If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will it be if it is not set correctly? That is, without the fix above, what would locality be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's TaskLocality.ANY without this patch.

}
}
}
}

/** An input DStream with for testing rate controlling */
Expand Down