Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 82 additions & 162 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private[spark] class TaskSetManager(
// same time for a barrier stage.
private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier

// Set of pending tasks for each executor. These collections are actually
// Set of pending tasks for various levels of locality: executor, host, rack,
// noPrefs and anyPrefs. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
Expand All @@ -143,25 +144,16 @@ private[spark] class TaskSetManager(
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of this comment should be moved to the class PendingTasksByLocality, as its shared for the regular & speculative versions. Maybe just a brief comment here "Store tasks waiting to be scheduled by locality preferences" / "Store speculatable tasks by locality preferences"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
// but at host level.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move these comments into PendingTasksByLocality ? I think it would always be good to keep original comments if there're no problems of them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
private[scheduler] val pendingTasks = new PendingTasksByLocality()

// Set of pending tasks for each rack -- similar to the above.
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
private val allPendingTasks = new ArrayBuffer[Int]

// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with this comment ? We could just append new comment after it rather than remove it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// The HashSet here ensures that we do not add duplicate speculative tasks
private[scheduler] val speculatableTasks = new HashSet[Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, now, this is only used to guard against submitting duplicate speculatable tasks, right ? But it used to dequeue tasks by traversing on it for several locality level. And this why it slows down the job when it is large.

But from the comment above, I think we have a default agreement previously that speculatable tasks should always be less in most cases(even for large taskset). And this maybe the reason why we don't classify it by location pref as non-speculatable tasks do.

So, just out of curiosity, my question is, about how many speculatable tasks spawn out of the stage(e.g. 100000 partitions) ? And(assuming the cluster is healthy), why there's so many speculatable tasks ? Do you have any specific configs ? I'm wondering that current speculation mechanism may have something wrong if nothing's abnormal in this case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 that is a valid question indeed. This change was reasoned about due to some jobs we saw on the cluster in which speculation was heavily kicking in due to the fact that the cluster during those points in time was healthy but was really busy. As I have documented the findings in https://issues.apache.org/jira/browse/SPARK-26755, you can see that the task-result-getter threads were more than often in BLOCKED state and job progress was slow.
The HashSet speculatableTasks was still holding a small fraction of the tasks(at most 10 to 15%) but every call to dequeueSpeculativeTask would end up looping through each task 5 different times in the worst case thus, leading to unnecessary traversing and thread blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've given some good evidence, its easy to see how anything O(numSpeculatableTasks) on every resource offer would lead to really poor performance. But it would be great if you could share the size of the taskset & the numSpeculatableTasks (probably easiest if you add a logline in checkSpeculatableTasks here, just for your internal repro: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1071)

I think this hasn't been handled before just because speculative execution & large tasks just hasn't been looked at that closely before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I ran a Join query on a dataset of size 10TB without this change and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that was noted was close to 7900-8000 at a point. That is when we start seeing the bottleneck on the scheduler lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pgandhi999 Thanks for doing the experiment and posting the details. I think they're really helpful for further digs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Ngone51 once again for your valuable reviews and suggestions. Really appreciate it!


// Set of pending tasks marked as speculative for various levels of locality: executor, host,
// rack, noPrefs and anyPrefs
private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the speculationEnabled is false, we can even don't do the instance creation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were not doing this earlier, so not sure whether we should do it now.


// Task index, start and finish time for each task attempt (indexed by task ID)
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]

Expand Down Expand Up @@ -197,11 +189,11 @@ private[spark] class TaskSetManager(
}
// Resolve the rack for each host. This can be slow, so de-dupe the list of hosts,
// and assign the rack to all relevant task indices.
val (hosts, indicesForHosts) = pendingTasksForHost.toSeq.unzip
val (hosts, indicesForHosts) = pendingTasks.forHost.toSeq.unzip
val racks = sched.getRacksForHosts(hosts)
racks.zip(indicesForHosts).foreach {
case (Some(rack), indices) =>
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices
pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices
case (None, _) => // no rack, nothing to do
}
}
Expand Down Expand Up @@ -234,63 +226,43 @@ private[spark] class TaskSetManager(
/** Add a task to all the pending-task lists that it should be on. */
private[spark] def addPendingTask(
index: Int,
resolveRacks: Boolean = true): Unit = {
resolveRacks: Boolean = true,
speculative: Boolean = false): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following on comment above, this should be called "speculatable"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks
// ... mostly the original code from `addPendingTask` here, just adding
// into pendingTaskSetToAddTo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't leave this is as comment in the code -- the comments should make sense for somebody reading the code without having to look at history to figure out what its referring to.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed the comment

for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index

if (resolveRacks) {
sched.getRackForHost(loc.host).foreach { rack =>
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
}

if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
pendingTaskSetToAddTo.noPrefs += index
}

allPendingTasks += index // No point scanning this whole list to find the old task there
}

/**
* Return the pending tasks list for a given executor ID, or an empty list if
* there is no map entry for that host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move these three methods and comment into PendingTasksByLocality, then there will be fewer code changes from the original dequeueTask to dequeueTaskHelper.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we really need those three helper methods anymore, might lead to unnecessary code duplication.

*/
private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
}

/**
* Return the pending tasks list for a given host, or an empty list if
* there is no map entry for that host
*/
private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}

/**
* Return the pending rack-local task list for a given rack, or an empty list if
* there is no map entry for that rack
*/
private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
pendingTasksForRack.getOrElse(rack, ArrayBuffer())
pendingTaskSetToAddTo.anyPrefs += index
}

