Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ while (($#)); do
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--packages" ]; then
export SPARK_SUBMIT_PACKAGES=$2
elif [ "$1" = "--repositories" ]; then
export SPARK_SUBMIT_REPOSITORIES=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
fi
Expand Down Expand Up @@ -65,7 +69,7 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI
grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
grep -v "^[[:space:]]*#"
)
if [ -n "$contains_special_configs" ]; then
if [[ -n "$contains_special_configs" || -n "$SPARK_SUBMIT_PACKAGES" ]]; then
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
fi
fi
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ if [%1] == [] goto continue
set SPARK_SUBMIT_CLASSPATH=%2
) else if [%1] == [--driver-java-options] (
set SPARK_SUBMIT_OPTS=%2
) else if [%1] == [--packages] (
set SPARK_SUBMIT_PACKAGES=%2
) else if [%1] == [--repositories] (
set SPARK_SUBMIT_REPOSITORIES=%2
) else if [%1] == [--master] (
set MASTER=%2
)
Expand All @@ -72,6 +76,9 @@ if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
)
if [%SPARK_SUBMIT_PACKAGES%] NEQ [] (
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
)
)
)

Expand Down
89 changes: 53 additions & 36 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,24 +251,19 @@ object SparkSubmit {
}

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
val packagesResolved =
if (args.packagesResolved != null) {
// SparkSubmitDriverBootstrapper already downloaded the jars for us
args.packagesResolved
} else {
args.jars += s",$resolvedMavenCoordinates"
SparkSubmitUtils.resolveMavenCoordinates(args.packages, Option(args.repositories),
Option(args.ivyRepoPath)).mkString(",")
}

if (packagesResolved.nonEmpty) {
args.jars = mergeFileLists(args.jars, packagesResolved)
if (args.isPython) {
if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
args.pyFiles = resolvedMavenCoordinates
} else {
args.pyFiles += s",$resolvedMavenCoordinates"
}
args.pyFiles = mergeFileLists(args.pyFiles, packagesResolved)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is an opportunity for abstracting the two cases here:

val packagesResolved =
  if (args.packagesResolved != null) {
    // SparkSubmitDriverBootstrapper already downloaded the jars for us
    args.packagesResolved
  } else {
    SparkSubmitUtils.resolveMavenCoordinates(...).mkString(",")
  }

if (packagesResolved.nonEmpty) {
  args.jars = mergeFileLists(args.jars, packagesResolved)
  if (args.isPython) {
    args.pyFiles = mergeFileLists(args.pyFiles, packagesResolved)
  }
}

}

Expand Down Expand Up @@ -655,8 +650,7 @@ private[spark] object SparkSubmitUtils {

/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
* simplicity for Spark Package users.
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
Expand Down Expand Up @@ -721,17 +715,17 @@ private[spark] object SparkSubmitUtils {
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
* @param artifacts Sequence of dependencies that were resolved and retrieved
* @param cacheDirectory directory where jars are cached
* @return a comma-delimited list of paths for the dependencies
* @return A sequence of paths for the dependencies
*/
private[spark] def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): String = {
cacheDirectory: File): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, update java doc

artifacts.map { artifactInfo =>
val artifactString = artifactInfo.toString
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
cacheDirectory.getAbsolutePath + File.separator +
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
}.mkString(",")
}
}

/** Adds the given maven coordinates to Ivy's module descriptor. */
Expand All @@ -748,6 +742,35 @@ private[spark] object SparkSubmitUtils {
}
}

/** Add exclusion rules for dependencies already included in the spark-assembly */
private[spark] def addExclusionRules(
ivySettings: IvySettings,
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
md.addExcludeRule(scalaDependencyExcludeRule)

// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")

components.foreach { comp =>
val sparkArtifacts =
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)

md.addExcludeRule(sparkDependencyExcludeRule)
}
}

/** A nice function to use in tests as well. Values are dummy strings. */
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
Expand All @@ -757,17 +780,20 @@ private[spark] object SparkSubmitUtils {
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return The comma-delimited path to the jars of the given maven artifacts including their
* @return A sequence of paths to the jars of the given maven artifacts including their
* transitive dependencies
*/
private[spark] def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
isTest: Boolean = false): String = {
isTest: Boolean = false): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to update the javadoc to reflect this change. Right now it still says comma-delimited path

