From bbdce05de3151091a55b350e8de2e8e29d6143da Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 25 Apr 2014 11:07:28 -0700 Subject: [PATCH 1/8] [SPARK-1395] Fix "local:" URI support in Yarn mode (again). Recent changes ignored the fact that path may be defined with "local:" URIs, which means they need to be explicitly added to the classpath everywhere a remote process is started. This change fixes that by: - Using the correct methods to add paths to the classpath - Creating SparkConf settings for the Spark jar itself and for the user's jar - Propagating those two settings to the remote processes where needed This ensures that both in client and in cluster mode, the driver has the necessary info to build the executor's classpath and have things still work when they contain "local:" references. On the cleanup front, I removed the hacky way that log4j configuration was being propagated to handle the "local:" case. It's much more cleanly (and generically) handled by using spark-submit arguments (--files to upload a config file, or setting spark.executor.extraJavaOptions to pass JVM arguments and use a local file). --- .../apache/spark/deploy/yarn/ClientBase.scala | 165 ++++++++++-------- .../deploy/yarn/ExecutorRunnableUtil.scala | 8 +- .../spark/deploy/yarn/ClientBaseSuite.scala | 84 ++++++++- .../spark/deploy/yarn/ExecutorRunnable.scala | 1 + 4 files changed, 181 insertions(+), 77 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 8f2267599914..c741062c4d7e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -154,7 +154,7 @@ trait ClientBase extends Logging { } /** Copy the file into HDFS if needed. */ - private def copyRemoteFile( + private[yarn] def copyRemoteFile( dstDir: Path, originalPath: Path, replication: Short, @@ -213,10 +213,10 @@ trait ClientBase extends Logging { val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - Map( - ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar, - ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) - ).foreach { case(destName, _localPath) => + List( + (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR), + (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR) + ).foreach { case(destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val localURI = new URI(localPath) @@ -225,6 +225,8 @@ trait ClientBase extends Logging { val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, destName, statCache) + } else { + sparkConf.set(confKey, localPath) } } } @@ -246,6 +248,8 @@ trait ClientBase extends Logging { if (addToClasspath) { cachedSecondaryJarLinks += linkname } + } else if (addToClasspath) { + cachedSecondaryJarLinks += file.trim() } } } @@ -265,14 +269,10 @@ trait ClientBase extends Logging { val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) - ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp) + ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - if (log4jConf != null) { - env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf - } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) @@ -286,6 +286,7 @@ trait ClientBase extends Logging { env("SPARK_YARN_USER_ENV") = userEnvs } + logInfo(s"ApplicationMaster environment: $env") env } @@ -364,7 +365,6 @@ trait ClientBase extends Logging { sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p") } - javaOpts += ClientBase.getLog4jConfiguration(localResources) // Command for the ApplicationMaster val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ @@ -391,12 +391,31 @@ trait ClientBase extends Logging { object ClientBase extends Logging { val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" - val LOG4J_PROP: String = "log4j.properties" - val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" val LOCAL_SCHEME = "local" + val CONF_SPARK_JAR = "spark.yarn.jar" + val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" + val ENV_SPARK_JAR = "SPARK_JAR" - def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) + /** + * Find the user-defined Spark jar if configured, or return the jar containing this + * class if not. + * + * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the + * user environment if that is not found (for backwards compatibility). + */ + def sparkJar(conf: SparkConf) = { + if (conf.contains(CONF_SPARK_JAR)) { + conf.get(CONF_SPARK_JAR) + } else if (System.getenv(ENV_SPARK_JAR) != null) { + logWarning( + s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " + + s"in favor of the $CONF_SPARK_JAR configuration variable.") + System.getenv(ENV_SPARK_JAR) + } else { + SparkContext.jarOfClass(this.getClass).head + } + } def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) @@ -469,71 +488,75 @@ object ClientBase extends Logging { triedDefault.toOption } + def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, + env: HashMap[String, String], extraClassPath: Option[String] = None) { + extraClassPath.foreach(addClasspathEntry(_, env)) + addClasspathEntry(Environment.PWD.$(), env) + + // Normally the users app.jar is last in case conflicts with spark jars + if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { + addUserClasspath(args, sparkConf, env) + addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) + ClientBase.populateHadoopClasspath(conf, env) + } else { + addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) + ClientBase.populateHadoopClasspath(conf, env) + addUserClasspath(args, sparkConf, env) + } + + // Append all class files and jar files under the working directory to the classpath. + addFileToClasspath("*", null, env) + } /** - * Returns the java command line argument for setting up log4j. If there is a log4j.properties - * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable - * is checked. + * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly + * to the classpath. */ - def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = { - var log4jConf = LOG4J_PROP - if (!localResources.contains(log4jConf)) { - log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match { - case conf: String => - val confUri = new URI(conf) - if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) { - "file://" + confUri.getPath() - } else { - ClientBase.LOG4J_PROP - } - case null => "log4j-spark-container.properties" + private def addUserClasspath(args: ClientArguments, conf: SparkConf, + env: HashMap[String, String]) = { + if (args != null) { + addFileToClasspath(args.userJar, APP_JAR, env) + if (args.addJars != null) { + args.addJars.split(",").foreach { case file: String => + addFileToClasspath(file, null, env) + } } + } else { + val userJar = conf.getOption(CONF_SPARK_USER_JAR).getOrElse(null) + addFileToClasspath(userJar, APP_JAR, env) + + val cachedSecondaryJarLinks = + conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env)) } - " -Dlog4j.configuration=" + log4jConf } - def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String, - env: HashMap[String, String], extraClassPath: Option[String] = None) { - - if (log4jConf != null) { - // If a custom log4j config file is provided as a local: URI, add its parent directory to the - // classpath. Note that this only works if the custom config's file name is - // "log4j.properties". - val localPath = getLocalPath(log4jConf) - if (localPath != null) { - val parentPath = new File(localPath).getParent() - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath, - File.pathSeparator) + /** + * Adds the given path to the classpath, handling "local:" URIs correctly. + * + * If an alternate name for the file is given, and it's not a "local:" file, the alternate + * name will be added to the classpath (relative to the job's work directory). + * + * If not a "local:" file and no alternate name, the environment is not modified. + * + * @param path Path to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. + */ + private def addFileToClasspath(path: String, fileName: String, + env: HashMap[String, String]) : Unit = { + if (path != null) { + scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { + val localPath = getLocalPath(path) + if (localPath != null) { + addClasspathEntry(localPath, env) + return + } } } - - /** Add entry to the classpath. */ - def addClasspathEntry(path: String) = YarnSparkHadoopUtil.addToEnvironment(env, - Environment.CLASSPATH.name, path, File.pathSeparator) - /** Add entry to the classpath. Interpreted as a path relative to the working directory. */ - def addPwdClasspathEntry(entry: String) = - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry) - - extraClassPath.foreach(addClasspathEntry) - - val cachedSecondaryJarLinks = - sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") - .filter(_.nonEmpty) - // Normally the users app.jar is last in case conflicts with spark jars - if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { - addPwdClasspathEntry(APP_JAR) - cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) - addPwdClasspathEntry(SPARK_JAR) - ClientBase.populateHadoopClasspath(conf, env) - } else { - addPwdClasspathEntry(SPARK_JAR) - ClientBase.populateHadoopClasspath(conf, env) - addPwdClasspathEntry(APP_JAR) - cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) + if (fileName != null) { + addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env); } - // Append all class files and jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$()) - addPwdClasspathEntry("*") } /** @@ -547,4 +570,8 @@ object ClientBase extends Logging { null } + private def addClasspathEntry(path: String, env: HashMap[String, String]) = + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path, + File.pathSeparator) + } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 43dbb2464f92..b456c19c29af 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -58,7 +58,6 @@ trait ExecutorRunnableUtil extends Logging { javaOpts += "-Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - javaOpts += ClientBase.getLog4jConfiguration(localResources) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -166,13 +165,8 @@ trait ExecutorRunnableUtil extends Logging { def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.executor.extraClassPath") - val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) - ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp) - if (log4jConf != null) { - env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf - } + ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp) // Allow users to specify some environment variables YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"), diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 608c6e92624c..7b29985388fc 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -17,13 +17,20 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.URI +import com.google.common.io.Files import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants.Environment - +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Matchers._ +import org.mockito.Mockito._ import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers._ @@ -31,6 +38,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap => MutableHashMap } import scala.util.Try +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils class ClientBaseSuite extends FunSuite { @@ -68,6 +77,65 @@ class ClientBaseSuite extends FunSuite { } } + private val SPARK = "local:/sparkJar" + private val USER = "local:/userJar" + private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" + + test("Local jar URIs") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val env = new MutableHashMap[String, String]() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + ClientBase.populateClasspath(args, conf, sparkConf, env, None) + + val jars = env("CLASSPATH").split(File.pathSeparator) + s"$SPARK,$USER,$ADDED".split(",").foreach({ jar => + val uri = new URI(jar) + if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { + jars should contain (uri.getPath()) + } else { + jars should not contain (uri.getPath()) + } + }) + jars should not contain (ClientBase.SPARK_JAR) + jars should not contain (ClientBase.APP_JAR) + } + + test("Jar path propagation through SparkConf") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val yarnConf = new YarnConfiguration() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]), + any(classOf[Path]), any(classOf[Short]), any(classOf[Boolean])) + + var tempDir = Files.createTempDir(); + try { + client.prepareLocalResources(tempDir.getAbsolutePath()) + sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) + + // The non-local path should be propagated by name only, since it will end up in the app's + // staging dir. + val expected = ADDED.split(",") + .map(p => { + val uri = new URI(p) + if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { + p + } else { + Option(uri.getFragment()).getOrElse(new File(p).getName()) + } + }) + .mkString(",") + + sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) + } finally { + Utils.deleteRecursively(tempDir) + } + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = @@ -109,4 +177,18 @@ class ClientBaseSuite extends FunSuite { def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) + private class DummyClient( + val args: ClientArguments, + val conf: Configuration, + val sparkConf: SparkConf, + val yarnConf: YarnConfiguration) extends ClientBase { + + override def calculateAMMemory(newApp: GetNewApplicationResponse): Int = + throw new UnsupportedOperationException() + + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = + throw new UnsupportedOperationException() + + } + } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 117b33f466f8..07ba0a4b30bd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -81,6 +81,7 @@ class ExecutorRunnable( val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, localResources) + logInfo(s"Setting up executor with environment: $env") logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) From 1dfbb40e9b7669bde7a3723bbb0abc3c9ac41f40 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 13:52:15 -0700 Subject: [PATCH 2/8] Add documentation for spark.yarn.jar. --- docs/running-on-yarn.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index fecd8f2cc2d4..e773c023e7b3 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -95,10 +95,19 @@ Most of the configs are the same for Spark on YARN as for other deployment modes The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + + spark.yarn.jar + (none) + + The location of the Spark jar file, in case overriding the default location is desired. + By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be + in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't + need to be distributed each time an application runs. To point to a jar on HDFS, for example, + set this configuration to "hdfs:///some/path". + + -By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`. - # Launching Spark on YARN Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster. From e5c682d3265942d44f3f08b72c5e087dbb4e44a0 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 6 Jun 2014 11:32:26 -0700 Subject: [PATCH 3/8] Fix cluster mode, restore SPARK_LOG4J_CONF. Also add documentation about logging to the Yarn guide. In cluster mode, the change modifies some code added in fb98488f to treat both client and cluster modes as mostly the same. Previously, cluster mode was only forwarding system properties that started with "spark", which caused it to ignore anything that SparkSubmit sets directly in the SparkConf object. --- docs/running-on-yarn.md | 15 +++++++- .../apache/spark/deploy/yarn/ClientBase.scala | 35 ++++++++++--------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e773c023e7b3..43b06df99d9c 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -165,7 +165,20 @@ all environment variables used for launching each container. This process is use classpath problems in particular. (Note that enabling this requires admin privileges on cluster settings and a restart of all node managers. Thus, this is not applicable to hosted clusters). -# Important Notes +To use a custom log4j configuration for the application master or executors, there are two options: + +- upload a custom log4j.properties using spark-submit, by adding it to the "--files" list of files + to be uploaded with the application. +- add "-Dlog4j.configuration=" to "spark.driver.extraJavaOptions" + (for the driver) or "spark.executor.extraJavaOptions" (for executors). Note that if using a file, + the "file:" protocol should be explicitly provided, and the file needs to exist locally on all + the nodes. + +Note that for the first option, both executors and the application master will share the same +log4j configuration, which may cause issues when they run on the same node (e.g. trying to write +to the same log file). + +# Important notes - Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured. - The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index c741062c4d7e..37ff292f02df 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -213,9 +213,18 @@ trait ClientBase extends Logging { val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) + if (oldLog4jConf.isDefined) { + logWarning( + "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + + "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + + "for alternatives.") + } + List( (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR), - (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR) + (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR), + ("log4j.properties", oldLog4jConf.getOrElse(null), null) ).foreach { case(destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { @@ -225,7 +234,7 @@ trait ClientBase extends Logging { val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, destName, statCache) - } else { + } else if (confKey != null) { sparkConf.set(confKey, localPath) } } @@ -348,20 +357,13 @@ trait ClientBase extends Logging { sparkConf.set("spark.driver.extraJavaOptions", opts) } + // Forward the Spark configuration to the application master / executors. // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. - if (args.amClass == classOf[ExecutorLauncher].getName) { - // If we are being launched in client mode, forward the spark-conf options - // onto the executor launcher - for ((k, v) <- sparkConf.getAll) { - javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" - } - } else { - // If we are being launched in standalone mode, capture and forward any spark - // system properties (e.g. set by spark-class). - for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) { - javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" - } + for ((k, v) <- sparkConf.getAll) { + javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" + } + if (args.amClass == classOf[ApplicationMaster].getName) { sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p") } @@ -522,11 +524,10 @@ object ClientBase extends Logging { } } } else { - val userJar = conf.getOption(CONF_SPARK_USER_JAR).getOrElse(null) + val userJar = conf.get(CONF_SPARK_USER_JAR, null) addFileToClasspath(userJar, APP_JAR, env) - val cachedSecondaryJarLinks = - conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",") cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env)) } } From 93c3f85c2e41353d4e8ef9dc893554ef636e235a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 13 Jun 2014 13:06:55 -0700 Subject: [PATCH 4/8] Fix ClassCastException in test. --- .../scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 7b29985388fc..b13509a70da4 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -110,7 +110,7 @@ class ClientBaseSuite extends FunSuite { val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]), - any(classOf[Path]), any(classOf[Short]), any(classOf[Boolean])) + any(classOf[Path]), anyShort(), anyBoolean()) var tempDir = Files.createTempDir(); try { From b2e377f0df12ee9a04142a5622ab835eca419c74 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jun 2014 09:33:43 -0700 Subject: [PATCH 5/8] Review feedback. --- .../apache/spark/deploy/yarn/ClientBase.scala | 12 +++++++++-- .../spark/deploy/yarn/ClientBaseSuite.scala | 20 ++++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 37ff292f02df..e14e10666b0e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -395,7 +395,15 @@ object ClientBase extends Logging { val APP_JAR: String = "__app__.jar" val LOCAL_SCHEME = "local" val CONF_SPARK_JAR = "spark.yarn.jar" + /** + * This is an internal config used to propagate the location of the user's jar file to the + * driver/executors. + */ val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" + /** + * This is an internal config used to propagate the list of extra jars to add to the classpath + * of executors. + */ val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" val ENV_SPARK_JAR = "SPARK_JAR" @@ -506,8 +514,8 @@ object ClientBase extends Logging { addUserClasspath(args, sparkConf, env) } - // Append all class files and jar files under the working directory to the classpath. - addFileToClasspath("*", null, env) + // Append all jar files under the working directory to the classpath. + addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env); } /** diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index b13509a70da4..113cd41ad832 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers._ +import org.scalatest.Matchers import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap => MutableHashMap } @@ -41,7 +41,7 @@ import scala.util.Try import org.apache.spark.SparkConf import org.apache.spark.util.Utils -class ClientBaseSuite extends FunSuite { +class ClientBaseSuite extends FunSuite with Matchers { test("default Yarn application classpath") { ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) @@ -89,17 +89,19 @@ class ClientBaseSuite extends FunSuite { ClientBase.populateClasspath(args, conf, sparkConf, env, None) - val jars = env("CLASSPATH").split(File.pathSeparator) - s"$SPARK,$USER,$ADDED".split(",").foreach({ jar => - val uri = new URI(jar) + val cp = env("CLASSPATH").split(File.pathSeparator) + s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => + val uri = new URI(entry) if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { - jars should contain (uri.getPath()) + cp should contain (uri.getPath()) } else { - jars should not contain (uri.getPath()) + cp should not contain (uri.getPath()) } }) - jars should not contain (ClientBase.SPARK_JAR) - jars should not contain (ClientBase.APP_JAR) + cp should contain (s"$$PWD") + cp should contain (s"$$PWD${File.separator}*") + cp should not contain (ClientBase.SPARK_JAR) + cp should not contain (ClientBase.APP_JAR) } test("Jar path propagation through SparkConf") { From 6dd5943e5a7ba1e78542d920118892b9344d7a6c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jun 2014 10:35:15 -0700 Subject: [PATCH 6/8] Fix propagation of config options to driver / executor. ClientBase disagreed with itself about how to propagate config options. Some places used the SparkConf object, others relied on system properties. This lead to certain properties not being propagated, mainly in cluster mode. So standardize on using SparkConf for that. To maintain compatibility with SPARK_JAVA_OPTS, remove the hack in ClientBase and just call SparkConf.validateSettings(), which does the right thing and correctly warns the user to stop using the env variable in the future. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 1 + .../org/apache/spark/deploy/yarn/ClientBase.scala | 11 +++-------- .../scala/org/apache/spark/deploy/yarn/Client.scala | 1 + 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 82f79d88a300..64c7b732ccc7 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -178,6 +178,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf + sparkConf.validateSettings() try { val args = new ClientArguments(argStrings, sparkConf) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index e14e10666b0e..5ce2b7083f50 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -351,12 +351,6 @@ trait ClientBase extends Logging { javaOpts += "-XX:CMSIncrementalDutyCycle=10" } - // SPARK_JAVA_OPTS is deprecated, but for backwards compatibility: - sys.env.get("SPARK_JAVA_OPTS").foreach { opts => - sparkConf.set("spark.executor.extraJavaOptions", opts) - sparkConf.set("spark.driver.extraJavaOptions", opts) - } - // Forward the Spark configuration to the application master / executors. // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. @@ -364,8 +358,9 @@ trait ClientBase extends Logging { javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } if (args.amClass == classOf[ApplicationMaster].getName) { - sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) - sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p") + sparkConf.getOption("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) + sparkConf.getOption("spark.driver.libraryPath") + .foreach(p => javaOpts += s"-Djava.library.path=$p") } // Command for the ApplicationMaster diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15f3c4f180ea..a3cf003f063b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -185,6 +185,7 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() + sparkConf.validateSettings() try { val args = new ClientArguments(argStrings, sparkConf) From 6a454ead7f1fab5eefa41075348e5255a6d5a404 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jun 2014 14:56:05 -0700 Subject: [PATCH 7/8] Use constants for PWD in test. --- .../scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 113cd41ad832..686714dc3648 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -98,8 +98,8 @@ class ClientBaseSuite extends FunSuite with Matchers { cp should not contain (uri.getPath()) } }) - cp should contain (s"$$PWD") - cp should contain (s"$$PWD${File.separator}*") + cp should contain (Environment.PWD.$()) + cp should contain (s"${Environment.PWD.$()}${File.separator}*") cp should not contain (ClientBase.SPARK_JAR) cp should not contain (ClientBase.APP_JAR) } From 4e7f066414c6ee5a54f3258457ae8994ea7c394c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 19 Jun 2014 14:27:07 -0700 Subject: [PATCH 8/8] Correctly propagate SPARK_JAVA_OPTS to driver/executor. Users expected it to be possible to set spark.* config options using SPARK_JAVA_OPTS, but that's not possible when trying to propagate the env variable using spark.*.extraJavaOptions. So instead, in Yarn mode, propagate the env variable itself. Also make sure that, in cluster mode, the warning about SPARK_JAVA_OPTS being deprecated is printed to the logs. --- .../org/apache/spark/deploy/yarn/Client.scala | 1 - .../apache/spark/deploy/yarn/ClientBase.scala | 43 +++++++++++++++++-- .../deploy/yarn/ExecutorRunnableUtil.scala | 3 ++ .../org/apache/spark/deploy/yarn/Client.scala | 1 - 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 64c7b732ccc7..82f79d88a300 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -178,7 +178,6 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - sparkConf.validateSettings() try { val args = new ClientArguments(argStrings, sparkConf) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5ce2b7083f50..556f49342977 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -294,8 +294,6 @@ trait ClientBase extends Logging { // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs } - - logInfo(s"ApplicationMaster environment: $env") env } @@ -320,6 +318,37 @@ trait ClientBase extends Logging { logInfo("Setting up container launch context") val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) + + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to + // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's + // SparkContext will not let that set spark* system properties, which is expected behavior for + // Yarn clients. So propagate it through the environment. + // + // Note that to warn the user about the deprecation in cluster mode, some code from + // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition + // described above). + if (args.amClass == classOf[ApplicationMaster].getName) { + sys.env.get("SPARK_JAVA_OPTS").foreach { value => + val warning = + s""" + |SPARK_JAVA_OPTS was detected (set to '$value'). + |This is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application + | - ./spark-submit with --driver-java-options to set -X options for a driver + | - spark.executor.extraJavaOptions to set -X options for executors + """.stripMargin + logWarning(warning) + for (proc <- Seq("driver", "executor")) { + val key = s"spark.$proc.extraJavaOptions" + if (sparkConf.contains(key)) { + throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") + } + } + env("SPARK_JAVA_OPTS") = value + } + } amContainer.setEnvironment(env) val amMemory = calculateAMMemory(newApp) @@ -357,8 +386,11 @@ trait ClientBase extends Logging { for ((k, v) <- sparkConf.getAll) { javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } + if (args.amClass == classOf[ApplicationMaster].getName) { - sparkConf.getOption("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) + sparkConf.getOption("spark.driver.extraJavaOptions") + .orElse(sys.env.get("SPARK_JAVA_OPTS")) + .foreach(opts => javaOpts += opts) sparkConf.getOption("spark.driver.libraryPath") .foreach(p => javaOpts += s"-Djava.library.path=$p") } @@ -374,7 +406,10 @@ trait ClientBase extends Logging { "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - logInfo("Command for starting the Spark ApplicationMaster: " + commands) + logInfo("Yarn AM launch context:") + logInfo(s" class: ${args.amClass}") + logInfo(s" env: $env") + logInfo(s" command: ${commands.mkString(" ")}") // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index b456c19c29af..4ba7133a959e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -55,6 +55,9 @@ trait ExecutorRunnableUtil extends Logging { sys.props.get("spark.executor.extraJavaOptions").foreach { opts => javaOpts += opts } + sys.env.get("SPARK_JAVA_OPTS").foreach { opts => + javaOpts += opts + } javaOpts += "-Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a3cf003f063b..15f3c4f180ea 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -185,7 +185,6 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() - sparkConf.validateSettings() try { val args = new ClientArguments(argStrings, sparkConf)