Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,31 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<!--
Testing Hive reflection needs hive on the test classpath only.
It doesn't need the spark hive modules, so the -Phive flag is not checked.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
Copy link
Author

@mgummelt mgummelt Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a problem here that I could use some help on.

hive-exec has a dependency on pentaho that doesn't exist in maven central anymore. This is why we have an entry for it in spark-parent's dependencyManagement section that excludes pentaho via calcite-core.

But the scope in spark-parent's dependencyManagement entry for hive-exec is compile, so if I make it test here, which is what it was in YARN, then it fails to match that entry anymore, and thus doesn't inherit the excludes. This shouldn't be a problem, because dependencies in the test scope aren't transitive, so we should never be trying to fetch the missing pentaho dependency. But sbt builds don't seem to recognize this transitivity, so if I add the test scope, I get this error:

[warn] 	Note: Unresolved dependencies path:
[warn] 		org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde
[warn] 		  +- org.apache.calcite:calcite-core:1.2.0-incubating ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.spark-project.hive:hive-exec:1.2.1.spark2 ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-core_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-catalyst_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-sql_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-hive_2.11:2.2.0-SNAPSHOT

What I can't understand is how this wasn't a problem when these dependencies were in YARN. For some reason sbt dependencyTree on master shows that calcite-core is excluded from the hive-exec dependency tree, but the same command on this PR tries to resolve calcite-core.

Let me know if you have any ideas here. My best idea right now is to just try to duplicate the excludes in spark-parent.

<scope>test</scope>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
org.apache.spark.deploy.security.HadoopFSCredentialProvider
org.apache.spark.deploy.security.HBaseCredentialProvider
org.apache.spark.deploy.security.HiveCredentialProvider
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... you're adding a YARN dependency in core, which should be able to build without YARN... but you're not actually adding a new dependency to the POM, so I guess the dependency is already there indirectly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it looks like this should require hadoop-yarn-api. I'll add this dependency to core unless you object, in which case I suppose I could just use the raw string instead of YarnConfiguration.RM_PRINCIPAL, but that seems hacky.

FYI, we originally talked about placing the code below into the Mesos scheduler, but that seems to be too late. The SparkContext constructor creates a copy of the configuration, so we need to set the required YARN property before SparkContext is created, which means before user code gets run, which probably means somewhere in SparkSubmit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need the explicit dependency (otherwise this would not be compiling). This is probably being brought by hadoop-client already.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to explicitly declare a dependency rather than rely on transitivity, right?

import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
Expand All @@ -45,6 +46,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

