@@ -324,55 +324,20 @@ object SparkSubmit {
324324 // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
325325 args.mainClass = " org.apache.spark.deploy.PythonRunner"
326326 args.childArgs = ArrayBuffer (args.primaryResource, args.pyFiles) ++ args.childArgs
327- args.files = mergeFileLists(args.files, args.primaryResource)
327+ if (clusterManager != YARN ) {
328+ // The YARN backend distributes the primary file differently, so don't merge it.
329+ args.files = mergeFileLists(args.files, args.primaryResource)
330+ }
331+ }
332+ if (clusterManager != YARN ) {
333+ // The YARN backend handles python files differently, so don't merge the lists.
334+ args.files = mergeFileLists(args.files, args.pyFiles)
328335 }
329- args.files = mergeFileLists(args.files, args.pyFiles)
330336 if (args.pyFiles != null ) {
331337 sysProps(" spark.submit.pyFiles" ) = args.pyFiles
332338 }
333339 }
334340
335- // In yarn mode for a python app, add pyspark archives to files
336- // that can be distributed with the job
337- if (args.isPython && clusterManager == YARN ) {
338- var pyArchives : String = null
339- val pyArchivesEnvOpt = sys.env.get(" PYSPARK_ARCHIVES_PATH" )
340- if (pyArchivesEnvOpt.isDefined) {
341- pyArchives = pyArchivesEnvOpt.get
342- } else {
343- if (! sys.env.contains(" SPARK_HOME" )) {
344- printErrorAndExit(" SPARK_HOME does not exist for python application in yarn mode." )
345- }
346- val pythonPath = new ArrayBuffer [String ]
347- for (sparkHome <- sys.env.get(" SPARK_HOME" )) {
348- val pyLibPath = Seq (sparkHome, " python" , " lib" ).mkString(File .separator)
349- val pyArchivesFile = new File (pyLibPath, " pyspark.zip" )
350- if (! pyArchivesFile.exists()) {
351- printErrorAndExit(" pyspark.zip does not exist for python application in yarn mode." )
352- }
353- val py4jFile = new File (pyLibPath, " py4j-0.8.2.1-src.zip" )
354- if (! py4jFile.exists()) {
355- printErrorAndExit(" py4j-0.8.2.1-src.zip does not exist for python application " +
356- " in yarn mode." )
357- }
358- pythonPath += pyArchivesFile.getAbsolutePath()
359- pythonPath += py4jFile.getAbsolutePath()
360- }
361- pyArchives = pythonPath.mkString(" ," )
362- }
363-
364- pyArchives = pyArchives.split(" ," ).map { localPath =>
365- val localURI = Utils .resolveURI(localPath)
366- if (localURI.getScheme != " local" ) {
367- args.files = mergeFileLists(args.files, localURI.toString)
368- new Path (localPath).getName
369- } else {
370- localURI.getPath
371- }
372- }.mkString(File .pathSeparator)
373- sysProps(" spark.submit.pyArchives" ) = pyArchives
374- }
375-
376341 // If we're running a R app, set the main class to our specific R runner
377342 if (args.isR && deployMode == CLIENT ) {
378343 if (args.primaryResource == SPARKR_SHELL ) {
@@ -386,19 +351,10 @@ object SparkSubmit {
386351 }
387352 }
388353
389- if (isYarnCluster) {
390- // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
391- // that can be distributed with the job
392- if (args.isPython) {
393- args.files = mergeFileLists(args.files, args.primaryResource)
394- args.files = mergeFileLists(args.files, args.pyFiles)
395- }
396-
354+ if (isYarnCluster && args.isR) {
397355 // In yarn-cluster mode for a R app, add primary resource to files
398356 // that can be distributed with the job
399- if (args.isR) {
400- args.files = mergeFileLists(args.files, args.primaryResource)
401- }
357+ args.files = mergeFileLists(args.files, args.primaryResource)
402358 }
403359
404360 // Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
515471 }
516472 }
517473
474+ // Let YARN know it's a pyspark app, so it distributes needed libraries.
475+ if (clusterManager == YARN && args.isPython) {
476+ sysProps.put(" spark.yarn.isPython" , " true" )
477+ }
478+
518479 // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
519480 if (isYarnCluster) {
520481 childMainClass = " org.apache.spark.deploy.yarn.Client"
521482 if (args.isPython) {
522- val mainPyFile = new Path (args.primaryResource).getName
523- childArgs += (" --primary-py-file" , mainPyFile)
483+ childArgs += (" --primary-py-file" , args.primaryResource)
524484 if (args.pyFiles != null ) {
525- // These files will be distributed to each machine's working directory, so strip the
526- // path prefix
527- val pyFilesNames = args.pyFiles.split(" ," ).map(p => (new Path (p)).getName).mkString(" ," )
528- childArgs += (" --py-files" , pyFilesNames)
485+ childArgs += (" --py-files" , args.pyFiles)
529486 }
530487 childArgs += (" --class" , " org.apache.spark.deploy.PythonRunner" )
531488 } else if (args.isR) {
0 commit comments