From 5d33e3e9e51de80c66233d8dc1fb4551e2c696b2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 20 Jan 2017 17:09:59 -0800 Subject: [PATCH 1/3] [SPARK-18750][yarn] Avoid using "mapValues" when allocating containers. That method is prone to stack overflows when the input map is really large; instead, use plain "map". Also includes a unit tests that was tested and caused stack overflows without the fix. --- ...yPreferredContainerPlacementStrategy.scala | 11 +-- .../yarn/LocalityPlacementStrategySuite.scala | 77 +++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index fb2d61f621c8..f2b6324db619 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( val largestRatio = updatedHostToContainerCount.values.max // Round the ratio of preferred locality to the number of locality required container // number, which is used for locality preferred host calculating. - var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio => + var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) => val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio - adjustedRatio.ceil.toInt + (k, adjustedRatio.ceil.toInt) } for (i <- 0 until requiredLocalityAwareContainerNum) { @@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( // Minus 1 each time when the host is used. When the current ratio is 0, // which means all the required ratio is satisfied, this host will not be allocated again. - preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1) + preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) } } } @@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( val possibleTotalContainerNum = pendingHostToContainerCount.values.sum val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble - pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum) - .toMap + pendingHostToContainerCount.map { case (k, v) => + (k, v * localityMatchedPendingNum / possibleTotalContainerNum) + }.toMap } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala new file mode 100644 index 000000000000..7691c19df493 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.{HashMap, HashSet, Set} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class LocalityPlacementStrategySuite extends SparkFunSuite { + + test("handle large number of containers and tasks (SPARK-18750)") { + // Run the test in a thread with a small stack size, since the original issue + // surfaced as a StackOverflowError. + var error: Throwable = null + + val runnable = new Runnable() { + override def run(): Unit = try { + runTest() + } catch { + case e: Throwable => error = e + } + } + + val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024) + thread.start() + thread.join() + + assert(error === null) + } + + private def runTest(): Unit = { + val resource = Resource.newInstance(8 * 1024, 4) + val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(), + new Configuration(), resource) + + val totalTasks = 32 * 1024 + val totalContainers = totalTasks / 16 + val totalHosts = totalContainers / 16 + + val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap + val containers = (1 to totalContainers).map { i => + ContainerId.fromString(s"container_12345678_0001_01_$i") + } + val count = containers.size / hosts.size / 2 + + val hostToContainerMap = new HashMap[String, Set[ContainerId]]() + hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) => + val hostContainers = new HashSet[ContainerId]() + containers.drop(count * i).take(i).foreach { c => hostContainers += c } + hostToContainerMap(host) = hostContainers + } + + strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts, + hostToContainerMap, Nil) + } + +} From 16a99fcff20a2527d95d54d94c1c348dbd638f26 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 20 Jan 2017 20:04:08 -0800 Subject: [PATCH 2/3] Mock ContainerId to avoid cross-hadoop-version issues. --- .../yarn/LocalityPlacementStrategySuite.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala index 7691c19df493..6d5e778004eb 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -19,9 +19,11 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.{HashMap, HashSet, Set} -import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.DNSToSwitchMapping import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Mockito._ import org.apache.spark.{SparkConf, SparkFunSuite} @@ -49,18 +51,22 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { } private def runTest(): Unit = { + val yarnConf = new YarnConfiguration() + yarnConf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + classOf[MockResolver], classOf[DNSToSwitchMapping]) + val resource = Resource.newInstance(8 * 1024, 4) val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(), - new Configuration(), resource) + yarnConf, resource) val totalTasks = 32 * 1024 val totalContainers = totalTasks / 16 val totalHosts = totalContainers / 16 + val mockId = mock(classOf[ContainerId]) val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap - val containers = (1 to totalContainers).map { i => - ContainerId.fromString(s"container_12345678_0001_01_$i") - } + val containers = (1 to totalContainers).map { i => mockId } val count = containers.size / hosts.size / 2 val hostToContainerMap = new HashMap[String, Set[ContainerId]]() From 68c8925df66a2ada06cbbe7d7b008a376b233b36 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 23 Jan 2017 10:22:59 -0800 Subject: [PATCH 3/3] Feedback. --- .../spark/deploy/yarn/LocalityPlacementStrategySuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala index 6d5e778004eb..fb80ff9f3132 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.{HashMap, HashSet, Set} import org.apache.hadoop.fs.CommonConfigurationKeysPublic import org.apache.hadoop.net.DNSToSwitchMapping import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Mockito._ @@ -56,6 +55,11 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, classOf[MockResolver], classOf[DNSToSwitchMapping]) + // The numbers below have been chosen to balance being large enough to replicate the + // original issue while not taking too long to run when the issue is fixed. The main + // goal is to create enough requests for localized containers (so there should be many + // tasks on several hosts that have no allocated containers). + val resource = Resource.newInstance(8 * 1024, 4) val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(), yarnConf, resource)