From 41cf47ed8d14e879dbaeaadcf8f4b681b2791223 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Tue, 22 Jul 2014 16:15:40 +0800 Subject: [PATCH 1/5] Fix race condition at SchedulerBackend.isReady in standalone mode --- .../CoarseGrainedSchedulerBackend.scala | 29 +++++++++---------- .../cluster/SparkDeploySchedulerBackend.scala | 6 +++- docs/configuration.md | 13 +++++---- .../cluster/YarnClientSchedulerBackend.scala | 8 +++-- .../cluster/YarnClusterSchedulerBackend.scala | 9 ++++-- 5 files changed, 37 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9f085eef4672..cb2dbd747bbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + // Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds). val maxRegisteredWaitingTime = - conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) val createTime = System.currentTimeMillis() - var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { - ready = true - logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + - executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + - ", minRegisteredExecutorsRatio: " + minRegisteredRatio) - } + totalExecutors.addAndGet(1) makeOffers() } @@ -268,14 +263,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } + def checkRegisteredResources(): Boolean = true + override def isReady(): Boolean = { - if (ready) { + if (checkRegisteredResources) { + logInfo("SchedulerBackend is ready for scheduling beginning" + + ", total expected resources: " + totalExpectedResources.get() + + ", minRegisteredResourcesRatio: " + minRegisteredRatio) return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { - ready = true logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) + "maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime) return true } false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 48aaaa54bdb3..c7cc283713c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + totalExpectedResources.getAndSet(maxCores.getOrElse(0)) override def start() { super.start() @@ -98,7 +99,6 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { - totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } @@ -111,4 +111,8 @@ private[spark] class SparkDeploySchedulerBackend( logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason.toString) } + + override def checkRegisteredResources(): Boolean = { + totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio + } } diff --git a/docs/configuration.md b/docs/configuration.md index ea69057b5be1..9c4920a01236 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -746,21 +746,22 @@ Apart from these, the following properties are also available, and may be useful - spark.scheduler.minRegisteredExecutorsRatio + spark.scheduler.minRegisteredResourcesRatio 0 - The minimum ratio of registered executors (registered executors / total expected executors) + The minimum ratio of registered resources (registered resources / total expected resources) + (resources are executors in yarn mode, CPU cores in standalone and mesos mode) to wait for before scheduling begins. Specified as a double between 0 and 1. - Regardless of whether the minimum ratio of executors has been reached, + Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config - spark.scheduler.maxRegisteredExecutorsWaitingTime + spark.scheduler.maxRegisteredResourcesWaitingTime - spark.scheduler.maxRegisteredExecutorsWaitingTime + spark.scheduler.maxRegisteredResourcesWaitingTime 30000 - Maximum amount of time to wait for executors to register before scheduling begins + Maximum amount of time to wait for resources to register before scheduling begins (in milliseconds). diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f8fb96b312f2..00a6bec57c63 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -30,9 +30,8 @@ private[spark] class YarnClientSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 - ready = false } var client: Client = null @@ -84,7 +83,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors.set(args.numExecutors) + totalExpectedResources.set(args.numExecutors) client = new Client(args, conf) appId = client.runApp() waitForApp() @@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } + override def checkRegisteredResources(): Boolean = { + totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio + } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 0ad1794d1953..cba717c62e78 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -27,9 +27,8 @@ private[spark] class YarnClusterSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 - ready = false } override def start() { @@ -40,6 +39,10 @@ private[spark] class YarnClusterSchedulerBackend( } // System property can override environment variable. numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) - totalExpectedExecutors.set(numExecutors) + totalExpectedResources.set(numExecutors) + } + + override def checkRegisteredResources(): Boolean = { + totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio } } From 88c7dc6a64a817f22f4b2e3f8a723a8e515e089e Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Wed, 23 Jul 2014 13:33:03 +0800 Subject: [PATCH 2/5] Few codes and docs refactor --- .../cluster/CoarseGrainedSchedulerBackend.scala | 11 ++++++----- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- docs/configuration.md | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cb2dbd747bbc..ac2581c4e041 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -54,9 +54,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) - if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds). + var minRegisteredRatio = + math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) + // Submit tasks after maxRegisteredWaitingTime milliseconds + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) val createTime = System.currentTimeMillis() @@ -263,10 +264,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - def checkRegisteredResources(): Boolean = true + def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { - if (checkRegisteredResources) { + if (sufficientResourcesRegistered) { logInfo("SchedulerBackend is ready for scheduling beginning" + ", total expected resources: " + totalExpectedResources.get() + ", minRegisteredResourcesRatio: " + minRegisteredRatio) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c7cc283713c9..0e64d99e26f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -112,7 +112,7 @@ private[spark] class SparkDeploySchedulerBackend( removeExecutor(fullId.split("/")(1), reason.toString) } - override def checkRegisteredResources(): Boolean = { + override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio } } diff --git a/docs/configuration.md b/docs/configuration.md index 9c4920a01236..fa4216b026fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -750,7 +750,7 @@ Apart from these, the following properties are also available, and may be useful 0 The minimum ratio of registered resources (registered resources / total expected resources) - (resources are executors in yarn mode, CPU cores in standalone and mesos mode) + (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 00a6bec57c63..c1717f50d4ad 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -149,7 +149,7 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } - override def checkRegisteredResources(): Boolean = { + override def sufficientResourcesRegistered(): Boolean = { totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index cba717c62e78..669f3c4c397b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -42,7 +42,7 @@ private[spark] class YarnClusterSchedulerBackend( totalExpectedResources.set(numExecutors) } - override def checkRegisteredResources(): Boolean = { + override def sufficientResourcesRegistered(): Boolean = { totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio } } From ca54bd950b4d5e2159d9cea86a3d38bb95fe7cf7 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Wed, 23 Jul 2014 14:58:02 +0800 Subject: [PATCH 3/5] Format log with String interpolation --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ac2581c4e041..f3d02b9b4abb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -268,14 +268,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A override def isReady(): Boolean = { if (sufficientResourcesRegistered) { - logInfo("SchedulerBackend is ready for scheduling beginning" + - ", total expected resources: " + totalExpectedResources.get() + - ", minRegisteredResourcesRatio: " + minRegisteredRatio) + logInfo("SchedulerBackend is ready for scheduling beginning, total expected resources: " + + s"$totalExpectedResources, minRegisteredResourcesRatio: $minRegisteredRatio") return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - "maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime) + s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)") return true } false From abf48606f7826324e5b53cec85bf26b774afd8bf Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Thu, 24 Jul 2014 09:37:20 +0800 Subject: [PATCH 4/5] Push down variable totalExpectedResources to children classes --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 ++--- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 4 ++-- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 5 +++-- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 6 ++++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f3d02b9b4abb..5349426345a0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -48,7 +48,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) var totalExecutors = new AtomicInteger(0) - var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) @@ -268,8 +267,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A override def isReady(): Boolean = { if (sufficientResourcesRegistered) { - logInfo("SchedulerBackend is ready for scheduling beginning, total expected resources: " + - s"$totalExpectedResources, minRegisteredResourcesRatio: $minRegisteredRatio") + logInfo("SchedulerBackend is ready for scheduling beginning after " + + s"reached minRegisteredResourcesRatio: $minRegisteredRatio") return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0e64d99e26f6..97af9a5b26ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - totalExpectedResources.getAndSet(maxCores.getOrElse(0)) + val totalExpectedCores = maxCores.getOrElse(0) override def start() { super.start() @@ -113,6 +113,6 @@ private[spark] class SparkDeploySchedulerBackend( } override def sufficientResourcesRegistered(): Boolean = { - totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio + totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index c1717f50d4ad..8217440a1418 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -38,6 +38,7 @@ private[spark] class YarnClientSchedulerBackend( var appId: ApplicationId = null var checkerThread: Thread = null var stopping: Boolean = false + var totalExpectedExecutors = 0 private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { @@ -83,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedResources.set(args.numExecutors) + totalExpectedExecutors = args.numExecutors client = new Client(args, conf) appId = client.runApp() waitForApp() @@ -150,6 +151,6 @@ private[spark] class YarnClientSchedulerBackend( } override def sufficientResourcesRegistered(): Boolean = { - totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio + totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 669f3c4c397b..82ca4efded74 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -27,6 +27,8 @@ private[spark] class YarnClusterSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + var totalExpectedExecutors = 0 + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 } @@ -39,10 +41,10 @@ private[spark] class YarnClusterSchedulerBackend( } // System property can override environment variable. numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) - totalExpectedResources.set(numExecutors) + totalExpectedExecutors = numExecutors } override def sufficientResourcesRegistered(): Boolean = { - totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio + totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } } From e9a630b5e107438e372f80f47751efe140ad125d Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 4 Aug 2014 09:11:46 +0800 Subject: [PATCH 5/5] Rename variable totalExecutors and clean codes --- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5349426345a0..33500d967ebb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -47,7 +47,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExecutors = new AtomicInteger(0) + var totalRegisteredExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) @@ -94,7 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - totalExecutors.addAndGet(1) + totalRegisteredExecutors.addAndGet(1) makeOffers() } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 8217440a1418..833e249f9f61 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -151,6 +151,6 @@ private[spark] class YarnClientSchedulerBackend( } override def sufficientResourcesRegistered(): Boolean = { - totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio + totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 82ca4efded74..55665220a6f9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -35,16 +35,16 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors) + totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) + .getOrElse(totalExpectedExecutors) } // System property can override environment variable. - numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) - totalExpectedExecutors = numExecutors + totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) } override def sufficientResourcesRegistered(): Boolean = { - totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio + totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } }