Skip to content

Commit fc104f2

Browse files
authored
Merge pull request apache#14 from WenboZhao/manual-allocation
Support manual executor allocation
2 parents 51d7c36 + fb22ffc commit fc104f2

3 files changed

Lines changed: 197 additions & 45 deletions

File tree

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
<url>http://spark.apache.org/</url>
3636
<dependencies>
3737
<dependency>
38-
<groupId>com.twosigma</groupId>
38+
<groupId>twosigma</groupId>
3939
<artifactId>cook-jobclient</artifactId>
40-
<version>0.1.0</version>
40+
<version>0.1.2-SNAPSHOT</version>
4141
</dependency>
4242
<dependency>
4343
<groupId>org.apache.avro</groupId>

core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ object CoarseCookSchedulerBackend {
6363
}
6464
}
6565

66-
67-
6866
/**
6967
* A SchedulerBackend that runs tasks using Cook, using "coarse-grained" tasks, where it holds
7068
* onto Cook instances for the duration of the Spark job instead of relinquishing cores whenever
@@ -79,27 +77,11 @@ class CoarseCookSchedulerBackend(
7977
cookPort: Int)
8078
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with Logging {
8179

82-
val maxCores = conf.getInt("spark.cores.max", 0)
83-
val maxCoresPerJob = conf.getInt("spark.executor.cores", 1)
84-
val priority = conf.getInt("spark.cook.priority", 75)
85-
val jobNamePrefix = conf.get("spark.cook.job.name.prefix", "sparkjob")
86-
val maxFailures = conf.getInt("spark.executor.failures", 5)
87-
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
88-
89-
if (conf.contains("spark.cores.max") && dynamicAllocationEnabled) {
90-
logWarning("spark.cores.max is ignored when dynamic allocation is enabled. Use spark.dynamicAllocation.maxExecutors instead")
91-
}
92-
93-
def currentInstancesToRequest: Int = (executorsToRequest - totalInstancesRequested)
94-
var executorsToRequest: Int = if (dynamicAllocationEnabled) {
95-
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
96-
} else {
97-
maxCores / maxCoresPerJob
98-
}
99-
var totalInstancesRequested = 0
100-
var totalFailures = 0
101-
val jobIds = mutable.Set[UUID]()
102-
val abortedJobIds = mutable.Set[UUID]()
80+
private[this] val schedulerConf = CookSchedulerConfiguration.conf(conf)
81+
private[this] var executorsRequested = 0
82+
private[this] var totalFailures = 0
83+
private[this] val jobIds = mutable.Set[UUID]()
84+
private[this] val abortedJobIds = mutable.Set[UUID]()
10385

10486
private[this] val jobClient = new JobClient.Builder()
10587
.setHost(cookHost)
@@ -117,7 +99,7 @@ class CoarseCookSchedulerBackend(
11799
val isAborted = abortedJobIds.contains(job.getUUID)
118100

119101
if (isCompleted) {
120-
totalInstancesRequested -= 1
102+
executorsRequested -= 1
121103
abortedJobIds -= job.getUUID
122104
jobIds -= job.getUUID
123105

@@ -127,18 +109,21 @@ class CoarseCookSchedulerBackend(
127109

128110
if (!job.isSuccess && !isAborted) {
129111
totalFailures += 1
130-
logWarning(s"Job ${job.getUUID} has died. Failure ($totalFailures/$maxFailures)")
112+
logWarning(s"Job ${job.getUUID} has died. " +
113+
s"Failure ($totalFailures/$schedulerConf.getMaximumExecutorFailures)")
131114
jobIds -= job.getUUID
132-
if (totalFailures >= maxFailures) {
115+
if (totalFailures >= schedulerConf.getMaximumExecutorFailures) {
133116
// TODO should we abort the outstanding tasks now?
134-
logError(s"We have exceeded our maximum failures ($maxFailures)" +
117+
logError(s"We have exceeded our maximum failures " +
118+
s"($schedulerConf.getMaximumExecutorFailures)" +
135119
"and will not relaunch any more tasks")
136120
}
137121
}
138122
}
139123
}
140124
}
141-
def executorUUIDWriter: UUID => Unit =
125+
126+
private def executorUUIDWriter: UUID => Unit =
142127
conf.getOption("spark.cook.executoruuid.log").fold { _: UUID => () } { _file =>
143128
def file(ct: Int) = s"${_file}.$ct"
144129
def path(ct: Int) = Paths.get(file(ct))
@@ -167,13 +152,13 @@ class CoarseCookSchedulerBackend(
167152
}
168153
}
169154

170-
val sparkMesosScheduler =
155+
private[this] val sparkMesosScheduler =
171156
new CoarseMesosSchedulerBackend(scheduler, sc, "", sc.env.securityManager)
172157

173158
override def applicationId(): String = conf.get("spark.cook.applicationId", super.applicationId())
174159
override def applicationAttemptId(): Option[String] = Some(applicationId())
175160

176-
def createJob(numCores: Double): Job = {
161+
private def createJob(numCores: Double): Job = {
177162
import CoarseCookSchedulerBackend.fetchUri
178163

179164
val jobId = UUID.randomUUID()
@@ -272,11 +257,11 @@ class CoarseCookSchedulerBackend(
272257

273258
val builder = new Job.Builder()
274259
.setUUID(jobId)
275-
.setName(jobNamePrefix)
260+
.setName(schedulerConf.getPrefixOfCookJobName)
276261
.setCommand(cmds.mkString("; "))
277262
.setMemory(sparkMesosScheduler.calculateTotalMemory(sc).toDouble)
278263
.setCpus(numCores)
279-
.setPriority(priority)
264+
.setPriority(schedulerConf.getPriorityPerCookJob)
280265

281266
val container = conf.get("spark.executor.cook.container", null)
282267
if(container != null) {
@@ -288,7 +273,8 @@ class CoarseCookSchedulerBackend(
288273
builder.build()
289274
}
290275

291-
private[this] val minExecutorsNecessary = currentInstancesToRequest * minRegisteredRatio
276+
private[this] val minExecutorsNecessary =
277+
schedulerConf.getExecutorsToRequest(0) * minRegisteredRatio
292278

293279
override def sufficientResourcesRegistered(): Boolean =
294280
totalRegisteredExecutors.get >= minExecutorsNecessary
@@ -307,7 +293,7 @@ class CoarseCookSchedulerBackend(
307293
ret
308294
}
309295

310-
// In our fake offer mesos adds some autoincrementing ID per job but
296+
// In our fake offer mesos adds some auto-increasing ID per job but
311297
// this sticks around in the executorId so we strop it out to get the actual executor ID
312298
private def instanceIdFromExecutorId(executorId: String): UUID = {
313299
UUID.fromString(executorId.split('/')(0))
@@ -369,8 +355,8 @@ class CoarseCookSchedulerBackend(
369355

370356
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
371357
logInfo(s"Setting total amount of executors to request to $requestedTotal")
372-
executorsToRequest = requestedTotal
373-
requestRemainingInstances()
358+
schedulerConf.setMaximumCores(requestedTotal)
359+
requestExecutorsIfNecessary()
374360
true
375361
}
376362

@@ -384,21 +370,39 @@ class CoarseCookSchedulerBackend(
384370
@annotation.tailrec
385371
def loop(instancesRemaining: Double, jobs: List[Job]): List[Job] =
386372
if (instancesRemaining <= 0) jobs
387-
else loop(instancesRemaining - 1, createJob(maxCoresPerJob) :: jobs)
388-
loop(currentInstancesToRequest, Nil).reverse
373+
else loop(instancesRemaining - 1, createJob(schedulerConf.getCoresPerCookJob) :: jobs)
374+
375+
loop(schedulerConf.getExecutorsToRequest(executorsRequested), Nil).reverse
389376
}
390377

391378
/**
392-
* Request cores from Cook via cook jobs.
379+
* Kill the extra executors if necessary.
393380
*/
394-
private[this] def requestRemainingInstances(): Unit = {
381+
private[this] def killExecutorsIfNecessary(): Unit = {
382+
val executorsToKill = schedulerConf.getExecutorsToKil(executorsRequested)
383+
if (executorsToKill > 0) {
384+
val jobIdsToKill = jobIds.take(executorsToKill)
385+
Try[Unit](jobClient.abort(jobIdsToKill.asJava)) match {
386+
case Failure(e) =>
387+
logWarning("Failed to abort redundant jobs", e)
388+
case Success(_) =>
389+
logInfo(s"Successfully abort $executorsToKill jobs.")
390+
jobIdsToKill.foreach(abortedJobIds.add)
391+
}
392+
}
393+
}
394+
395+
/**
396+
* Request more executors from Cook via cook jobs if necessary.
397+
*/
398+
private[this] def requestExecutorsIfNecessary(): Unit = {
395399
val jobs = createRemainingJobs()
396400
if (jobs.nonEmpty) {
397401
Try[Unit](jobClient.submit(jobs.asJava, jobListener)) match {
398402
case Failure(e) => logWarning("Can't request more instances", e)
399403
case Success(_) => {
400404
logInfo(s"Successfully requested ${jobs.size} instances")
401-
totalInstancesRequested += jobs.size
405+
executorsRequested += jobs.size
402406
jobs.map(_.getUUID).foreach(jobIds.add)
403407
}
404408
}
@@ -421,9 +425,12 @@ class CoarseCookSchedulerBackend(
421425
override def start(): Unit = {
422426
super.start()
423427

424-
requestRemainingInstances()
428+
requestExecutorsIfNecessary()
425429
resourceManagerService.scheduleAtFixedRate(new Runnable() {
426-
override def run(): Unit = requestRemainingInstances()
430+
override def run(): Unit = {
431+
requestExecutorsIfNecessary()
432+
killExecutorsIfNecessary()
433+
}
427434
}, 10, 10, TimeUnit.SECONDS)
428435
}
429436

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.Logging
22+
23+
/**
24+
* To use this configuration in PySpark, one could do
25+
* {{{
26+
* >>> cook_conf_obj = sc._jvm.org.apache.spark.scheduler.CookSchedulerConfiguration
27+
* >>> cook_conf = cook_conf_obj.conf()
28+
* >>> cook_conf.setMaximumCores(8)
29+
* }}}
30+
*/
31+
class CookSchedulerConfiguration(
32+
@transient val conf: SparkConf
33+
) extends Logging {
34+
35+
private[this] val SPARK_MAX_CORES = "spark.cores.max"
36+
private[this] val SPARK_EXECUTOR_CORES = "spark.executor.cores"
37+
private[this] val SPARK_EXECUTOR_FAILURES = "spark.executor.failures"
38+
private[this] val SPARK_DYNAMICALLOCATION_ENABLED =
39+
"spark.dynamicAllocation.enabled"
40+
private[this] val SPARK_DYNAMICALLOCATION_MIN_EXECUTORS =
41+
"spark.dynamicAllocation.minExecutors"
42+
private[this] val SPARK_DYNAMICALLOCATION_MAX_EXECUTORS =
43+
"spark.dynamicAllocation.maxExecutors"
44+
private[this] val SPARK_COOK_PRIORITY = "spark.cook.priority"
45+
private[this] val SPARK_COOK_JOB_NAME_PREFIX = "spark.cook.job.name.prefix"
46+
47+
private[this] val dynamicAllocationEnabled =
48+
conf.getBoolean(SPARK_DYNAMICALLOCATION_ENABLED, defaultValue = false)
49+
private[this] val coresPerCookJob = conf.getInt(SPARK_EXECUTOR_CORES, 1)
50+
51+
// ==========================================================================
52+
// Config options
53+
private[this] var maximumCores = if (dynamicAllocationEnabled) {
54+
conf.getInt(SPARK_DYNAMICALLOCATION_MIN_EXECUTORS, 0) * coresPerCookJob
55+
} else {
56+
conf.getInt(SPARK_MAX_CORES, 0)
57+
}
58+
private[this] var maximumExecutorFailures =
59+
conf.getInt(SPARK_EXECUTOR_FAILURES, 5)
60+
private[this] var priorityOfCookJob = conf.getInt(SPARK_COOK_PRIORITY, 75)
61+
private[this] var prefixOfCookJobName =
62+
conf.get(SPARK_COOK_JOB_NAME_PREFIX, "sparkjob")
63+
64+
// ==========================================================================1
65+
66+
if (conf.getOption(SPARK_MAX_CORES).isDefined && dynamicAllocationEnabled) {
67+
logWarning(
68+
s"$SPARK_MAX_CORES is ignored when dynamic allocation is enabled. " +
69+
s"Use $SPARK_DYNAMICALLOCATION_MAX_EXECUTORS instead.")
70+
}
71+
72+
def getCoresPerCookJob: Int = coresPerCookJob
73+
74+
def getMaximumCores: Int = maximumCores
75+
76+
def setMaximumCores(cores: Int): CookSchedulerConfiguration = {
77+
require(cores >= 0, "The maximum number of cores should be non-negative.")
78+
maximumCores = cores
79+
logInfo(
80+
s"The maximum cores of Cook scheduler has been set to $maximumCores")
81+
this
82+
}
83+
84+
def getPriorityPerCookJob: Int = priorityOfCookJob
85+
86+
def setPriorityPerCookJob(priority: Int): CookSchedulerConfiguration = {
87+
require(
88+
0 < priority && priority <= 100,
89+
"The priority of Cook job must be within range of (0, 100]."
90+
)
91+
priorityOfCookJob = priority
92+
logInfo(
93+
s"The priority of jobs in Cook scheduler has been set to $priorityOfCookJob")
94+
this
95+
}
96+
97+
def getPrefixOfCookJobName: String = prefixOfCookJobName
98+
99+
def setPrefixOfCookJobName(prefix: String): CookSchedulerConfiguration = {
100+
prefixOfCookJobName = prefix
101+
logInfo(
102+
s"The name prefix of jobs in Cook scheduler has been set to $prefixOfCookJobName")
103+
this
104+
}
105+
106+
def getMaximumExecutorFailures: Int = maximumExecutorFailures
107+
108+
def setMaximumExecutorFailures(maxExecutorFailures: Int): CookSchedulerConfiguration = {
109+
require(
110+
maxExecutorFailures > 0,
111+
"The maximum executor failures must be positive."
112+
)
113+
maximumExecutorFailures = maxExecutorFailures
114+
this
115+
}
116+
117+
def getExecutorsToRequest(executorsRequested: Int = 0): Int =
118+
Math.max(maximumCores / coresPerCookJob - executorsRequested, 0)
119+
120+
def getExecutorsToKil(executorsRequested: Int = 0): Int =
121+
Math.max(executorsRequested - maximumCores / coresPerCookJob, 0)
122+
123+
}
124+
125+
object CookSchedulerConfiguration {
126+
127+
@volatile
128+
private[this] var configuration: CookSchedulerConfiguration = _
129+
130+
private[spark] def conf(conf: SparkConf): CookSchedulerConfiguration = {
131+
if (configuration == null) {
132+
this.synchronized {
133+
if (configuration == null) {
134+
configuration = new CookSchedulerConfiguration(conf)
135+
}
136+
}
137+
}
138+
configuration
139+
}
140+
141+
def conf(): CookSchedulerConfiguration = {
142+
require(configuration != null, "It haven't been initialized yet.")
143+
configuration
144+
}
145+
}

0 commit comments

Comments
 (0)