Expand All @@ -63,7 +65,7 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit extends CommandLineUtils {
object SparkSubmit extends CommandLineUtils with Logging {

// Cluster managers
private val YARN = 1
Expand Down Expand Up @@ -564,12 +566,22 @@ object SparkSubmit extends CommandLineUtils {
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}


// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
// must trick it into thinking we're YARN.
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work in user impersonation scenario? Here shortUserName is a real user, while HadoopRDD may execute as a proxy user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it? It looks like getCurrentUser returns the subject from the current AccessControllerContext, which is set by the doAs when --proxy-user is set.

Regardless, the renewer specified here actually has no effect, since we aren't renewing yet. Once I add renewal, I'll need to revisit this to make sure it's consistent with the renewal we're going to do in MesosSecurity. I'm only setting this now to avoid an Exception that gets thrown in the hadoop library if the master principal is not configured. See the JIRA in the above code comment for details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The renewer for delegation tokens should be the user that created them (if the service is not doing it for the user, like in the YARN case); only the logged in user can create tokens, so this has to be the current user (not the proxy user, which happens later).

The tokens should be created in the proxy user's name (so user = "proxy" renewer = "real user"), when you care to support that.

Once I add renewal

So do you mean that currently this only works until the delegation tokens need renewal (which is 1 day by default - a lot shorter than the max life time)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you mean that currently this only works until the delegation tokens need renewal (which is 1 day by default - a lot shorter than the max life time)?

Yes


logDebug(s"Setting ${key} to ${shortUserName}.")
sysProps.put(key, shortUserName)
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.util.ServiceLoader

Expand All @@ -41,15 +41,17 @@ import org.apache.spark.util.Utils
* For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by
* the configuration spark.yarn.security.credentials.hive.enabled.
*/
private[yarn] final class ConfigurableCredentialManager(
private[spark] class ConfigurableCredentialManager(
sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also rename the configurations, currently it is "spark.yarn.security.*"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these are deprecated configs which need to keep the old names for compatibility.

private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now these configs probably should be updated to the new namespace, with code added for backwards compatibility...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Maintain all the registered credential providers
private val credentialProviders = {
val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
Utils.getContextOrSparkClassLoader).asScala
private val credentialProviders = getCredentialProviders()
logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.")

private def getCredentialProviders(): Map[String, ServiceCredentialProvider] = {
val providers = loadCredentialProviders

// Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
providers.filter { p =>
Expand All @@ -64,15 +66,22 @@ private[yarn] final class ConfigurableCredentialManager(
}.map { p => (p.serviceName, p) }.toMap
}

/**
protected def loadCredentialProviders: List[ServiceCredentialProvider] = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala.toList
}

/**
* Get credential provider for the specified service.
*/
def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
credentialProviders.get(service)
}

/**
* Obtain credentials from all the registered providers.
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
* otherwise the nearest renewal time of any credentials will be returned.
*/
Expand All @@ -87,21 +96,4 @@ private[yarn] final class ConfigurableCredentialManager(
}
}.foldLeft(Long.MaxValue)(math.min)
}

/**
* Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
* instance when it is not used. AM will use it to renew credentials periodically.
*/
def credentialRenewer(): AMCredentialRenewer = {
new AMCredentialRenewer(sparkConf, hadoopConf, this)
}

/**
* Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
* when it is not used. Executors and driver (client mode) will use it to update credentials.
* periodically.
*/
def credentialUpdater(): CredentialUpdater = {
new CredentialUpdater(sparkConf, hadoopConf, this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

private[deploy] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
// if None means no token renewer specified or no token can be renewed,
// so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null

override val serviceName: String = "hadoopfs"

override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// NameNode to access, used to get tokens from different FileSystems
val tmpCreds = new Credentials()
val tokenRenewer = getTokenRenewer(hadoopConf)
hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
logInfo("getting token for: " + dst)
dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
}

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = tmpCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t =>
val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(tmpCreds)
nextRenewalDate
}

protected def getTokenRenewalInterval(
hadoopConf: Configuration,
sparkConf: SparkConf): Option[Long] = None

protected def getTokenRenewer(hadoopConf: Configuration): String = {
UserGroupInformation.getCurrentUser.getShortUserName
}

protected def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
Set(FileSystem.get(hadoopConf).getHomeDirectory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this move not break all existing ServiceCredentialProvider's ? (since the service discovery via META-INF/services will get affected now).
@jerryshao and @vanzin can comment more since they wrote/reviewed this IIRC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in the "yarn" module is sort of considered "private", so anyone extending those classes is walking on thin ice... the internal services files need to be moved / updated, but I don't think we need to worry about external implementations.

This also means we need to be more careful here when making classes / traits public. Maybe add @InterfaceStability.Evolving annotations if they haven't been added, or something.

(Haven't looked at the rest of the code yet, kinda waiting for the PR builder to be happy first.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved all the ServiceCredentialProvider subclasses to core, and I've updated the references in yarn's META-INF.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Since this is documented in running-on-yarn.md, I dont think it can be considered internal implementation detail anymore; but it is part of our exposed contract.

@mgummelt I did see that all spark impl have been moved - but since this is an exposed contract, there can be (and are) external implementations which rely on it.
If we are breaking api compatibility, we should be very explicit about it : I wanted to make sure it is called out, and evaluated accordingly (and not an internal implementation detail within yarn/core).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... that's unfortunate, not just because of this change, but because yarn/ is not included in mima checks, so if the class changed in incompatible ways we wouldn't notice.

I really doubt anyone has custom implementations of that interface at this point. And making it a proper API in core is better going forward, IMO. If you really think that it's important to keep backwards compatibility, we could have the old interface present, extending the new one, and some compatibility code to keep both working. But that sounds like a lot of work for something I doubt anyone is currently using...

Copy link
Contributor

@mridulm mridulm Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Would be good to keep backward compatibility since there are users of the interface.
I did not realize yarn was excluded from MIMA - that is unfortunate.

Cursory look at ServiceLoader seemed to indicate it might not be very involved to provide backward compatibility (yarn interface extends from core, load for both interfaces; and use both as core impl will do ? or did I miss something here ?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backwards compat you'd have to call ServiceLoader twice, to load implementations of the two interfaces. That should be most of it, really, assuming the interface itself is not changing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll try to unbreak backwards compatibility.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I:

  • Readded deploy.yarn.security.ServiceCredentialProvider, extended from deploy.security.ServiceCredentialProvider, and deprecated it.
  • Added a new YARNConfigurableCredentialProvider, which extends from ConfigurableCredentialProvider, with the added behavior that it loads providers from deploy.yarn.security.ServiceCredentialProvider as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure this is correct though. As it is, if a user has a service listed in the resources entry for both deploy.security.ServiceCredentialProvider and deploy.yarn.security.ServiceCredentialProvider, then they will fetch tokens twice for each service. This should only occur if the user's workflow is to drop a duplicated, but modified version of the deploy.yarn.security.ServiceCredentialProvider resources file into META-INF. If they have a fork of Spark that they merge, or if they have an entry in some other jar for just their customized service, there should be no duplication problem.

Thoughts?


import org.apache.spark.SparkConf

/**
* A credential provider for a service. User must implement this if they need to access a
* secure service from Spark.
*/
trait ServiceCredentialProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No annotation means "Unstable" IIRC, but might as well be explicit here and add a proper InterfaceStability annotation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does InterfaceStability relate to scala language level access control (public, private, etc.)? Every public interface must maintain backwards compatibility during a major version, right? So does it make sense to have a public trait that's unstable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a Scala thing, it's a Spark API thing. When adding a public API that might change, we annotate it appropriately so that people can know that it's not really stable. Flip side, if you change an API that is tagged with a "Stable" annotation, you'll be yelled at.

It also shows up in docs:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.util.QueryExecutionListener

(Basically this is the new version of @DeveloperApi and @Experimental.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding a public API that might change

Ah, thanks. I didn't know this was allowed at all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Name of the service to provide credentials. This name should unique, Spark internally will
* use this name to differentiate credential provider.
*/
def serviceName: String

/**
* To decide whether credential is required for this service. By default it based on whether
* Hadoop security is enabled.
*/
def credentialsRequired(hadoopConf: Configuration): Boolean = {
UserGroupInformation.isSecurityEnabled
}

/**
* Obtain credentials for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param sparkConf Spark configuration.
* @param creds Credentials to add tokens and security keys to.
* @return If this Credential is renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean
import javax.xml.bind.DatatypeConverter

import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -174,6 +177,24 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend extends Logging {

private def addMesosDelegationTokens(driverConf: SparkConf) {
val value = driverConf.get("spark.mesos.kerberos.userCredentials")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You really should be using the config constant you create here.

But a bigger issue is that this value will be written to event logs and shown in the UI. Instead of using the config for this, would it be too hard to add this as a field in SparkAppConfig (see CoarseGrainedClusterMessage.scala)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'll go ahead and change this to an RPC.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val tokens = DatatypeConverter.parseBase64Binary(value)

logDebug(s"Found delegation tokens of ${tokens.length} bytes.")

// Use tokens for HDFS login.
val hadoopConf = SparkHadoopUtil.get.newConfiguration(driverConf)
hadoopConf.set("hadoop.security.authentication", "Token")
UserGroupInformation.setConfiguration(hadoopConf)

// Decode tokens and add them to the current user's credentials.
val creds = UserGroupInformation.getCurrentUser.getCredentials
val tokensBuf = new java.io.ByteArrayInputStream(tokens)
creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf))
UserGroupInformation.getCurrentUser.addCredentials(creds)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relying on an immutable initial credential populated in the driver conf.
How is renewal scenario expected to be handled here ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my pending renewal PR, I've implemented it to use a new UpdateDelegationToken RPC call. This is just used for the initial transmission. I guess it makes sense to consolidate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great, thanks - I am trying to get a sense of how renewal works in this case.
There some ongoing work for having different ways to update credentials; and I was hoping this (from what I understand, not using hdfs but direct rpc) would be another way to do it - allowing for multiple common implementations which can be leveraged across all schedulers depending on requirements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate; one potential use of it is to do token acquisition and token distribution without needing to provide principal/keytab to spark application (other than keeping track of last credential update to account for AM failures).
This is WIP though, but if mesos approach is different from yarn - that would be a great way to iterate on the latter aspect (token distribution); and ensure it is extensible enough for future requirements/implementations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe @jerryshao is working on a standard method of delegating tokens. We can plug that in once it exists.


private def run(
driverUrl: String,
executorId: String,
Expand Down Expand Up @@ -220,6 +241,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

if (driverConf.contains("spark.mesos.kerberos.userCredentials")) {
addMesosDelegationTokens(driverConf)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.deploy.security.TestCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
Expand All @@ -24,7 +24,6 @@ import org.apache.hadoop.security.token.Token
import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.config._

class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
private var credentialManager: ConfigurableCredentialManager = null
Expand Down
Loading