Skip to content

Commit 43e0af2

Browse files
committed
Address review comments + update docs
1 parent aae7e87 commit 43e0af2

File tree

4 files changed

+26
-26
lines changed

4 files changed

+26
-26
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,10 +600,10 @@ package object config {
600600
// Threshold above which we abort the TaskSet if a task could not be scheduled because of complete
601601
// blacklisting.
602602
private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
603-
ConfigBuilder("spark.scheduler.unschedulableTaskSetTimeout")
604-
.doc("The timeout in seconds to wait before aborting a TaskSet to acquire a new executor " +
605-
"and schedule a task which was previously unschedulable because of being completely " +
606-
"blacklisted.")
603+
ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
604+
.doc("The timeout in seconds to wait to try to acquire a new executor and schedule a task " +
605+
"before aborting a TaskSet which was previously unschedulable because of being " +
606+
"completely blacklisted.")
607607
.timeConf(TimeUnit.SECONDS)
608608
.checkValue(v => v >= 0, "The value should be a non negative time value.")
609609
.createWithDefault(120)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ import java.util.{Locale, Timer, TimerTask}
2222
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

25-
import scala.collection.Set
25+
import scala.collection.{Set, mutable}
2626
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2727
import scala.util.Random
28-
2928
import org.apache.spark._
3029
import org.apache.spark.TaskState.TaskState
3130
import org.apache.spark.executor.ExecutorMetrics
@@ -118,7 +117,6 @@ private[spark] class TaskSchedulerImpl(
118117
protected val executorIdToHost = new HashMap[String, String]
119118

120119
private val abortTimer = new Timer(true)
121-
122120
private val clock = new SystemClock
123121

124122
protected val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
@@ -430,14 +428,13 @@ private[spark] class TaskSchedulerImpl(
430428
// executor. If we cannot find one, we abort immediately. Else we kill the idle
431429
// executor and kick off an abortTimer which after waiting will abort the taskSet if
432430
// we were unable to schedule any task from the taskSet.
433-
// Note 1: We keep a track of schedulability on a per taskSet basis rather than on a
434-
// per task basis.
431+
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
432+
// task basis.
435433
// Note 2: The taskSet can still be aborted when there are more than one idle
436-
// blacklisted executors and dynamic allocation is on. This is because we rely on the
437-
// ExecutorAllocationManager to acquire a new executor based on the pending tasks and
438-
// it won't release any blacklisted executors which idle timeout after we kill an
439-
// executor to acquire a new one, resulting in the abort timer to expire and abort the
440-
// taskSet.
434+
// blacklisted executors and dynamic allocation is on. This can happen when a killed
435+
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
436+
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
437+
// timer to expire and abort the taskSet.
441438
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
442439
case Some (x) =>
443440
val executorId = x._1
@@ -465,18 +462,16 @@ private[spark] class TaskSchedulerImpl(
465462
}
466463
case _ => // Abort Immediately
467464
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
468-
s" executors could be found. Aborting $taskSet." )
465+
s" executors can be found to kill. Aborting $taskSet." )
469466
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
470467
}
471-
case _ => // Do nothing.
472-
}
473-
} else {
474-
// If a task was scheduled, we clear the expiry time for the taskSet. The abort timer
475-
// checks this entry to decide if we want to abort the taskSet.
476-
if (unschedulableTaskSetToExpiryTime.contains(taskSet)) {
477-
unschedulableTaskSetToExpiryTime.remove(taskSet)
478-
}
468+
case _ => // Do nothing if no tasks completely blacklisted.
479469
}
470+
} else {
471+
// If a task was scheduled, we clear the expiry time for the taskSet. The abort timer
472+
// checks this entry to decide if we want to abort the taskSet.
473+
unschedulableTaskSetToExpiryTime.remove(taskSet)
474+
}
480475

481476
if (launchedAnyTask && taskSet.isBarrier) {
482477
// Check whether the barrier tasks are partially launched.

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
530530
)
531531
// Wait for the failed task to propagate.
532532
Thread.sleep(500)
533-
// taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
534-
// tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, TaskResultLost)
535533

536534
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index))
537535
.thenReturn(true)
@@ -548,7 +546,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
548546
}
549547

550548
test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") {
551-
552549
taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
553550
config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
554551

docs/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,14 @@ Apart from these, the following properties are also available, and may be useful
15831583
driver using more memory.
15841584
</td>
15851585
</tr>
1586+
<tr>
1587+
<td><code>spark.scheduler.blacklist.unschedulableTaskSetTimeout</code></td>
1588+
<td>120s</td>
1589+
<td>
1590+
The timeout in seconds to wait to try to acquire a new executor and schedule a task before
1591+
aborting a TaskSet which was previously unschedulable because of being completely blacklisted.
1592+
</td>
1593+
</tr>
15861594
<tr>
15871595
<td><code>spark.blacklist.enabled</code></td>
15881596
<td>

0 commit comments

Comments
 (0)