Skip to content

Commit bfdc361

Browse files
ArtRandMarcelo Vanzin
authored andcommitted
[SPARK-16742] Mesos Kerberos Support
## What changes were proposed in this pull request? Add Kerberos Support to Mesos. This includes kinit and --keytab support, but does not include delegation token renewal. ## How was this patch tested? Manually against a Secure DC/OS Apache HDFS cluster. Author: ArtRand <arand@soe.ucsc.edu> Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #18519 from mgummelt/SPARK-16742-kerberos.
1 parent 6aad02d commit bfdc361

9 files changed

Lines changed: 117 additions & 32 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{File, IOException}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
2121
import java.security.PrivilegedExceptionAction
2222
import java.text.DateFormat
2323
import java.util.{Arrays, Comparator, Date, Locale}
@@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging {
147147

148148
def isYarnMode(): Boolean = { false }
149149

150-
def getCurrentUserCredentials(): Credentials = { null }
151-
152-
def addCurrentUserCredentials(creds: Credentials) {}
153-
154150
def addSecretKeyToUserCredentials(key: String, secret: String) {}
155151

156152
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
157153

154+
def getCurrentUserCredentials(): Credentials = {
155+
UserGroupInformation.getCurrentUser().getCredentials()
156+
}
157+
158+
def addCurrentUserCredentials(creds: Credentials): Unit = {
159+
UserGroupInformation.getCurrentUser.addCredentials(creds)
160+
}
161+
158162
def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
159163
if (!new File(keytabFilename).exists()) {
160164
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
@@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging {
425429
s"${if (status.isDirectory) "d" else "-"}$perm")
426430
false
427431
}
432+
433+
def serialize(creds: Credentials): Array[Byte] = {
434+
val byteStream = new ByteArrayOutputStream
435+
val dataStream = new DataOutputStream(byteStream)
436+
creds.writeTokenStorageToStream(dataStream)
437+
byteStream.toByteArray
438+
}
439+
440+
def deserialize(tokenBytes: Array[Byte]): Credentials = {
441+
val tokensBuf = new ByteArrayInputStream(tokenBytes)
442+
443+
val creds = new Credentials()
444+
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
445+
creds
446+
}
428447
}
429448

430449
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils
3434
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
3535
import org.apache.hadoop.fs.{FileSystem, Path}
3636
import org.apache.hadoop.security.UserGroupInformation
37+
import org.apache.hadoop.yarn.conf.YarnConfiguration
3738
import org.apache.ivy.Ivy
3839
import org.apache.ivy.core.LogOptions
3940
import org.apache.ivy.core.module.descriptor._
@@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
4950
import org.apache.spark._
5051
import org.apache.spark.api.r.RUtils
5152
import org.apache.spark.deploy.rest._
53+
import org.apache.spark.internal.Logging
5254
import org.apache.spark.launcher.SparkLauncher
5355
import org.apache.spark.util._
5456

@@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils {
556558
}
557559

558560
// assure a keytab is available from any place in a JVM
559-
if (clusterManager == YARN || clusterManager == LOCAL) {
561+
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
560562
if (args.principal != null) {
561-
require(args.keytab != null, "Keytab must be specified when principal is specified")
562-
SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
563-
// Add keytab and principal configurations in sysProps to make them available
564-
// for later use; e.g. in spark sql, the isolated class loader used to talk
565-
// to HiveMetastore will use these settings. They will be set as Java system
566-
// properties and then loaded by SparkConf
567-
sysProps.put("spark.yarn.keytab", args.keytab)
568-
sysProps.put("spark.yarn.principal", args.principal)
563+
if (args.keytab != null) {
564+
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
565+
// Add keytab and principal configurations in sysProps to make them available
566+
// for later use; e.g. in spark sql, the isolated class loader used to talk
567+
// to HiveMetastore will use these settings. They will be set as Java system
568+
// properties and then loaded by SparkConf
569+
sysProps.put("spark.yarn.keytab", args.keytab)
570+
sysProps.put("spark.yarn.principal", args.principal)
571+
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
572+
}
569573
}
570574
}
571575

