Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

// reject offers with mismatched constraints in seconds
// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)

// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc)

// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
Expand Down Expand Up @@ -279,18 +283,28 @@ private[spark] class CoarseMesosSchedulerBackend(
}

private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
for (offer <- offers) {
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val filters = Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
}
}

private def declineOffer(d: SchedulerDriver, offer: Offer, reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

private def declineOffer(
    d: SchedulerDriver,
    ...,
    refuseSeconds: Option[Long] = None): Unit = {
  ...
}


logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")

d.declineOffer(offer.getId, filters)
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem"
+ s" cpu: $cpus for $refuseSeconds seconds" + reason.fold("")(r => s" (reason: $r)"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use map getOrElse here. It's easier to understand than fold.


refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filters)
case _ => d.declineOffer(offer.getId)
}
}

Expand Down Expand Up @@ -326,11 +340,12 @@ private[spark] class CoarseMesosSchedulerBackend(
d.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else { // decline
logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
s"mem: $offerMem cpu: $offerCpus")

d.declineOffer(offer.getId)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
declineOffer(d, offer, Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
declineOffer(d, offer)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}

protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}

test("mesos declines offers with a filter when reached spark.cores.max") {
val maxCores = 3
setBackend(Map("spark.cores.max" -> maxCores.toString))

val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))

verifyTaskLaunched("o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test the new config var. This would have passed before the addition of this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have failed because the declined offer wouldn't have been passed a filter. It would have passed with verifyDeclinedOffer(driver, createOfferId("o2"), false).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then can you change the test description to "mesos declines offers with a filter"

}

test("mesos assigns tasks round-robin on offers") {
val executorCores = 4
val maxCores = executorCores * 2
Expand Down
14 changes: 14 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,20 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDurationForUnmetConstraints</code></td>
<td><code>120s</code></td>
<td>
Set the amount of time for which offers are rejected when constraints are unmet. See <code>spark.mesos.constraints</code>.
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDurationForReachedMaxCores</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually not document these configs. Doing so would require us to maintain backward compatibility. I can't think of any strong use case where someone would want to change these values so I don't think it's worth the maintenance burden.

<td><code>120s</code></td>
<td>
Set the amount of time for which offers are rejected when the app already acquired <code>spark.cores.max</code> cores.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add "This is used to prevent starvation of other frameworks."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I added that comment to spark.mesos.rejectOfferDurationForUnmetConstraints as well since it's the same idea.

</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down