Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ package object config {
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")

private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
.doc("The timeout in seconds to wait to acquire a new executor and schedule a task " +
"before aborting a TaskSet which is unschedulable because of being completely blacklisted.")
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v >= 0, "The value should be a non negative time value.")
.createWithDefault(120)

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killExecutor(exec: String, msg: String): Unit = {
allocationClient match {
case Some(a) =>
logInfo(msg)
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
case None =>
logInfo(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
killExecutor(exec,
s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
}
}

private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = {
killExecutor(exec,
s"Killing blacklisted idle executor id $exec because of task unschedulability and trying " +
"to acquire a new executor.")
}

private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -117,6 +117,11 @@ private[spark] class TaskSchedulerImpl(

protected val executorIdToHost = new HashMap[String, String]

private val abortTimer = new Timer(true)
private val clock = new SystemClock
// Exposed for testing
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]

// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null

Expand Down Expand Up @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}

if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match {
case Some(taskIndex) => // Returns the taskIndex which was unschedulable

// If the taskSet is unschedulable we try to find an existing idle blacklisted
// executor. If we cannot find one, we abort immediately. Else we kill the idle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried that the idle condition will be too strict in some scenarios, if there is a large backlog of tasks from another taskset, or whatever the error is, the tasks take a while to fail (eg., you've really got a bad executor, but its not apparent till after network timeouts or something). Eg. that could happen if you're doing a big join, and while preparing the input on the map-side, one side just has one straggler left but the other side still has a big backlog of tasks. Or, in a jobserver style situation, and there are always other tasksets coming in.

that said, I don't have any better ideas at the moment, and I still think this is an improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By clearing the abort timer as soon as a task is launched we are relaxing this situation.
If there are large backlog of tasks:

  • If we acquire new executors or launch new tasks we will defer the check
  • If we cannot acquire new executors and we are running with long running tasks such that no new tasks can be launched and we have less no. of executors compared to max failures, in that case this will end up being harsh. This can happen, but seems more like a very specific edge case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true -- if there is no idle executor here, you abort the taskset immediately, you're not starting any timer, from this case lower down: case _ => // Abort Immediately.

I think to do what you described, you would instead need to do something different in that case, like start the same abortTimer, and also set a flag needToKillIdleExecutor and then on every call to resourceOffer, check that flag and potentially find an executor to kill. (However I haven't totally thought through that, not sure if it would really work. again, I'm not saying this has to be addressed now, just thinking this through)

// executor and kick off an abortTimer which if it doesn't schedule a task within the
// the timeout will abort the taskSet if we were unable to schedule any task from the
// taskSet.
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
// task basis.
// Note 2: The taskSet can still be aborted when there are more than one idle
// blacklisted executors and dynamic allocation is on. This can happen when a killed
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
// timer to expire and abort the taskSet.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isExecutorBusy is going to probe the same hashmap again, you could just do executorIdToRunningTaskIds.find(_._2.isEmpty)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was preferring the code to be more readable. As this isn't a frequently running scenario, may be we could keep it. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I thought the name executorIdToRunningTaskIds makes the other version clear enough, but I don't really feel strongly.

case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}
case _ => // Abort Immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really minor, I think its a bit more clear if you say case None here (otherwise I take a just a second to figure out what other patterns will fall under this catch-all)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will update it.

logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
case _ => // Do nothing if no tasks completely blacklisted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this case if instead above you do

taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen this style earlier in the code base. Is this a norm (just curious)? I read a few scenarios where this would be better. However, personally every time I read a foreach, its instinctive to think the entity on which its being invoked as an iterable rather than an option, so it feels a bit odd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't matter a ton, I think its just a scala-ism it takes a while to get used to. my rough guidline is: use pattern-matching if you're doing something distinct in both the Some and None cases, or if you can make use of more complex patterns to avoid more nesting (eg. case Some(x) if x.isFoo() =>). If you're only doing something in the Some branch, then generally prefer map, foreach, filter, etc.

My reason for wanting it here is that when I look at this code, I needed to scroll back to figure out what you were even matching on here and make sure you weren't ignoring something important. When I see the match up above, I assume something is going to happen in both branches. OTOH if there was a foreach, when I see the foreach I know right away you're ignoring None.

again this is really minor, I don't actually care that much, just explaining my thinking.

}
} else {
// We want to defer killing any taskSets as long as we have a non blacklisted executor
// which can be used to schedule a task from any active taskSets. This ensures that the
// job can make progress and if we encounter a flawed taskSet it will eventually either
// fail or abort due to being completely blacklisted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should say here that you may have a job wait indefinitely, if its effectively blacklisted the entire cluster, but other jobs keep coming in and keeping resources occupied so the cluster stays busy. So its not really accurate to say that it will be aborted eventually, we are actually not guaranteeing that (if I understood things correctly).

Since its folded now lemme reference the prior discussion on this: #22288 (comment)

Want to make sure I understand this part, and why you aren't only clearing the timer for the taskset you just scheduled a task for. If you have multiple tasksets running simultaneously, one is making progress but the other is totally blacklisted, I guess you do not want to kill anything, because that would mess with the taskset that is working correctly? Instead you'll just let the taskset which is totally blacklisted eventually fail from the timeout? I guess that makes sense, because if one taskset is progressing, it means the failing taskset probably is probably flawed, not the executors.

If that's right, would be good to include something along those lines in the comment (personally I don't find a comment about how its related to the timer that useful, that's obvious from the code).

dhruve 7 days ago Contributor
That is correct. It also covers other scenario that @tgravescs originally pointed out.

Lets say if you have multiple taskSets running which are completely blacklisted. If you were able to get an executor, you would just clear the timer for that specific taskSet. Now due to resource constraint, if you weren't able to obtain another executor within the timeout for the other taskSet, you would abort the other taskSet when you could actually wait for it to be scheduled on the newly obtained executor.

So clearing the timer for all the taskSets ensures that currently we aren't in a completely blacklisted state and should try to run to completion. However if the taskset itself is flawed, we would eventually fail. This could result in wasted effort, but we don't have a way to determine that yet, so this should be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your understanding is correct. I will update the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs since we've been back and forth on the discussion of the cases here, just want to make sure you're aware of the possibility for waiting indefinitely here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, but if I'm reading the discussion properly, I don't think you will actually wait indefinitely. Eventually you will either abort immediately or you should fail due to max number of task failures. Let me know if I'm missing something from the scenario.

Lets say you have taskset1 that is blacklisted on all nodes (lets say we have 3). 3 cases can happen at this point:

  • taskset 2 hasn't started, so it tries to kill an executor and starts the timer.
  • taskset 2 has started, if its running on all nodes then we abort immediately because no executors to kill to kill
  • taskset 2 has started but its not running on all blacklisted nodes, then we will kill an executor

At this point lets say we didn't abort so we killed an executor. Taskset 1 will get a chance to run on the new executor and either work or have a task failure. If it has a task failure and it gets blacklisted, we go back into the case above. But the # of task failures gets one closer.

so it seems like eventually you would either abort immediately if there aren't any executors to kill or you would eventually fail with max number of task attempts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the scenario I'm worried about:

  1. taskset1 and taskset2 are both running currently. taskset1 has enough failures to get blacklisted everywhere.
  2. there is an idle executor, even though taskset2 is running (eg. the executor that is available doesn't meet the locality preferences of taskset2). So abort timer is started.
  3. the idle executor is killed, and you get a new one.
  4. just by luck, taskset2 gets a hold of the new idle executor (eg. the executor is on a node blacklisted by taskset1, or taskset2 just has a higher priority). abort timer is cleared
  5. taskset2 finishes, but meanwhile taskset3 has been launched, and it uses the idle executor. etc. for taskSetN, so you keep launching tasks, abort timer gets cleared, but nothing even gets scheduled on taskset1.

admittedly this would not be the normal scenario -- you'll need more tasksets to keep coming, and you need tight enough resource constraints that taskset1 never get a hold of anything, even the new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, yeah it seems like it would have to be very timing dependent that taskset1 never got a chance for that executor, really that would just be a normal indefinite postponement problem in the scheduler regardless of blacklisting. I don't think with fifo its a problem as first taskset should always be first. With Fair scheduler perhaps it could but probably depends on much more specific scenario.

I guess I'm ok with this if you are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so its a bit worse than regular starvation from having competing tasksets, as in this case you might actually have resources available on your cluster, but you never ask for them, because the executor allocation manager thinks you have enough based on the number of pending tasks.

In any case, I agree this is a stretch, and overall its an improvement, so I'm OK with it.

if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
"recently scheduled.")
unschedulableTaskSetToExpiryTime.clear()
}
}

