Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 80 additions & 188 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,14 @@ private[spark] class TaskSetManager(
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of this comment should be moved to the class PendingTasksByLocality, as its shared for the regular & speculative versions. Maybe just a brief comment here "Store tasks waiting to be scheduled by locality preferences" / "Store speculatable tasks by locality preferences"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private[scheduler] val pendingTasks = PendingTasksByLocality(
forExecutor = new HashMap[String, ArrayBuffer[Int]],
forHost = new HashMap[String, ArrayBuffer[Int]],
noPrefs = new ArrayBuffer[Int],
forRack = new HashMap[String, ArrayBuffer[Int]],
anyPrefs = new ArrayBuffer[Int])
private[scheduler] val pendingTasks = new PendingTasksByLocality()

// The HashSet here ensures that we do not add duplicate speculative tasks
private[scheduler] val speculatableTasks = new HashSet[Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, now, this is only used to guard against submitting duplicate speculatable tasks, right ? But it used to dequeue tasks by traversing on it for several locality level. And this why it slows down the job when it is large.

But from the comment above, I think we have a default agreement previously that speculatable tasks should always be less in most cases(even for large taskset). And this maybe the reason why we don't classify it by location pref as non-speculatable tasks do.

So, just out of curiosity, my question is, about how many speculatable tasks spawn out of the stage(e.g. 100000 partitions) ? And(assuming the cluster is healthy), why there's so many speculatable tasks ? Do you have any specific configs ? I'm wondering that current speculation mechanism may have something wrong if nothing's abnormal in this case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 that is a valid question indeed. This change was reasoned about due to some jobs we saw on the cluster in which speculation was heavily kicking in due to the fact that the cluster during those points in time was healthy but was really busy. As I have documented the findings in https://issues.apache.org/jira/browse/SPARK-26755, you can see that the task-result-getter threads were more than often in BLOCKED state and job progress was slow.
The HashSet speculatableTasks was still holding a small fraction of the tasks(at most 10 to 15%) but every call to dequeueSpeculativeTask would end up looping through each task 5 different times in the worst case thus, leading to unnecessary traversing and thread blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've given some good evidence, its easy to see how anything O(numSpeculatableTasks) on every resource offer would lead to really poor performance. But it would be great if you could share the size of the taskset & the numSpeculatableTasks (probably easiest if you add a logline in checkSpeculatableTasks here, just for your internal repro: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1071)

I think this hasn't been handled before just because speculative execution & large tasks just hasn't been looked at that closely before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I ran a Join query on a dataset of size 10TB without this change and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that was noted was close to 7900-8000 at a point. That is when we start seeing the bottleneck on the scheduler lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pgandhi999 Thanks for doing the experiment and posting the details. I think they're really helpful for further digs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Ngone51 once again for your valuable reviews and suggestions. Really appreciate it!


// Set of pending tasks marked as speculative for various levels of locality: executor, host,
// rack, noPrefs and anyPrefs
private[scheduler] val pendingSpeculatableTasks = PendingTasksByLocality(
forExecutor = new HashMap[String, ArrayBuffer[Int]],
forHost = new HashMap[String, ArrayBuffer[Int]],
noPrefs = new ArrayBuffer[Int],
forRack = new HashMap[String, ArrayBuffer[Int]],
anyPrefs = new ArrayBuffer[Int])
private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the speculationEnabled is false, we can even don't do the instance creation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were not doing this earlier, so not sure whether we should do it now.


// Task index, start and finish time for each task attempt (indexed by task ID)
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
Expand Down Expand Up @@ -238,84 +228,41 @@ private[spark] class TaskSetManager(
index: Int,
resolveRacks: Boolean = true,
speculative: Boolean = false): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following on comment above, this should be called "speculatable"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (speculative) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingSpeculatableTasks.forExecutor.getOrElseUpdate(
e.executorId, new ArrayBuffer) += index
case _ =>
}
pendingSpeculatableTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingSpeculatableTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}

if (tasks(index).preferredLocations == Nil) {
pendingSpeculatableTasks.noPrefs += index
}

