diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9bdbfb33bf54..58d686681709 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -35,15 +35,28 @@ class SparkHadoopUtil { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () => Unit) { + /** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */ + def createSparkUser(): Option[UserGroupInformation] = { + val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { - val ugi = UserGroupInformation.createRemoteUser(user) - transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + Some(UserGroupInformation.createRemoteUser(user)) } else { - func() + None + } + } + + /** + * If a user is specified, we will run the function as that user. We additionally transfer + * Spark's tokens to the given UGI to ensure it has access to data written by Spark. + */ + def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) { + user match { + case Some(ugi) => { + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + })} + case None => func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 272bcda5f8f2..e2a83ba42513 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -128,7 +128,13 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + // NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem + // objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed, + // mainly because expired tokens cannot be removed from the UGI. + val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true) + + val cachedSparkUser = SparkHadoopUtil.get.createSparkUser() + def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser() def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) @@ -172,7 +178,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/docs/configuration.md b/docs/configuration.md index e7e1dd56cf12..af3c3e32fc98 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -679,6 +679,17 @@ Apart from these, the following properties are also available, and may be useful Set a special library path to use when launching executor JVM's. +