Skip to content

Commit 16c580b

Browse files
cxzl25venkata91
authored andcommitted
[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
Ref: LIHADOOP-52383 When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen <sychen@ctrip.com> Closes apache#21656 from cxzl25/fix_MedianHeap_empty.
1 parent 599d552 commit 16c580b

3 files changed

Lines changed: 68 additions & 15 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -764,9 +764,12 @@ private[spark] class TaskSchedulerImpl(
764764
* do not also submit those same tasks. That also means that a task completion from an earlier
765765
* attempt can lead to the entire stage getting marked as successful.
766766
*/
767-
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
767+
private[scheduler] def markPartitionCompletedInAllTaskSets(
768+
stageId: Int,
769+
partitionId: Int,
770+
taskInfo: TaskInfo) = {
768771
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
769-
tsm.markPartitionCompleted(partitionId)
772+
tsm.markPartitionCompleted(partitionId, taskInfo)
770773
}
771774
}
772775

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ private[spark] class TaskSetManager(
764764
}
765765
// There may be multiple tasksets for this stage -- we let all of them know that the partition
766766
// was completed. This may result in some of the tasksets getting completed.
767-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
767+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
768768
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
769769
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
770770
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -775,9 +775,12 @@ private[spark] class TaskSetManager(
775775
maybeFinishTaskSet()
776776
}
777777

778-
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
778+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
779779
partitionToIndex.get(partitionId).foreach { index =>
780780
if (!successful(index)) {
781+
if (speculationEnabled && !isZombie) {
782+
successfulTaskDurations.insert(taskInfo.duration)
783+
}
781784
tasksSuccessful += 1
782785
successful(index) = true
783786
if (tasksSuccessful == numTasks) {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,18 @@ package org.apache.spark.scheduler
1919

2020
import java.util.{Properties, Random}
2121

22-
import scala.collection.mutable
23-
import scala.collection.mutable.ArrayBuffer
24-
22+
import org.apache.spark._
23+
import org.apache.spark.internal.{Logging, config}
24+
import org.apache.spark.serializer.SerializerInstance
25+
import org.apache.spark.storage.BlockManagerId
26+
import org.apache.spark.util.{AccumulatorV2, ManualClock}
2527
import org.mockito.Matchers.{any, anyInt, anyString}
26-
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
28+
import org.mockito.Mockito._
2729
import org.mockito.invocation.InvocationOnMock
2830
import org.mockito.stubbing.Answer
2931

30-
import org.apache.spark._
31-
import org.apache.spark.internal.Logging
32-
import org.apache.spark.internal.config
33-
import org.apache.spark.serializer.SerializerInstance
34-
import org.apache.spark.storage.BlockManagerId
35-
import org.apache.spark.util.{AccumulatorV2, ManualClock, Utils}
32+
import scala.collection.mutable
33+
import scala.collection.mutable.ArrayBuffer
3634

3735
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
3836
extends DAGScheduler(sc) {
@@ -162,7 +160,7 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
162160
}
163161

164162
class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging {
165-
import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
163+
import TaskLocality._
166164

167165
private val conf = new SparkConf
168166

@@ -1362,6 +1360,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
13621360
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
13631361
}
13641362

1363+
test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
1364+
val conf = new SparkConf().set("spark.speculation", "true")
1365+
sc = new SparkContext("local", "test", conf)
1366+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1367+
sc.conf.set("spark.speculation.multiplier", "0.0")
1368+
sc.conf.set("spark.speculation.quantile", "0.1")
1369+
sc.conf.set("spark.speculation", "true")
1370+
1371+
sched = new FakeTaskScheduler(sc)
1372+
sched.initialize(new FakeSchedulerBackend())
1373+
1374+
val dagScheduler = new FakeDAGScheduler(sc, sched)
1375+
sched.setDAGScheduler(dagScheduler)
1376+
1377+
val taskSet1 = FakeTask.createTaskSet(10)
1378+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task =>
1379+
task.metrics.internalAccums
1380+
}
1381+
1382+
sched.submitTasks(taskSet1)
1383+
sched.resourceOffers(
1384+
(0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1385+
1386+
val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
1387+
1388+
// fail fetch
1389+
taskSetManager1.handleFailedTask(
1390+
taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
1391+
FetchFailed(null, 0, 0, 0, "fetch failed"))
1392+
1393+
assert(taskSetManager1.isZombie)
1394+
assert(taskSetManager1.runningTasks === 9)
1395+
1396+
val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
1397+
sched.submitTasks(taskSet2)
1398+
sched.resourceOffers(
1399+
(11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1400+
1401+
// Complete the 2 tasks and leave 8 task in running
1402+
for (id <- Set(0, 1)) {
1403+
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1404+
assert(sched.endedTasks(id) === Success)
1405+
}
1406+
1407+
val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
1408+
assert(!taskSetManager2.successfulTaskDurations.isEmpty())
1409+
taskSetManager2.checkSpeculatableTasks(0)
1410+
}
1411+
13651412
test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
13661413
val conf = new SparkConf().set("spark.speculation", "true")
13671414
sc = new SparkContext("local", "test", conf)

0 commit comments

Comments
 (0)