@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
6666 // Lower and upper bounds on the number of executors. These are required.
6767 private val minNumExecutors = conf.getInt(" spark.dynamicAllocation.minExecutors" , - 1 )
6868 private val maxNumExecutors = conf.getInt(" spark.dynamicAllocation.maxExecutors" , - 1 )
69- verifyBounds()
7069
7170 // How long there must be backlogged tasks for before an addition is triggered
7271 private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
7776 " spark.dynamicAllocation.sustainedSchedulerBacklogTimeout" , schedulerBacklogTimeout)
7877
7978 // How long an executor must be idle for before it is removed
80- private val removeThresholdSeconds = conf.getLong(
79+ private val executorIdleTimeout = conf.getLong(
8180 " spark.dynamicAllocation.executorIdleTimeout" , 600 )
8281
82+ // During testing, the methods to actually kill and add executors are mocked out
83+ private val testing = conf.getBoolean(" spark.dynamicAllocation.testing" , false )
84+
85+ validateSettings()
86+
8387 // Number of executors to add in the next round
8488 private var numExecutorsToAdd = 1
8589
@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
103107 // Polling loop interval (ms)
104108 private val intervalMillis : Long = 100
105109
106- // Whether we are testing this class. This should only be used internally.
107- private val testing = conf.getBoolean(" spark.dynamicAllocation.testing" , false )
108-
109110 // Clock used to schedule when executors should be added and removed
110111 private var clock : Clock = new RealClock
111112
112113 /**
113- * Verify that the lower and upper bounds on the number of executors are valid.
114+ * Verify that the settings specified through the config are valid.
114115 * If not, throw an appropriate exception.
115116 */
116- private def verifyBounds (): Unit = {
117+ private def validateSettings (): Unit = {
117118 if (minNumExecutors < 0 || maxNumExecutors < 0 ) {
118119 throw new SparkException (" spark.dynamicAllocation.{min/max}Executors must be set!" )
119120 }
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
124125 throw new SparkException (s " spark.dynamicAllocation.minExecutors ( $minNumExecutors) must " +
125126 s " be less than or equal to spark.dynamicAllocation.maxExecutors ( $maxNumExecutors)! " )
126127 }
128+ if (schedulerBacklogTimeout <= 0 ) {
129+ throw new SparkException (" spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!" )
130+ }
131+ if (sustainedSchedulerBacklogTimeout <= 0 ) {
132+ throw new SparkException (
133+ " spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!" )
134+ }
135+ if (executorIdleTimeout <= 0 ) {
136+ throw new SparkException (" spark.dynamicAllocation.executorIdleTimeout must be > 0!" )
137+ }
138+ // Require external shuffle service for dynamic allocation
139+ // Otherwise, we may lose shuffle files when killing executors
140+ if (! conf.getBoolean(" spark.shuffle.service.enabled" , false ) && ! testing) {
141+ throw new SparkException (" Dynamic allocation of executors requires the external " +
142+ " shuffle service. You may enable this through spark.shuffle.service.enabled." )
143+ }
127144 }
128145
129146 /**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
254271 val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
255272 if (removeRequestAcknowledged) {
256273 logInfo(s " Removing executor $executorId because it has been idle for " +
257- s " $removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1 }) " )
274+ s " $executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1 }) " )
258275 executorsPendingToRemove.add(executorId)
259276 true
260277 } else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
329346 private def onExecutorIdle (executorId : String ): Unit = synchronized {
330347 if (! removeTimes.contains(executorId) && ! executorsPendingToRemove.contains(executorId)) {
331348 logDebug(s " Starting idle timer for $executorId because there are no more tasks " +
332- s " scheduled to run on the executor (to expire in $removeThresholdSeconds seconds) " )
333- removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
349+ s " scheduled to run on the executor (to expire in $executorIdleTimeout seconds) " )
350+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
334351 }
335352 }
336353
0 commit comments