if (launchedAnyTask && taskSet.isBarrier) {
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
Expand Down Expand Up @@ -453,6 +503,23 @@ private[spark] class TaskSchedulerImpl(
return tasks
}

private def createUnschedulableTaskSetAbortTimer(
taskSet: TaskSetManager,
taskIndex: Int): TimerTask = {
new TimerTask() {
override def run() {
if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) {
logInfo("Cannot schedule any task because of complete blacklisting. " +
s"Wait time for scheduling expired. Aborting $taskSet.")
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
} else {
this.cancel()
}
}
}
}

/**
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
* overriding in tests, so it can be deterministic.
Expand Down Expand Up @@ -590,6 +657,7 @@ private[spark] class TaskSchedulerImpl(
barrierCoordinator.stop()
}
starvationTimer.cancel()
abortTimer.cancel()
}

override def defaultParallelism(): Int = backend.defaultParallelism()
Expand Down
41 changes: 23 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
* blacklist. The most common scenario would be if there are fewer executors than
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
* will hang.
* spark.task.maxFailures. We need to detect this so we can avoid the job from being hung.
* We try to acquire new executor/s by killing an existing idle blacklisted executor.
*
* There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
* would add extra time to each iteration of the scheduling loop. Here, we take the approach of
Expand All @@ -635,9 +635,9 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks one unscheduled task, and then iterates through each
* executor until it finds one that the task isn't blacklisted on).
*/
private[scheduler] def abortIfCompletelyBlacklisted(
hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
private[scheduler] def getCompletelyBlacklistedTaskIfAny(
hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist =>
val appBlacklist = blacklistTracker.get
// Only look for unschedulable tasks when at least one executor has registered. Otherwise,
// task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
Expand All @@ -658,11 +658,11 @@ private[spark] class TaskSetManager(
}
}

pendingTask.foreach { indexInTaskSet =>
pendingTask.find { indexInTaskSet =>
// try to find some executor this task can run on. Its possible that some *other*
// task isn't schedulable anywhere, but we will discover that in some later call,
// when that unschedulable task is the last task remaining.
val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
hostToExecutors.forall { case (host, execsOnHost) =>
// Check if the task can run on the node
val nodeBlacklisted =
appBlacklist.isNodeBlacklisted(host) ||
Expand All @@ -679,22 +679,27 @@ private[spark] class TaskSetManager(
}
}
}
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}
} else {
None
}
}
}

private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): Unit = {
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
val partition = tasks(indexInTaskSet).partitionId
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}

/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = true)
}

// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
// doesn't hang
// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, we try
// to acquire a new executor and if we aren't able to get one, the job doesn't hang and we abort
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
"spark.testing.nCoresPerExecutor" -> "1"
"spark.testing.nCoresPerExecutor" -> "1",
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
)
) {
def runBackend(): Unit = {
Expand Down
Loading