Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object SparkSubmit {
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
if (args.isPython && !isYarnCluster && !isMesosCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
Expand All @@ -322,7 +322,7 @@ object SparkSubmit {
// Require all R files to be local
if (args.isR && !isYarnCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local R files are supported: $args.primaryResource")
printErrorAndExit(s"Only local R files are supported: ${args.primaryResource}")
}
}

Expand Down Expand Up @@ -633,7 +633,14 @@ object SparkSubmit {
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sysProps("spark.submit.pyFiles") = formattedPyFiles
}

Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,25 @@ class SparkSubmitSuite
val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
sysProps3("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))

// Test remote python files
val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
val writer4 = new PrintWriter(f4)
val remotePyFiles = "hdfs:///tmp/file1.py,hdfs:///tmp/file2.py"
writer4.println("spark.submit.pyFiles " + remotePyFiles)
writer4.close()
val clArgs4 = Seq(
"--master", "yarn",
"--deploy-mode", "cluster",
"--properties-file", f4.getPath,
"hdfs:///tmp/mister.py"
)
val appArgs4 = new SparkSubmitArguments(clArgs4)
val sysProps4 = SparkSubmit.prepareSubmitEnvironment(appArgs4)._3
// Should not format python path for yarn cluster mode
sysProps4("spark.submit.pyFiles") should be(
Utils.resolveURIs(remotePyFiles)
)
}

test("user classpath first in driver") {
Expand Down