File tree Expand file tree Collapse file tree 1 file changed +15
-3
lines changed
yarn/src/main/scala/org/apache/spark/scheduler/cluster Expand file tree Collapse file tree 1 file changed +15
-3
lines changed Original file line number Diff line number Diff line change @@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
125125 * This includes executors already pending or running.
126126 */
127127 override def doRequestTotalExecutors (requestedTotal : Int ): Boolean = {
128- yarnSchedulerEndpointRef.askWithRetry[Boolean ](
129- RequestExecutors (requestedTotal, localityAwareTasks, hostToLocalTaskCount))
128+ val r = RequestExecutors (requestedTotal, localityAwareTasks, hostToLocalTaskCount)
129+ yarnSchedulerEndpoint.amEndpoint match {
130+ case Some (am) =>
131+ try {
132+ am.askWithRetry[Boolean ](r)
133+ } catch {
134+ case NonFatal (e) =>
135+ logError(s " Sending $r to AM was unsuccessful " , e)
136+ return false
137+ }
138+ case None =>
139+ logWarning(" Attempted to request executors before the AM has registered!" )
140+ return false
141+ }
130142 }
131143
132144 /**
@@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
209221 */
210222 private class YarnSchedulerEndpoint (override val rpcEnv : RpcEnv )
211223 extends ThreadSafeRpcEndpoint with Logging {
212- private var amEndpoint : Option [RpcEndpointRef ] = None
224+ var amEndpoint : Option [RpcEndpointRef ] = None
213225
214226 private val askAmThreadPool =
215227 ThreadUtils .newDaemonCachedThreadPool(" yarn-scheduler-ask-am-thread-pool" )
You can’t perform that action at this time.
0 commit comments