From fdca59ac9eaca23754e94e3cd758c2588154697b Mon Sep 17 00:00:00 2001 From: Clement Michaud Date: Sun, 10 Feb 2019 13:59:32 +0100 Subject: [PATCH] [SPARK-17454][MESOS] Use Mesos disk resources for executors. Before this change, there was no way to allocate a given amount of disk when using Mesos scheduler. It's good enough when using default isolation options but not when enabling the XFS isolator with hard limit in order to properly isolate all containers. In that case, the executor is killed by Mesos during the download of the Spark executor archive. Therefore, this change introduces a configuration flag, specific to Mesos, to declare the amount of disk required by the executors and therefore prevent Mesos from killing the container because the XFS hard limit has been exceeded. --- docs/running-on-mesos.md | 11 ++++- .../apache/spark/deploy/mesos/config.scala | 10 +++++ .../MesosCoarseGrainedSchedulerBackend.scala | 26 ++++++++--- .../cluster/mesos/MesosSchedulerUtils.scala | 4 ++ .../mesos/MesosClusterSchedulerSuite.scala | 4 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 45 ++++++++++++++++++- .../spark/scheduler/cluster/mesos/Utils.scala | 7 +++ 7 files changed, 97 insertions(+), 10 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index a07773c1c71e..41b90d76d87c 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -702,7 +702,16 @@ See the [configuration page](configuration.html) for information on Spark config Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found since this configuration is just an upper limit and not a guaranteed amount. - + + + spark.mesos.disk + (none) + + Set the amount of disk to acquire for this job. You might need to set this value depending on the type of disk isolation set up in Mesos. + For instance, setting an amount of disk is required when XFS isolator is enabled with hard limit enforced otherwise the isolator will kill + the Mesos executor when downloading the Spark executor archive. + + spark.mesos.network.name (none) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 79a113792de9..115c1c2d71ae 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -321,6 +321,16 @@ package object config { .intConf .createWithDefault(0) + private[spark] val EXECUTOR_DISK = + ConfigBuilder("spark.mesos.disk") + .doc("Set the amount of disk to acquire for this job. You might need to set this value " + + "depending on the type of disk isolation set up in Mesos. For instance, setting an " + + "amount of disk is required when XFS isolator is enabled with hard limit enforced " + + "otherwise the isolator will kill the Mesos executor when downloading the Spark executor " + + "archive.") + .intConf + .createOptional + private[spark] val TASK_LABELS = ConfigBuilder("spark.mesos.task.labels") .doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " + diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 8bd61c230d8e..193705317ca5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -79,6 +79,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE) private val maxGpus = conf.get(MAX_GPUS) + private val diskPerExecutor = conf.get(EXECUTOR_DISK) private val taskLabels = conf.get(TASK_LABELS) @@ -399,6 +400,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") + val offerDisk = getResource(offer.getResourcesList, "disk") val offerReservationInfo = offer .getResourcesList .asScala @@ -411,7 +413,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + offerReservationInfo.map(resInfo => s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " + + s"mem: $offerMem cpu: $offerCpus ports: $offerPorts disk: $offerDisk " + s"resources: ${offer.getResourcesList.asScala.mkString(",")}." + s" Launching ${offerTasks.size} Mesos tasks.") @@ -419,10 +421,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val taskId = task.getTaskId val mem = getResource(task.getResourcesList, "mem") val cpus = getResource(task.getResourcesList, "cpus") + val disk = getResource(task.getResourcesList, "disk") val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ") + s" disk: $disk ports: $ports on slave with slave id: ${task.getSlaveId.getValue} ") } driver.launchTasks( @@ -497,11 +500,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val taskCPUs = executorCores(offerCPUs) val taskMemory = executorMemory(sc) + val taskDisk = diskPerExecutor slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) val (resourcesLeft, resourcesToUse) = - partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs) + partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs, taskDisk) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) @@ -534,7 +538,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( resources: JList[Resource], taskCPUs: Int, taskMemory: Int, - taskGPUs: Int) + taskGPUs: Int, + taskDisk: Option[Int]) : (List[Resource], List[Resource]) = { // partition cpus & mem @@ -550,14 +555,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val (nonPortResources, portResourcesToUse) = partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources) - (nonPortResources, + var (remainingResources, resourcesToUse) = (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse) + + if (taskDisk.isDefined) { + val (afterDiskResources, diskResourcesToUse) = + partitionResources(remainingResources.asJava, "disk", taskDisk.get) + + remainingResources = afterDiskResources + resourcesToUse ++= diskResourcesToUse + } + (remainingResources, resourcesToUse) } private def canLaunchTask(slaveId: String, offerHostname: String, resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt + val offerDisk = getResource(resources, "disk").toInt val cpus = executorCores(offerCPUs) val mem = executorMemory(sc) val ports = getRangeResource(resources, "ports") @@ -568,6 +583,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && + diskPerExecutor.fold(true)(_ <= offerDisk) && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements && satisfiesLocality(offerHostname) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 06993712035f..b871cc246fd5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -180,6 +180,10 @@ trait MesosSchedulerUtils extends Logging { res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } + def resourceExists(res: JList[Resource], name: String): Boolean = { + res.asScala.exists(_.getName == name) + } + /** * Transforms a range resource to a list of ranges * diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index f26ff04a9a89..661925b7d535 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -314,8 +314,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi Utils.createTextAttribute("c2", "b")) val offers = List( Utils.createOffer("o1", "s1", mem, cpu, None, 0), - Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes), - Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes)) + Utils.createOffer("o2", "s2", mem, cpu, None, 0, None, s2Attributes), + Utils.createOffer("o3", "s3", mem, cpu, None, 0, None, s3Attributes)) def submitDriver(driverConstraints: String): Unit = { val response = scheduler.submitDriver( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 37c0f5f45075..8fa34f19694d 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -136,6 +136,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == offerCores) } + test("mesos supports spark.mesos.disk") { + val claimedDisk = 40 + setBackend(Map("spark.mesos.disk" -> claimedDisk.toString)) + + val executorMemory = backend.executorMemory(sc) + val offers = List(Resources(executorMemory, 1, 0, Some(100))) + offerResources(offers) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val taskDisk = backend.getResource(taskInfos.head.getResourcesList, "disk") + assert(taskDisk == claimedDisk) + } + + test("mesos supports unset spark.mesos.disk") { + setBackend() + + val executorMemory = backend.executorMemory(sc) + val offers = List(Resources(executorMemory, 1, 0, Some(100))) + offerResources(offers) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val taskDiskExist = backend.resourceExists(taskInfos.head.getResourcesList, "disk") + assert(!taskDiskExist) + } + + test("mesos declines offer if not enough disk available") { + val claimedDisk = 400 + setBackend(Map("spark.mesos.disk" -> claimedDisk.toString)) + + val executorMemory = backend.executorMemory(sc) + val offers = List(Resources(executorMemory, 1, 0, Some(100))) + offerResources(offers) + + verifyDeclinedOffer(driver, createOfferId("o1")) + } + test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 setBackend(Map(CORES_MAX.key -> maxCores.toString)) @@ -686,7 +726,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o1") } - private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0, disk: Option[Int] = None) private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { val mockEndpointRef = mock[RpcEndpointRef] @@ -709,7 +749,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite private def offerResources(offers: List[Resources], startId: Int = 1): Unit = { val mesosOffers = offers.zipWithIndex.map {case (offer, i) => - createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)} + createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus, + offer.disk)} backend.resourceOffers(driver, mesosOffers.asJava) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 65e595e3cf2b..9fc7998fe47b 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -50,6 +50,7 @@ object Utils { cpus: Int, ports: Option[(Long, Long)] = None, gpus: Int = 0, + disk: Option[Int] = None, attributes: List[Attribute] = List.empty): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() @@ -73,6 +74,12 @@ object Utils { .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(gpus)) } + if (disk.isDefined) { + builder.addResourcesBuilder() + .setName("disk") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(disk.get)) + } builder.setId(createOfferId(offerId)) .setFrameworkId(FrameworkID.newBuilder() .setValue("f1"))