Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

type TaskLocality = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
private val executorsByHost = new HashMap[String, HashSet[String]]
protected val executorsByHost = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

private val executorIdToHost = new HashMap[String, String]
protected val executorIdToHost = new HashMap[String, String]

// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null
Expand Down Expand Up @@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl(

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
Expand All @@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl(
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
Expand Down
109 changes: 60 additions & 49 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[spark] class TaskSetManager(
private val numFailures = new Array[Int](numTasks)
// key is taskId, value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0

Expand Down Expand Up @@ -179,26 +180,17 @@ private[spark] class TaskSetManager(
}
}

var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
}
if (sched.hasExecutorsAliveOnHost(loc.host)) {
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
if(sched.hasHostAliveOnRack(rack)){
hadAliveLocations = true
}
}
}

if (!hadAliveLocations) {
// Even though the task might've had preferred locations, all of those hosts or executors
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}

Expand Down Expand Up @@ -239,7 +231,6 @@ private[spark] class TaskSetManager(
*/
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size

while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
Expand Down Expand Up @@ -288,12 +279,12 @@ private[spark] class TaskSetManager(
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)

if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
if (executors.contains(execId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment above this, remove "preference-less tasks" since we've moved them below

speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
Expand All @@ -310,6 +301,17 @@ private[spark] class TaskSetManager(
}
}

// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment above this line to say // Check for no-pref tasks

for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}

// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
Expand Down Expand Up @@ -341,20 +343,27 @@ private[spark] class TaskSetManager(
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}

if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
Expand All @@ -363,25 +372,27 @@ private[spark] class TaskSetManager(
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}

// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
// find a speculative task if all others tasks have been scheduled
findSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
def resourceOffer(
execId: String,
Expand All @@ -392,9 +403,14 @@ private[spark] class TaskSetManager(
if (!isZombie) {
val curTime = clock.getTime()

var allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
var allowedLocality = maxLocality

if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}

findTask(execId, host, allowedLocality) match {
Expand All @@ -410,8 +426,11 @@ private[spark] class TaskSetManager(
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
val startTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
Expand Down Expand Up @@ -639,8 +658,7 @@ private[spark] class TaskSetManager(
override def executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)

// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
// task that used to have locations on only this host might now go to the no-prefs list. Note
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because findTaskFromList will skip already-running tasks.
for (index <- getPendingTasksForExecutor(execId)) {
Expand Down Expand Up @@ -671,6 +689,9 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
}
// recalculate valid locality levels and waits when executor is lost
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}

/**
Expand Down Expand Up @@ -722,17 +743,17 @@ private[spark] class TaskSetManager(
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
case _ => 0L
}
}

/**
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
* added to queues using addPendingTask.
*
*/
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
Expand All @@ -742,6 +763,9 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this stuff deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

en, because in current code, we only contain real "NO_PREF" tasks in the pendingTasksWithNoPrefs, instead of also containing tasks whose preferences are not available, see changes here: https://github.com/apache/spark/pull/1313/files#diff-bad3987c83bd22d46416d3dd9d208e76L199, so we don't recompute pendingTasksWithNoPrefs every time when the new executor appears

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, makes sense.

}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
Expand All @@ -751,20 +775,7 @@ private[spark] class TaskSetManager(
levels.toArray
}

// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
(sched.getRackForHost(loc.host).isDefined &&
sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
return true
}
}
false
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}
Expand Down
Loading