Skip to content

Commit aebabf0

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources
### What changes were proposed in this pull request? Eagerly filter out zombie `TaskSetManager` before offering resources to reduce any overhead as possible. And this PR also avoid doing `recomputeLocality` and `addPendingTask` when `TaskSetManager` is zombie. ### Why are the changes needed? Zombie `TaskSetManager` could still exist in Pool's `schedulableQueue` when it has running tasks. Offering resources on a zombie `TaskSetManager` could bring unnecessary overhead and is meaningless. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass Jenkins. Closes #27455 from Ngone51/exclude-zombie-tsm. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent b95ccb1 commit aebabf0

2 files changed

Lines changed: 5 additions & 1 deletion

File tree

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ private[spark] class TaskSchedulerImpl(
430430
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
431431
val availableResources = shuffledOffers.map(_.resources).toArray
432432
val availableCpus = shuffledOffers.map(o => o.cores).toArray
433-
val sortedTaskSets = rootPool.getSortedTaskSetQueue
433+
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
434434
for (taskSet <- sortedTaskSets) {
435435
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
436436
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ private[spark] class TaskSetManager(
229229
index: Int,
230230
resolveRacks: Boolean = true,
231231
speculatable: Boolean = false): Unit = {
232+
// A zombie TaskSetManager may reach here while handling failed task.
233+
if (isZombie) return
232234
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
233235
for (loc <- tasks(index).preferredLocations) {
234236
loc match {
@@ -1082,6 +1084,8 @@ private[spark] class TaskSetManager(
10821084
}
10831085

10841086
def recomputeLocality(): Unit = {
1087+
// A zombie TaskSetManager may reach here while executorLost happens
1088+
if (isZombie) return
10851089
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
10861090
myLocalityLevels = computeValidLocalityLevels()
10871091
localityWaits = myLocalityLevels.map(getLocalityWait)

0 commit comments

Comments
 (0)