Skip to content

Commit 547f157

Browse files
committed
[SPARK-28699][CORE] Fix a corner case for aborting indeterminate stage
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed. In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug. It makes the corner case of indeterminate stage abort as expected. New UT in DAGSchedulerSuite. Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes apache#25498 from xuanyuanking/SPARK-28699-followup. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 0d3a783) Signed-off-by: Yuanjian Li <xyliyuanjian@gmail.com>
1 parent 75076ff commit 547f157

2 files changed

Lines changed: 7 additions & 24 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,13 +1505,13 @@ private[spark] class DAGScheduler(
15051505
// guaranteed to be determinate, so the input data of the reducers will not change
15061506
// even if the map tasks are re-tried.
15071507
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
1508-
// It's a little tricky to find all the succeeding stages of `failedStage`, because
1508+
// It's a little tricky to find all the succeeding stages of `mapStage`, because
15091509
// each stage only know its parents not children. Here we traverse the stages from
15101510
// the leaf nodes (the result stages of active jobs), and rollback all the stages
1511-
// in the stage chains that connect to the `failedStage`. To speed up the stage
1511+
// in the stage chains that connect to the `mapStage`. To speed up the stage
15121512
// traversing, we collect the stages to rollback first. If a stage needs to
15131513
// rollback, all its succeeding stages need to rollback to.
1514-
val stagesToRollback = scala.collection.mutable.HashSet(failedStage)
1514+
val stagesToRollback = HashSet[Stage](mapStage)
15151515

15161516
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
15171517
if (stagesToRollback.contains(stageChain.head)) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2704,27 +2704,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27042704
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
27052705
null))
27062706

2707-
val failedStages = scheduler.failedStages.toSeq
2708-
assert(failedStages.length == 2)
2709-
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
2710-
assert(failedStages.collect {
2711-
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
2712-
}.head.findMissingPartitions() == Seq(0))
2713-
// The result stage is still waiting for its 2 tasks to complete
2714-
assert(failedStages.collect {
2715-
case stage: ResultStage => stage
2716-
}.head.findMissingPartitions() == Seq(0, 1))
2717-
2718-
scheduler.resubmitFailedStages()
2719-
2720-
// The first task of the `shuffleMapRdd2` failed with fetch failure
2721-
runEvent(makeCompletionEvent(
2722-
taskSets(3).tasks(0),
2723-
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
2724-
null))
2725-
2726-
// The job should fail because Spark can't rollback the shuffle map stage.
2727-
assert(failure != null && failure.getMessage.contains("Spark cannot rollback"))
2707+
// The second shuffle map stage need to rerun, the job will abort for the indeterminate
2708+
// stage rerun.
2709+
assert(failure != null && failure.getMessage
2710+
.contains("Spark cannot rollback the ShuffleMapStage 1"))
27282711
}
27292712

27302713
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {

0 commit comments

Comments
 (0)