Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.
If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to
reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance,
at the cost of a (usually small) increase in disk I/O.
</td>
</tr>
<tr>
<td><code>spark.mesos.failoverTimeout</code></td>
<td>0.0</td>
<td>
The amount of time (in seconds) that the master will wait for thescheduler to failover before it tears down the framework

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo thescheduler -> the scheduler

by killing all its tasks/executors. This should be non-zero if aframework expects to reconnect after a failure and not lose

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo aframework -> a framework

its tasks/executors.
NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week).
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,42 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}

test("mesos supports checkpointing") {

val checkpoint = true
val failoverTimeout = 10
setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString,
"spark.mesos.failoverTimeout" -> failoverTimeout.toString))

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.equals(true))
assert(failoverTimeout.equals(10))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose these checks are sufficient to ensure that the values are passed correctly to createSchedulerDriver. We don't want to check if the MesosSchedulerDriver is created with the correct values? I.e., by inspecting the Protos.FrameworkInfo?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, does this work? checkpoint in createSchedulerDriver is Option[Boolean] not Boolean. Similarly for failoverTimeout

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I am still trying to run dev/run-tests on the entire project, but it keeps failing on some random Hive tests

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should probably be:

assert(checkpoint.get.equals(true))
assert(failoverTimeout.get.equals(10.0))

Note that failoverTimeout when "unboxed" is a Double

driver
}
}

backend.start()

}

test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,42 @@ class MesosFineGrainedSchedulerBackendSuite
backend.start()
}

test("mesos supports checkpointing") {
val conf = new SparkConf
conf.set("spark.mesos.checkpoint", "true")
conf.set("spark.mesos.failoverTimeout", "10")

val sc = mock[SparkContext]
when(sc.conf).thenReturn(conf)
when(sc.sparkUser).thenReturn("sparkUser1")
when(sc.appName).thenReturn("appName1")

val taskScheduler = mock[TaskSchedulerImpl]
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.equals(true))
assert(failoverTimeout.equals(10))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be:

assert(failoverTimeout.get.equals(10.0))

driver
}
}

backend.start()

}

test("Use configured mesosExecutor.cores for ExecutorInfo") {
val mesosExecutorCores = 3
val conf = new SparkConf
Expand Down