Skip to content

Commit 3cf068d

Browse files
author
Eric Vandenberg
committed
[SPARK-21219][scheduler] Fix race condition between adding task to pending list and updating black list state.
1 parent c228100 commit 3cf068d

File tree

2 files changed

+57
-11
lines changed

2 files changed

+57
-11
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
198198
private[scheduler] var emittedTaskSizeWarning = false
199199

200200
/** Add a task to all the pending-task lists that it should be on. */
201-
private def addPendingTask(index: Int) {
201+
private[spark] def addPendingTask(index: Int) {
202202
for (loc <- tasks(index).preferredLocations) {
203203
loc match {
204204
case e: ExecutorCacheTaskLocation =>
@@ -832,15 +832,6 @@ private[spark] class TaskSetManager(
832832

833833
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
834834

835-
if (successful(index)) {
836-
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
837-
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
838-
s" so the previous stage needs to be re-run, or because a different copy of the task" +
839-
s" has already succeeded).")
840-
} else {
841-
addPendingTask(index)
842-
}
843-
844835
if (!isZombie && reason.countTowardsTaskFailures) {
845836
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
846837
info.host, info.executorId, index))
@@ -854,6 +845,16 @@ private[spark] class TaskSetManager(
854845
return
855846
}
856847
}
848+
849+
if (successful(index)) {
850+
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
851+
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
852+
s" so the previous stage needs to be re-run, or because a different copy of the task" +
853+
s" has already succeeded).")
854+
} else {
855+
addPendingTask(index)
856+
}
857+
857858
maybeFinishTaskSet()
858859
}
859860

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.mockito.Matchers.{any, anyInt, anyString}
26-
import org.mockito.Mockito.{mock, never, spy, verify, when}
26+
import org.mockito.Mockito.{mock, never, spy, verify, when, times}
2727
import org.mockito.invocation.InvocationOnMock
2828
import org.mockito.stubbing.Answer
2929

@@ -1172,6 +1172,51 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
11721172
assert(blacklistTracker.isNodeBlacklisted("host1"))
11731173
}
11741174

1175+
test("update blacklist before adding pending task to avoid race condition") {
1176+
// When a task fails, it should apply the blacklist policy prior to
1177+
// retrying the task otherwise there's a race condition where run on
1178+
// the same executor that it was intended to be black listed from.
1179+
val conf = new SparkConf().
1180+
set(config.BLACKLIST_ENABLED, true).
1181+
set(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, 1)
1182+
1183+
// Create a task with two executors.
1184+
sc = new SparkContext("local", "test", conf)
1185+
val exec = "executor1"
1186+
val host = "host1"
1187+
val exec2 = "executor2"
1188+
val host2 = "host2"
1189+
sched = new FakeTaskScheduler(sc, (exec, host), (exec2, host2))
1190+
val taskSet = FakeTask.createTaskSet(1)
1191+
1192+
val clock = new ManualClock
1193+
val mockListenerBus = mock(classOf[LiveListenerBus])
1194+
val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, clock)
1195+
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker))
1196+
val taskSetManagerSpy = spy(taskSetManager)
1197+
1198+
val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)
1199+
1200+
// Assert the task has been black listed on the executor it was last executed on.
1201+
when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
1202+
new Answer[Unit] {
1203+
override def answer(invocationOnMock: InvocationOnMock): Unit = {
1204+
val task = invocationOnMock.getArgumentAt(0, classOf[Int])
1205+
assert(taskSetManager.taskSetBlacklistHelperOpt.get.
1206+
isExecutorBlacklistedForTask(exec, task))
1207+
null
1208+
}
1209+
}
1210+
)
1211+
1212+
// Simulate an out of memory error
1213+
val e = new OutOfMemoryError
1214+
taskSetManagerSpy.handleFailedTask(
1215+
taskDesc.get.taskId, TaskState.FAILED, new ExceptionFailure(e, Seq()))
1216+
1217+
verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
1218+
}
1219+
11751220
private def createTaskResult(
11761221
id: Int,
11771222
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

0 commit comments

Comments
 (0)