pendingSpeculatableTasks.anyPrefs += index
} else {

for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasks.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasks.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index

if (resolveRacks) {
sched.getRackForHost(loc.host).foreach { rack =>
pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks
// ... mostly the original code from `addPendingTask` here, just adding
// into pendingTaskSetToAddTo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't leave this is as comment in the code -- the comments should make sense for somebody reading the code without having to look at history to figure out what its referring to.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed the comment

for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
}
case _ =>
}
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index

if (tasks(index).preferredLocations == Nil) {
pendingTasks.noPrefs += index
if (resolveRacks) {
sched.getRackForHost(loc.host).foreach { rack =>
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}

pendingTasks.anyPrefs += index
}
}

/**
* Return the pending tasks list for a given executor ID, or an empty list if
* there is no map entry for that host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move these three methods and comment into PendingTasksByLocality, then there will be fewer code changes from the original dequeueTask to dequeueTaskHelper.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we really need those three helper methods anymore, might lead to unnecessary code duplication.

*/
private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
pendingTasks.forExecutor.getOrElse(executorId, ArrayBuffer())
}

/**
* Return the pending tasks list for a given host, or an empty list if
* there is no map entry for that host
*/
private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
pendingTasks.forHost.getOrElse(host, ArrayBuffer())
}
if (tasks(index).preferredLocations == Nil) {
pendingTaskSetToAddTo.noPrefs += index
}

/**
* Return the pending rack-local task list for a given rack, or an empty list if
* there is no map entry for that rack
*/
private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
pendingTasks.forRack.getOrElse(rack, ArrayBuffer())
pendingTaskSetToAddTo.anyPrefs += index
}

/**
Expand All @@ -329,30 +276,16 @@ private[spark] class TaskSetManager(
host: String,
list: ArrayBuffer[Int],
speculative: Boolean = false): Option[Int] = {
if (speculative) {
if (!list.isEmpty) {
for (index <- list) {
if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
!hasAttemptOnHost(index, host)) {
// This should almost always be list.trimEnd(1) to remove tail
list -= index
if (!successful(index)) {
return Some(index)
}
}
}
}
} else {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
!(speculative && hasAttemptOnHost(index, host))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent the continuation of the condition

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if ((copiesRunning(index) == 0 || speculative) && !successful(index)) {
return Some(index)
}
}
}
Expand All @@ -371,107 +304,64 @@ private[spark] class TaskSetManager(
}
}

/**
* Return a speculative task for a given executor if any are available. The task should not have
* an attempt running on this host, in case the host is slow. In addition, the task should meet
* the given locality constraint.
*/
// Labeled as protected to allow tests to override providing speculative tasks if necessary
protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- dequeueTaskFromList(
execId, host, pendingSpeculatableTasks.forExecutor.getOrElse(execId, ArrayBuffer()),
speculative = true)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

// Check for node-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(
execId, host, pendingSpeculatableTasks.forHost.getOrElse(host, ArrayBuffer()),
speculative = true)) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}

// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- dequeueTaskFromList(
execId, host, pendingSpeculatableTasks.noPrefs, speculative = true)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}

// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(
execId, host, pendingSpeculatableTasks.forRack.getOrElse(rack, ArrayBuffer()),
speculative = true)
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}

// Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, pendingSpeculatableTasks.anyPrefs,
speculative = true)) {
return Some((index, TaskLocality.ANY))
}
}

None
}

/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
: Option[(Int, TaskLocality.Value, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: if you're gonna touch this, I think it should switch to the multiline-method-def style, even if its just the return type that spills over:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// if we didn't schedule a regular task, try to schedule a speculative one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-duper nit: if you're going to move this comment up here, before either of the calls to dequeueTaskHelper, then don't use past tense for one of them, eg. "If we don't schedule a regular task, try to schedule a speculative one".

(or just delete the comment completely)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, have modified the comment.

dequeueTaskHelper(execId, host, maxLocality, false).orElse(
dequeueTaskHelper(execId, host, maxLocality, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do speculatableTasks -= index when this returns Some(index) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed it.

}

private def dequeueTaskHelper(
execId: String,
host: String,
maxLocality: TaskLocality.Value,
speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent method params

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (speculative && speculatableTasks.isEmpty) {
return None
}
val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
dequeueTaskFromList(execId, host, list, speculative)
}

dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.NODE_LOCAL, speculative))
}
}

// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasks.noPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
return Some((index, TaskLocality.NO_PREF, speculative))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change to NO_PREF ? Doesn't this changed original behavior ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is a very good point. I traced this back to to https://issues.apache.org/jira/browse/SPARK-2294 / 63bdb1f / #1313 (loooooooots of comments).

Changing this would affect the currentLocalityIndex here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L505-L507

and also will make its way into the UI related changes here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L69

(and here, but this will probably be fine: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L608)

I am not sure whether the old behavior is right or not, but in any case I think we should not change that as part of this change, and should consider updating it separately.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see, have updated the code patch, let me know if it still looks logically incorrect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Thanks for attaching those links. They're useful to figure out what's the right behavior. But before it, remaining the old behavior is fine.

}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
return Some((index, TaskLocality.RACK_LOCAL, speculative))
}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, pendingTasks.anyPrefs)) {
return Some((index, TaskLocality.ANY, false))
dequeue(pendingTaskSetToUse.anyPrefs).foreach { index =>
return Some((index, TaskLocality.ANY, speculative))
}
}

// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
None
}

/**
Expand Down Expand Up @@ -1084,7 +974,7 @@ private[spark] class TaskSetManager(
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
!speculatableTasks.contains(index)) {
addPendingTask(index, speculative = true)
logInfo(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
Expand Down Expand Up @@ -1159,9 +1049,11 @@ private[spark] object TaskSetManager {
val TASK_SIZE_TO_WARN_KIB = 1000
}

case class PendingTasksByLocality(
forExecutor: HashMap[String, ArrayBuffer[Int]],
forHost: HashMap[String, ArrayBuffer[Int]],
noPrefs: ArrayBuffer[Int],
forRack: HashMap[String, ArrayBuffer[Int]],
anyPrefs: ArrayBuffer[Int])
private[scheduler] class PendingTasksByLocality {

val forExecutor: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val forHost: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val noPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int]
val forRack: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]]
val anyPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The declaration seems a little redundant. Maybe, we can remove the explicit type declaration ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little confused that we have noPrefs & anyPrefs concurrently. Maybe, use original all instead of anyPrefs ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
new TaskSetManager(mockTaskScheduler, taskSet, 4) {
private var hasDequeuedSpeculatedTask = false
override def dequeueSpeculativeTask(execId: String,
def dequeueSpeculativeTask(execId: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I hadn't noticed this before -- this change is wrong as now this whole setup isn't doing anything. To keep the same behavior as before, I think you need to change dequeueTaskHelper to be protected, and then override it here.

squito@366e916

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code. Sorry, I missed out that one earlier. Thanks @squito.

host: String,
locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
if (hasDequeuedSpeculatedTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
// killed, so the FakeTaskScheduler is only told about the successful completion
// of the speculated task.
assert(sched.endedTasks(3) === Success)
assert(sched.endedTasks(4) === Success)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this changes to 4 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have changed the previous HashSet to ArrayBuffer, so now the dequeue logic remains same but order of tasks stored in the ArrayBuffer will be different and as the dequeuing of tasks picks up the first possible task that satisfies all conditions, a different task will be dequeued now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

// also because the scheduler is a mock, our manager isn't notified about the task killed event,
// so we do that manually
manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test"))
Expand Down Expand Up @@ -1706,13 +1706,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskOption6 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
val task5 = taskOption5.get
assert(task5.index === 2)
assert(task5.index === 3)
assert(task5.taskId === 4)
assert(task5.executorId === "exec1")
assert(task5.attemptNumber === 1)
assert(taskOption6.isDefined)
val task6 = taskOption6.get
assert(task6.index === 3)
assert(task6.index === 2)
assert(task6.taskId === 5)
assert(task6.executorId === "exec1")
assert(task6.attemptNumber === 1)
Expand Down