-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak #621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer | |
| import akka.actor._ | ||
| import akka.remote._ | ||
|
|
||
| import org.apache.spark.{SecurityManager, SparkConf, Logging} | ||
| import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf} | ||
| import org.apache.spark.TaskState.TaskState | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.worker.WorkerWatcher | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ | ||
| import org.apache.spark.util.{AkkaUtils, Utils} | ||
|
|
@@ -94,25 +95,32 @@ private[spark] class CoarseGrainedExecutorBackend( | |
|
|
||
| private[spark] object CoarseGrainedExecutorBackend { | ||
| def run(driverUrl: String, executorId: String, hostname: String, cores: Int, | ||
| workerUrl: Option[String]) { | ||
| // Debug code | ||
| Utils.checkHost(hostname) | ||
|
|
||
| val conf = new SparkConf | ||
| // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor | ||
| // before getting started with all our system properties, etc | ||
| val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, | ||
| indestructible = true, conf = conf, new SecurityManager(conf)) | ||
| // set it | ||
| val sparkHostPort = hostname + ":" + boundPort | ||
| actorSystem.actorOf( | ||
| Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, | ||
| sparkHostPort, cores), | ||
| name = "Executor") | ||
| workerUrl.foreach{ url => | ||
| actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") | ||
| workerUrl: Option[String]) { | ||
|
|
||
| val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could potentially just put this resolution of SPARK_USER inside runAsUser (calling it maybe runAsSparkUser), to avoid duplication of this logic and the weird SPARK_UNKNOWN_USER value.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's probably better than having
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe runAsUser is not actually used anywhere else. We can probably just make it intended for only this exact use-case to not confuse users with its over-generality until it's necessary. |
||
| SparkHadoopUtil.get.runAsUser(sparkUser) { () => | ||
|
|
||
| // Debug code | ||
| Utils.checkHost(hostname) | ||
|
|
||
| val conf = new SparkConf | ||
| // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor | ||
| // before getting started with all our system properties, etc | ||
| val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, | ||
| indestructible = true, conf = conf, new SecurityManager(conf)) | ||
| // set it | ||
| val sparkHostPort = hostname + ":" + boundPort | ||
| actorSystem.actorOf( | ||
| Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, | ||
| sparkHostPort, cores), | ||
| name = "Executor") | ||
| workerUrl.foreach { | ||
| url => | ||
| actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") | ||
| } | ||
| actorSystem.awaitTermination() | ||
|
|
||
| } | ||
| actorSystem.awaitTermination() | ||
| } | ||
|
|
||
| def main(args: Array[String]) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,10 +23,10 @@ import com.google.protobuf.ByteString | |
| import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} | ||
| import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} | ||
|
|
||
| import org.apache.spark.Logging | ||
| import org.apache.spark.TaskState | ||
| import org.apache.spark.{SparkContext, Logging, TaskState} | ||
| import org.apache.spark.TaskState.TaskState | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
|
|
||
| private[spark] class MesosExecutorBackend | ||
| extends MesosExecutor | ||
|
|
@@ -95,9 +95,13 @@ private[spark] class MesosExecutorBackend | |
| */ | ||
| private[spark] object MesosExecutorBackend { | ||
| def main(args: Array[String]) { | ||
| MesosNativeLibrary.load() | ||
| // Create a new Executor and start it running | ||
| val runner = new MesosExecutorBackend() | ||
| new MesosExecutorDriver(runner).run() | ||
| val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably add a Utils function for this, something like |
||
| SparkHadoopUtil.get.runAsUser(sparkUser) { () => | ||
|
|
||
| MesosNativeLibrary.load() | ||
| // Create a new Executor and start it running | ||
| val runner = new MesosExecutorBackend() | ||
| new MesosExecutorDriver(runner).run() | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the usage of this is now distributed to many places throughout Spark, can we add a comment for people who have no clue why it's there? Just something like "Runs the given function with a Hadoop UserGroupInformation as a thread local variable (distributed to child threads), used for authenticating HDFS and YARN calls."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "repeated" should be "repeatedly"