Skip to content

Commit 8da3f70

Browse files
Michael Gummeltsrowen
authored andcommitted
[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher
## What changes were proposed in this pull request? Add Mesos labels support to the Spark Dispatcher ## How was this patch tested? unit tests Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels.
1 parent dc4c351 commit 8da3f70

File tree

8 files changed

+157
-53
lines changed

8 files changed

+157
-53
lines changed

docs/running-on-mesos.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config
382382
<td>(none)</td>
383383
<td>
384384
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
385-
Key-value pairs should be separated by a colon, and commas used to list more than one.
386-
Ex. key:value,key2:value2.
385+
Key-value pairs should be separated by a colon, and commas used to
386+
list more than one. If your label includes a colon or comma, you
387+
can escape it with a backslash. Ex. key:value,key2:a\:b.
387388
</td>
388389
</tr>
389390
<tr>
@@ -468,6 +469,15 @@ See the [configuration page](configuration.html) for information on Spark config
468469
If unset it will point to Spark's internal web UI.
469470
</td>
470471
</tr>
472+
<tr>
473+
<td><code>spark.mesos.driver.labels</code></td>
474+
<td><code>(none)</code></td>
475+
<td>
476+
Mesos labels to add to the driver. See <code>spark.mesos.task.labels</code>
477+
for formatting information.
478+
</td>
479+
</tr>
480+
471481
<tr>
472482
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
473483
<td><code>(none)</code></td>

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,11 @@ package object config {
5656
.stringConf
5757
.createOptional
5858

59+
private [spark] val DRIVER_LABELS =
60+
ConfigBuilder("spark.mesos.driver.labels")
61+
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
62+
"pairs should be separated by a colon, and commas used to list more than one." +
63+
"Ex. key:value,key2:value2")
64+
.stringConf
65+
.createOptional
5966
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
3030
import org.apache.mesos.Protos.TaskStatus.Reason
3131

3232
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
33+
import org.apache.spark.deploy.mesos.config
3334
import org.apache.spark.deploy.mesos.MesosDriverDescription
3435
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
3536
import org.apache.spark.metrics.MetricsSystem
3637
import org.apache.spark.util.Utils
3738

39+
3840
/**
3941
* Tracks the current state of a Mesos Task that runs a Spark driver.
4042
* @param driverDescription Submitted driver description from
@@ -525,15 +527,17 @@ private[spark] class MesosClusterScheduler(
525527
offer.remainingResources = finalResources.asJava
526528

527529
val appName = desc.conf.get("spark.app.name")
528-
val taskInfo = TaskInfo.newBuilder()
530+
531+
TaskInfo.newBuilder()
529532
.setTaskId(taskId)
530533
.setName(s"Driver for ${appName}")
531534
.setSlaveId(offer.offer.getSlaveId)
532535
.setCommand(buildDriverCommand(desc))
533536
.addAllResources(cpuResourcesToUse.asJava)
534537
.addAllResources(memResourcesToUse.asJava)
535-
taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
536-
taskInfo.build
538+
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
539+
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
540+
.build
537541
}
538542

539543
/**

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
419419
.setSlaveId(offer.getSlaveId)
420420
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
421421
.setName(s"${sc.appName} $taskId")
422-
423-
taskBuilder.addAllResources(resourcesToUse.asJava)
424-
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
425-
426-
val labelsBuilder = taskBuilder.getLabelsBuilder
427-
val labels = buildMesosLabels().asJava
428-
429-
labelsBuilder.addAllLabels(labels)
430-
431-
taskBuilder.setLabels(labelsBuilder)
422+
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
423+
.addAllResources(resourcesToUse.asJava)
424+
.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
432425

433426
tasks(offer.getId) ::= taskBuilder.build()
434427
remainingResources(offerId) = resourcesLeft.asJava
@@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
444437
tasks.toMap
445438
}
446439

447-
private def buildMesosLabels(): List[Label] = {
448-
taskLabels.split(",").flatMap(label =>
449-
label.split(":") match {
450-
case Array(key, value) =>
451-
Some(Label.newBuilder()
452-
.setKey(key)
453-
.setValue(value)
454-
.build())
455-
case _ =>
456-
logWarning(s"Unable to parse $label into a key:value label for the task.")
457-
None
458-
}
459-
).toList
460-
}
461-
462440
/** Extracts task needed resources from a list of available resources. */
463441
private def partitionTaskResources(
464442
resources: JList[Resource],
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster.mesos
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.mesos.Protos
23+
24+
import org.apache.spark.SparkException
25+
import org.apache.spark.internal.Logging
26+
27+
object MesosProtoUtils extends Logging {
28+
29+
/** Parses a label string of the format specified in spark.mesos.task.labels. */
30+
def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
31+
val labels: Seq[Protos.Label] = if (labelsStr == "") {
32+
Seq()
33+
} else {
34+
labelsStr.split("""(?<!\\),""").toSeq.map { labelStr =>
35+
val parts = labelStr.split("""(?<!\\):""")
36+
if (parts.length != 2) {
37+
throw new SparkException(s"Malformed label: ${labelStr}")
38+
}
39+
40+
val cleanedParts = parts
41+
.map(part => part.replaceAll("""\\,""", ","))
42+
.map(part => part.replaceAll("""\\:""", ":"))
43+
44+
Protos.Label.newBuilder()
45+
.setKey(cleanedParts(0))
46+
.setValue(cleanedParts(1))
47+
.build()
48+
}
49+
}
50+
51+
Protos.Labels.newBuilder().addAllLabels(labels.asJava)
52+
}
53+
}

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
248248
assert(networkInfos.get(0).getName == "test-network-name")
249249
}
250250

