@@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils {
208208
209209 /**
210210 * Prepare the environment for submitting an application.
211- * This returns a 4-tuple:
212- * (1) the arguments for the child process,
213- * (2) a list of classpath entries for the child,
214- * (3) a map of system properties, and
215- * (4) the main class for the child
211+ *
212+ * @param args the parsed SparkSubmitArguments used for environment preparation.
213+ * @param conf the Hadoop Configuration, this argument will only be set in unit test.
214+ * @return a 4-tuple:
215+ * (1) the arguments for the child process,
216+ * (2) a list of classpath entries for the child,
217+ * (3) a map of system properties, and
218+ * (4) the main class for the child
219+ *
216220 * Exposed for testing.
217221 */
218- private [deploy] def prepareSubmitEnvironment (args : SparkSubmitArguments )
222+ private [deploy] def prepareSubmitEnvironment (
223+ args : SparkSubmitArguments ,
224+ conf : Option [HadoopConfiguration ] = None )
219225 : (Seq [String ], Seq [String ], Map [String , String ], String ) = {
220226 // Return values
221227 val childArgs = new ArrayBuffer [String ]()
@@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils {
311317 }
312318
313319 // In client mode, download remote files.
320+ var localPrimaryResource : String = null
321+ var localJars : String = null
322+ var localPyFiles : String = null
323+ var localFiles : String = null
314324 if (deployMode == CLIENT ) {
315- val hadoopConf = new HadoopConfiguration ()
316- args.primaryResource = Option (args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
317- args.jars = Option (args.jars).map(downloadFileList(_, hadoopConf)).orNull
318- args.pyFiles = Option (args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
319- args.files = Option (args.files).map(downloadFileList(_, hadoopConf)).orNull
325+ val hadoopConf = conf.getOrElse( new HadoopConfiguration () )
326+ localPrimaryResource = Option (args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
327+ localJars = Option (args.jars).map(downloadFileList(_, hadoopConf)).orNull
328+ localPyFiles = Option (args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
329+ localFiles = Option (args.files).map(downloadFileList(_, hadoopConf)).orNull
320330 }
321331
322332 // Require all python files to be local, so we can add them to the PYTHONPATH
@@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils {
366376 // If a python file is provided, add it to the child arguments and list of files to deploy.
367377 // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
368378 args.mainClass = " org.apache.spark.deploy.PythonRunner"
369- args.childArgs = ArrayBuffer (args.primaryResource, args.pyFiles ) ++ args.childArgs
379+ args.childArgs = ArrayBuffer (localPrimaryResource, localPyFiles ) ++ args.childArgs
370380 if (clusterManager != YARN ) {
371381 // The YARN backend distributes the primary file differently, so don't merge it.
372382 args.files = mergeFileLists(args.files, args.primaryResource)
@@ -376,8 +386,8 @@ object SparkSubmit extends CommandLineUtils {
376386 // The YARN backend handles python files differently, so don't merge the lists.
377387 args.files = mergeFileLists(args.files, args.pyFiles)
378388 }
379- if (args.pyFiles != null ) {
380- sysProps(" spark.submit.pyFiles" ) = args.pyFiles
389+ if (localPyFiles != null ) {
390+ sysProps(" spark.submit.pyFiles" ) = localPyFiles
381391 }
382392 }
383393
@@ -431,7 +441,7 @@ object SparkSubmit extends CommandLineUtils {
431441 // If an R file is provided, add it to the child arguments and list of files to deploy.
432442 // Usage: RRunner <main R file> [app arguments]
433443 args.mainClass = " org.apache.spark.deploy.RRunner"
434- args.childArgs = ArrayBuffer (args.primaryResource ) ++ args.childArgs
444+ args.childArgs = ArrayBuffer (localPrimaryResource ) ++ args.childArgs
435445 args.files = mergeFileLists(args.files, args.primaryResource)
436446 }
437447 }
@@ -468,6 +478,7 @@ object SparkSubmit extends CommandLineUtils {
468478 OptionAssigner (args.queue, YARN , ALL_DEPLOY_MODES , sysProp = " spark.yarn.queue" ),
469479 OptionAssigner (args.numExecutors, YARN , ALL_DEPLOY_MODES ,
470480 sysProp = " spark.executor.instances" ),
481+ OptionAssigner (args.pyFiles, YARN , ALL_DEPLOY_MODES , sysProp = " spark.yarn.dist.pyFiles" ),
471482 OptionAssigner (args.jars, YARN , ALL_DEPLOY_MODES , sysProp = " spark.yarn.dist.jars" ),
472483 OptionAssigner (args.files, YARN , ALL_DEPLOY_MODES , sysProp = " spark.yarn.dist.files" ),
473484 OptionAssigner (args.archives, YARN , ALL_DEPLOY_MODES , sysProp = " spark.yarn.dist.archives" ),
@@ -491,15 +502,28 @@ object SparkSubmit extends CommandLineUtils {
491502 sysProp = " spark.driver.cores" ),
492503 OptionAssigner (args.supervise.toString, STANDALONE | MESOS , CLUSTER ,
493504 sysProp = " spark.driver.supervise" ),
494- OptionAssigner (args.ivyRepoPath, STANDALONE , CLUSTER , sysProp = " spark.jars.ivy" )
505+ OptionAssigner (args.ivyRepoPath, STANDALONE , CLUSTER , sysProp = " spark.jars.ivy" ),
506+
507+ // An internal option used only for spark-shell to add user jars to repl's classloader,
508+ // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
509+ // remote jars, so adding a new option to only specify local jars for spark-shell internally.
510+ OptionAssigner (localJars, ALL_CLUSTER_MGRS , CLIENT , sysProp = " spark.repl.local.jars" )
495511 )
496512
497513 // In client mode, launch the application main class directly
498514 // In addition, add the main application jar and any added jars (if any) to the classpath
499- // Also add the main application jar and any added jars to classpath in case YARN client
500- // requires these jars.
501- if (deployMode == CLIENT || isYarnCluster) {
515+ if (deployMode == CLIENT ) {
502516 childMainClass = args.mainClass
517+ if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
518+ childClasspath += localPrimaryResource
519+ }
520+ if (localJars != null ) { childClasspath ++= localJars.split(" ," ) }
521+ }
522+ // Add the main application jar and any added jars to classpath in case YARN client
523+ // requires these jars.
524+ // This assumes both primaryResource and user jars are local jars, otherwise it will not be
525+ // added to the classpath of YARN client.
526+ if (isYarnCluster) {
503527 if (isUserJar(args.primaryResource)) {
504528 childClasspath += args.primaryResource
505529 }
@@ -556,10 +580,6 @@ object SparkSubmit extends CommandLineUtils {
556580 if (args.isPython) {
557581 sysProps.put(" spark.yarn.isPython" , " true" )
558582 }
559-
560- if (args.pyFiles != null ) {
561- sysProps(" spark.submit.pyFiles" ) = args.pyFiles
562- }
563583 }
564584
565585 // assure a keytab is available from any place in a JVM
0 commit comments