Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
exit 1
fi

# NOTE: If you add or remove spark-sumbmit options,
# NOTE: If you add or remove spark-submit options,
# modify NOT ONLY this script but also SparkSubmitArgument.scala
SUBMISSION_OPTS=()
APPLICATION_OPTS=()
while (($#)); do
case "$1" in
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
--conf | --properties-file | --driver-memory | --driver-java-options | \
--master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one maybe we could call it --packages. IMO maven is a little confusing because it's also the name of a piece of software. I'd also below just say --repositories below.

--conf | --maven_repos | --properties-file | --driver-memory | --driver-java-options | \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to --maven-repos with a dash instead of an underscore; everything else has a dash

--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
if [[ $# -lt 2 ]]; then
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
Expand Down
101 changes: 100 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ import java.io.{File, PrintStream}
import java.lang.reflect.{Modifier, InvocationTargetException}
import java.net.URL

import org.apache.ivy.Ivy
import org.apache.ivy.core.module.descriptor.{DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.ModuleRevisionId
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.resolver.IBiblioResolver

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
import org.apache.spark.util.{MavenCoordinate, Utils}

/**
* Main gateway of launching a Spark application.
Expand Down Expand Up @@ -56,6 +65,12 @@ object SparkSubmit {

private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Directories for caching downloads through ivy and storing the jars when maven coordinates are
// supplied to spark-submit
// TODO: Take these as arguments? For example, on AWS /mnt/ is a better location.
private val IVY_CACHE = new File("ivy/cache")
private val MAVEN_JARS = new File("ivy/jars")

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] var printStream: PrintStream = System.err
Expand Down Expand Up @@ -168,6 +183,16 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.maven_repos)
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
}

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
Expand Down Expand Up @@ -429,6 +454,80 @@ object SparkSubmit {
.mkString(",")
if (merged == "") null else merged
}

/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
private def resolveMavenCoordinates(coordinates: String, remoteRepos: String): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
} else {
val artifacts = coordinates.split(",").map { p =>
val splits = p.split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
// create an ivy instance
val ivySettings: IvySettings = new IvySettings
ivySettings.setDefaultCache(IVY_CACHE)

// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
br.setName("central")
ivySettings.addResolver(br)
// add any other remote repositories other than maven central
if (remoteRepos != null && !remoteRepos.trim.isEmpty) {
remoteRepos.split(",").foreach { repo =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
ivySettings.addResolver(brr)
}
}
ivySettings.setDefaultResolver(br.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val ro = new ResolveOptions
ro.setTransitive(true)
ro.setDownload(true)
// A Module descriptor must be specified. Entries are dummy strings
val md = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0"))

artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration("default", "default")
md.addDependency(dd)
}
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, ro)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
val m = rr.getModuleDescriptor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a lot of these single-letter variable names to be really hard to read. In a few cases, such as this one, I think they're not necessary: we can just inline this on the next line, for example.

ivy.retrieve(m.getModuleRevisionId,
MAVEN_JARS.getAbsolutePath + "/[artifact](-[classifier]).[ext]",
new RetrieveOptions().setConfs(Array("default")))

// output downloaded jars to classpath (will append to jars). The name of the jar is given
// after a `!` by Ivy.
rr.getArtifacts.toArray.map { case artifactInfo =>
val artifactString = artifactInfo.toString
MAVEN_JARS.getAbsolutePath + "/" + artifactString.drop(artifactString.lastIndexOf("!") + 1)
}.mkString(",")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var maven: String = null
var maven_repos: String = null
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
Expand Down Expand Up @@ -224,6 +226,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| name $name
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| maven $maven
| maven_repos $maven_repos
| verbose $verbose
|
|Spark properties used, including those specified through
Expand Down Expand Up @@ -330,6 +334,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
jars = Utils.resolveURIs(value)
parse(tail)

case ("--maven") :: value :: tail =>
maven = value
parse(tail)

case ("--maven_repos") :: value :: tail =>
maven_repos = value
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
Expand Down Expand Up @@ -380,6 +392,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths.
| --maven Comma-separated list of maven coordinates of jars to include
| on the driver and executor classpaths. Will search the local
| maven repo, then maven central and any additional remote
| repositories given by --maven_repos.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of taking one parameter with a list of all Maven packages, we might want to allow separate packages to be passed with separate --maven args. Dunno, @pwendell / @mengxr what do you think? It's just going to be annoying for people to write a giant comma separated string. Same with repos actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should say the format, i.e. groupId:artifactId:version

| --maven_repos Supply additional remote repositories to search for the
| maven coordinates given with --maven.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should say this is a comma-separated list

| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,14 @@ private[spark] object Utils extends Logging {
}
}

/**
* Represents a Maven Coordinate
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* A utility class to redirect the child process's stdout or stderr.
*/
Expand Down