Skip to content
Closed
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet
private val executorsPendingToRemove = new HashSet[String]

// Executors we have requested the cluster manager to replace with new ones that have killed
private val executorsToReplace = new HashSet[String]

// Number of executors requested from the cluster manager that have not replaced yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "have not been replaced"

private var numReplacingExecutors = 0

// A map to store hostname with its possible task number running on it
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty

Expand Down Expand Up @@ -147,6 +153,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
if (numReplacingExecutors > 0) {
numReplacingExecutors -= 1
logDebug(s"Decremented number of replaceing executors ($numReplacingExecutors left)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "number of executors being replaced"

}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor)
Expand Down Expand Up @@ -236,6 +246,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingToRemove -= executorId
if (executorsToReplace.contains(executorId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: if (executorsToReplace.remove(executorId)) { ... }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code is not need, thanks.

executorsToReplace -= executorId
if (numReplacingExecutors > 0) {
numReplacingExecutors -= 1
}
}
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
Expand Down Expand Up @@ -431,7 +447,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ numReplacingExecutors)
} else {
executorsToReplace ++= knownExecutors
numReplacingExecutors += knownExecutors.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need another variable? Can't we just do

} else {
  numPendingExecutors += knownExecutors.size
}

This makes sense on a high level too; if we replace an executor we expect to get one back, so it should be pending in the mean time.

}

doKillExecutors(executorsToKill)
Expand Down