Skip to content

Commit 37187e0

Browse files
committed
fix supervise for mesos in cluster mode
1 parent 113399b commit 37187e0

2 files changed

Lines changed: 20 additions & 3 deletions

File tree

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,8 @@ private[spark] class MesosClusterScheduler(
369369
}
370370

371371
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
372-
s"${frameworkId}-${desc.submissionId}"
372+
val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
373+
s"${frameworkId}-${desc.submissionId}${retries}"
373374
}
374375

375376
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
2121
import java.util.{Collections, List => JList}
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2223
import java.util.concurrent.locks.ReentrantLock
2324

2425
import scala.collection.JavaConverters._
@@ -170,6 +171,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
170171

171172
override def start() {
172173
super.start()
174+
175+
val startedBefore = IdHelper.startedBefore.getAndSet(true)
176+
177+
val suffix = if (startedBefore) {
178+
f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
179+
} else {
180+
""
181+
}
182+
173183
val driver = createSchedulerDriver(
174184
master,
175185
MesosCoarseGrainedSchedulerBackend.this,
@@ -179,10 +189,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
179189
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
180190
None,
181191
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
182-
sc.conf.getOption("spark.mesos.driver.frameworkId")
192+
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
183193
)
184194

185-
unsetFrameworkID(sc)
186195
startScheduler(driver)
187196
}
188197

@@ -271,6 +280,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
271280
driver: org.apache.mesos.SchedulerDriver,
272281
frameworkId: FrameworkID,
273282
masterInfo: MasterInfo) {
283+
274284
this.appId = frameworkId.getValue
275285
this.mesosExternalShuffleClient.foreach(_.init(appId))
276286
this.schedulerDriver = driver
@@ -672,3 +682,9 @@ private class Slave(val hostname: String) {
672682
var taskFailures = 0
673683
var shuffleRegistered = false
674684
}
685+
686+
object IdHelper {
687+
// Use atomic values since Spark contexts can be initialized in parallel
688+
private[mesos] val nextSCNumber = new AtomicLong(0)
689+
private[mesos] val startedBefore = new AtomicBoolean(false)
690+
}

0 commit comments

Comments
 (0)