@@ -533,21 +533,17 @@ private[master] class Master(
533533 }
534534
535535 /**
536- * This functions starts one or more executors on each worker.
537- *
538- * It traverses the available worker list. In spreadOutApps mode, it allocates at most
539- * spark.executor.cores (multiple executors per worker) or 1 core(s) (one executor per worker)
540- * for each visit of the worker (can be less than it when the worker does not have enough cores
541- * or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory and tracks
542- * the resource allocation in a 2d array for each visit; Otherwise, it allocates at most
543- * spark.executor.cores (multiple executors per worker) or worker.freeCores (one executor per
544- * worker) cores and app.desc.memoryPerExecutorMB megabytes to each executor.
536+ * The resource allocator spread out each app among all the workers until it has all its cores in
537+ * spreadOut mode otherwise packs each app into as few workers as possible until it has assigned
538+ * all its cores. User can define spark.deploy.maxCoresPerExecutor per application to
539+ * limit the maximum number of cores to allocate to each executor on each worker; if the parameter
540+ * is not defined, then only one executor will be launched on a worker.
545541 */
546542 private def startExecutorsOnWorkers () {
547543 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
548544 // in the queue, then the second app, etc.
549545 if (spreadOutApps) {
550- // Try to spread out each app among all the nodes , until it has all its cores
546+ // Try to spread out each app among all the workers , until it has all its cores
551547 for (app <- waitingApps if app.coresLeft > 0 ) {
552548 val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int .MaxValue )
553549 val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
@@ -565,35 +561,43 @@ private[master] class Master(
565561 }
566562 // Now that we've decided how many cores to give on each node, let's actually give them
567563 for (pos <- 0 until numUsable) {
568- while (assigned(pos) > 0 ) {
569- val coresForThisExecutor = math.min(maxCoresPerExecutor, assigned(pos))
570- val exec = app.addExecutor(usableWorkers(pos), coresForThisExecutor)
571- assigned(pos) -= coresForThisExecutor
572- launchExecutor(usableWorkers(pos), exec)
573- app.state = ApplicationState .RUNNING
574- }
564+ allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
575565 }
576566 }
577567 } else {
578- // Pack each app into as few nodes as possible until we've assigned all its cores
568+ // Pack each app into as few workers as possible until we've assigned all its cores
579569 for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState .ALIVE ) {
580570 for (app <- waitingApps if app.coresLeft > 0 ) {
581- val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int .MaxValue )
582- if (canUse(app, worker)) {
583- var coresToAssign = math.min(worker.coresFree, app.coresLeft)
584- while (coresToAssign > 0 ) {
585- val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign)
586- val exec = app.addExecutor(worker, coresForThisExecutor)
587- coresToAssign -= coresForThisExecutor
588- launchExecutor(worker, exec)
589- app.state = ApplicationState .RUNNING
590- }
591- }
571+ allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
592572 }
593573 }
594574 }
595575 }
596576
577+ /**
578+ * allocate resources in a certain worker to one or more executors
579+ * @param app the info of the application which the executors belong to
580+ * @param coresDemand the total number of cores to be allocated to this application
581+ * @param worker the worker info
582+ */
583+ private def allocateWorkerResourceToExecutors (
584+ app : ApplicationInfo ,
585+ coresDemand : Int ,
586+ worker : WorkerInfo ): Unit = {
587+ if (canUse(app, worker)) {
588+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
589+ val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int .MaxValue )
590+ var coresToAssign = coresDemand
591+ while (coresToAssign > 0 && worker.memoryFree >= memoryPerExecutor) {
592+ val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign)
593+ val exec = app.addExecutor(worker, coresForThisExecutor)
594+ coresToAssign -= coresForThisExecutor
595+ launchExecutor(worker, exec)
596+ app.state = ApplicationState .RUNNING
597+ }
598+ }
599+ }
600+
597601 private def startDriversOnWorkers (): Unit = {
598602 val shuffledWorkers = Random .shuffle(workers) // Randomization helps balance drivers
599603 for (worker <- shuffledWorkers if worker.state == WorkerState .ALIVE ) {
0 commit comments