576+
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
577+
setRMPrincipal(sysProps)
578+
}
579+
572580
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
573581
if (isYarnCluster) {
574582
childMainClass = "org.apache.spark.deploy.yarn.Client"
@@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils {
653661
(childArgs, childClasspath, sysProps, childMainClass)
654662
}
655663

664+
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
665+
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
666+
// must trick it into thinking we're YARN.
667+
private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
668+
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
669+
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
670+
// scalastyle:off println
671+
printStream.println(s"Setting ${key} to ${shortUserName}")
672+
// scalastyle:off println
673+
sysProps.put(key, shortUserName)
674+
}
675+
656676
/**
657677
* Run the main method of the child class using the provided launch environment.
658678
*

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager(
5555
logDebug(s"Using the following delegation token providers: " +
5656
s"${delegationTokenProviders.keys.mkString(", ")}.")
5757

58+
/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
59+
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
60+
this(
61+
sparkConf,
62+
hadoopConf,
63+
hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
64+
}
65+
5866
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
5967
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
6068
new HiveDelegationTokenProvider,

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.collection.mutable
2626
import scala.util.{Failure, Success}
2727
import scala.util.control.NonFatal
2828

29+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
30+
2931
import org.apache.spark._
3032
import org.apache.spark.TaskState.TaskState
3133
import org.apache.spark.deploy.SparkHadoopUtil
@@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
219221
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
220222
}
221223

224+
cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
225+
val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
226+
SparkHadoopUtil.get.addCurrentUserCredentials(creds)
227+
}
228+
222229
val env = SparkEnv.createExecutorEnv(
223230
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
224231

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages {
3232

3333
case class SparkAppConfig(
3434
sparkProperties: Seq[(String, String)],
35-
ioEncryptionKey: Option[Array[Byte]])
35+
ioEncryptionKey: Option[Array[Byte]],
36+
hadoopDelegationCreds: Option[Array[Byte]])
3637
extends CoarseGrainedClusterMessage
3738

3839
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy
2424
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2525
import scala.concurrent.Future
2626

27+
import org.apache.hadoop.security.UserGroupInformation
28+
2729
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
30+
import org.apache.spark.deploy.SparkHadoopUtil
31+
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
2832
import org.apache.spark.internal.Logging
2933
import org.apache.spark.rpc._
3034
import org.apache.spark.scheduler._
@@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
4246
*/
4347
private[spark]
4448
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
45-
extends ExecutorAllocationClient with SchedulerBackend with Logging
46-
{
49+
extends ExecutorAllocationClient with SchedulerBackend with Logging {
50+
4751
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4852
protected val totalCoreCount = new AtomicInteger(0)
4953
// Total number of executors that are currently registered
@@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9599
// The num of current max ExecutorId used to re-register appMaster
96100
@volatile protected var currentExecutorIdCounter = 0
97101

102+
// hadoop token manager used by some sub-classes (e.g. Mesos)
103+
def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
104+
105+
// Hadoop delegation tokens to be sent to the executors.
106+
val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
107+
98108
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
99109
extends ThreadSafeRpcEndpoint with Logging {
100110

@@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
223233
context.reply(true)
224234

225235
case RetrieveSparkAppConfig =>
226-
val reply = SparkAppConfig(sparkProperties,
227-
SparkEnv.get.securityManager.getIOEncryptionKey())
236+
val reply = SparkAppConfig(
237+
sparkProperties,
238+
SparkEnv.get.securityManager.getIOEncryptionKey(),
239+
hadoopDelegationCreds)
228240
context.reply(reply)
229241
}
230242

@@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
675687
driverEndpoint.send(KillExecutorsOnHost(host))
676688
true
677689
}
690+
691+
protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
692+
if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
693+
hadoopDelegationTokenManager.map { manager =>
694+
val creds = UserGroupInformation.getCurrentUser.getCredentials
695+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
696+
manager.obtainDelegationTokens(hadoopConf, creds)
697+
SparkHadoopUtil.get.serialize(creds)
698+
}
699+
} else {
700+
None
701+
}
702+
}
678703
}
679704

680705
private[spark] object CoarseGrainedSchedulerBackend {

resource-managers/mesos/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@
7474
<scope>test</scope>
7575
</dependency>
7676

77+
<dependency>
78+
<groupId>${hive.group}</groupId>
79+
<artifactId>hive-exec</artifactId>
80+
<scope>provided</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>${hive.group}</groupId>
84+
<artifactId>hive-metastore</artifactId>
85+
<scope>provided</scope>
86+
</dependency>
87+
7788
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
7889
<dependency>
7990
<groupId>com.google.guava</groupId>

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ import java.util.{Collections, List => JList}
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323
import java.util.concurrent.locks.ReentrantLock
2424

25+
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
26+
import org.apache.mesos.SchedulerDriver
2527
import scala.collection.JavaConverters._
2628
import scala.collection.mutable
2729
import scala.concurrent.Future
2830

29-
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
30-
import org.apache.mesos.SchedulerDriver
31-
3231
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
3332
import org.apache.spark.deploy.mesos.config._
33+
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
3434
import org.apache.spark.internal.config
3535
import org.apache.spark.network.netty.SparkTransportConf
3636
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
5555
master: String,
5656
securityManager: SecurityManager)
5757
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
58-
with org.apache.mesos.Scheduler
59-
with MesosSchedulerUtils {
58+
with org.apache.mesos.Scheduler with MesosSchedulerUtils {
59+
60+
override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
61+
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
6062

6163
// Blacklist a slave after this many failures
6264
private val MAX_SLAVE_FAILURES = 2

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
7474
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
7575
}
7676

77-
override def getCurrentUserCredentials(): Credentials = {
78-
UserGroupInformation.getCurrentUser().getCredentials()
79-
}
80-
81-
override def addCurrentUserCredentials(creds: Credentials) {
82-
UserGroupInformation.getCurrentUser().addCredentials(creds)
83-
}
84-
8577
override def addSecretKeyToUserCredentials(key: String, secret: String) {
8678
val creds = new Credentials()
8779
creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))

0 commit comments

Comments
 (0)