Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private[spark] class ApplicationMaster(
@volatile private var allocator: YarnAllocator = _
private val allocatorLock = new Object()

@volatile private var backend: CoarseGrainedSchedulerBackend = _

// Fields used in client mode.
private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _
Expand Down Expand Up @@ -218,11 +220,13 @@ private[spark] class ApplicationMaster(
}
}

private def sparkContextInitialized(sc: SparkContext) = {
private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = {
sparkContextRef.synchronized {
sparkContextRef.compareAndSet(null, sc)
sparkContextRef.notifyAll()
}
this.backend = backend
if (null != allocator) allocator.setScheduler(backend)
}

private def sparkContextStopped(sc: SparkContext) = {
Expand Down Expand Up @@ -252,6 +256,7 @@ private[spark] class ApplicationMaster(
uiAddress,
historyAddress,
securityMgr)
if (null != backend) allocator.setScheduler(backend)

allocator.allocateResources()
reporterThread = launchReporterThread()
Expand Down Expand Up @@ -612,8 +617,9 @@ object ApplicationMaster extends Logging {
}
}

private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
private[spark] def sparkContextInitialized(sc: SparkContext,
backend: CoarseGrainedSchedulerBackend): Unit = {
master.sparkContextInitialized(sc, backend)
}

private[spark] def sparkContextStopped(sc: SparkContext): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
Expand Down Expand Up @@ -88,6 +89,10 @@ private[yarn] class YarnAllocator(
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]

private var numUnexpectedContainerRelease = 0L
private var backend: CoarseGrainedSchedulerBackend = _
private val containerIdToExecutorId = new HashMap[ContainerId, String]

// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
Expand Down Expand Up @@ -165,6 +170,7 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
containerIdToExecutorId.remove(container.getId)
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
Expand Down Expand Up @@ -353,6 +359,7 @@ private[yarn] class YarnAllocator(

logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId

val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
Expand Down Expand Up @@ -384,6 +391,7 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId

var needNotify = false
if (releasedContainers.contains(containerId)) {
// Already marked the container for release, so remove it from
// `releasedContainers`.
Expand Down Expand Up @@ -415,6 +423,7 @@ private[yarn] class YarnAllocator(
". Diagnostics: " + completedContainer.getDiagnostics)
numExecutorsFailed += 1
}
needNotify = true
}

if (allocatedContainerToHostMap.containsKey(containerId)) {
Expand All @@ -430,6 +439,15 @@ private[yarn] class YarnAllocator(

allocatedContainerToHostMap.remove(containerId)
}

val executorIdOpt = containerIdToExecutorId.remove(containerId)
if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get)

if (needNotify && executorIdOpt.isDefined) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
notifyBackend(executorIdOpt.get, containerId)
}
}
}

Expand All @@ -438,6 +456,19 @@ private[yarn] class YarnAllocator(
amClient.releaseAssignedContainer(container.getId())
}

private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = {
numUnexpectedContainerRelease += 1
if (null != backend) {
backend.removeExecutor(executorId,
"Yarn deallocated the executor (" + executorId + ") container " + containerId)
}
}

private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease

private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized {
this.backend = backend
}
}

private object YarnAllocator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule
logInfo("Created YarnClusterScheduler")

override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
ApplicationMaster.sparkContextInitialized(sc,
this.backend.asInstanceOf[CoarseGrainedSchedulerBackend])
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumPendingAllocate should be (1)
}

test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))

handler.requestTotalExecutors(2)

val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
Expand Down