Skip to content

Commit 7eb9cbc

Browse files
committed
[SPARK-3072] YARN - Exit when reach max number failed executors
In some cases on hadoop 2.x the spark application master doesn't properly exit and hangs around for 10 minutes after its really done. We should make sure it exits properly and stops the driver. Author: Thomas Graves <[email protected]> Closes #2022 from tgravescs/SPARK-3072 and squashes the following commits: 665701d [Thomas Graves] Exit when reach max number failed executors
1 parent cd0720c commit 7eb9cbc

File tree

4 files changed

+40
-19
lines changed

4 files changed

+40
-19
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
267267
// TODO: This is a bit ugly. Can we make it nicer?
268268
// TODO: Handle container failure
269269

270-
// Exists the loop if the user thread exits.
271-
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
272-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
273-
finishApplicationMaster(FinalApplicationStatus.FAILED,
274-
"max number of executor failures reached")
275-
}
270+
// Exits the loop if the user thread exits.
271+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
272+
&& !isFinished) {
273+
checkNumExecutorsFailed()
276274
yarnAllocator.allocateContainers(
277275
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
278276
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
303301

304302
val t = new Thread {
305303
override def run() {
306-
while (userThread.isAlive) {
307-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
308-
finishApplicationMaster(FinalApplicationStatus.FAILED,
309-
"max number of executor failures reached")
310-
}
304+
while (userThread.isAlive && !isFinished) {
305+
checkNumExecutorsFailed()
311306
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
312307
if (missingExecutorCount > 0) {
313308
logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
327322
t
328323
}
329324

325+
private def checkNumExecutorsFailed() {
326+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
327+
logInfo("max number of executor failures reached")
328+
finishApplicationMaster(FinalApplicationStatus.FAILED,
329+
"max number of executor failures reached")
330+
// make sure to stop the user thread
331+
val sparkContext = ApplicationMaster.sparkContextRef.get()
332+
if (sparkContext != null) {
333+
logInfo("Invoking sc stop from checkNumExecutorsFailed")
334+
sparkContext.stop()
335+
} else {
336+
logError("sparkContext is null when should shutdown")
337+
}
338+
}
339+
}
340+
330341
private def sendProgress() {
331342
logDebug("Sending progress")
332343
// Simulated with an allocate request with no nodes requested ...

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
249249
// Wait until all containers have finished
250250
// TODO: This is a bit ugly. Can we make it nicer?
251251
// TODO: Handle container failure
252-
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
252+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
253+
!isFinished) {
253254
yarnAllocator.allocateContainers(
254255
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
255256
checkNumExecutorsFailed()
@@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
271272

272273
val t = new Thread {
273274
override def run() {
274-
while (!driverClosed) {
275+
while (!driverClosed && !isFinished) {
275276
checkNumExecutorsFailed()
276277
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
277278
if (missingExecutorCount > 0) {

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
247247
yarnAllocator.allocateResources()
248248
// Exits the loop if the user thread exits.
249249

250-
var iters = 0
251-
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
250+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
251+
&& !isFinished) {
252252
checkNumExecutorsFailed()
253253
allocateMissingExecutor()
254254
yarnAllocator.allocateResources()
255255
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
256-
iters += 1
257256
}
258257
}
259258
logInfo("All executors have launched.")
@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
271270

272271
private def checkNumExecutorsFailed() {
273272
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
273+
logInfo("max number of executor failures reached")
274274
finishApplicationMaster(FinalApplicationStatus.FAILED,
275275
"max number of executor failures reached")
276+
// make sure to stop the user thread
277+
val sparkContext = ApplicationMaster.sparkContextRef.get()
278+
if (sparkContext != null) {
279+
logInfo("Invoking sc stop from checkNumExecutorsFailed")
280+
sparkContext.stop()
281+
} else {
282+
logError("sparkContext is null when should shutdown")
283+
}
276284
}
277285
}
278286

@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
289297

290298
val t = new Thread {
291299
override def run() {
292-
while (userThread.isAlive) {
300+
while (userThread.isAlive && !isFinished) {
293301
checkNumExecutorsFailed()
294302
allocateMissingExecutor()
295303
logDebug("Sending progress")

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
217217
// Wait until all containers have launched
218218
yarnAllocator.addResourceRequests(args.numExecutors)
219219
yarnAllocator.allocateResources()
220-
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
220+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
221+
!isFinished) {
221222
checkNumExecutorsFailed()
222223
allocateMissingExecutor()
223224
yarnAllocator.allocateResources()
@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
249250

250251
val t = new Thread {
251252
override def run() {
252-
while (!driverClosed) {
253+
while (!driverClosed && !isFinished) {
253254
checkNumExecutorsFailed()
254255
allocateMissingExecutor()
255256
logDebug("Sending progress")

0 commit comments

Comments
 (0)