/**
Expand All @@ -302,15 +274,17 @@ private[spark] class TaskSetManager(
private def dequeueTaskFromList(
execId: String,
host: String,
list: ArrayBuffer[Int]): Option[Int] = {
list: ArrayBuffer[Int],
speculative: Boolean = false): Option[Int] = {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
!(speculative && hasAttemptOnHost(index, host))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent the continuation of the condition

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
if ((copiesRunning(index) == 0 || speculative) && !successful(index)) {
return Some(index)
}
}
Expand All @@ -330,128 +304,64 @@ private[spark] class TaskSetManager(
}
}

/**
* Return a speculative task for a given executor if any are available. The task should not have
* an attempt running on this host, in case the host is slow. In addition, the task should meet
* the given locality constraint.
*/
// Labeled as protected to allow tests to override providing speculative tasks if necessary
protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set

def canRunOnHost(index: Int): Boolean = {
!hasAttemptOnHost(index, host) &&
!isTaskBlacklistedOnExecOrNode(index, execId, host)
}

if (!speculatableTasks.isEmpty) {
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_ match {
case e: ExecutorCacheTaskLocation => Some(e.executorId)
case _ => None
})
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}

// Check for node-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations.map(_.host)
if (locations.contains(host)) {
speculatableTasks -= index
return Some((index, TaskLocality.NODE_LOCAL))
}
}
}

// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}

// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
if (racks.contains(rack)) {
speculatableTasks -= index
return Some((index, TaskLocality.RACK_LOCAL))
}
}
}
}

// Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
speculatableTasks -= index
return Some((index, TaskLocality.ANY))
}
}
}

None
}

/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
: Option[(Int, TaskLocality.Value, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: if you're gonna touch this, I think it should switch to the multiline-method-def style, even if its just the return type that spills over:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// if we didn't schedule a regular task, try to schedule a speculative one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-duper nit: if you're going to move this comment up here, before either of the calls to dequeueTaskHelper, then don't use past tense for one of them, eg. "If we don't schedule a regular task, try to schedule a speculative one".

(or just delete the comment completely)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, have modified the comment.

dequeueTaskHelper(execId, host, maxLocality, false).orElse(
dequeueTaskHelper(execId, host, maxLocality, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do speculatableTasks -= index when this returns Some(index) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed it.

}

private def dequeueTaskHelper(
execId: String,
host: String,
maxLocality: TaskLocality.Value,
speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent method params

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (speculative && speculatableTasks.isEmpty) {
return None
}
val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
dequeueTaskFromList(execId, host, list, speculative)
}

dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.NODE_LOCAL, speculative))
}
}

// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
return Some((index, TaskLocality.NO_PREF, speculative))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change to NO_PREF ? Doesn't this changed original behavior ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is a very good point. I traced this back to to https://issues.apache.org/jira/browse/SPARK-2294 / 63bdb1f / #1313 (loooooooots of comments).

Changing this would affect the currentLocalityIndex here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L505-L507

and also will make its way into the UI related changes here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L69

(and here, but this will probably be fine: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L608)

I am not sure whether the old behavior is right or not, but in any case I think we should not change that as part of this change, and should consider updating it separately.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see, have updated the code patch, let me know if it still looks logically incorrect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Thanks for attaching those links. They're useful to figure out what's the right behavior. But before it, remaining the old behavior is fine.

}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
return Some((index, TaskLocality.RACK_LOCAL, speculative))
}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
dequeue(pendingTaskSetToUse.anyPrefs).foreach { index =>
return Some((index, TaskLocality.ANY, speculative))
}
}

// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
None
}

/**
Expand Down Expand Up @@ -616,10 +526,10 @@ private[spark] class TaskSetManager(

while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
Expand Down Expand Up @@ -686,13 +596,13 @@ private[spark] class TaskSetManager(
// from each list, we may need to go deeper in the list. We poll from the end because
// failed tasks are put back at the end of allPendingTasks, so we're more likely to find
// an unschedulable task this way.
val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
val indexOffset = pendingTasks.anyPrefs.lastIndexWhere { indexInTaskSet =>
copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
}
if (indexOffset == -1) {
None
} else {
Some(allPendingTasks(indexOffset))
Some(pendingTasks.anyPrefs(indexOffset))
}
}

Expand Down Expand Up @@ -1064,7 +974,8 @@ private[spark] class TaskSetManager(
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
!speculatableTasks.contains(index)) {
addPendingTask(index, speculative = true)
logInfo(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking a bit more about my comment that you should change this logline ... if you have a ton of speculative tasks, isn't this horribly verbose? I am wondering if it should be dropped to logDebug. But then I guess we lose all info about speculatable assignments, which would be a bummer. But at the very least, since this comment is here, its probably worth also throwing in the total number of speculatable tasks? "... ran more than 100 ms (145 speculatable tasks in this taskset now)"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the comment.

Expand Down Expand Up @@ -1100,19 +1011,19 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
if (!pendingTasks.forExecutor.isEmpty &&
pendingTasks.forExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
if (!pendingTasks.forHost.isEmpty &&
pendingTasks.forHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
if (!pendingTasks.noPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
if (!pendingTasks.forRack.isEmpty &&
pendingTasks.forRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
Expand All @@ -1137,3 +1048,12 @@ private[spark] object TaskSetManager {
// this.
val TASK_SIZE_TO_WARN_KIB = 1000
}

private[scheduler] class PendingTasksByLocality {

val forExecutor: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val forHost: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val noPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int]
val forRack: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val anyPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The declaration seems a little redundant. Maybe, we can remove the explicit type declaration ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little confused that we have noPrefs & anyPrefs concurrently. Maybe, use original all instead of anyPrefs ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
Loading