Skip to content

Commit 4c694e4

Browse files
scwfzsxwing
authored andcommitted
[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
| Time |Thread 1 , Job1 | Thread 2 , Job2 | |:-------------:|:-------------:|:-----:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage | | | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure added unit test Author: w00228970 <[email protected]> Author: wangfei <[email protected]> Closes #15213 from scwf/dag-resubmit. (cherry picked from commit 46d1203) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 4d73d5c commit 4c694e4

2 files changed

Lines changed: 70 additions & 12 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,18 +1277,20 @@ class DAGScheduler(
12771277
s"has failed the maximum allowable number of " +
12781278
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
12791279
s"Most recent failure reason: ${failureMessage}", None)
1280-
} else if (failedStages.isEmpty) {
1281-
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
1282-
// in that case the event will already have been scheduled.
1283-
// TODO: Cancel running tasks in the stage
1284-
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
1285-
s"$failedStage (${failedStage.name}) due to fetch failure")
1286-
messageScheduler.schedule(new Runnable {
1287-
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1288-
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1280+
} else {
1281+
if (failedStages.isEmpty) {
1282+
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
1283+
// in that case the event will already have been scheduled.
1284+
// TODO: Cancel running tasks in the stage
1285+
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
1286+
s"$failedStage (${failedStage.name}) due to fetch failure")
1287+
messageScheduler.schedule(new Runnable {
1288+
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1289+
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1290+
}
1291+
failedStages += failedStage
1292+
failedStages += mapStage
12891293
}
1290-
failedStages += failedStage
1291-
failedStages += mapStage
12921294
// Mark the map whose fetch failed as broken in the map stage
12931295
if (mapId != -1) {
12941296
mapStage.removeOutputLoc(mapId, bmAddress)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.util.Properties
21+
import java.util.concurrent.atomic.AtomicBoolean
2122

2223
import scala.annotation.meta.param
2324
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
3132
import org.apache.spark.broadcast.BroadcastManager
3233
import org.apache.spark.rdd.RDD
3334
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34-
import org.apache.spark.shuffle.MetadataFetchFailedException
35+
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
3536
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
3637
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
3738

@@ -2058,6 +2059,61 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
20582059
assertDataStructuresEmpty()
20592060
}
20602061

2062+
test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" +
2063+
"still behave correctly on fetch failures") {
2064+
// Runs a job that always encounters a fetch failure, so should eventually be aborted
2065+
def runJobWithPersistentFetchFailure: Unit = {
2066+
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
2067+
val shuffleHandle =
2068+
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
2069+
rdd1.map {
2070+
case (x, _) if (x == 1) =>
2071+
throw new FetchFailedException(
2072+
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
2073+
case (x, _) => x
2074+
}.count()
2075+
}
2076+
2077+
// Runs a job that encounters a single fetch failure but succeeds on the second attempt
2078+
def runJobWithTemporaryFetchFailure: Unit = {
2079+
object FailThisAttempt {
2080+
val _fail = new AtomicBoolean(true)
2081+
}
2082+
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
2083+
val shuffleHandle =
2084+
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
2085+
rdd1.map {
2086+
case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
2087+
throw new FetchFailedException(
2088+
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
2089+
}
2090+
}
2091+
2092+
failAfter(10.seconds) {
2093+
val e = intercept[SparkException] {
2094+
runJobWithPersistentFetchFailure
2095+
}
2096+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
2097+
}
2098+
2099+
// Run a second job that will fail due to a fetch failure.
2100+
// This job will hang without the fix for SPARK-17644.
2101+
failAfter(10.seconds) {
2102+
val e = intercept[SparkException] {
2103+
runJobWithPersistentFetchFailure
2104+
}
2105+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
2106+
}
2107+
2108+
failAfter(10.seconds) {
2109+
try {
2110+
runJobWithTemporaryFetchFailure
2111+
} catch {
2112+
case e: Throwable => fail("A job with one fetch failure should eventually succeed")
2113+
}
2114+
}
2115+
}
2116+
20612117
/**
20622118
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
20632119
* Note that this checks only the host and not the executor ID.

0 commit comments

Comments
 (0)