Skip to content

Commit be8bfe5

Browse files
author
jinxing
committed
[SPARK-19263] DAGScheduler should avoid sending conflicting task set.
1 parent b79cc7c commit be8bfe5

File tree

2 files changed

+77
-3
lines changed

2 files changed

+77
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,14 @@ class DAGScheduler(
11931193
}
11941194

11951195
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
1196-
markStageAsFinished(shuffleStage)
1196+
val noActiveTaskSetManager =
1197+
taskScheduler.rootPool == null ||
1198+
!taskScheduler.rootPool.getSortedTaskSetQueue.exists {
1199+
tsm => tsm.stageId == stageId && !tsm.isZombie
1200+
}
1201+
if (shuffleStage.isAvailable || noActiveTaskSetManager) {
1202+
markStageAsFinished(shuffleStage)
1203+
}
11971204
logInfo("looking for newly runnable stages")
11981205
logInfo("running: " + runningStages)
11991206
logInfo("waiting: " + waitingStages)
@@ -1218,7 +1225,9 @@ class DAGScheduler(
12181225
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
12191226
") because some of its tasks had failed: " +
12201227
shuffleStage.findMissingPartitions().mkString(", "))
1221-
submitStage(shuffleStage)
1228+
if (noActiveTaskSetManager) {
1229+
submitStage(shuffleStage)
1230+
}
12221231
} else {
12231232
// Mark any map-stage jobs waiting on this stage as finished
12241233
if (shuffleStage.mapStageJobs.nonEmpty) {

core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.rdd.RDD
3838
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
3939

4040
/**
41-
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
41+
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
4242
* TaskSetManagers.
4343
*
4444
* Test cases are configured by providing a set of jobs to submit, and then simulating interaction
@@ -648,4 +648,69 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
648648
}
649649
assertDataStructuresEmpty(noFailure = false)
650650
}
651+
652+
testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
653+
val a = new MockRDD(sc, 2, Nil)
654+
val b = shuffle(2, a)
655+
val shuffleId = b.shuffleDeps.head.shuffleId
656+
657+
def runBackend(): Unit = {
658+
val (taskDescription, task) = backend.beginTask()
659+
task.stageId match {
660+
// ShuffleMapTask
661+
case 0 =>
662+
val stageAttempt = task.stageAttemptId
663+
val partitionId = task.partitionId
664+
(stageAttempt, partitionId) match {
665+
case (0, 0) =>
666+
val fetchFailed = FetchFailed(
667+
DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
668+
backend.taskFailed(taskDescription, fetchFailed)
669+
case (0, 1) =>
670+
// Wait until stage resubmission caused by FetchFailed is finished.
671+
waitUntilConditionBecomeTrue(taskScheduler.runningTaskSets.size==2, 5000,
672+
"Wait until stage is resubmitted caused by fetch failed")
673+
674+
// Task(stageAttempt=0, partition=1) will be bogus, because both two
675+
// tasks(stageAttempt=0, partition=0, 1) run on hostA.
676+
// Pending partitions are (0, 1) after stage resubmission,
677+
// then change to be 0 after this bogus task.
678+
backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2))
679+
case (1, 1) =>
680+
// Wait long enough until Success of task(stageAttempt=1 and partition=0)
681+
// is handled by DAGScheduler.
682+
Thread.sleep(5000)
683+
// Task(stageAttempt=1 and partition=0) will cause stage resubmission,
684+
// because shuffleStage.pendingPartitions.isEmpty,
685+
// but shuffleStage.isAvailable is false.
686+
backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2))
687+
case _ =>
688+
backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2))
689+
}
690+
// ResultTask
691+
case 1 => backend.taskSuccess(taskDescription, 10)
692+
}
693+
}
694+
695+
withBackend(runBackend _) {
696+
val jobFuture = submit(b, (0 until 2).toArray)
697+
val duration = Duration(15, SECONDS)
698+
awaitJobTermination(jobFuture, duration)
699+
}
700+
assert(results === (0 until 2).map { _ -> 10}.toMap)
701+
}
702+
703+
def waitUntilConditionBecomeTrue(condition: => Boolean, timeout: Long, msg: String): Unit = {
704+
val finishTime = System.currentTimeMillis() + timeout
705+
while (System.currentTimeMillis() < finishTime) {
706+
if (condition) {
707+
return
708+
}
709+
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
710+
// add overhead in the general case.
711+
Thread.sleep(10)
712+
}
713+
throw new TimeoutException(
714+
s"Condition '$msg' failed to become true before $timeout milliseconds elapsed")
715+
}
651716
}

0 commit comments

Comments
 (0)