Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}

import scala.util.control.NonFatal
import scala.xml.{Node, XML}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -69,19 +70,31 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val DEFAULT_WEIGHT = 1

override def buildPools() {
var is: Option[InputStream] = None
var fileData: Option[(InputStream, String)] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add "($DEFAULT_SCHEDULER_FILE)" after "file "?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case happens when spark.scheduler.allocation.file property is not set and default scheduler file does not exist in classpath so warning message is not specific for only default one. I think current generic message looks more suitable, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ok what about adding at the end "To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE, or set spark.scheduler.allocation.file to a file that contains the configuration." I think it's nice to give users as much info as possible about how to fix the problem, although I don't feel strongly so if you prefer the current message, that's fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, i totally agree for informing the user about how to fix the problem. Addressing by adding information.

"in FIFO order")
None
}
}

is.foreach { i => buildFairSchedulerPool(i) }
fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
} catch {
case NonFatal(t) =>
logError("Error while building the fair scheduler pools: ", t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the filename (if it's defined) to this error message?

throw t
} finally {
is.foreach(_.close())
fileData.foreach { case (is, fileName) => is.close() }
}

// finally create "default" pool
Expand All @@ -93,36 +106,40 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}

private def buildFairSchedulerPool(is: InputStream) {
private def buildFairSchedulerPool(is: InputStream, fileName: String) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {

val poolName = (poolNode \ POOL_NAME_PROPERTY).text

val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
val schedulingMode = getSchedulingModeValue(poolNode, poolName,
DEFAULT_SCHEDULING_MODE, fileName)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
DEFAULT_MINIMUM_SHARE, fileName)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
DEFAULT_WEIGHT, fileName)

rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))

logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}

private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
defaultValue: SchedulingMode): SchedulingMode = {
defaultValue: SchedulingMode, fileName: String): SchedulingMode = {

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
s"schedulingMode: $defaultValue for pool: $poolName"
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
s"Fair Scheduler configuration file: $fileName, using " +
s"the default schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
Expand All @@ -140,14 +157,15 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getIntValue(
poolNode: Node,
poolName: String,
propertyName: String, defaultValue: Int): Int = {
propertyName: String,
defaultValue: Int, fileName: String): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you fix the spacing here? (fileName should be on its own line)


val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
logWarning(s"Error while loading scheduler allocation file. " +
logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
Expand All @@ -166,7 +184,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
Expand Down