Skip to content

Commit 0784dc3

Browse files
committed
Addressed tgraves and mridulm comments
1 parent 947cc16 commit 0784dc3

8 files changed

Lines changed: 123 additions & 69 deletions

File tree

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,14 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
162162
onEvent(speculativeTask);
163163
}
164164

165-
public void onUnschedulableTaskSet(
166-
SparkListenerUnschedulableTaskSet unschedulableTaskSet) {
167-
onEvent(unschedulableTaskSet);
165+
public void onUnschedulableTaskSetAdded(
166+
SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) {
167+
onEvent(unschedulableTaskSetAdded);
168+
}
169+
170+
public void onUnschedulableTaskSetRemoved(
171+
SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) {
172+
onEvent(unschedulableTaskSetRemoved);
168173
}
169174

170175
@Override

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private[spark] class ExecutorAllocationManager(
281281
private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
282282
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
283283
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
284-
val numUnschedulables = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
284+
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
285285
val running = listener.totalRunningTasksPerResourceProfile(rpId)
286286
val numRunningOrPendingTasks = pending + running
287287
val rp = resourceProfileManager.resourceProfileFromId(rpId)
@@ -290,6 +290,7 @@ private[spark] class ExecutorAllocationManager(
290290
s" tasksperexecutor: $tasksPerExecutor")
291291
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
292292
tasksPerExecutor).toInt
293+
293294
val maxNeededWithSpeculationLocalityOffset =
294295
if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
295296
// If we have pending speculative tasks and only need a single executor, allocate one more
@@ -299,11 +300,11 @@ private[spark] class ExecutorAllocationManager(
299300
maxNeeded
300301
}
301302

302-
// Since the maxNeededWithSpeculationLocalityOffset already includes the num executors needed
303-
// to run unschedulable tasks, we would only try to add when the
304-
// maxNeededWithSpeculationLocalityOffset is lesser than the active executors
305-
if (numUnschedulables > 0) {
306-
val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio /
303+
if (unschedulableTaskSets > 0) {
304+
// Request additional executors only if maxNeededWithSpeculationLocalityOffset is less than
305+
// current active executors, if not requesting maxNeededWithSpeculationLocalityOffset should
306+
// be enough to make the unschedulable tasks schedulable now.
307+
val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
307308
tasksPerExecutor).toInt
308309
math.max(maxNeededWithSpeculationLocalityOffset,
309310
executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables)
@@ -810,20 +811,25 @@ private[spark] class ExecutorAllocationManager(
810811
}
811812
}
812813

813-
override def onUnschedulableTaskSet
814-
(unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = {
815-
val stageId = unschedulableTaskSet.stageId
816-
val stageAttemptId = unschedulableTaskSet.stageAttemptId
814+
override def onUnschedulableTaskSetAdded(
815+
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = {
816+
val stageId = unschedulableTaskSetAdded.stageId
817+
val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
817818
allocationManager.synchronized {
818-
(stageId, stageAttemptId) match {
819-
case (Some(stageId), Some(stageAttemptId)) =>
820-
val stageAttempt = StageAttempt(stageId, stageAttemptId)
821-
unschedulableTaskSets.add(stageAttempt)
822-
case (None, _) =>
823-
// Clear unschedulableTaskSets since atleast one task becomes schedulable now
824-
unschedulableTaskSets.clear()
825-
}
826-
allocationManager.onSchedulerBacklogged()
819+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
820+
unschedulableTaskSets.add(stageAttempt)
821+
}
822+
allocationManager.onSchedulerBacklogged()
823+
}
824+
825+
override def onUnschedulableTaskSetRemoved(
826+
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = {
827+
val stageId = unschedulableTaskSetRemoved.stageId
828+
val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
829+
allocationManager.synchronized {
830+
// Clear unschedulableTaskSets since atleast one task becomes schedulable now
831+
val stageAttempt = StageAttempt(stageId, stageAttemptId)
832+
unschedulableTaskSets.remove(stageAttempt)
827833
}
828834
}
829835

@@ -867,9 +873,9 @@ private[spark] class ExecutorAllocationManager(
867873
numTotalTasks - numRunning
868874
}
869875

870-
// Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first
871-
// unschedulable task due to blacklisting. So keeping track of unschedulableTaskSets
872-
// should be enough as we'll always have no more than a task unschedulable at any time.
876+
// Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first unschedulable
877+
// task found due to blacklisting. This way we only need to keep track of the unschedulable
878+
// tasksets which is an indirect way to get the current number of unschedulable tasks.
873879
def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
874880
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
875881
attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,23 @@ private[spark] class DAGScheduler(
312312
}
313313

314314
/**
315-
* Called by the TaskSetManager when there is an unschedulable blacklist task and dynamic
315+
* Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and
316+
* dynamic allocation is enabled
317+
*/
318+
def unschedulableTaskSetAdded(
319+
stageId: Int,
320+
stageAttemptId: Int): Unit = {
321+
eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
322+
}
323+
324+
/**
325+
* Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic
316326
* allocation is enabled
317327
*/
318-
def unschedulableBlacklistTaskSubmitted(
319-
stageId: Option[Int],
320-
stageAttemptId: Option[Int]): Unit = {
321-
eventProcessLoop.post(UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId))
328+
def unschedulableTaskSetRemoved(
329+
stageId: Int,
330+
stageAttemptId: Int): Unit = {
331+
eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId))
322332
}
323333

324334
private[scheduler]
@@ -1024,11 +1034,18 @@ private[spark] class DAGScheduler(
10241034
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
10251035
}
10261036

1027-
private[scheduler] def handleUnschedulableBlacklistTaskSubmitted(
1028-
stageId: Option[Int],
1029-
stageAttemptId: Option[Int]): Unit = {
1037+
private[scheduler] def handleUnschedulableTaskSetAdded(
1038+
stageId: Int,
1039+
stageAttemptId: Int): Unit = {
10301040
listenerBus.post(
1031-
SparkListenerUnschedulableTaskSet(stageId, stageAttemptId))
1041+
SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
1042+
}
1043+
1044+
private[scheduler] def handleUnschedulableTaskSetRemoved(
1045+
stageId: Int,
1046+
stageAttemptId: Int): Unit = {
1047+
listenerBus.post(
1048+
SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
10321049
}
10331050

10341051
private[scheduler] def handleTaskSetFailed(
@@ -2304,8 +2321,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
23042321
case SpeculativeTaskSubmitted(task) =>
23052322
dagScheduler.handleSpeculativeTaskSubmitted(task)
23062323

2307-
case UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId) =>
2308-
dagScheduler.handleUnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId)
2324+
case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
2325+
dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)
2326+
2327+
case UnschedulableTaskSetRemoved(stageId, stageAttemptId) =>
2328+
dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId)
23092329

23102330
case GettingResultEvent(taskInfo) =>
23112331
dagScheduler.handleGetTaskResult(taskInfo)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,10 @@ private[scheduler]
9898
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
9999

100100
private[scheduler]
101-
case class UnschedulableBlacklistTaskSubmitted(stageId: Option[Int], stageAttemptId: Option[Int])
101+
case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
102102
extends DAGSchedulerEvent
103+
104+
private[scheduler]
105+
case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
106+
extends DAGSchedulerEvent
107+

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,14 @@ case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
158158
extends SparkListenerEvent
159159

160160
@DeveloperApi
161-
case class SparkListenerUnschedulableTaskSet(
162-
stageId: Option[Int],
163-
stageAttemptId: Option[Int]) extends SparkListenerEvent
161+
case class SparkListenerUnschedulableTaskSetAdded(
162+
stageId: Int,
163+
stageAttemptId: Int) extends SparkListenerEvent
164+
165+
@DeveloperApi
166+
case class SparkListenerUnschedulableTaskSetRemoved(
167+
stageId: Int,
168+
stageAttemptId: Int) extends SparkListenerEvent
164169

165170
@DeveloperApi
166171
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
@@ -345,10 +350,18 @@ private[spark] trait SparkListenerInterface {
345350
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit
346351

347352
/**
348-
* Called when both dynamic allocation is enabled and there is an unschedulable blacklist task
353+
* Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation
354+
* is enabled
355+
*/
356+
def onUnschedulableTaskSetAdded(
357+
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
358+
359+
/**
360+
* Called when an unschedulable taskset becomes schedulable and dynamic allocation
361+
* is enabled
349362
*/
350-
def onUnschedulableTaskSet(
351-
blacklistTask: SparkListenerUnschedulableTaskSet): Unit
363+
def onUnschedulableTaskSetRemoved(
364+
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit
352365

353366
/**
354367
* Called when the driver receives a block update info.
@@ -436,8 +449,11 @@ abstract class SparkListener extends SparkListenerInterface {
436449
override def onNodeUnblacklisted(
437450
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
438451

439-
override def onUnschedulableTaskSet(
440-
unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = { }
452+
override def onUnschedulableTaskSetAdded(
453+
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { }
454+
455+
override def onUnschedulableTaskSetRemoved(
456+
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { }
441457

442458
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
443459

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ private[spark] trait SparkListenerBus
7979
listener.onBlockUpdated(blockUpdated)
8080
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
8181
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
82-
case unschedulableTaskSet: SparkListenerUnschedulableTaskSet =>
83-
listener.onUnschedulableTaskSet(unschedulableTaskSet)
82+
case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded =>
83+
listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded)
84+
case unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved =>
85+
listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved)
8486
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
8587
listener.onResourceProfileAdded(resourceProfileAdded)
8688
case _ => listener.onOtherEvent(event)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -637,17 +637,20 @@ private[spark] class TaskSchedulerImpl(
637637
if (!launchedAnyTask) {
638638
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
639639
// If the taskSet is unschedulable we try to find an existing idle blacklisted
640-
// executor. If we cannot find one, we abort immediately. Else we kill the idle
641-
// executor and kick off an abortTimer which if it doesn't schedule a task within the
642-
// the timeout will abort the taskSet if we were unable to schedule any task from the
643-
// taskSet.
640+
// executor and kill the idle executor and kick off an abortTimer which if it doesn't
641+
// schedule a task within the the timeout will abort the taskSet if we were unable to
642+
// schedule any task from the taskSet.
644643
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
645644
// task basis.
646645
// Note 2: The taskSet can still be aborted when there are more than one idle
647646
// blacklisted executors and dynamic allocation is on. This can happen when a killed
648647
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
649648
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
650649
// timer to expire and abort the taskSet.
650+
//
651+
// If there are no idle executors and dynamic allocation is enabled, then we would
652+
// notify ExecutorAllocationManager to allocate more executors to schedule the
653+
// unschedulable tasks else we will abort immediately.
651654
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
652655
case Some ((executorId, _)) =>
653656
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
@@ -659,11 +662,10 @@ private[spark] class TaskSchedulerImpl(
659662
// in order to provision more executors to make them schedulable
660663
if (Utils.isDynamicAllocationEnabled(conf)) {
661664
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
662-
logInfo(s"Notifying ExecutorAllocationManager to add executors to" +
663-
s" schedule the unschedulable blacklisted task before aborting $taskSet.")
664-
dagScheduler.unschedulableBlacklistTaskSubmitted(
665-
Some(taskSet.taskSet.stageId),
666-
Some(taskSet.taskSet.stageAttemptId))
665+
logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" +
666+
s" schedule the unschedulable task before aborting $taskSet.")
667+
dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
668+
taskSet.taskSet.stageAttemptId)
667669
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
668670
}
669671
} else {
@@ -686,7 +688,8 @@ private[spark] class TaskSchedulerImpl(
686688
"recently scheduled.")
687689
// Notify ExecutorAllocationManager as well as other subscribers that a task now
688690
// recently becomes schedulable
689-
dagScheduler.unschedulableBlacklistTaskSubmitted(None, None)
691+
dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId,
692+
taskSet.taskSet.stageAttemptId)
690693
unschedulableTaskSetToExpiryTime.clear()
691694
}
692695
}

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
524524
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
525525

526526
// Stage 0 becomes unschedulable due to blacklisting
527-
post(SparkListenerUnschedulableTaskSet(Some(0), Some(0)))
527+
post(SparkListenerUnschedulableTaskSetAdded(0, 0))
528528
clock.advance(1000)
529529
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
530530
// Assert that we are getting additional executor to schedule unschedulable tasks
@@ -534,8 +534,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
534534
// Add a new executor
535535
onExecutorAddedDefaultProfile(manager, "1")
536536
// Now once the task becomes schedulable, clear the unschedulableTaskSets
537-
// by posting unschedulable event with (None, None)
538-
post(SparkListenerUnschedulableTaskSet(None, None))
537+
post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
539538
clock.advance(1000)
540539
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
541540
assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
@@ -581,8 +580,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
581580
post(SparkListenerStageCompleted(createStageInfo(0, 2)))
582581

583582
// Stage 1 and 2 becomes unschedulable now due to blacklisting
584-
post(SparkListenerUnschedulableTaskSet(Some(1), Some(0)))
585-
post(SparkListenerUnschedulableTaskSet(Some(2), Some(0)))
583+
post(SparkListenerUnschedulableTaskSetAdded(1, 0))
584+
post(SparkListenerUnschedulableTaskSetAdded(2, 0))
586585

587586
clock.advance(1000)
588587
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
@@ -594,12 +593,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
594593
onExecutorAddedDefaultProfile(manager, "3")
595594

596595
// Now once the task becomes schedulable, clear the unschedulableTaskSets
597-
// by posting unschedulable event with (None, None)
598-
post(SparkListenerUnschedulableTaskSet(None, None))
596+
post(SparkListenerUnschedulableTaskSetRemoved(1, 0))
599597
clock.advance(1000)
600598
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
601-
assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
602-
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
599+
assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
600+
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
603601
}
604602

605603
test("SPARK-31418: remove executors after unschedulable tasks end") {
@@ -639,7 +637,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
639637
(0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) }
640638

641639
// Now due to blacklisting, the task becomes unschedulable
642-
post(SparkListenerUnschedulableTaskSet(Some(0), Some(0)))
640+
post(SparkListenerUnschedulableTaskSetAdded(0, 0))
643641
clock.advance(1000)
644642
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
645643
assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
@@ -649,8 +647,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
649647
onExecutorAddedDefaultProfile(manager, "5")
650648

651649
// Now once the task becomes schedulable, clear the unschedulableTaskSets
652-
// by posting unschedulable event with (None, None)
653-
post(SparkListenerUnschedulableTaskSet(None, None))
650+
post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
654651
clock.advance(1000)
655652
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
656653
assert(numExecutorsTarget(manager, defaultProfile.id) === 1)

0 commit comments

Comments
 (0)