@@ -533,7 +533,7 @@ private[master] class Master(
533533
534534 /**
535535 * Schedule executors to be launched on the workers.
536- * Returns an array containing number of cores assigned to each worker (None if scheduling fails)
536+ * Returns an array containing number of cores assigned to each worker.
537537 *
538538 * There are two modes of launching executors. The first attempts to spread out an application's
539539 * executors on as many workers as possible, while the second does the opposite (i.e. launch them
@@ -563,41 +563,29 @@ private[master] class Master(
563563 val assignedCores = new Array [Int ](numUsable) // Number of cores to give to each worker
564564 val assignedMemory = new Array [Int ](numUsable) // Amount of memory to give to each worker
565565 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
566- var pos = 0
567- var lastCoresToAssign = coresToAssign
568- if (spreadOutApps) {
569- // Try to spread out executors among workers (sparse scheduling)
570- while (coresToAssign > 0 ) {
571- if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
572- usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
573- coresToAssign -= coresPerExecutor
574- assignedCores(pos) += coresPerExecutor
575- assignedMemory(pos) += memoryPerExecutor
576- }
577- pos = (pos + 1 ) % numUsable
578- if (pos == 0 ) {
579- if (lastCoresToAssign == coresToAssign) {
580- return assignedCores
581- }
582- lastCoresToAssign = coresToAssign
583- }
584- }
585- } else {
586- // Pack executors into as few workers as possible (dense scheduling)
587- while (coresToAssign > 0 ) {
588- while (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
589- usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor &&
590- coresToAssign > 0 ) {
566+ var freeWorkers = (0 until numUsable).toIndexedSeq
567+
568+ def canLaunchExecutor (pos : Int ): Boolean = {
569+ usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
570+ usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
571+ }
572+
573+ while (coresToAssign > 0 && freeWorkers.nonEmpty) {
574+ freeWorkers = freeWorkers.filter(canLaunchExecutor)
575+ freeWorkers.foreach { pos =>
576+ var keepScheduling = true
577+ while (keepScheduling && canLaunchExecutor(pos) && coresToAssign > 0 ) {
591578 coresToAssign -= coresPerExecutor
592579 assignedCores(pos) += coresPerExecutor
593580 assignedMemory(pos) += memoryPerExecutor
594- }
595- pos = (pos + 1 ) % numUsable
596- if (pos == 0 ) {
597- if (lastCoresToAssign == coresToAssign) {
598- return assignedCores
581+
582+ // Spreading out an application means spreading out its executors across as
583+ // many workers as possible. If we are not spreading out, then we should keep
584+ // scheduling executors on this worker until we use all of its resources.
585+ // Otherwise, just move on to the next worker.
586+ if (spreadOutApps) {
587+ keepScheduling = false
599588 }
600- lastCoresToAssign = coresToAssign
601589 }
602590 }
603591 }
0 commit comments