Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,8 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
<td>infinity</td>
<td>Depends on yarn cluster VCores Total for YARN;
infinity for standalone mode and Mesos mode</td>
<td>
Upper bound for the number of executors if dynamic allocation is enabled.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,8 @@ private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

init()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
Expand Down Expand Up @@ -1193,6 +1189,37 @@ private[spark] class Client(
}
}

def init(): Unit = {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

setMaxNumExecutors()
}

/**
* If using dynamic allocation and user doesn't set spark.dynamicAllocation.maxExecutors
* then set the max number of executors depends on yarn cluster VCores Total.
* If not using dynamic allocation don't set it.
*/
private def setMaxNumExecutors(): Unit = {
if (Utils.isDynamicAllocationEnabled(sparkConf)) {

val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get
if (defaultMaxNumExecutors == sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) {
val executorCores = sparkConf.getInt("spark.executor.cores", 1)
val maxNumExecutors = yarnClient.getNodeReports().asScala.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we take queue's maxResources amount into account from ResourceManager REST APIs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I will try API first. Pseudo code:

import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import scala.collection.JavaConverters._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration

val yarnConf = new YarnConfiguration()
val yarnClient = YarnClient.createYarnClient
yarnClient.init(yarnConf)
yarnClient.start()
yarnClient.getRootQueueInfos

filter(_.getNodeState == NodeState.RUNNING).
map(_.getCapability.getVirtualCores / executorCores).sum

sparkConf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors)
}
}
}

}

private object Client extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.service.Service.STATE
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.hadoop.yarn.util.Records
import org.mockito.Matchers.{eq => meq, _}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}

import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}

class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
Expand Down Expand Up @@ -227,6 +230,74 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
appContext.getMaxAppAttempts should be (42)
}

test("Dynamic set spark.dynamicAllocation.maxExecutors if dynamicAllocation enabled") {
val numNodeManagers = 4
val conf = new YarnConfiguration
val yarnCluster = new MiniYARNCluster(classOf[ClientSuite].getName, numNodeManagers, 1, 1)
yarnCluster.init(conf)
yarnCluster.start()
assert(null != yarnCluster)
assert(STATE.STARTED == yarnCluster.getServiceState)

val args = new ClientArguments(Array())
// dynamicAllocation enabled
val sparkConfEnabled = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
val clientEnabled = new Client(args, yarnCluster.getConfig, sparkConfEnabled)
assert(null != clientEnabled)

val nodeManagerCores =
clientEnabled.hadoopConf.get("yarn.nodemanager.resource.cpu-vcores").toInt
assert(8 == nodeManagerCores)

assert(Int.MaxValue == clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabled.init()
assert(numNodeManagers * nodeManagerCores ==
clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabled.stop()

// dynamicAllocation disabled
val sparkConfDisabled = new SparkConf()
.set("spark.dynamicAllocation.enabled", "false")
val clientDisabled = new Client(args, yarnCluster.getConfig, sparkConfDisabled)
assert(null != clientDisabled)

assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientDisabled.init()
assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientDisabled.stop()

// dynamicAllocation enabled and user set spark.dynamicAllocation.maxExecutors
val maxExecutors = 10
val sparkConfSetMaxExes = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set(DYN_ALLOCATION_MAX_EXECUTORS.key, maxExecutors.toString)
val clientEnabledSetMaxExes = new Client(args, yarnCluster.getConfig, sparkConfSetMaxExes)
assert(null != clientEnabledSetMaxExes)

assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabledSetMaxExes.init()
assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabledSetMaxExes.stop()

// dynamicAllocation enabled and user set spark.executor.cores
val executorCores = 3
val sparkConfSetCores = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.executor.cores", executorCores.toString)
val clientEnabledSetCores = new Client(args, yarnCluster.getConfig, sparkConfSetCores)
assert(null != clientEnabledSetCores)

assert(Int.MaxValue == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabledSetCores.init()
// (8 / 3) * 4 = 8
val expectNumExecutors = (nodeManagerCores / executorCores) * numNodeManagers
assert(expectNumExecutors == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS))
clientEnabledSetCores.stop()

yarnCluster.stop()
}

test("spark.yarn.jars with multiple paths and globs") {
val libs = Utils.createTempDir()
val single = Utils.createTempDir()
Expand Down