Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.net.URI

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api._
Expand All @@ -44,9 +44,9 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
localResources: HashMap[String, LocalResource]) = {
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
var JAVA_OPTS = ""
val JAVA_OPTS = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
Expand All @@ -56,10 +56,21 @@ trait ExecutorRunnableUtil extends Logging {
JAVA_OPTS += opts
}

JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
JAVA_OPTS += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }

sparkConf.getAkkaConf.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence
Expand All @@ -85,25 +96,25 @@ trait ExecutorRunnableUtil extends Logging {
}
*/

val commands = List[String](
Environment.JAVA_HOME.$() + "/bin/java" +
" -server " +
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
" org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " +
slaveId + " " +
hostname + " " +
executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

commands
"-XX:OnOutOfMemoryError='kill %p'") ++
JAVA_OPTS ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
masterAddress.toString,
slaveId.toString,
hostname.toString,
executorCores.toString,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to exclude null commands altogether? Maybe something like commands.flatMap(Option(_)), or if that's too much magic, commands.filter(_ != null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is copy and paste from ClientBase to make the command building be the same, I'd prefer to take any changes to it to a separate jira as I assume there was a reason they didn't do it initially and put a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I can take the changes to make it a list out if you would like and just make that a separate jira, I just ran into the space issue so thought I would make it the same. Really we should perhaps make a utility function for it but we don't have any util classes across client/app master right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we should keep the list, since you don't have to worry about spaces here and there.

For the null thing, I'm trying to understand what it does. If I have a command Seq("java", "null", "-cp", "some.jar", "SomeClass"), doesn't this get compiled to "java null -cp some.jar SomeClass"? It seems that the consequences are undefined if we leave arbitrary "null" strings in there. Also I didn't realize it was copy and pasted from ClientBase, which makes this all the stranger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, its a bit odd, I just didn't have the time to investigate at this point and figured someone put the comment there perhaps thinking there might be an argument that was "null" was ok. One thing I thought of was if one of the args is null, like the hostname or masterAddress, if you just strip it out, then the usage is wrong and it might not be obvious why its failing. Hopefully it would complain somewhere on startup that a null was passed and it would be easier to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that --jar is required, but can be "null" (e.g. when using spark-shell or pyspark).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From YarnClientSchedulerBackend.scala::start():

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
  "--class", "notused",
  "--jar", null,
  "--args", hostport,
  "--am-class", classOf[ExecutorLauncher].getName
)

Maybe that explains the need?

}

private def setupDistributedCache(
Expand Down