From e1cc521817c49fdf3448fa9290f50129d837d8bc Mon Sep 17 00:00:00 2001 From: hualiu Date: Wed, 3 May 2017 11:30:00 -0700 Subject: [PATCH] add spark.yarn.launchContainer.count.simultaneously to cap # of executors to be launched simultaneously --- .../spark/deploy/yarn/YarnAllocator.scala | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7..7ed4b894d8ef 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -257,7 +257,9 @@ private[yarn] class YarnAllocator( * This must be synchronized because variables read in this method are mutated by other methods. */ def allocateResources(): Unit = synchronized { - updateResourceRequests() + val launchContainerCount = + sparkConf.getInt("spark.yarn.launchContainer.count.simultaneously", -1) + updateResourceRequests(launchContainerCount) val progressIndicator = 0.1f // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container @@ -292,9 +294,16 @@ private[yarn] class YarnAllocator( * Visible for testing. */ def updateResourceRequests(): Unit = { + updateResourceRequests(-1) + } + + def updateResourceRequests(maxCount : Int): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + var missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + if (maxCount > 0) { + missing = math.min(missing, maxCount) + } if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -431,18 +440,33 @@ private[yarn] class YarnAllocator( remainingAfterOffRackMatches) } - if (!remainingAfterOffRackMatches.isEmpty) { - logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " + + var launchContainerCount = + sparkConf.getInt("spark.yarn.launchContainer.count.simultaneously", -1) + val containerNumToLaunch = { + if (launchContainerCount <= 0) { + containersToUse.size + } else { + math.min(launchContainerCount, containersToUse.size) + } + } + val filteredContainersToUse = containersToUse.drop(containersToUse.size - containerNumToLaunch) + val remainingAfterFilter + = containersToUse -- filteredContainersToUse ++ remainingAfterOffRackMatches + + if (!remainingAfterFilter.isEmpty) { + logDebug(s"Releasing ${remainingAfterFilter.size} unneeded containers that were " + s"allocated to us") - for (container <- remainingAfterOffRackMatches) { + for (container <- remainingAfterFilter) { internalReleaseContainer(container) } } - runAllocatedContainers(containersToUse) + if (filteredContainersToUse.length > 0) { + runAllocatedContainers(filteredContainersToUse) + } logInfo("Received %d containers from YARN, launching executors on %d of them." - .format(allocatedContainers.size, containersToUse.size)) + .format(allocatedContainers.size, filteredContainersToUse.size)) } /**