Skip to content

Commit 7e2c870

Browse files
committed
[SPARK-31485][CORE] Avoid application hang if only partial barrier tasks launched
Use `dagScheduler.taskSetFailed` to abort a barrier stage instead of throwing exception within `resourceOffers`. Any non fatal exception thrown within Spark RPC framework can be swallowed: https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211 The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application. As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued. But since the barrier stage isn't really executed, the application will hang. The issue can be reproduced by the following test: ```scala initLocalClusterSparkContext(2) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val dep = new OneToOneDependency[Int](rdd0) val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0"))) rdd.barrier().mapPartitions { iter => BarrierTaskContext.get().barrier() iter }.collect() ``` Yes, application hang previously but fail-fast after this fix. Added a regression test. Closes apache#28257 from Ngone51/fix_barrier_abort. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a602ab6 commit 7e2c870

3 files changed

Lines changed: 43 additions & 13 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private[spark] class ShuffleMapStage(
6363
/**
6464
* Returns the list of active jobs,
6565
* i.e. map-stage jobs that were submitted to execute this stage independently (if any).
66-
*/
66+
*/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
6767
def mapStageJobs: Seq[ActiveJob] = _mapStageJobs
6868

6969
/** Adds the job to the active job list. */

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
5757
* * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
5858
* scheduling
5959
* * task-result-getter threads
60+
*
61+
* CAUTION: Any non fatal exception thrown within Spark RPC framework can be swallowed.
62+
* Thus, throwing exception in methods like resourceOffers, statusUpdate won't fail
63+
* the application, but could lead to undefined behavior. Instead, we shall use method like
64+
* TaskSetManger.abort() to abort a stage and then fail the application (SPARK-31485).
6065
*/
6166
private[spark] class TaskSchedulerImpl(
6267
val sc: SparkContext,
@@ -356,9 +361,7 @@ private[spark] class TaskSchedulerImpl(
356361
// addresses are the same as that we allocated in taskSet.resourceOffer() since it's
357362
// synchronized. We don't remove the exact addresses allocated because the current
358363
// approach produces the identical result with less time complexity.
359-
availableResources(i).getOrElse(rName,
360-
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
361-
.remove(0, rInfo.addresses.size)
364+
availableResources(i)(rName).remove(0, rInfo.addresses.size)
362365
}
363366
// Only update hosts for a barrier task.
364367
if (taskSet.isBarrier) {
@@ -516,11 +519,18 @@ private[spark] class TaskSchedulerImpl(
516519
// Check whether the barrier tasks are partially launched.
517520
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
518521
// requirements are not fulfilled, and we should revert the launched tasks).
519-
require(addressesWithDescs.size == taskSet.numTasks,
520-
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
521-
s"because only ${addressesWithDescs.size} out of a total number of " +
522-
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
523-
"been blacklisted or cannot fulfill task locality requirements.")
522+
if (addressesWithDescs.size != taskSet.numTasks) {
523+
val errorMsg =
524+
s"Fail resource offers for barrier stage ${taskSet.stageId} because only " +
525+
s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" +
526+
s" tasks got resource offers. This happens because barrier execution currently " +
527+
s"does not work gracefully with delay scheduling. We highly recommend you to " +
528+
s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " +
529+
s"you see this error frequently."
530+
logWarning(errorMsg)
531+
taskSet.abort(errorMsg)
532+
throw new SparkException(errorMsg)
533+
}
524534

525535
// materialize the barrier coordinator.
526536
maybeInitBarrierCoordinator()
@@ -582,8 +592,12 @@ private[spark] class TaskSchedulerImpl(
582592
if (state == TaskState.LOST) {
583593
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
584594
// where each executor corresponds to a single task, so mark the executor as failed.
585-
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
586-
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
595+
val execId = taskIdToExecutorId.getOrElse(tid, {
596+
val errorMsg =
597+
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
598+
taskSet.abort(errorMsg)
599+
throw new SparkException(errorMsg)
600+
})
587601
if (executorIdToRunningTaskIds.contains(execId)) {
588602
reason = Some(
589603
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))

core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
2626

2727
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
2828

29-
def initLocalClusterSparkContext(): Unit = {
29+
def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
3030
val conf = new SparkConf()
3131
// Init local cluster here so each barrier task runs in a separated process, thus `barrier()`
3232
// call is actually useful.
33-
.setMaster("local-cluster[4, 1, 1024]")
33+
.setMaster(s"local-cluster[$numWorker, 1, 1024]")
3434
.setAppName("test-cluster")
3535
.set(TEST_NO_STAGE_RETRY, true)
3636
sc = new SparkContext(conf)
@@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
276276
initLocalClusterSparkContext()
277277
testBarrierTaskKilled(interruptOnKill = true)
278278
}
279+
280+
test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
281+
initLocalClusterSparkContext(2)
282+
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
283+
val dep = new OneToOneDependency[Int](rdd0)
284+
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
285+
// scheduling. So, one of tasks won't be scheduled in one round of resource offer.
286+
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
287+
val errorMsg = intercept[SparkException] {
288+
rdd.barrier().mapPartitions { iter =>
289+
BarrierTaskContext.get().barrier()
290+
iter
291+
}.collect()
292+
}.getMessage
293+
assert(errorMsg.contains("Fail resource offers for barrier stage"))
294+
}
279295
}

0 commit comments

Comments
 (0)