Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

// reject offers with mismatched constraints in seconds
// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)

// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc)

// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
Expand Down Expand Up @@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend(
}

private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
for (offer <- offers) {
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val filters = Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()

logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
}
}

d.declineOffer(offer.getId, filters)
private def declineOffer(
d: SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

private def declineOffer(
    d: SchedulerDriver,
    ...,
    refuseSeconds: Option[Long] = None): Unit = {
  ...
}


val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")

logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
s" cpu: $cpus for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))

refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filters)
case _ => d.declineOffer(offer.getId)
}
}

Expand Down Expand Up @@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend(
d.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else { // decline
logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
s"mem: $offerMem cpu: $offerCpus")

d.declineOffer(offer.getId)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
declineOffer(d, offer, Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
declineOffer(d, offer)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}

protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}

test("mesos declines offers with a filter when reached spark.cores.max") {
val maxCores = 3
setBackend(Map("spark.cores.max" -> maxCores.toString))

val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))

verifyTaskLaunched("o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test the new config var. This would have passed before the addition of this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have failed because the declined offer wouldn't have been passed a filter. It would have passed with verifyDeclinedOffer(driver, createOfferId("o2"), false).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then can you change the test description to "mesos declines offers with a filter"

}

test("mesos assigns tasks round-robin on offers") {
val executorCores = 4
val maxCores = executorCores * 2
Expand Down