Skip to content

Commit f68d024

Browse files
author
Marcelo Vanzin
committed
[SPARK-7736] [CORE] [YARN] Make pyspark fail YARN app on failure.
The YARN backend doesn't like when user code calls `System.exit`, since it cannot know the exit status and thus cannot set an appropriate final status for the application. So, for pyspark, avoid that call and instead throw an exception with the exit code. SparkSubmit handles that exception and exits with the given exit code, while YARN uses the exit code as the failure code for the Spark app. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7751 from vanzin/SPARK-9416.
1 parent ed092a0 commit f68d024

4 files changed

Lines changed: 40 additions & 8 deletions

File tree

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
3030
*/
3131
private[spark] class SparkDriverExecutionException(cause: Throwable)
3232
extends SparkException("Execution error", cause)
33+
34+
/**
35+
* Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
36+
* the parent SparkSubmit process to exit with the same exit code.
37+
*/
38+
private[spark] case class SparkUserAppException(exitCode: Int)
39+
extends SparkException(s"User application exited with $exitCode")

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.collection.JavaConversions._
2525
import scala.util.Try
2626

27+
import org.apache.spark.SparkUserAppException
2728
import org.apache.spark.api.python.PythonUtils
2829
import org.apache.spark.util.{RedirectThread, Utils}
2930

@@ -46,7 +47,14 @@ object PythonRunner {
4647
// Launch a Py4J gateway server for the process to connect to; this will let it see our
4748
// Java system properties and such
4849
val gatewayServer = new py4j.GatewayServer(null, 0)
49-
gatewayServer.start()
50+
val thread = new Thread(new Runnable() {
51+
override def run(): Unit = Utils.logUncaughtExceptions {
52+
gatewayServer.start()
53+
}
54+
})
55+
thread.setName("py4j-gateway")
56+
thread.setDaemon(true)
57+
thread.start()
5058

5159
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
5260
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
@@ -64,11 +72,18 @@ object PythonRunner {
6472
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
6573
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
6674
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
67-
val process = builder.start()
75+
try {
76+
val process = builder.start()
6877

69-
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
78+
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
7079

71-
System.exit(process.waitFor())
80+
val exitCode = process.waitFor()
81+
if (exitCode != 0) {
82+
throw new SparkUserAppException(exitCode)
83+
}
84+
} finally {
85+
gatewayServer.shutdown()
86+
}
7287
}
7388

7489
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3939
import org.apache.ivy.plugins.repository.file.FileRepository
4040
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
4141

42+
import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
4243
import org.apache.spark.api.r.RUtils
43-
import org.apache.spark.SPARK_VERSION
4444
import org.apache.spark.deploy.rest._
4545
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4646

@@ -672,7 +672,13 @@ object SparkSubmit {
672672
mainMethod.invoke(null, childArgs.toArray)
673673
} catch {
674674
case t: Throwable =>
675-
throw findCause(t)
675+
findCause(t) match {
676+
case SparkUserAppException(exitCode) =>
677+
System.exit(exitCode)
678+
679+
case t: Throwable =>
680+
throw t
681+
}
676682
}
677683
}
678684

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records._
3030
import org.apache.hadoop.yarn.conf.YarnConfiguration
3131

3232
import org.apache.spark.rpc._
33-
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
34-
import org.apache.spark.SparkException
33+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
34+
SparkException, SparkUserAppException}
3535
import org.apache.spark.deploy.SparkHadoopUtil
3636
import org.apache.spark.deploy.history.HistoryServer
3737
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
@@ -530,6 +530,10 @@ private[spark] class ApplicationMaster(
530530
e.getCause match {
531531
case _: InterruptedException =>
532532
// Reporter thread can interrupt to stop user class
533+
case SparkUserAppException(exitCode) =>
534+
val msg = s"User application exited with status $exitCode"
535+
logError(msg)
536+
finish(FinalApplicationStatus.FAILED, exitCode, msg)
533537
case cause: Throwable =>
534538
logError("User class threw exception: " + cause, cause)
535539
finish(FinalApplicationStatus.FAILED,

0 commit comments

Comments
 (0)