Skip to content

Commit 0bdb4e5

Browse files
GschiavonMarcelo Vanzin
authored andcommitted
[SPARK-22574][MESOS][SUBMIT] Check submission request parameters
## What changes were proposed in this pull request? PR closed with all the comments -> #19793 It solves the problem when submitting a wrong CreateSubmissionRequest to Spark Dispatcher was causing a bad state of Dispatcher and making it inactive as a Mesos framework. https://issues.apache.org/jira/browse/SPARK-22574 ## How was this patch tested? All spark test passed successfully. It was tested sending a wrong request (without appArgs) before and after the change. The point is easy, check if the value is null before being accessed. This was before the change, leaving the dispatcher inactive: ``` Exception in thread "Thread-22" java.lang.NullPointerException at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.getDriverCommandValue(MesosClusterScheduler.scala:444) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.buildDriverCommand(MesosClusterScheduler.scala:451) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.org$apache$spark$scheduler$cluster$mesos$MesosClusterScheduler$$createTaskInfo(MesosClusterScheduler.scala:538) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:570) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:555) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.scheduleTasks(MesosClusterScheduler.scala:555) at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.resourceOffers(MesosClusterScheduler.scala:621) ``` And after: ``` "message" : "Malformed request: org.apache.spark.deploy.rest.SubmitRestProtocolException: Validation of message CreateSubmissionRequest failed!\n\torg.apache.spark.deploy.rest.SubmitRestProtocolMessage.validate(SubmitRestProtocolMessage.scala:70)\n\torg.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:272)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:707)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\torg.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)\n\torg.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)\n\torg.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)\n\torg.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)\n\torg.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)\n\torg.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\torg.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)\n\torg.spark_project.jetty.server.Server.handle(Server.java:524)\n\torg.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)\n\torg.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)\n\torg.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)\n\torg.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)\n\torg.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)\n\tjava.lang.Thread.run(Thread.java:745)" ``` Author: German Schiavon <[email protected]> Closes #19966 from Gschiavon/fix-submission-request.
1 parent 1abcbed commit 0bdb4e5

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
4646
super.doValidate()
4747
assert(sparkProperties != null, "No Spark properties set!")
4848
assertFieldIsSet(appResource, "appResource")
49+
assertFieldIsSet(appArgs, "appArgs")
50+
assertFieldIsSet(environmentVariables, "environmentVariables")
4951
assertPropertyIsSet("spark.app.name")
5052
assertPropertyIsBoolean("spark.driver.supervise")
5153
assertPropertyIsNumeric("spark.driver.cores")

core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
8686
message.clientSparkVersion = "1.2.3"
8787
message.appResource = "honey-walnut-cherry.jar"
8888
message.mainClass = "org.apache.spark.examples.SparkPie"
89+
message.appArgs = Array("two slices")
90+
message.environmentVariables = Map("PATH" -> "/dev/null")
8991
val conf = new SparkConf(false)
9092
conf.set("spark.app.name", "SparkPie")
9193
message.sparkProperties = conf.getAll.toMap

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,17 @@ private[mesos] class MesosSubmitRequestServlet(
7777
private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
7878
// Required fields, including the main class because python is not yet supported
7979
val appResource = Option(request.appResource).getOrElse {
80-
throw new SubmitRestMissingFieldException("Application jar is missing.")
80+
throw new SubmitRestMissingFieldException("Application jar 'appResource' is missing.")
8181
}
8282
val mainClass = Option(request.mainClass).getOrElse {
83-
throw new SubmitRestMissingFieldException("Main class is missing.")
83+
throw new SubmitRestMissingFieldException("Main class 'mainClass' is missing.")
84+
}
85+
val appArgs = Option(request.appArgs).getOrElse {
86+
throw new SubmitRestMissingFieldException("Application arguments 'appArgs' are missing.")
87+
}
88+
val environmentVariables = Option(request.environmentVariables).getOrElse {
89+
throw new SubmitRestMissingFieldException("Environment variables 'environmentVariables' " +
90+
"are missing.")
8491
}
8592

8693
// Optional fields
@@ -91,8 +98,6 @@ private[mesos] class MesosSubmitRequestServlet(
9198
val superviseDriver = sparkProperties.get("spark.driver.supervise")
9299
val driverMemory = sparkProperties.get("spark.driver.memory")
93100
val driverCores = sparkProperties.get("spark.driver.cores")
94-
val appArgs = request.appArgs
95-
val environmentVariables = request.environmentVariables
96101
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
97102

98103
// Construct driver description

0 commit comments

Comments
 (0)