Skip to content
Closed
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet
private val executorsPendingToRemove = new HashSet[String]

// Number of executors requested from the cluster manager that have not replaced yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "have not been replaced"

private var numReplacingExecutors = 0

// A map to store hostname with its possible task number running on it
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty

Expand Down Expand Up @@ -147,6 +150,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
if (numReplacingExecutors > 0) {
numReplacingExecutors -= 1
logDebug(s"Decremented number of replaceing executors ($numReplacingExecutors left)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "number of executors being replaced"

}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor)
Expand Down Expand Up @@ -431,7 +438,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ numReplacingExecutors)
} else {
numReplacingExecutors += knownExecutors.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need another variable? Can't we just do

} else {
  numPendingExecutors += knownExecutors.size
}

This makes sense on a high level too; if we replace an executor we expect to get one back, so it should be pending in the mean time.

}

doKillExecutors(executorsToKill)
Expand Down