-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19820] [core] Allow reason to be specified for task kill #17166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
91b8aef
a58d391
02d81b5
170fa34
614ad3c
72b28cb
fda712d
8f7ffb3
348e97a
f5069f7
884a3ad
203a900
6e8593b
5707715
a37c09b
71b41b3
3ec3633
145c78a
8c4381f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2249,6 +2249,25 @@ class SparkContext(config: SparkConf) extends Logging { | |
| dagScheduler.cancelStage(stageId, None) | ||
| } | ||
|
|
||
| /** | ||
| * Kill a given task. It will be retried. | ||
| * | ||
| * @param taskId the task ID to kill | ||
| */ | ||
| def killTask(taskId: Long): Unit = { | ||
| killTask(taskId, "cancelled") | ||
| } | ||
|
|
||
| /** | ||
| * Kill a given task. It will be retried. | ||
| * | ||
| * @param taskId the task ID to kill | ||
| * @param reason the reason for killing the task, which should be a short string | ||
| */ | ||
| def killTask(taskId: Long, reason: String): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm i don't think we should automatically retry just by providing a reason. Perhaps this
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing for the lower level dag scheduler api.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it turns out there's not a good reason to not retry. The task will get retried anyways eventually unless the stage is cancelled. The previous code seems to be just a performance optimization to not call reviveOffers for speculative task completions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok. for some reason i read it as killTask(long) is kill without retry, and killTask(long, string) is with.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about calling it "killAndRescheduleTask" then? Otherwise kill is a little misleading -- since where we use it elsewhere (to kill a stage) it implies no retry
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am unclear about the expectations from the api.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given Mridul's points maybe killTaskAttempt is a better name? IMO specifying "attempt" in the name makes it sound less permanent than killTask (which to me sounds like it won't be retried)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The current task attempt (which is uniquely identifier by the task id). I updated the docs as suggested here.
Went with killTaskAttempt.
For now, you can look at the Spark UI, find the task ID, and call killTaskAttempt on it. It would be nice to have this as a button on the executor page in a follow-up. You can also have a listener that kills tasks as suggested. |
||
| dagScheduler.killTask(taskId, reason) | ||
| } | ||
|
|
||
| /** | ||
| * Clean a closure to make it ready to serialized and send to tasks | ||
| * (removes unreferenced variables in $outer's, updates REPL variables) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,9 @@ sealed trait TaskFailedReason extends TaskEndReason { | |
| * on was killed. | ||
| */ | ||
| def countTowardsTaskFailures: Boolean = true | ||
|
|
||
| /** Whether this task should be retried by the scheduler. */ | ||
| def shouldRetry: Boolean = false | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -212,8 +215,8 @@ case object TaskResultLost extends TaskFailedReason { | |
| * Task was killed intentionally and needs to be rescheduled. | ||
| */ | ||
| @DeveloperApi | ||
| case object TaskKilled extends TaskFailedReason { | ||
| override def toErrorString: String = "TaskKilled (killed intentionally)" | ||
| case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason { | ||
|
||
| override def toErrorString: String = s"TaskKilled ($reason)" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this was part of DeveloperApi, what is the impact of making this change ? If it does introduce backward incompatible changes, is there a way to mitigate this ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unfortunately not backwards compatible. I've looked into this, and the issue seems to be that we are converting a case object into a case class. If
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is unfortunate, but looks like it cant be helped if we need this feature. Thx for clarifying. |
||
| override def countTowardsTaskFailures: Boolean = false | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -158,7 +158,8 @@ private[spark] class Executor( | |
| threadPool.execute(tr) | ||
| } | ||
|
|
||
| def killTask(taskId: Long, interruptThread: Boolean): Unit = { | ||
| def killTask( | ||
|
||
| taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { | ||
| val taskRunner = runningTasks.get(taskId) | ||
| if (taskRunner != null) { | ||
| if (taskReaperEnabled) { | ||
|
|
@@ -168,7 +169,9 @@ private[spark] class Executor( | |
| case Some(existingReaper) => interruptThread && !existingReaper.interruptThread | ||
| } | ||
| if (shouldCreateReaper) { | ||
| val taskReaper = new TaskReaper(taskRunner, interruptThread = interruptThread) | ||
| val taskReaper = new TaskReaper( | ||
| taskRunner, interruptThread = interruptThread, reason = reason, | ||
| shouldRetry = shouldRetry) | ||
| taskReaperForTask(taskId) = taskReaper | ||
| Some(taskReaper) | ||
| } else { | ||
|
|
@@ -178,7 +181,8 @@ private[spark] class Executor( | |
| // Execute the TaskReaper from outside of the synchronized block. | ||
| maybeNewTaskReaper.foreach(taskReaperPool.execute) | ||
| } else { | ||
| taskRunner.kill(interruptThread = interruptThread) | ||
| taskRunner.kill( | ||
| interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -189,8 +193,9 @@ private[spark] class Executor( | |
| * tasks instead of taking the JVM down. | ||
| * @param interruptThread whether to interrupt the task thread | ||
| */ | ||
| def killAllTasks(interruptThread: Boolean) : Unit = { | ||
| runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) | ||
| def killAllTasks(interruptThread: Boolean, reason: String) : Unit = { | ||
| runningTasks.keys().asScala.foreach(t => | ||
| killTask(t, interruptThread = interruptThread, reason = reason, shouldRetry = false)) | ||
| } | ||
|
|
||
| def stop(): Unit = { | ||
|
|
@@ -220,6 +225,12 @@ private[spark] class Executor( | |
| /** Whether this task has been killed. */ | ||
| @volatile private var killed = false | ||
|
|
||
| /** The reason this task was killed. */ | ||
| @volatile private var killReason: String = null | ||
|
|
||
| /** Whether to retry this killed task. */ | ||
| @volatile private var retryIfKilled: Boolean = false | ||
|
|
||
| @volatile private var threadId: Long = -1 | ||
|
|
||
| def getThreadId: Long = threadId | ||
|
|
@@ -239,8 +250,10 @@ private[spark] class Executor( | |
| */ | ||
| @volatile var task: Task[Any] = _ | ||
|
|
||
| def kill(interruptThread: Boolean): Unit = { | ||
| logInfo(s"Executor is trying to kill $taskName (TID $taskId)") | ||
| def kill(interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { | ||
| logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") | ||
| retryIfKilled = shouldRetry | ||
| killReason = reason | ||
| killed = true | ||
| if (task != null) { | ||
| synchronized { | ||
|
|
@@ -427,14 +440,17 @@ private[spark] class Executor( | |
| execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
|
||
| case _: TaskKilledException => | ||
| logInfo(s"Executor killed $taskName (TID $taskId)") | ||
| logInfo(s"Executor killed $taskName (TID $taskId), reason: $killReason") | ||
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) | ||
| execBackend.statusUpdate( | ||
| taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) | ||
|
|
||
| case _: InterruptedException if task.killed => | ||
| logInfo(s"Executor interrupted and killed $taskName (TID $taskId)") | ||
| logInfo( | ||
| s"Executor interrupted and preempted $taskName (TID $taskId), reason: $killReason") | ||
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) | ||
| execBackend.statusUpdate( | ||
| taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) | ||
|
|
||
| case CausedBy(cDE: CommitDeniedException) => | ||
| val reason = cDE.toTaskFailedReason | ||
|
|
@@ -512,7 +528,9 @@ private[spark] class Executor( | |
| */ | ||
| private class TaskReaper( | ||
| taskRunner: TaskRunner, | ||
| val interruptThread: Boolean) | ||
| val interruptThread: Boolean, | ||
| val reason: String, | ||
| val shouldRetry: Boolean) | ||
| extends Runnable { | ||
|
|
||
| private[this] val taskId: Long = taskRunner.taskId | ||
|
|
@@ -533,7 +551,8 @@ private[spark] class Executor( | |
| // Only attempt to kill the task once. If interruptThread = false then a second kill | ||
| // attempt would be a no-op and if interruptThread = true then it may not be safe or | ||
| // effective to interrupt multiple times: | ||
| taskRunner.kill(interruptThread = interruptThread) | ||
| taskRunner.kill( | ||
| interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) | ||
| // Monitor the killed task until it exits. The synchronization logic here is complicated | ||
| // because we don't want to synchronize on the taskRunner while possibly taking a thread | ||
| // dump, but we also need to be careful to avoid races between checking whether the task | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -731,6 +731,13 @@ class DAGScheduler( | |
| eventProcessLoop.post(StageCancelled(stageId, reason)) | ||
| } | ||
|
|
||
| /** | ||
| * Kill a given task. It will be retried. | ||
| */ | ||
| def killTask(taskId: Long, reason: String): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar to the public api, we should separate retry from reason... |
||
| taskScheduler.killTask(taskId, interruptThread = true, reason, shouldRetry = true) | ||
| } | ||
|
|
||
| /** | ||
| * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since | ||
| * the last fetch failure. | ||
|
|
@@ -1345,7 +1352,7 @@ class DAGScheduler( | |
| case TaskResultLost => | ||
| // Do nothing here; the TaskScheduler handles these failures and resubmits the task. | ||
|
|
||
| case _: ExecutorLostFailure | TaskKilled | UnknownReason => | ||
| case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => | ||
| // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler | ||
| // will abort the job. | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,8 +30,20 @@ private[spark] trait SchedulerBackend { | |
| def reviveOffers(): Unit | ||
| def defaultParallelism(): Int | ||
|
|
||
| def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = | ||
| /** | ||
| * Requests that an executor kills a running task. | ||
| * | ||
| * @param taskId Id of the task. | ||
| * @param executorId Id of the executor the task is running on. | ||
| * @param interruptThread Whether the executor should interrupt the task thread. | ||
| * @param reason The reason for the task kill. | ||
| * @param shouldRetry Whether the scheduler should retry the task. | ||
| */ | ||
| def killTask( | ||
| taskId: Long, executorId: String, interruptThread: Boolean, reason: String, | ||
|
||
| shouldRetry: Boolean): Unit = | ||
| throw new UnsupportedOperationException | ||
|
|
||
| def isReady(): Boolean = true | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,9 @@ private[spark] trait TaskScheduler { | |
| // Cancel a stage. | ||
| def cancelTasks(stageId: Int, interruptThread: Boolean): Unit | ||
|
|
||
| // Kill a task. | ||
|
||
| def killTask(taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit | ||
|
|
||
| // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. | ||
| def setDAGScheduler(dagScheduler: DAGScheduler): Unit | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -239,14 +239,23 @@ private[spark] class TaskSchedulerImpl private[scheduler]( | |
| // simply abort the stage. | ||
| tsm.runningTasksSet.foreach { tid => | ||
| val execId = taskIdToExecutorId(tid) | ||
| backend.killTask(tid, execId, interruptThread) | ||
| backend.killTask( | ||
| tid, execId, interruptThread, reason = "stage cancelled", shouldRetry = false) | ||
| } | ||
| tsm.abort("Stage %s cancelled".format(stageId)) | ||
| logInfo("Stage %d was cancelled".format(stageId)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def killTask( | ||
| taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { | ||
| logInfo(s"Killing task ($reason): $taskId") | ||
|
||
| val execId = taskIdToExecutorId.getOrElse( | ||
| taskId, throw new IllegalArgumentException("Task not found: " + taskId)) | ||
|
||
| backend.killTask(taskId, execId, interruptThread, reason, shouldRetry) | ||
| } | ||
|
|
||
| /** | ||
| * Called to indicate that all task attempts (including speculated tasks) associated with the | ||
| * given TaskSetManager have completed, so state associated with the TaskSetManager should be | ||
|
|
@@ -467,7 +476,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( | |
| taskState: TaskState, | ||
| reason: TaskFailedReason): Unit = synchronized { | ||
| taskSetManager.handleFailedTask(tid, taskState, reason) | ||
| if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { | ||
| if (!taskSetManager.isZombie && reason.shouldRetry) { | ||
| // Need to revive offers again now that the task set manager state has been updated to | ||
| // reflect failed tasks that need to be re-run. | ||
| backend.reviveOffers() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,9 @@ private[spark] object CoarseGrainedClusterMessages { | |
| // Driver to executors | ||
| case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage | ||
|
|
||
| case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) | ||
| case class KillTask( | ||
|
||
| taskId: Long, executor: String, interruptThread: Boolean, reason: String, | ||
| shouldRetry: Boolean) | ||
| extends CoarseGrainedClusterMessage | ||
|
|
||
| case class KillExecutorsOnHost(host: String) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"killed by user via SparkContext.killTask"? These things can be hard to debug when they're unexpected and "cancelled" isn't very helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why not make this a default argument below and then have just one method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only issue here is that the UI is not great at rendering long strings (it tends to cut them off). I'd prefer to keep it something concise for now.