Skip to content

Commit bf3c060

Browse files
jasonmoore2ksrowen
authored andcommitted
[SPARK-14915][CORE] Don't re-queue a task if another attempt has already succeeded
Don't re-queue a task if another attempt has already succeeded. This currently happens when a speculative task is denied from committing the result due to another copy of the task already having succeeded. I'm running a job which has a fair bit of skew in the processing time across the tasks for speculation to trigger in the last quarter (default settings), causing many commit denied exceptions to be thrown. Previously, these tasks were then being retried over and over again until the stage possibly completes (despite using compute resources on these superfluous tasks). With this change (applied to the 1.6 branch), they no longer retry and the stage completes successfully without these extra task attempts. Author: Jason Moore <jasonmoore2k@outlook.com> Closes apache#12751 from jasonmoore2k/SPARK-14915. (cherry picked from commit 77361a4) Signed-off-by: Sean Owen <sowen@cloudera.com>
1 parent 2db19a3 commit bf3c060

1 file changed

Lines changed: 19 additions & 1 deletion

File tree

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,25 @@ private[spark] class TaskSetManager(
720720
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
721721
put(info.executorId, clock.getTimeMillis())
722722
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
723-
addPendingTask(index)
723+
724+
if (successful(index)) {
725+
logInfo(
726+
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
727+
"but another instance of the task has already succeeded, " +
728+
"so not re-queuing the task to be re-executed.")
729+
} else {
730+
addPendingTask(index)
731+
}
732+
733+
if (successful(index)) {
734+
logInfo(
735+
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
736+
"but another instance of the task has already succeeded, " +
737+
"so not re-queuing the task to be re-executed.")
738+
} else {
739+
addPendingTask(index)
740+
}
741+
724742
if (!isZombie && state != TaskState.KILLED
725743
&& reason.isInstanceOf[TaskFailedReason]
726744
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {

0 commit comments

Comments
 (0)