From 2b385b5014d28d03a883d369a6528e2a0f4e5fa1 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 21 Apr 2020 16:14:52 -0700 Subject: [PATCH 01/12] [SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature. In this change, in the case of dynamic allocation is enabled instead of aborting an unschedulable blacklisted task blacklist immediately using the SparkListener pass an event saying UnschedulableBlacklistTaskSubmitted which will be handled by ExecutorAllocationManager and request more executors to schedule the unschedulable blacklisted task. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815] Currently manually tested it in our clusters. Also trying to figure out how to add unit tests. --- .../apache/spark/SparkFirehoseListener.java | 5 +++ .../spark/ExecutorAllocationManager.scala | 45 +++++++++++++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 20 +++++++++ .../spark/scheduler/DAGSchedulerEvent.scala | 3 ++ .../spark/scheduler/SparkListener.scala | 14 ++++++ .../spark/scheduler/SparkListenerBus.scala | 2 + .../spark/scheduler/TaskSchedulerImpl.scala | 36 +++++++++++++-- .../ExecutorAllocationManagerSuite.scala | 33 ++++++++++++++ 8 files changed, 151 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 579e7ff320f5c..618746396bce3 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } + public void onUnschedulableBlacklistTaskSubmitted( + SparkListenerUnschedulableBlacklistTaskSubmitted blacklistTask) { + onEvent(blacklistTask); + } + @Override public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) { onEvent(event); diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 620a6fe2f9d72..6f8e6f479fa31 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -622,6 +622,12 @@ private[spark] class ExecutorAllocationManager( private val resourceProfileIdToStageAttempt = new mutable.HashMap[Int, mutable.Set[StageAttempt]] + // Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's + // because we'll only take the last unschedulable task in a taskset although there can be more. + // This is done in order to avoid costly loops in the scheduling. + // Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details. + private val unschedulableTaskSets = new mutable.HashSet[StageAttempt] + // stageAttempt to tuple (the number of task with locality preferences, a map where each pair // is a node and the number of tasks that would like to be scheduled on that node, and // the resource profile id) map, @@ -696,7 +702,9 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty && + stageAttemptToNumSpeculativeTasks.isEmpty && + unschedulableTaskSets.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -789,6 +797,23 @@ private[spark] class ExecutorAllocationManager( } } + override def onUnschedulableBlacklistTaskSubmitted + (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = { + val stageId = blacklistedTask.stageId + val stageAttemptId = blacklistedTask.stageAttemptId + allocationManager.synchronized { + (stageId, stageAttemptId) match { + case (Some(stageId), Some(stageAttemptId)) => + val stageAttempt = StageAttempt(stageId, stageAttemptId) + unschedulableTaskSets.add(stageAttempt) + case (None, None) => + // Clear unschedulableTaskSets since atleast one task becomes schedulable now + unschedulableTaskSets.clear() + } + allocationManager.onSchedulerBacklogged() + } + } + /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. @@ -829,12 +854,26 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } + def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = { + val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq + attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size + } + + def hasPendingUnschedulableTasks: Boolean = { + val attemptSets = resourceProfileIdToStageAttempt.values + attemptSets.exists { attempts => + attempts.exists(unschedulableTaskSets.contains(_)) + } + } + def hasPendingTasks: Boolean = { - hasPendingSpeculativeTasks || hasPendingRegularTasks + hasPendingSpeculativeTasks || hasPendingRegularTasks || hasPendingUnschedulableTasks } def totalPendingTasksPerResourceProfile(rp: Int): Int = { - pendingTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp) + pendingTasksPerResourceProfile(rp) + + pendingSpeculativeTasksPerResourceProfile(rp) + + pendingUnschedulableTasksPerResourceProfile(rp) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb024d0852d06..ddf1b29b1b2bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -311,6 +311,16 @@ private[spark] class DAGScheduler( eventProcessLoop.post(SpeculativeTaskSubmitted(task)) } + /** + * Called by the TaskSetManager when there is an unschedulable blacklist task and dynamic + * allocation is enabled + */ + def unschedulableBlacklistTaskSubmitted( + stageId: Option[Int], + stageAttemptId: Option[Int]): Unit = { + eventProcessLoop.post(UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -1014,6 +1024,13 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) } + private[scheduler] def handleUnschedulableBlacklistTaskSubmitted( + stageId: Option[Int], + stageAttemptId: Option[Int]): Unit = { + listenerBus.post( + SparkListenerUnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId)) + } + private[scheduler] def handleTaskSetFailed( taskSet: TaskSet, reason: String, @@ -2287,6 +2304,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) + case UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId) + case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 78d458338e8fb..e4576f5ed4332 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -97,3 +97,6 @@ private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent private[scheduler] case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent +private[scheduler] +case class UnschedulableBlacklistTaskSubmitted(stageId: Option[Int], stageAttemptId: Option[Int]) + extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 62d54f3b74a47..005421068e072 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -157,6 +157,11 @@ case class SparkListenerNodeBlacklisted( case class SparkListenerNodeUnblacklisted(time: Long, hostId: String) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerUnschedulableBlacklistTaskSubmitted( + stageId: Option[Int], + stageAttemptId: Option[Int]) extends SparkListenerEvent + @DeveloperApi case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent @@ -339,6 +344,12 @@ private[spark] trait SparkListenerInterface { */ def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit + /** + * Called when both dynamic allocation is enabled and there is an unschedulable blacklist task + */ + def onUnschedulableBlacklistTaskSubmitted( + blacklistTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit + /** * Called when the driver receives a block update info. */ @@ -425,6 +436,9 @@ abstract class SparkListener extends SparkListenerInterface { override def onNodeUnblacklisted( nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } + override def onUnschedulableBlacklistTaskSubmitted( + blacklistTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = { } + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } override def onSpeculativeTaskSubmitted( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 3d316c948db7e..e99c0722612b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) + case blacklistTaskSubmitted: SparkListenerUnschedulableBlacklistTaskSubmitted => + listener.onUnschedulableBlacklistTaskSubmitted(blacklistTaskSubmitted) case resourceProfileAdded: SparkListenerResourceProfileAdded => listener.onResourceProfileAdded(resourceProfileAdded) case _ => listener.onOtherEvent(event) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 12bd93286d736..dcde00f48351f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -660,10 +660,24 @@ private[spark] class TaskSchedulerImpl( abortTimer.schedule( createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) } - case None => // Abort Immediately - logInfo("Cannot schedule any task because of complete blacklisting. No idle" + - s" executors can be found to kill. Aborting $taskSet." ) - taskSet.abortSinceCompletelyBlacklisted(taskIndex) + case None => + // Notify ExecutorAllocationManager about the unschedulable task set, + // in order to provision more executors to make them schedulable + if (Utils.isDynamicAllocationEnabled(conf)) { + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + logInfo(s"Requesting additional executors to schedule unschedulable" + + s" blacklisted task before aborting $taskSet.") + dagScheduler.unschedulableBlacklistTaskSubmitted( + Some(taskSet.taskSet.stageId), + Some(taskSet.taskSet.stageAttemptId)) + updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) + } + } else { + // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors can be found to kill. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } } } } else { @@ -676,6 +690,9 @@ private[spark] class TaskSchedulerImpl( if (unschedulableTaskSetToExpiryTime.nonEmpty) { logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + "recently scheduled.") + // Notify ExecutorAllocationManager as well as other subscribers that a task now + // recently becomes schedulable + dagScheduler.unschedulableBlacklistTaskSubmitted(None, None) unschedulableTaskSetToExpiryTime.clear() } } @@ -722,6 +739,17 @@ private[spark] class TaskSchedulerImpl( return tasks.map(_.toSeq) } + private def updateUnschedulableTaskSetTimeoutAndStartAbortTimer( + taskSet: TaskSetManager, + taskIndex: Int): Unit = { + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } + private def createUnschedulableTaskSetAbortTimer( taskSet: TaskSetManager, taskIndex: Int): TimerTask = { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 8037f4a9447dd..a1f82efe2bf87 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -501,6 +501,39 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsToAddForDefaultProfile(manager) === 1) } + test("SPARK-31418: add executors when unschedulable tasks added") { + val manager = createManager(createConf(0, 10, 0)) + + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(createStageInfo(1, 2))) + // Verify that we're capped at number of tasks including the unschedulable ones in the stage + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(1))) + assert(numExecutorsTargetForDefaultProfileId(manager) === 0) + assert(numExecutorsToAddForDefaultProfile(manager) === 1) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + assert(numExecutorsTargetForDefaultProfileId(manager) === 1) + assert(numExecutorsToAddForDefaultProfile(manager) === 2) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(numExecutorsTargetForDefaultProfileId(manager) === 2) + assert(numExecutorsToAddForDefaultProfile(manager) === 1) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(numExecutorsTargetForDefaultProfileId(manager) === 2) + assert(numExecutorsToAddForDefaultProfile(manager) === 2) + + // Verify that running a task doesn't affect the target + post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + assert(numExecutorsTargetForDefaultProfileId(manager) === 2) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(numExecutorsToAddForDefaultProfile(manager) === 2) + } + test("SPARK-30511 remove executors when speculative tasks end") { val clock = new ManualClock() val stage = createStageInfo(0, 40) From 213bbef01e57532ba2b09d2fff0d767973bba3b8 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Sat, 20 Jun 2020 09:45:31 -0700 Subject: [PATCH 02/12] better log message --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index dcde00f48351f..27abc225dd45f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -652,21 +652,15 @@ private[spark] class TaskSchedulerImpl( case Some ((executorId, _)) => if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) - - val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 - unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout - logInfo(s"Waiting for $timeout ms for completely " - + s"blacklisted task to be schedulable again before aborting $taskSet.") - abortTimer.schedule( - createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) } case None => // Notify ExecutorAllocationManager about the unschedulable task set, // in order to provision more executors to make them schedulable if (Utils.isDynamicAllocationEnabled(conf)) { if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { - logInfo(s"Requesting additional executors to schedule unschedulable" + - s" blacklisted task before aborting $taskSet.") + logInfo(s"Notifying ExecutorAllocationManager to add executors to" + + s" schedule the unschedulable blacklisted task before aborting $taskSet.") dagScheduler.unschedulableBlacklistTaskSubmitted( Some(taskSet.taskSet.stageId), Some(taskSet.taskSet.stageAttemptId)) From de05589342aac46fb89ae8452af01a0218cc1006 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 22 Jun 2020 23:32:37 -0700 Subject: [PATCH 03/12] Added one more test --- .../spark/ExecutorAllocationManager.scala | 2 +- .../ExecutorAllocationManagerSuite.scala | 73 ++++++++++++++++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6f8e6f479fa31..bf30bdbac1de4 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -806,7 +806,7 @@ private[spark] class ExecutorAllocationManager( case (Some(stageId), Some(stageAttemptId)) => val stageAttempt = StageAttempt(stageId, stageAttemptId) unschedulableTaskSets.add(stageAttempt) - case (None, None) => + case (None, _) => // Clear unschedulableTaskSets since atleast one task becomes schedulable now unschedulableTaskSets.clear() } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a1f82efe2bf87..10b134b029926 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -509,7 +509,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerStageSubmitted(createStageInfo(1, 2))) // Verify that we're capped at number of tasks including the unschedulable ones in the stage - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(1))) + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(0))) assert(numExecutorsTargetForDefaultProfileId(manager) === 0) assert(numExecutorsToAddForDefaultProfile(manager) === 1) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) @@ -534,6 +534,77 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsToAddForDefaultProfile(manager) === 2) } + test("SPARK-31418: remove executors after unschedulable tasks end") { + val clock = new ManualClock() + val stage = createStageInfo(0, 10) + val conf = createConf(0, 6, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(stage)) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + + (0 to 4).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + (0 to 9).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 5) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) + + // 8 tasks (0 - 7) finished + (0 to 7).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + (0 to 3).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString)) } + (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) } + + // Now due to blacklisting, the task becomes unschedulable + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(0), Some(0))) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 2) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) + + // New executor got added + onExecutorAddedDefaultProfile(manager, "5") + + // Now once the task becomes schedulable, clear the unschedulableTaskSets + // by posting unschedulable event with (None, None) + post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + post(SparkListenerTaskEnd(0, 0, null, Success, + createTaskInfo(9, 9, "4"), new ExecutorMetrics, null)) + // Unschedulable task successfully ran on the new executor provisioned + post(SparkListenerTaskEnd(0, 0, null, Success, + createTaskInfo(8, 8, "5"), new ExecutorMetrics, null)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + post(SparkListenerStageCompleted(stage)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 0) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 0) + assert(removeExecutorDefaultProfile(manager, "4")) + onExecutorRemoved(manager, "4") + assert(removeExecutorDefaultProfile(manager, "5")) + onExecutorRemoved(manager, "5") + } + test("SPARK-30511 remove executors when speculative tasks end") { val clock = new ManualClock() val stage = createStageInfo(0, 40) From 7f0fba15ca5f0d1cdf6a1b7ba7321865fd91f5d4 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 23 Jun 2020 11:00:48 -0700 Subject: [PATCH 04/12] fix to avoid the corner case where we might not request any executor --- .../org/apache/spark/ExecutorAllocationManager.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bf30bdbac1de4..bcbdea7e7be6e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -856,7 +856,14 @@ private[spark] class ExecutorAllocationManager( def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq - attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size + val numUnschedulables = attempts + .filter(attempt => unschedulableTaskSets.contains(attempt)).size + val maxTasksPerExecutor = + resourceProfileManager.resourceProfileFromId(rp).maxTasksPerExecutor(conf) + // Need an additional executor since the unschedulableTasks cannot be currently + // scheduled on the available executors. This is to ensure that we'll always + // request for an additional executor. + numUnschedulables + maxTasksPerExecutor } def hasPendingUnschedulableTasks: Boolean = { From 060b37e72e8730fcaa695ba69ffddea8da105dce Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 24 Jun 2020 20:26:44 -0700 Subject: [PATCH 05/12] Handle corner cases --- .../spark/ExecutorAllocationManager.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bcbdea7e7be6e..51d20a1bcb2b9 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager( private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { val pending = listener.totalPendingTasksPerResourceProfile(rpId) val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId) + val numUnschedulables = listener.pendingUnschedulableTasksPerResourceProfile(rpId) val running = listener.totalRunningTasksPerResourceProfile(rpId) val numRunningOrPendingTasks = pending + running val rp = resourceProfileManager.resourceProfileFromId(rpId) @@ -289,13 +290,23 @@ private[spark] class ExecutorAllocationManager( s" tasksperexecutor: $tasksPerExecutor") val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutor).toInt - if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { + val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { // If we have pending speculative tasks and only need a single executor, allocate one more // to satisfy the locality requirements of speculation maxNeeded + 1 } else { maxNeeded } + + // Request additional executors to schedule the unschedulable tasks as well + if (numUnschedulables > 0) { + val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio / + tasksPerExecutor).toInt + math.max(totalNeed, executorMonitor.executorCountWithResourceProfile(rpId)) + + maxNeededForUnschedulables + } else { + totalNeed + } } private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized { @@ -856,14 +867,7 @@ private[spark] class ExecutorAllocationManager( def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq - val numUnschedulables = attempts - .filter(attempt => unschedulableTaskSets.contains(attempt)).size - val maxTasksPerExecutor = - resourceProfileManager.resourceProfileFromId(rp).maxTasksPerExecutor(conf) - // Need an additional executor since the unschedulableTasks cannot be currently - // scheduled on the available executors. This is to ensure that we'll always - // request for an additional executor. - numUnschedulables + maxTasksPerExecutor + attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size } def hasPendingUnschedulableTasks: Boolean = { @@ -879,8 +883,7 @@ private[spark] class ExecutorAllocationManager( def totalPendingTasksPerResourceProfile(rp: Int): Int = { pendingTasksPerResourceProfile(rp) + - pendingSpeculativeTasksPerResourceProfile(rp) + - pendingUnschedulableTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp) } /** From 432e3cd9b469bcc2a05b72c27c1d643a3288a459 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 1 Jul 2020 17:53:29 -0700 Subject: [PATCH 06/12] addressed Erik's comments --- .../spark/ExecutorAllocationManager.scala | 13 +- .../ExecutorAllocationManagerSuite.scala | 116 ++++++++++++++---- 2 files changed, 100 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 51d20a1bcb2b9..bfa44be650524 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -290,7 +290,8 @@ private[spark] class ExecutorAllocationManager( s" tasksperexecutor: $tasksPerExecutor") val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutor).toInt - val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { + val maxNeededWithSpeculationLocalityOffset = + if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { // If we have pending speculative tasks and only need a single executor, allocate one more // to satisfy the locality requirements of speculation maxNeeded + 1 @@ -298,14 +299,16 @@ private[spark] class ExecutorAllocationManager( maxNeeded } - // Request additional executors to schedule the unschedulable tasks as well + // Since the maxNeededWithSpeculationLocalityOffset already includes the num executors needed + // to run unschedulable tasks, we would only try to add when the + // maxNeededWithSpeculationLocalityOffset is lesser than the active executors if (numUnschedulables > 0) { val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio / tasksPerExecutor).toInt - math.max(totalNeed, executorMonitor.executorCountWithResourceProfile(rpId)) + - maxNeededForUnschedulables + math.max(maxNeededWithSpeculationLocalityOffset, + executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables) } else { - totalNeed + maxNeededWithSpeculationLocalityOffset } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 10b134b029926..395c4a28a92c3 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable -import org.mockito.ArgumentMatchers.{any, eq => meq} -import org.mockito.Mockito.{mock, never, times, verify, when} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests} +import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -501,37 +501,105 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsToAddForDefaultProfile(manager) === 1) } - test("SPARK-31418: add executors when unschedulable tasks added") { - val manager = createManager(createConf(0, 10, 0)) - + test("SPARK-31418: one stage being unschedulable") { + val clock = new ManualClock() + val conf = createConf(0, 5, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) val updatesNeeded = new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] - post(SparkListenerStageSubmitted(createStageInfo(1, 2))) - // Verify that we're capped at number of tasks including the unschedulable ones in the stage - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(0))) - assert(numExecutorsTargetForDefaultProfileId(manager) === 0) - assert(numExecutorsToAddForDefaultProfile(manager) === 1) + post(SparkListenerStageSubmitted(createStageInfo(0, 2))) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + + onExecutorAddedDefaultProfile(manager, "0") + val t1 = createTaskInfo(0, 0, executorId = s"0") + val t2 = createTaskInfo(1, 1, executorId = s"0") + post(SparkListenerTaskStart(0, 0, t1)) + post(SparkListenerTaskStart(0, 0, t2)) + + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + + // Stage 0 becomes unschedulable due to blacklisting + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(0), Some(0))) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + // Assert that we are getting additional executor to schedule unschedulable tasks + assert(numExecutorsTarget(manager, defaultProfile.id) === 2) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) + + // Add a new executor + onExecutorAddedDefaultProfile(manager, "1") + // Now once the task becomes schedulable, clear the unschedulableTaskSets + // by posting unschedulable event with (None, None) post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) - assert(numExecutorsTargetForDefaultProfileId(manager) === 1) - assert(numExecutorsToAddForDefaultProfile(manager) === 2) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + } + + test("SPARK-31418: multiple stages being unschedulable") { + val clock = new ManualClock() + val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(createStageInfo(0, 2))) + post(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(SparkListenerStageSubmitted(createStageInfo(2, 2))) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) - assert(numExecutorsTargetForDefaultProfileId(manager) === 2) - assert(numExecutorsToAddForDefaultProfile(manager) === 1) - assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) - assert(numExecutorsTargetForDefaultProfileId(manager) === 2) - assert(numExecutorsToAddForDefaultProfile(manager) === 2) - - // Verify that running a task doesn't affect the target - post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) - assert(numExecutorsTargetForDefaultProfileId(manager) === 2) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) - doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) - assert(numExecutorsToAddForDefaultProfile(manager) === 2) + + // Add necessary executors + (0 to 2).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + + // Start all the tasks + (0 to 2).foreach { + i => + val t1Info = createTaskInfo(0, (i * 2) + 1, executorId = s"${i / 2}") + val t2Info = createTaskInfo(1, (i * 2) + 2, executorId = s"${i / 2}") + post(SparkListenerTaskStart(i, 0, t1Info)) + post(SparkListenerTaskStart(i, 0, t2Info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + // Complete the stage 0 tasks. + val t1Info = createTaskInfo(0, 0, executorId = s"0") + val t2Info = createTaskInfo(1, 1, executorId = s"0") + post(SparkListenerTaskEnd(0, 0, null, Success, t1Info, new ExecutorMetrics, null)) + post(SparkListenerTaskEnd(0, 0, null, Success, t2Info, new ExecutorMetrics, null)) + post(SparkListenerStageCompleted(createStageInfo(0, 2))) + + // Stage 1 and 2 becomes unschedulable now due to blacklisting + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(0))) + post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(2), Some(0))) + + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + // Assert that we are getting additional executor to schedule unschedulable tasks + assert(numExecutorsTarget(manager, defaultProfile.id) === 4) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4) + + // Add a new executor + onExecutorAddedDefaultProfile(manager, "3") + + // Now once the task becomes schedulable, clear the unschedulableTaskSets + // by posting unschedulable event with (None, None) + post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 2) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) } test("SPARK-31418: remove executors after unschedulable tasks end") { From 8a00d5b60799f1354728f39c08627296fc83c449 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Sat, 4 Jul 2020 12:27:12 -0700 Subject: [PATCH 07/12] Addressed Mridul's comments --- .../apache/spark/SparkFirehoseListener.java | 6 +++--- .../spark/ExecutorAllocationManager.scala | 18 ++++++++++-------- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/SparkListener.scala | 10 +++++----- .../spark/scheduler/SparkListenerBus.scala | 4 ++-- .../spark/ExecutorAllocationManagerSuite.scala | 14 +++++++------- 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 618746396bce3..9e4601114fd7b 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -162,9 +162,9 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } - public void onUnschedulableBlacklistTaskSubmitted( - SparkListenerUnschedulableBlacklistTaskSubmitted blacklistTask) { - onEvent(blacklistTask); + public void onUnschedulableTaskSet( + SparkListenerUnschedulableTaskSet unschedulableTaskSet) { + onEvent(unschedulableTaskSet); } @Override diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bfa44be650524..6b242fe33cff9 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -281,7 +281,7 @@ private[spark] class ExecutorAllocationManager( private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { val pending = listener.totalPendingTasksPerResourceProfile(rpId) val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId) - val numUnschedulables = listener.pendingUnschedulableTasksPerResourceProfile(rpId) + val numUnschedulables = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId) val running = listener.totalRunningTasksPerResourceProfile(rpId) val numRunningOrPendingTasks = pending + running val rp = resourceProfileManager.resourceProfileFromId(rpId) @@ -717,8 +717,7 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason if (stageAttemptToNumTasks.isEmpty && - stageAttemptToNumSpeculativeTasks.isEmpty && - unschedulableTaskSets.isEmpty) { + stageAttemptToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -811,10 +810,10 @@ private[spark] class ExecutorAllocationManager( } } - override def onUnschedulableBlacklistTaskSubmitted - (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = { - val stageId = blacklistedTask.stageId - val stageAttemptId = blacklistedTask.stageAttemptId + override def onUnschedulableTaskSet + (unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = { + val stageId = unschedulableTaskSet.stageId + val stageAttemptId = unschedulableTaskSet.stageAttemptId allocationManager.synchronized { (stageId, stageAttemptId) match { case (Some(stageId), Some(stageAttemptId)) => @@ -868,7 +867,10 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } - def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = { + // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first + // unschedulable task due to blacklisting. So keeping track of unschedulableTaskSets + // should be enough as we'll always have no more than a task unschedulable at any time. + def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ddf1b29b1b2bc..46944cdccfe64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1028,7 +1028,7 @@ private[spark] class DAGScheduler( stageId: Option[Int], stageAttemptId: Option[Int]): Unit = { listenerBus.post( - SparkListenerUnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId)) + SparkListenerUnschedulableTaskSet(stageId, stageAttemptId)) } private[scheduler] def handleTaskSetFailed( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 005421068e072..83aa438e95f1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -158,7 +158,7 @@ case class SparkListenerNodeUnblacklisted(time: Long, hostId: String) extends SparkListenerEvent @DeveloperApi -case class SparkListenerUnschedulableBlacklistTaskSubmitted( +case class SparkListenerUnschedulableTaskSet( stageId: Option[Int], stageAttemptId: Option[Int]) extends SparkListenerEvent @@ -347,8 +347,8 @@ private[spark] trait SparkListenerInterface { /** * Called when both dynamic allocation is enabled and there is an unschedulable blacklist task */ - def onUnschedulableBlacklistTaskSubmitted( - blacklistTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit + def onUnschedulableTaskSet( + blacklistTask: SparkListenerUnschedulableTaskSet): Unit /** * Called when the driver receives a block update info. @@ -436,8 +436,8 @@ abstract class SparkListener extends SparkListenerInterface { override def onNodeUnblacklisted( nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } - override def onUnschedulableBlacklistTaskSubmitted( - blacklistTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = { } + override def onUnschedulableTaskSet( + unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = { } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e99c0722612b0..71e1086d2149c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -79,8 +79,8 @@ private[spark] trait SparkListenerBus listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) - case blacklistTaskSubmitted: SparkListenerUnschedulableBlacklistTaskSubmitted => - listener.onUnschedulableBlacklistTaskSubmitted(blacklistTaskSubmitted) + case unschedulableTaskSet: SparkListenerUnschedulableTaskSet => + listener.onUnschedulableTaskSet(unschedulableTaskSet) case resourceProfileAdded: SparkListenerResourceProfileAdded => listener.onResourceProfileAdded(resourceProfileAdded) case _ => listener.onOtherEvent(event) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 395c4a28a92c3..c1104783d4c25 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -524,7 +524,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) // Stage 0 becomes unschedulable due to blacklisting - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(0), Some(0))) + post(SparkListenerUnschedulableTaskSet(Some(0), Some(0))) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) // Assert that we are getting additional executor to schedule unschedulable tasks @@ -535,7 +535,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { onExecutorAddedDefaultProfile(manager, "1") // Now once the task becomes schedulable, clear the unschedulableTaskSets // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + post(SparkListenerUnschedulableTaskSet(None, None)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 1) @@ -581,8 +581,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerStageCompleted(createStageInfo(0, 2))) // Stage 1 and 2 becomes unschedulable now due to blacklisting - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(1), Some(0))) - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(2), Some(0))) + post(SparkListenerUnschedulableTaskSet(Some(1), Some(0))) + post(SparkListenerUnschedulableTaskSet(Some(2), Some(0))) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) @@ -595,7 +595,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // Now once the task becomes schedulable, clear the unschedulableTaskSets // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + post(SparkListenerUnschedulableTaskSet(None, None)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 2) @@ -639,7 +639,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) } // Now due to blacklisting, the task becomes unschedulable - post(SparkListenerUnschedulableBlacklistTaskSubmitted(Some(0), Some(0))) + post(SparkListenerUnschedulableTaskSet(Some(0), Some(0))) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 2) @@ -650,7 +650,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // Now once the task becomes schedulable, clear the unschedulableTaskSets // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableBlacklistTaskSubmitted(None, None)) + post(SparkListenerUnschedulableTaskSet(None, None)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 1) From 7b4140b14cd02b4b424416f85fb1f04a7c1a9182 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 8 Jul 2020 16:48:47 -0700 Subject: [PATCH 08/12] Addressed tgraves and mridulm comments --- .../apache/spark/SparkFirehoseListener.java | 11 ++-- .../spark/ExecutorAllocationManager.scala | 50 +++++++++++-------- .../apache/spark/scheduler/DAGScheduler.scala | 42 ++++++++++++---- .../spark/scheduler/DAGSchedulerEvent.scala | 7 ++- .../spark/scheduler/SparkListener.scala | 32 +++++++++--- .../spark/scheduler/SparkListenerBus.scala | 6 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 23 +++++---- .../ExecutorAllocationManagerSuite.scala | 21 ++++---- 8 files changed, 123 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 9e4601114fd7b..c0e72b57d48bd 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -162,9 +162,14 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } - public void onUnschedulableTaskSet( - SparkListenerUnschedulableTaskSet unschedulableTaskSet) { - onEvent(unschedulableTaskSet); + public void onUnschedulableTaskSetAdded( + SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) { + onEvent(unschedulableTaskSetAdded); + } + + public void onUnschedulableTaskSetRemoved( + SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) { + onEvent(unschedulableTaskSetRemoved); } @Override diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6b242fe33cff9..c5121df5c7a60 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -281,7 +281,7 @@ private[spark] class ExecutorAllocationManager( private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { val pending = listener.totalPendingTasksPerResourceProfile(rpId) val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId) - val numUnschedulables = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId) + val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId) val running = listener.totalRunningTasksPerResourceProfile(rpId) val numRunningOrPendingTasks = pending + running val rp = resourceProfileManager.resourceProfileFromId(rpId) @@ -290,6 +290,7 @@ private[spark] class ExecutorAllocationManager( s" tasksperexecutor: $tasksPerExecutor") val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutor).toInt + val maxNeededWithSpeculationLocalityOffset = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { // If we have pending speculative tasks and only need a single executor, allocate one more @@ -299,11 +300,11 @@ private[spark] class ExecutorAllocationManager( maxNeeded } - // Since the maxNeededWithSpeculationLocalityOffset already includes the num executors needed - // to run unschedulable tasks, we would only try to add when the - // maxNeededWithSpeculationLocalityOffset is lesser than the active executors - if (numUnschedulables > 0) { - val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio / + if (unschedulableTaskSets > 0) { + // Request additional executors only if maxNeededWithSpeculationLocalityOffset is less than + // current active executors, if not requesting maxNeededWithSpeculationLocalityOffset should + // be enough to make the unschedulable tasks schedulable now. + val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio / tasksPerExecutor).toInt math.max(maxNeededWithSpeculationLocalityOffset, executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables) @@ -810,20 +811,25 @@ private[spark] class ExecutorAllocationManager( } } - override def onUnschedulableTaskSet - (unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = { - val stageId = unschedulableTaskSet.stageId - val stageAttemptId = unschedulableTaskSet.stageAttemptId + override def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { + val stageId = unschedulableTaskSetAdded.stageId + val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId allocationManager.synchronized { - (stageId, stageAttemptId) match { - case (Some(stageId), Some(stageAttemptId)) => - val stageAttempt = StageAttempt(stageId, stageAttemptId) - unschedulableTaskSets.add(stageAttempt) - case (None, _) => - // Clear unschedulableTaskSets since atleast one task becomes schedulable now - unschedulableTaskSets.clear() - } - allocationManager.onSchedulerBacklogged() + val stageAttempt = StageAttempt(stageId, stageAttemptId) + unschedulableTaskSets.add(stageAttempt) + } + allocationManager.onSchedulerBacklogged() + } + + override def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { + val stageId = unschedulableTaskSetRemoved.stageId + val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId + allocationManager.synchronized { + // Clear unschedulableTaskSets since atleast one task becomes schedulable now + val stageAttempt = StageAttempt(stageId, stageAttemptId) + unschedulableTaskSets.remove(stageAttempt) } } @@ -867,9 +873,9 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } - // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first - // unschedulable task due to blacklisting. So keeping track of unschedulableTaskSets - // should be enough as we'll always have no more than a task unschedulable at any time. + // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first unschedulable + // task found due to blacklisting. This way we only need to keep track of the unschedulable + // tasksets which is an indirect way to get the current number of unschedulable tasks. def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 46944cdccfe64..502e4ec336846 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -312,13 +312,23 @@ private[spark] class DAGScheduler( } /** - * Called by the TaskSetManager when there is an unschedulable blacklist task and dynamic + * Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and + * dynamic allocation is enabled + */ + def unschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + /** + * Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic * allocation is enabled */ - def unschedulableBlacklistTaskSubmitted( - stageId: Option[Int], - stageAttemptId: Option[Int]): Unit = { - eventProcessLoop.post(UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId)) + def unschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId)) } private[scheduler] @@ -1024,11 +1034,18 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) } - private[scheduler] def handleUnschedulableBlacklistTaskSubmitted( - stageId: Option[Int], - stageAttemptId: Option[Int]): Unit = { + private[scheduler] def handleUnschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { listenerBus.post( - SparkListenerUnschedulableTaskSet(stageId, stageAttemptId)) + SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + private[scheduler] def handleUnschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + listenerBus.post( + SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) } private[scheduler] def handleTaskSetFailed( @@ -2304,8 +2321,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) - case UnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId) => - dagScheduler.handleUnschedulableBlacklistTaskSubmitted(stageId, stageAttemptId) + case UnschedulableTaskSetAdded(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId) + + case UnschedulableTaskSetRemoved(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index e4576f5ed4332..d226fe88614d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -98,5 +98,10 @@ private[scheduler] case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent private[scheduler] -case class UnschedulableBlacklistTaskSubmitted(stageId: Option[Int], stageAttemptId: Option[Int]) +case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int) extends DAGSchedulerEvent + +private[scheduler] +case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int) + extends DAGSchedulerEvent + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 83aa438e95f1b..da6ff096cc01c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -158,9 +158,14 @@ case class SparkListenerNodeUnblacklisted(time: Long, hostId: String) extends SparkListenerEvent @DeveloperApi -case class SparkListenerUnschedulableTaskSet( - stageId: Option[Int], - stageAttemptId: Option[Int]) extends SparkListenerEvent +case class SparkListenerUnschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int) extends SparkListenerEvent + +@DeveloperApi +case class SparkListenerUnschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int) extends SparkListenerEvent @DeveloperApi case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent @@ -345,10 +350,18 @@ private[spark] trait SparkListenerInterface { def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit /** - * Called when both dynamic allocation is enabled and there is an unschedulable blacklist task + * Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation + * is enabled + */ + def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit + + /** + * Called when an unschedulable taskset becomes schedulable and dynamic allocation + * is enabled */ - def onUnschedulableTaskSet( - blacklistTask: SparkListenerUnschedulableTaskSet): Unit + def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit /** * Called when the driver receives a block update info. @@ -436,8 +449,11 @@ abstract class SparkListener extends SparkListenerInterface { override def onNodeUnblacklisted( nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } - override def onUnschedulableTaskSet( - unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = { } + override def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { } + + override def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 71e1086d2149c..13e65f4291fd0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -79,8 +79,10 @@ private[spark] trait SparkListenerBus listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) - case unschedulableTaskSet: SparkListenerUnschedulableTaskSet => - listener.onUnschedulableTaskSet(unschedulableTaskSet) + case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded => + listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded) + case unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved => + listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved) case resourceProfileAdded: SparkListenerResourceProfileAdded => listener.onResourceProfileAdded(resourceProfileAdded) case _ => listener.onOtherEvent(event) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 27abc225dd45f..83c00dd4f248e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -637,10 +637,9 @@ private[spark] class TaskSchedulerImpl( if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // If the taskSet is unschedulable we try to find an existing idle blacklisted - // executor. If we cannot find one, we abort immediately. Else we kill the idle - // executor and kick off an abortTimer which if it doesn't schedule a task within the - // the timeout will abort the taskSet if we were unable to schedule any task from the - // taskSet. + // executor and kill the idle executor and kick off an abortTimer which if it doesn't + // schedule a task within the the timeout will abort the taskSet if we were unable to + // schedule any task from the taskSet. // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per // task basis. // Note 2: The taskSet can still be aborted when there are more than one idle @@ -648,6 +647,10 @@ private[spark] class TaskSchedulerImpl( // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort // timer to expire and abort the taskSet. + // + // If there are no idle executors and dynamic allocation is enabled, then we would + // notify ExecutorAllocationManager to allocate more executors to schedule the + // unschedulable tasks else we will abort immediately. executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { case Some ((executorId, _)) => if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { @@ -659,11 +662,10 @@ private[spark] class TaskSchedulerImpl( // in order to provision more executors to make them schedulable if (Utils.isDynamicAllocationEnabled(conf)) { if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { - logInfo(s"Notifying ExecutorAllocationManager to add executors to" + - s" schedule the unschedulable blacklisted task before aborting $taskSet.") - dagScheduler.unschedulableBlacklistTaskSubmitted( - Some(taskSet.taskSet.stageId), - Some(taskSet.taskSet.stageAttemptId)) + logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" + + s" schedule the unschedulable task before aborting $taskSet.") + dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId, + taskSet.taskSet.stageAttemptId) updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) } } else { @@ -686,7 +688,8 @@ private[spark] class TaskSchedulerImpl( "recently scheduled.") // Notify ExecutorAllocationManager as well as other subscribers that a task now // recently becomes schedulable - dagScheduler.unschedulableBlacklistTaskSubmitted(None, None) + dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId, + taskSet.taskSet.stageAttemptId) unschedulableTaskSetToExpiryTime.clear() } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index c1104783d4c25..ea6e010ef29a7 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -524,7 +524,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) // Stage 0 becomes unschedulable due to blacklisting - post(SparkListenerUnschedulableTaskSet(Some(0), Some(0))) + post(SparkListenerUnschedulableTaskSetAdded(0, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) // Assert that we are getting additional executor to schedule unschedulable tasks @@ -534,8 +534,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // Add a new executor onExecutorAddedDefaultProfile(manager, "1") // Now once the task becomes schedulable, clear the unschedulableTaskSets - // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableTaskSet(None, None)) + post(SparkListenerUnschedulableTaskSetRemoved(0, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 1) @@ -581,8 +580,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerStageCompleted(createStageInfo(0, 2))) // Stage 1 and 2 becomes unschedulable now due to blacklisting - post(SparkListenerUnschedulableTaskSet(Some(1), Some(0))) - post(SparkListenerUnschedulableTaskSet(Some(2), Some(0))) + post(SparkListenerUnschedulableTaskSetAdded(1, 0)) + post(SparkListenerUnschedulableTaskSetAdded(2, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) @@ -594,12 +593,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { onExecutorAddedDefaultProfile(manager, "3") // Now once the task becomes schedulable, clear the unschedulableTaskSets - // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableTaskSet(None, None)) + post(SparkListenerUnschedulableTaskSetRemoved(1, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) - assert(numExecutorsTarget(manager, defaultProfile.id) === 2) - assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) + assert(numExecutorsTarget(manager, defaultProfile.id) === 4) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) } test("SPARK-31418: remove executors after unschedulable tasks end") { @@ -639,7 +637,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) } // Now due to blacklisting, the task becomes unschedulable - post(SparkListenerUnschedulableTaskSet(Some(0), Some(0))) + post(SparkListenerUnschedulableTaskSetAdded(0, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 2) @@ -649,8 +647,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { onExecutorAddedDefaultProfile(manager, "5") // Now once the task becomes schedulable, clear the unschedulableTaskSets - // by posting unschedulable event with (None, None) - post(SparkListenerUnschedulableTaskSet(None, None)) + post(SparkListenerUnschedulableTaskSetRemoved(0, 0)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager, defaultProfile.id) === 1) From 6fc74b300ed859407fbd957fb2a6fa86f4e9cafa Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 15 Jul 2020 10:14:11 -0700 Subject: [PATCH 09/12] Address tgraves review comments --- .../spark/ExecutorAllocationManager.scala | 28 +++++++-------- .../apache/spark/scheduler/DAGScheduler.scala | 10 +++--- .../spark/scheduler/SparkListener.scala | 4 +-- .../scheduler/TaskSchedulerImplSuite.scala | 35 +++++++++++++++++++ 4 files changed, 55 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c5121df5c7a60..c3ec589ea00ee 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -301,9 +301,9 @@ private[spark] class ExecutorAllocationManager( } if (unschedulableTaskSets > 0) { - // Request additional executors only if maxNeededWithSpeculationLocalityOffset is less than - // current active executors, if not requesting maxNeededWithSpeculationLocalityOffset should - // be enough to make the unschedulable tasks schedulable now. + // Request additional executors to account for task sets having tasks that are unschedulable + // due to blacklisting when the active executor count has already reached the max needed + // which we would normally get. val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio / tasksPerExecutor).toInt math.max(maxNeededWithSpeculationLocalityOffset, @@ -717,8 +717,7 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageAttemptToNumTasks.isEmpty && - stageAttemptToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -815,20 +814,20 @@ private[spark] class ExecutorAllocationManager( unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { val stageId = unschedulableTaskSetAdded.stageId val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) allocationManager.synchronized { - val stageAttempt = StageAttempt(stageId, stageAttemptId) unschedulableTaskSets.add(stageAttempt) + allocationManager.onSchedulerBacklogged() } - allocationManager.onSchedulerBacklogged() } override def onUnschedulableTaskSetRemoved( unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { val stageId = unschedulableTaskSetRemoved.stageId val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) allocationManager.synchronized { // Clear unschedulableTaskSets since atleast one task becomes schedulable now - val stageAttempt = StageAttempt(stageId, stageAttemptId) unschedulableTaskSets.remove(stageAttempt) } } @@ -873,9 +872,11 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } - // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes the first unschedulable - // task found due to blacklisting. This way we only need to keep track of the unschedulable - // tasksets which is an indirect way to get the current number of unschedulable tasks. + /** + * Currently we only know when a task set has an unschedulable task, we don't know + * the exact number and since the allocation manager isn't tied closely with the scheduler, + * we use the number of tasks sets that are unschedulable as a heuristic to add more executors. + */ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size @@ -889,12 +890,11 @@ private[spark] class ExecutorAllocationManager( } def hasPendingTasks: Boolean = { - hasPendingSpeculativeTasks || hasPendingRegularTasks || hasPendingUnschedulableTasks + hasPendingSpeculativeTasks || hasPendingRegularTasks } def totalPendingTasksPerResourceProfile(rp: Int): Int = { - pendingTasksPerResourceProfile(rp) + - pendingSpeculativeTasksPerResourceProfile(rp) + pendingTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 502e4ec336846..d7b574365d779 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -313,7 +313,7 @@ private[spark] class DAGScheduler( /** * Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and - * dynamic allocation is enabled + * dynamic allocation is enabled. */ def unschedulableTaskSetAdded( stageId: Int, @@ -323,7 +323,7 @@ private[spark] class DAGScheduler( /** * Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic - * allocation is enabled + * allocation is enabled. */ def unschedulableTaskSetRemoved( stageId: Int, @@ -1037,15 +1037,13 @@ private[spark] class DAGScheduler( private[scheduler] def handleUnschedulableTaskSetAdded( stageId: Int, stageAttemptId: Int): Unit = { - listenerBus.post( - SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId)) + listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId)) } private[scheduler] def handleUnschedulableTaskSetRemoved( stageId: Int, stageAttemptId: Int): Unit = { - listenerBus.post( - SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) + listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) } private[scheduler] def handleTaskSetFailed( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index da6ff096cc01c..8119215b8b74f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -351,14 +351,14 @@ private[spark] trait SparkListenerInterface { /** * Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation - * is enabled + * is enabled. */ def onUnschedulableTaskSetAdded( unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit /** * Called when an unschedulable taskset becomes schedulable and dynamic allocation - * is enabled + * is enabled. */ def onUnschedulableTaskSetRemoved( unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e43be60e956be..d86f0dbc8cd85 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1000,6 +1000,41 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!tsm.isZombie) } + test("SPARK-31418 abort timer should kick in when task is completely blacklisted &" + + "allocation manager could not acquire a new executor before the timeout") { + // set the abort timer to fail immediately + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0", + config.DYN_ALLOCATION_ENABLED.key -> "true") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + failTask(failedTask.taskId, TaskState.FAILED, UnknownReason, tsm) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to kick in immediately + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // Wait for the abort timer to kick in. Even though we configure the timeout to be 0, there is a + // slight delay as the abort timer is launched in a separate thread. + eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) + } + } + /** * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks). From ac7b6121c47ba8f2570285b0610359889eb0cb27 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 15 Jul 2020 12:57:33 -0700 Subject: [PATCH 10/12] fix the test changes --- .../scheduler/TaskSchedulerImplSuite.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index d86f0dbc8cd85..9ca3ce9d43ca5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1007,21 +1007,22 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0", config.DYN_ALLOCATION_ENABLED.key -> "true") - // We have only 1 task remaining with 1 executor - val taskSet = FakeTask.createTaskSet(numTasks = 1) + // We have 2 tasks remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 2) taskScheduler.submitTasks(taskSet) val tsm = stageToMockTaskSetManager(0) // submit an offer with one executor - val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( - WorkerOffer("executor0", "host0", 1) - )).flatten + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor0", "host0", 2))).flatten // Fail the running task - val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get - failTask(failedTask.taskId, TaskState.FAILED, UnknownReason, tsm) + failTask(0, TaskState.FAILED, UnknownReason, tsm) when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( - "executor0", failedTask.index)).thenReturn(true) + "executor0", 0)).thenReturn(true) + + // If the executor is busy, then dynamic allocation should kick in and try + // to acquire additional executors to schedule the blacklisted task + assert(taskScheduler.isExecutorBusy("executor0")) // make an offer on the blacklisted executor. We won't schedule anything, and set the abort // timer to kick in immediately From d9f473d748b206426106a9c4e97e153aec2f508f Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 21 Jul 2020 15:12:03 -0700 Subject: [PATCH 11/12] Address tgraves review comments --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 7 ------- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c3ec589ea00ee..85409d599ccaa 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -882,13 +882,6 @@ private[spark] class ExecutorAllocationManager( attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size } - def hasPendingUnschedulableTasks: Boolean = { - val attemptSets = resourceProfileIdToStageAttempt.values - attemptSets.exists { attempts => - attempts.exists(unschedulableTaskSets.contains(_)) - } - } - def hasPendingTasks: Boolean = { hasPendingSpeculativeTasks || hasPendingRegularTasks } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d7b574365d779..9cac5a8b4a064 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1041,8 +1041,8 @@ private[spark] class DAGScheduler( } private[scheduler] def handleUnschedulableTaskSetRemoved( - stageId: Int, - stageAttemptId: Int): Unit = { + stageId: Int, + stageAttemptId: Int): Unit = { listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) } From d6f1e73ae49d9641a8efcb1f5e9016bd378a70c6 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 22 Jul 2020 10:55:56 -0700 Subject: [PATCH 12/12] Addressed indentation review comments --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9cac5a8b4a064..e9fb0759c2746 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -316,8 +316,8 @@ private[spark] class DAGScheduler( * dynamic allocation is enabled. */ def unschedulableTaskSetAdded( - stageId: Int, - stageAttemptId: Int): Unit = { + stageId: Int, + stageAttemptId: Int): Unit = { eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId)) } @@ -326,8 +326,8 @@ private[spark] class DAGScheduler( * allocation is enabled. */ def unschedulableTaskSetRemoved( - stageId: Int, - stageAttemptId: Int): Unit = { + stageId: Int, + stageAttemptId: Int): Unit = { eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId)) }