if (coordinates == null || coordinates.trim.isEmpty) {
""
Seq.empty
} else {
val sysOut = System.out
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
Expand Down Expand Up @@ -811,19 +837,9 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add an exclusion rule for Spark and Scala Library
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)

// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(scalaDependencyExcludeRule)
// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
Expand All @@ -836,6 +852,7 @@ private[spark] object SparkSubmitUtils {
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))

System.setOut(sysOut)
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var packages: String = null
var packagesResolved: String = null
var repositories: String = null
var ivyRepoPath: String = null
var verbose: Boolean = false
Expand Down Expand Up @@ -395,6 +396,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
packages = value
parse(tail)

// Spark-6031 Internal flag to receive the resolved maven jars and add to --jars.
// This is only passed through the Bootstrapper
case ("--packages-resolved") :: value :: tail =>
packagesResolved = Utils.resolveURIs(value)
parse(tail)

case ("--repositories") :: value :: tail =>
repositories = value
parse(tail)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
System.exit(1)
}

val submitArgs = args
var submitArgs = args
val runner = sys.env("RUNNER")
val classpath = sys.env("CLASSPATH")
val javaOpts = sys.env("JAVA_OPTS")
Expand All @@ -58,6 +58,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
val submitPackages = sys.env.getOrElse("SPARK_SUBMIT_PACKAGES", "")
val submitRepositories = sys.env.get("SPARK_SUBMIT_REPOSITORIES")

assume(runner != null, "RUNNER must be set")
assume(classpath != null, "CLASSPATH must be set")
Expand All @@ -73,6 +75,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")
val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
val confIvyRepo = properties.get("spark.jars.ivy")

// Favor Spark submit arguments over the equivalent configs in the properties file.
// Note that we do not actually use the Spark submit values for library path, classpath,
Expand All @@ -82,13 +85,25 @@ private[spark] object SparkSubmitDriverBootstrapper {
.orElse(confDriverMemory)
.getOrElse(defaultDriverMemory)

val newClasspath =
var newClasspath =
if (submitClasspath.isDefined) {
classpath
} else {
classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
}

// Resolve maven dependencies if there are any and add them to classpath. Also send them
// to SparkSubmit so that they can be shipped to executors.
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
submitPackages, submitRepositories, confIvyRepo)
if (resolvedMavenCoordinates.nonEmpty) {
newClasspath += sys.props("path.separator") +
resolvedMavenCoordinates.mkString(sys.props("path.separator"))
submitArgs =
Array("--packages-resolved", resolvedMavenCoordinates.mkString(",")) ++ submitArgs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we thread this through using an environment variable _PACKAGES_RESOLVED? Having this as an extra flag forces you to make args here mutable, which is sort of strange.

}

val newJavaOpts =
if (submitJavaOpts.isDefined) {
// SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,22 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
val ivyPath = "dummy/ivy"
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath))
var jPaths =
SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)).mkString(",")
for (i <- 0 until 3) {
val index = jPaths.indexOf(ivyPath)
assert(index >= 0)
jPaths = jPaths.substring(index + ivyPath.length)
}
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
"com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true)
"com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true).mkString(",")
assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path")
}

test("search for artifact at other repositories") {
val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3",
Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true)
Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true).mkString(",")
assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" +
"if package still exists. If it has been removed, replace the example in this test.")
}
Expand All @@ -117,8 +118,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
}

test("neglects Spark and Spark's dependencies") {
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")

val coordinates =
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"
val path = SparkSubmitUtils.resolveMavenCoordinates(
"org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
coordinates, None, None, true).mkString(",")
assert(path === "", "should return empty path")
// Should not exclude the following dependency. Will throw an error, because it doesn't exist,
// but the fact that it is checking means that it wasn't excluded.
intercept[RuntimeException] {
SparkSubmitUtils.resolveMavenCoordinates(coordinates +
",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
}
}
}