-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl #29579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
d5bc756
404f92b
47da0d7
3152e9b
a0bc4f6
75a14a6
b6490fc
bea465b
c12c82d
90f1fd1
84df735
0c0749e
6e8b57e
a39ba8e
ff02621
9096cb9
58add67
d246840
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( | |
| cores)) | ||
| listener.executorAdded(fullId, workerId, hostPort, cores, memory) | ||
|
|
||
| case ExecutorUpdated(id, state, message, exitStatus, workerLost) => | ||
| case ExecutorUpdated(id, state, message, exitStatus, workerHost) => | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I would still be okay with workerLost being an Option[String] instead of a Boolean. Obviously, had it been called "workerIsLost" then we would have to rename it. But I am also fine with the new name workerHost as well. I don't particularly think that the name workerLost must connote a boolean. This ExecutorUpdated message is a case in point where the "lost" part is meaningful because it refers to the "worker that is lost" as opposed to some random worker-host. But no strong feelings on this and I am happy with the choice workerHost. |
||
| val fullId = appId + "/" + id | ||
| val messageText = message.map(s => " (" + s + ")").getOrElse("") | ||
| logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) | ||
| if (ExecutorState.isFinished(state)) { | ||
| listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) | ||
| listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) | ||
| } else if (state == ExecutorState.DECOMMISSIONED) { | ||
| listener.executorDecommissioned(fullId, | ||
| ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) | ||
| ExecutorDecommissionInfo(message.getOrElse(""), workerHost)) | ||
| } | ||
|
|
||
| case WorkerRemoved(id, host, message) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -308,7 +308,7 @@ private[deploy] class Master( | |
| appInfo.resetRetryCount() | ||
| } | ||
|
|
||
| exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) | ||
| exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None)) | ||
|
|
||
| if (ExecutorState.isFinished(state)) { | ||
| // Remove this executor from the worker and app | ||
|
|
@@ -909,9 +909,10 @@ private[deploy] class Master( | |
| exec.application.driver.send(ExecutorUpdated( | ||
| exec.id, ExecutorState.DECOMMISSIONED, | ||
| Some("worker decommissioned"), None, | ||
| // workerLost is being set to true here to let the driver know that the host (aka. worker) | ||
| // is also being decommissioned. | ||
| workerLost = true)) | ||
| // worker host is being set here to let the driver know that the host (aka. worker) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you reword the comment to be more accurate now :-)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated a little bit. |
||
| // is also being decommissioned. So the driver can unregister all the shuffle map | ||
| // statues located at this host when it receives the executor lost event. | ||
| Some(worker.host))) | ||
| exec.state = ExecutorState.DECOMMISSIONED | ||
| exec.application.removeExecutor(exec) | ||
| } | ||
|
|
@@ -932,7 +933,7 @@ private[deploy] class Master( | |
| for (exec <- worker.executors.values) { | ||
| logInfo("Telling app of lost executor: " + exec.id) | ||
| exec.application.driver.send(ExecutorUpdated( | ||
| exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) | ||
| exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host))) | ||
| exec.state = ExecutorState.LOST | ||
| exec.application.removeExecutor(exec) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,14 +53,15 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los | |
|
|
||
| /** | ||
| * @param _message human readable loss reason | ||
| * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service) | ||
| * @param workerHost it's defined when the host is confirmed lost too (i.e. including | ||
| * shuffle service) | ||
| * @param causedByApp whether the loss of the executor is the fault of the running app. | ||
| * (assumed true by default unless known explicitly otherwise) | ||
| */ | ||
| private[spark] | ||
| case class ExecutorProcessLost( | ||
| _message: String = "Executor Process Lost", | ||
| workerLost: Boolean = false, | ||
| workerHost: Option[String] = None, | ||
| causedByApp: Boolean = true) | ||
| extends ExecutorLossReason(_message) | ||
|
|
||
|
|
@@ -69,5 +70,8 @@ case class ExecutorProcessLost( | |
| * | ||
| * This is used by the task scheduler to remove state associated with the executor, but | ||
| * not yet fail any tasks that were running in the executor before the executor is "fully" lost. | ||
| * | ||
| * @param workerHost it's defined when the worker is decommissioned too | ||
|
||
| */ | ||
| private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") | ||
| private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) | ||
| extends ExecutorLossReason("Executor decommission.") | ||
Uh oh!
There was an error while loading. Please reload this page.