From f8aa8262f8fd3640d0eb9283a21b7f5c13a698e8 Mon Sep 17 00:00:00 2001 From: Xu Lijian Date: Sat, 12 Apr 2014 20:01:34 +0800 Subject: [PATCH] Add role and checkpoint support for Mesos Change-Id: I3aeaf1600ff096d4023f655863cb833e2c8dd132 --- .../mesos/CoarseMesosSchedulerBackend.scala | 35 ++++++++---- .../cluster/mesos/MesosSchedulerBackend.scala | 53 ++++++++++++------- docs/_config.yml | 2 +- docs/configuration.md | 18 +++++++ docs/running-on-mesos.md | 12 +++++ pom.xml | 2 +- 6 files changed, 93 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 87e181e773fd..c1eb2464e5b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -95,8 +95,16 @@ private[spark] class CoarseMesosSchedulerBackend( setDaemon(true) override def run() { val scheduler = CoarseMesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + val fwBuilder = FrameworkInfo.newBuilder().setUser("").setName(sc.appName) + val role = sc.conf.get("spark.mesos.role", null) + val checkpoint = sc.conf.get("spark.mesos.checkpoint", null) + if (role != null) { + fwBuilder.setRole(role) + } + if (checkpoint != null) { + fwBuilder.setCheckpoint(checkpoint.toBoolean) + } + driver = new MesosSchedulerDriver(scheduler, fwBuilder.build(), master) try { { val ret = driver.run() logInfo("driver.run() returned with code " + ret) @@ -196,8 +204,12 @@ private[spark] class CoarseMesosSchedulerBackend( for (offer <- offers) { val slaveId = offer.getSlaveId.toString - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt + val memResource = getResource(offer.getResourcesList, "mem") + val mem = memResource.getScalar.getValue + val memRole = memResource.getRole + val cpusResource = getResource(offer.getResourcesList, "cpus") + val cpus = cpusResource.getScalar.getValue.toInt + val cpusRole = cpusResource.getRole if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { @@ -213,8 +225,8 @@ private[spark] class CoarseMesosSchedulerBackend( .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", sc.executorMemory)) + .addResources(createResource("cpus", cpusToUse, cpusRole)) + .addResources(createResource("mem", sc.executorMemory, memRole)) .build() d.launchTasks( Collections.singleton(offer.getId), Collections.singletonList(task), filters) @@ -228,20 +240,25 @@ private[spark] class CoarseMesosSchedulerBackend( } /** Helper function to pull out a resource from a Mesos Resources protobuf */ - private def getResource(res: JList[Resource], name: String): Double = { + private def getResource(res: JList[Resource], name: String): Resource = { for (r <- res if r.getName == name) { - return r.getScalar.getValue + return r } // If we reached here, no resource with the required name was present throw new IllegalArgumentException("No resource called " + name + " in " + res) } /** Build a Mesos resource protobuf object */ - private def createResource(resourceName: String, quantity: Double): Protos.Resource = { + private def createResource( + resourceName: String, + quantity: Double, + role: String): Protos.Resource = { + Resource.newBuilder() .setName(resourceName) .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) + .setRole(role) .build() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 67ee4d66f151..406cf3d75cc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -53,8 +53,9 @@ private[spark] class MesosSchedulerBackend( // Driver for talking to Mesos var driver: SchedulerDriver = null - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] + // Executors with different resources roles can't share the same ID, + // so we save the ExecutorInfo for each slave in memory. + val slaveIdToExecutorInfo = new HashMap[String, ExecutorInfo] val taskIdToSlaveId = new HashMap[Long, String] // An ExecutorInfo for our tasks @@ -70,8 +71,16 @@ private[spark] class MesosSchedulerBackend( setDaemon(true) override def run() { val scheduler = MesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + val fwBuilder = FrameworkInfo.newBuilder().setUser("").setName(sc.appName) + val role = sc.conf.get("spark.mesos.role", null) + val checkpoint = sc.conf.get("spark.mesos.checkpoint", null) + if (role != null) { + fwBuilder.setRole(role) + } + if (checkpoint != null) { + fwBuilder.setCheckpoint(checkpoint.toBoolean) + } + driver = new MesosSchedulerDriver(scheduler, fwBuilder.build(), master) try { val ret = driver.run() logInfo("driver.run() returned with code " + ret) @@ -85,7 +94,11 @@ private[spark] class MesosSchedulerBackend( } } - def createExecutorInfo(execId: String): ExecutorInfo = { + def createExecutorInfo(offer: Offer): ExecutorInfo = { + val slaveId = offer.getSlaveId.getValue + if (slaveIdToExecutorInfo.contains(slaveId)) { + return slaveIdToExecutorInfo(slaveId) + } val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { @@ -128,13 +141,16 @@ private[spark] class MesosSchedulerBackend( .setName("mem") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build()) + .setRole(getResource(offer.getResourcesList, "mem").getRole) .build() - ExecutorInfo.newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) + val executorInfo = ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue(slaveId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) .addResources(memory) .build() + slaveIdToExecutorInfo(slaveId) = executorInfo + executorInfo } /** @@ -205,9 +221,9 @@ private[spark] class MesosSchedulerBackend( val offerableIndices = new HashMap[String, Int] def enoughMemory(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") + val mem = getResource(o.getResourcesList, "mem").getScalar.getValue val slaveId = o.getSlaveId.getValue - mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) + mem >= sc.executorMemory || slaveIdToExecutorInfo.contains(slaveId) } for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { @@ -215,7 +231,7 @@ private[spark] class MesosSchedulerBackend( offerableWorkers += new WorkerOffer( offer.getSlaveId.getValue, offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) + getResource(offer.getResourcesList, "cpus").getScalar.getValue.toInt) } // Call into the TaskSchedulerImpl @@ -228,9 +244,8 @@ private[spark] class MesosSchedulerBackend( for (taskDesc <- taskList) { val slaveId = taskDesc.executorId val offerNum = offerableIndices(slaveId) - slaveIdsWithExecutors += slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) + mesosTasks(offerNum).add(createMesosTask(taskDesc, offers(offerNum))) } } } @@ -247,26 +262,28 @@ private[spark] class MesosSchedulerBackend( } /** Helper function to pull out a resource from a Mesos Resources protobuf */ - def getResource(res: JList[Resource], name: String): Double = { + def getResource(res: JList[Resource], name: String): Resource = { for (r <- res if r.getName == name) { - return r.getScalar.getValue + return r } // If we reached here, no resource with the required name was present throw new IllegalArgumentException("No resource called " + name + " in " + res) } /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { + def createMesosTask(task: TaskDescription, offer: Offer): MesosTaskInfo = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() + val slaveId = offer.getSlaveId.getValue val cpuResource = Resource.newBuilder() .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) + .setRole(getResource(offer.getResourcesList, "cpus").getRole) .build() MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(createExecutorInfo(slaveId)) + .setExecutor(createExecutorInfo(offer)) .setName(task.name) .addResources(cpuResource) .setData(ByteString.copyFrom(task.serializedTask)) @@ -289,7 +306,7 @@ private[spark] class MesosSchedulerBackend( synchronized { if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { // We lost the executor on this slave, so remember that it's gone - slaveIdsWithExecutors -= taskIdToSlaveId(tid) + slaveIdToExecutorInfo -= taskIdToSlaveId(tid) } if (isFinished(status.getState)) { taskIdToSlaveId.remove(tid) @@ -328,7 +345,7 @@ private[spark] class MesosSchedulerBackend( try { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { - slaveIdsWithExecutors -= slaveId.getValue + slaveIdToExecutorInfo -= slaveId.getValue } scheduler.executorLost(slaveId.getValue, reason) } finally { diff --git a/docs/_config.yml b/docs/_config.yml index 45b78fe724a5..387f5c6d8d17 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -7,6 +7,6 @@ SPARK_VERSION: 1.0.0-SNAPSHOT SPARK_VERSION_SHORT: 1.0.0 SCALA_BINARY_VERSION: "2.10" SCALA_VERSION: "2.10.4" -MESOS_VERSION: 0.18.1 +MESOS_VERSION: 0.19.1 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK SPARK_GITHUB_URL: https://github.com/apache/spark diff --git a/docs/configuration.md b/docs/configuration.md index 65a422caabb7..3f10355f6a81 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -764,6 +764,24 @@ Apart from these, the following properties are also available, and may be useful for the whole duration of the Spark job. + + spark.mesos.role + (not set) + + Allocation role of the framework when running on Mesos. If not set, the default value of Mesos + framework will be used. + + + + spark.mesos.checkpoint + (not set) + + Whether to checkpoint task information to disk when running on Mesos, using the Mesos + slave recovery feature. + If not set, the default value of Mesos framework will be used. + Note: if you use this, Spark will only accept offers from Mesos slaves with checkpointing enabled. + + spark.speculation false diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 1073abb202c5..977bae71fd9a 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -138,6 +138,18 @@ it does not need to be redundantly passed in as a system property. {% endhighlight %} +# Mesos Configuration Properties + +When running on Mesos, Spark supports the role and checkpointing features like other +Mesos frameworks. Just set the `spark.mesos.role` and `spark.mesos.checkpoint` +[config properties](configuration.html#spark-properties). For example: + +{% highlight scala %} +conf.set("spark.mesos.role", "spark") +conf.set("spark.mesos.checkpoint", "true") +{% endhighlight %} + + # Mesos Run Modes Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained". diff --git a/pom.xml b/pom.xml index 556b9da3d6d9..5ecfae558543 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 2.10.4 2.10 2.0.1 - 0.18.1 + 0.19.1 shaded-protobuf org.spark-project.akka 2.2.3-shaded-protobuf