Skip to content

Commit 012f904

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-1774] Respect SparkSubmit --jars on YARN (client)
SparkSubmit ignores `--jars` for YARN client. This is a bug. This PR also automatically adds the application jar to `spark.jar`. Previously, when running as yarn-client, you must specify the jar additionally through `--files` (because `--jars` didn't work). Now you don't have to explicitly specify it through either. Tested on a YARN cluster. Author: Andrew Or <andrewor14@gmail.com> Closes #710 from andrewor14/yarn-jars and squashes the following commits: 35d1928 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars c27bf6c [Andrew Or] For yarn-cluster and python, do not add primaryResource to spark.jar c92c5bf [Andrew Or] Minor cleanups 269f9f3 [Andrew Or] Fix format 013d840 [Andrew Or] Fix tests 1407474 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars 3bb75e8 [Andrew Or] Allow SparkSubmit --jars to take effect in yarn-client mode (cherry picked from commit 83e0424) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
1 parent c7253da commit 012f904

4 files changed

Lines changed: 102 additions & 53 deletions

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
917917
if (SparkHadoopUtil.get.isYarnMode() &&
918918
(master == "yarn-standalone" || master == "yarn-cluster")) {
919919
// In order for this to work in yarn-cluster mode the user must specify the
920-
// --addjars option to the client to upload the file into the distributed cache
920+
// --addJars option to the client to upload the file into the distributed cache
921921
// of the AM to make it show up in the current working directory.
922922
val fileName = new Path(uri.getPath).getName()
923923
try {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ object SparkSubmit {
6767
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
6868

6969
/**
70-
* @return
71-
* a tuple containing the arguments for the child, a list of classpath
70+
* @return a tuple containing the arguments for the child, a list of classpath
7271
* entries for the child, a list of system propertes, a list of env vars
7372
* and the main class for the child
7473
*/
@@ -115,13 +114,16 @@ object SparkSubmit {
115114
val sysProps = new HashMap[String, String]()
116115
var childMainClass = ""
117116

117+
val isPython = args.isPython
118+
val isYarnCluster = clusterManager == YARN && deployOnCluster
119+
118120
if (clusterManager == MESOS && deployOnCluster) {
119121
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
120122
}
121123

122124
// If we're running a Python app, set the Java class to run to be our PythonRunner, add
123125
// Python files to deployment list, and pass the main file and Python path to PythonRunner
124-
if (args.isPython) {
126+
if (isPython) {
125127
if (deployOnCluster) {
126128
printErrorAndExit("Cannot currently run Python driver programs on cluster")
127129
}
@@ -161,35 +163,35 @@ object SparkSubmit {
161163
val options = List[OptionAssigner](
162164
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
163165
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
166+
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
164167
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
165168
sysProp = "spark.driver.extraClassPath"),
166169
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
167170
sysProp = "spark.driver.extraJavaOptions"),
168171
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
169172
sysProp = "spark.driver.extraLibraryPath"),
170173
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
171-
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
174+
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
175+
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
172176
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
173177
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
174178
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
175179
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
176180
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
177181
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
178182
sysProp = "spark.executor.memory"),
179-
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
180-
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
181183
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
182184
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
183185
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
184186
sysProp = "spark.cores.max"),
185187
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
186188
OptionAssigner(args.files, YARN, true, clOption = "--files"),
189+
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
190+
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
187191
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
188192
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
189193
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
190-
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
191-
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
192-
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
194+
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
193195
)
194196

195197
// For client mode make any added jars immediately visible on the classpath
@@ -212,21 +214,22 @@ object SparkSubmit {
212214
}
213215
}
214216

215-
// For standalone mode, add the application jar automatically so the user doesn't have to
216-
// call sc.addJar. TODO: Standalone mode in the cluster
217-
if (clusterManager == STANDALONE) {
217+
// Add the application jar automatically so the user doesn't have to call sc.addJar
218+
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
219+
// For python files, the primary resource is already distributed as a regular file
220+
if (!isYarnCluster && !isPython) {
218221
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
219222
if (args.primaryResource != RESERVED_JAR_NAME) {
220223
jars = jars ++ Seq(args.primaryResource)
221224
}
222225
sysProps.put("spark.jars", jars.mkString(","))
223226
}
224227

228+
// Standalone cluster specific configurations
225229
if (deployOnCluster && clusterManager == STANDALONE) {
226230
if (args.supervise) {
227231
childArgs += "--supervise"
228232
}
229-
230233
childMainClass = "org.apache.spark.deploy.Client"
231234
childArgs += "launch"
232235
childArgs += (args.master, args.primaryResource, args.mainClass)
@@ -243,16 +246,20 @@ object SparkSubmit {
243246
}
244247
}
245248

249+
// Read from default spark properties, if any
246250
for ((k, v) <- args.getDefaultSparkProperties) {
247251
if (!sysProps.contains(k)) sysProps(k) = v
248252
}
249253

250254
(childArgs, childClasspath, sysProps, childMainClass)
251255
}
252256

253-
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
254-
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
255-
{
257+
private def launch(
258+
childArgs: ArrayBuffer[String],
259+
childClasspath: ArrayBuffer[String],
260+
sysProps: Map[String, String],
261+
childMainClass: String,
262+
verbose: Boolean = false) {
256263
if (verbose) {
257264
printStream.println(s"Main class:\n$childMainClass")
258265
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 77 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -87,25 +87,41 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
8787
}
8888

8989
test("handles arguments with --key=val") {
90-
val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
90+
val clArgs = Seq(
91+
"--jars=one.jar,two.jar,three.jar",
92+
"--name=myApp")
9193
val appArgs = new SparkSubmitArguments(clArgs)
9294
appArgs.jars should be ("one.jar,two.jar,three.jar")
9395
appArgs.name should be ("myApp")
9496
}
9597

9698
test("handles arguments to user program") {
97-
val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
99+
val clArgs = Seq(
100+
"--name", "myApp",
101+
"--class", "Foo",
102+
"userjar.jar",
103+
"some",
104+
"--weird", "args")
98105
val appArgs = new SparkSubmitArguments(clArgs)
99106
appArgs.childArgs should be (Seq("some", "--weird", "args"))
100107
}
101108

102109
test("handles YARN cluster mode") {
103-
val clArgs = Seq("--deploy-mode", "cluster",
104-
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
105-
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
106-
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
107-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
108-
"thejar.jar", "arg1", "arg2")
110+
val clArgs = Seq(
111+
"--deploy-mode", "cluster",
112+
"--master", "yarn",
113+
"--executor-memory", "5g",
114+
"--executor-cores", "5",
115+
"--class", "org.SomeClass",
116+
"--jars", "one.jar,two.jar,three.jar",
117+
"--driver-memory", "4g",
118+
"--queue", "thequeue",
119+
"--files", "file1.txt,file2.txt",
120+
"--archives", "archive1.txt,archive2.txt",
121+
"--num-executors", "6",
122+
"--name", "beauty",
123+
"thejar.jar",
124+
"arg1", "arg2")
109125
val appArgs = new SparkSubmitArguments(clArgs)
110126
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
111127
val childArgsStr = childArgs.mkString(" ")
@@ -127,12 +143,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
127143
}
128144

129145
test("handles YARN client mode") {
130-
val clArgs = Seq("--deploy-mode", "client",
131-
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
132-
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
133-
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
134-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
135-
"thejar.jar", "arg1", "arg2")
146+
val clArgs = Seq(
147+
"--deploy-mode", "client",
148+
"--master", "yarn",
149+
"--executor-memory", "5g",
150+
"--executor-cores", "5",
151+
"--class", "org.SomeClass",
152+
"--jars", "one.jar,two.jar,three.jar",
153+
"--driver-memory", "4g",
154+
"--queue", "thequeue",
155+
"--files", "file1.txt,file2.txt",
156+
"--archives", "archive1.txt,archive2.txt",
157+
"--num-executors", "6",
158+
"--name", "trill",
159+
"thejar.jar",
160+
"arg1", "arg2")
136161
val appArgs = new SparkSubmitArguments(clArgs)
137162
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
138163
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -142,6 +167,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
142167
classpath should contain ("two.jar")
143168
classpath should contain ("three.jar")
144169
sysProps("spark.app.name") should be ("trill")
170+
sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
145171
sysProps("spark.executor.memory") should be ("5g")
146172
sysProps("spark.executor.cores") should be ("5")
147173
sysProps("spark.yarn.queue") should be ("thequeue")
@@ -152,9 +178,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
152178
}
153179

154180
test("handles standalone cluster mode") {
155-
val clArgs = Seq("--deploy-mode", "cluster",
156-
"--master", "spark://h:p", "--class", "org.SomeClass",
157-
"--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
181+
val clArgs = Seq(
182+
"--deploy-mode", "cluster",
183+
"--master", "spark://h:p",
184+
"--class", "org.SomeClass",
185+
"--supervise",
186+
"--driver-memory", "4g",
187+
"--driver-cores", "5",
188+
"thejar.jar",
189+
"arg1", "arg2")
158190
val appArgs = new SparkSubmitArguments(clArgs)
159191
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
160192
val childArgsStr = childArgs.mkString(" ")
@@ -166,9 +198,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
166198
}
167199

168200
test("handles standalone client mode") {
169-
val clArgs = Seq("--deploy-mode", "client",
170-
"--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
171-
"--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
201+
val clArgs = Seq(
202+
"--deploy-mode", "client",
203+
"--master", "spark://h:p",
204+
"--executor-memory", "5g",
205+
"--total-executor-cores", "5",
206+
"--class", "org.SomeClass",
207+
"--driver-memory", "4g",
208+
"thejar.jar",
209+
"arg1", "arg2")
172210
val appArgs = new SparkSubmitArguments(clArgs)
173211
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
174212
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -179,9 +217,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
179217
}
180218

181219
test("handles mesos client mode") {
182-
val clArgs = Seq("--deploy-mode", "client",
183-
"--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
184-
"--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
220+
val clArgs = Seq(
221+
"--deploy-mode", "client",
222+
"--master", "mesos://h:p",
223+
"--executor-memory", "5g",
224+
"--total-executor-cores", "5",
225+
"--class", "org.SomeClass",
226+
"--driver-memory", "4g",
227+
"thejar.jar",
228+
"arg1", "arg2")
185229
val appArgs = new SparkSubmitArguments(clArgs)
186230
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
187231
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -192,15 +236,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
192236
}
193237

194238
test("launch simple application with spark-submit") {
195-
runSparkSubmit(
196-
Seq(
197-
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
198-
"--name", "testApp",
199-
"--master", "local",
200-
"unUsed.jar"))
239+
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
240+
val args = Seq(
241+
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
242+
"--name", "testApp",
243+
"--master", "local",
244+
unusedJar.toString)
245+
runSparkSubmit(args)
201246
}
202247

203248
test("spark submit includes jars passed in through --jar") {
249+
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
204250
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
205251
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
206252
val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
@@ -209,7 +255,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
209255
"--name", "testApp",
210256
"--master", "local-cluster[2,1,512]",
211257
"--jars", jarsString,
212-
"unused.jar")
258+
unusedJar.toString)
213259
runSparkSubmit(args)
214260
}
215261

@@ -227,7 +273,7 @@ object JarCreationTest {
227273
def main(args: Array[String]) {
228274
val conf = new SparkConf()
229275
val sc = new SparkContext(conf)
230-
val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
276+
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
231277
var foundClasses = false
232278
try {
233279
Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
@@ -248,7 +294,6 @@ object SimpleApplicationTest {
248294
def main(args: Array[String]) {
249295
val conf = new SparkConf()
250296
val sc = new SparkContext(conf)
251-
252297
val configs = Seq("spark.master", "spark.app.name")
253298
for (config <- configs) {
254299
val masterValue = conf.get(config)
@@ -266,6 +311,5 @@ object SimpleApplicationTest {
266311
s"Master had $config=$masterValue but executor had $config=$executorValue")
267312
}
268313
}
269-
270314
}
271315
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
2020
import scala.collection.mutable.{ArrayBuffer, HashMap}
2121

2222
import org.apache.spark.SparkConf
23-
import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
23+
import org.apache.spark.scheduler.InputFormatInfo
2424
import org.apache.spark.util.IntParam
2525
import org.apache.spark.util.MemoryParam
2626

@@ -40,9 +40,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
4040
var amMemory: Int = 512 // MB
4141
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
4242
var appName: String = "Spark"
43-
// TODO
4443
var inputFormatInfo: List[InputFormatInfo] = null
45-
// TODO(harvey)
4644
var priority = 0
4745

4846
parseArgs(args.toList)

0 commit comments

Comments
 (0)