Skip to content

Commit c8bee93

Browse files
cxzl25tgravescs
authored andcommitted
[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen <sychen@ctrip.com> Closes #21656 from cxzl25/fix_MedianHeap_empty.
1 parent fc0c8c9 commit c8bee93

3 files changed

Lines changed: 59 additions & 4 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
697697
* do not also submit those same tasks. That also means that a task completion from an earlier
698698
* attempt can lead to the entire stage getting marked as successful.
699699
*/
700-
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
700+
private[scheduler] def markPartitionCompletedInAllTaskSets(
701+
stageId: Int,
702+
partitionId: Int,
703+
taskInfo: TaskInfo) = {
701704
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
702-
tsm.markPartitionCompleted(partitionId)
705+
tsm.markPartitionCompleted(partitionId, taskInfo)
703706
}
704707
}
705708

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ private[spark] class TaskSetManager(
758758
}
759759
// There may be multiple tasksets for this stage -- we let all of them know that the partition
760760
// was completed. This may result in some of the tasksets getting completed.
761-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
761+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
762762
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
763763
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
764764
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -769,9 +769,12 @@ private[spark] class TaskSetManager(
769769
maybeFinishTaskSet()
770770
}
771771

772-
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
772+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
773773
partitionToIndex.get(partitionId).foreach { index =>
774774
if (!successful(index)) {
775+
if (speculationEnabled && !isZombie) {
776+
successfulTaskDurations.insert(taskInfo.duration)
777+
}
775778
tasksSuccessful += 1
776779
successful(index) = true
777780
if (tasksSuccessful == numTasks) {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
13651365
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
13661366
}
13671367

1368+
test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
1369+
val conf = new SparkConf().set("spark.speculation", "true")
1370+
sc = new SparkContext("local", "test", conf)
1371+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1372+
sc.conf.set("spark.speculation.multiplier", "0.0")
1373+
sc.conf.set("spark.speculation.quantile", "0.1")
1374+
sc.conf.set("spark.speculation", "true")
1375+
1376+
sched = new FakeTaskScheduler(sc)
1377+
sched.initialize(new FakeSchedulerBackend())
1378+
1379+
val dagScheduler = new FakeDAGScheduler(sc, sched)
1380+
sched.setDAGScheduler(dagScheduler)
1381+
1382+
val taskSet1 = FakeTask.createTaskSet(10)
1383+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task =>
1384+
task.metrics.internalAccums
1385+
}
1386+
1387+
sched.submitTasks(taskSet1)
1388+
sched.resourceOffers(
1389+
(0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1390+
1391+
val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
1392+
1393+
// fail fetch
1394+
taskSetManager1.handleFailedTask(
1395+
taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
1396+
FetchFailed(null, 0, 0, 0, "fetch failed"))
1397+
1398+
assert(taskSetManager1.isZombie)
1399+
assert(taskSetManager1.runningTasks === 9)
1400+
1401+
val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
1402+
sched.submitTasks(taskSet2)
1403+
sched.resourceOffers(
1404+
(11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1405+
1406+
// Complete the 2 tasks and leave 8 task in running
1407+
for (id <- Set(0, 1)) {
1408+
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1409+
assert(sched.endedTasks(id) === Success)
1410+
}
1411+
1412+
val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
1413+
assert(!taskSetManager2.successfulTaskDurations.isEmpty())
1414+
taskSetManager2.checkSpeculatableTasks(0)
1415+
}
1416+
13681417
private def createTaskResult(
13691418
id: Int,
13701419
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

0 commit comments

Comments
 (0)