251+
test("supports spark.mesos.driver.labels") {
252+
setScheduler()
253+
254+
val mem = 1000
255+
val cpu = 1
256+
257+
val response = scheduler.submitDriver(
258+
new MesosDriverDescription("d1", "jar", mem, cpu, true,
259+
command,
260+
Map("spark.mesos.executor.home" -> "test",
261+
"spark.app.name" -> "test",
262+
"spark.mesos.driver.labels" -> "key:value"),
263+
"s1",
264+
new Date()))
265+
266+
assert(response.success)
267+
268+
val offer = Utils.createOffer("o1", "s1", mem, cpu)
269+
scheduler.resourceOffers(driver, List(offer).asJava)
270+
271+
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
272+
val labels = launchedTasks.head.getLabels
273+
assert(labels.getLabelsCount == 1)
274+
assert(labels.getLabels(0).getKey == "key")
275+
assert(labels.getLabels(0).getValue == "value")
276+
}
277+
251278
test("can kill supervised drivers") {
252279
val conf = new SparkConf()
253280
conf.setMaster("mesos://localhost:5050")

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
532532
assert(launchedTasks.head.getLabels.equals(taskLabels))
533533
}
534534

535-
test("mesos ignored invalid labels and sets configurable labels on tasks") {
536-
val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
537-
setBackend(Map(
538-
"spark.mesos.task.labels" -> taskLabelsString
539-
))
540-
541-
// Build up the labels
542-
val taskLabels = Protos.Labels.newBuilder()
543-
.addLabels(Protos.Label.newBuilder()
544-
.setKey("mesos").setValue("test").build())
545-
.addLabels(Protos.Label.newBuilder()
546-
.setKey("label").setValue("test").build())
547-
.build()
548-
549-
val offers = List(Resources(backend.executorMemory(sc), 1))
550-
offerResources(offers)
551-
val launchedTasks = verifyTaskLaunched(driver, "o1")
552-
553-
val labels = launchedTasks.head.getLabels
554-
555-
assert(launchedTasks.head.getLabels.equals(taskLabels))
556-
}
557-
558535
test("mesos supports spark.mesos.network.name") {
559536
setBackend(Map(
560537
"spark.mesos.network.name" -> "test-network-name"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster.mesos
19+
20+
import org.apache.spark.SparkFunSuite
21+
22+
class MesosProtoUtilsSuite extends SparkFunSuite {
23+
test("mesosLabels") {
24+
val labels = MesosProtoUtils.mesosLabels("key:value")
25+
assert(labels.getLabelsCount == 1)
26+
val label = labels.getLabels(0)
27+
assert(label.getKey == "key")
28+
assert(label.getValue == "value")
29+
30+
val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
31+
assert(labels2.getLabelsCount == 1)
32+
val label2 = labels2.getLabels(0)
33+
assert(label2.getKey == "key")
34+
assert(label2.getValue == "value:value")
35+
36+
val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
37+
assert(labels3.getLabelsCount == 2)
38+
assert(labels3.getLabels(0).getKey == "key")
39+
assert(labels3.getLabels(0).getValue == "value")
40+
assert(labels3.getLabels(1).getKey == "key2")
41+
assert(labels3.getLabels(1).getValue == "value2")
42+
43+
val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
44+
assert(labels4.getLabelsCount == 1)
45+
assert(labels4.getLabels(0).getKey == "key")
46+
assert(labels4.getLabels(0).getValue == "value,value")
47+
}
48+
}

0 commit comments

Comments
 (0)