-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16742] Mesos Kerberos Support #18519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
ff55b89
64ab5d5
b1a90a9
794d26e
860351d
7e12dea
973dce2
a9d8998
5c59daa
5848a78
8662057
f903e6f
e6a7357
4ba8bab
641bdad
797fe4b
cdd3030
63ca4db
4a86186
1f3ad35
857cf31
1d7ddbd
4c77d54
685e976
c3050d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy | |
| import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} | ||
| import scala.concurrent.Future | ||
|
|
||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.rpc._ | ||
| import org.apache.spark.scheduler._ | ||
|
|
@@ -41,9 +45,11 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} | |
| * (spark.deploy.*). | ||
| */ | ||
| private[spark] | ||
| class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) | ||
| extends ExecutorAllocationClient with SchedulerBackend with Logging | ||
| { | ||
| class CoarseGrainedSchedulerBackend( | ||
|
||
| scheduler: TaskSchedulerImpl, | ||
|
||
| val rpcEnv: RpcEnv) | ||
| extends ExecutorAllocationClient with SchedulerBackend with Logging { | ||
|
|
||
| // Use an atomic variable to track total number of cores in the cluster for simplicity and speed | ||
| protected val totalCoreCount = new AtomicInteger(0) | ||
| // Total number of executors that are currently registered | ||
|
|
@@ -95,6 +101,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| // The num of current max ExecutorId used to re-register appMaster | ||
| @volatile protected var currentExecutorIdCounter = 0 | ||
|
|
||
| // hadoop token manager used by some sub-classes (e.g. Mesos) | ||
| protected val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None | ||
|
|
||
| // Hadoop delegation tokens to be sent to the executors. | ||
| protected val hadoopDelegationCreds: Option[Array[Byte]] = None | ||
|
|
||
| class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) | ||
| extends ThreadSafeRpcEndpoint with Logging { | ||
|
|
||
|
|
@@ -223,8 +235,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| context.reply(true) | ||
|
|
||
| case RetrieveSparkAppConfig => | ||
| val reply = SparkAppConfig(sparkProperties, | ||
| SparkEnv.get.securityManager.getIOEncryptionKey()) | ||
| val reply = SparkAppConfig( | ||
| sparkProperties, | ||
| SparkEnv.get.securityManager.getIOEncryptionKey(), | ||
| hadoopDelegationCreds) | ||
| context.reply(reply) | ||
| } | ||
|
|
||
|
|
@@ -675,6 +689,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| driverEndpoint.send(KillExecutorsOnHost(host)) | ||
| true | ||
| } | ||
|
|
||
| protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { | ||
| if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { | ||
| hadoopDelegationTokenManager.map { manager => | ||
| val creds = UserGroupInformation.getCurrentUser.getCredentials | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||
| manager.obtainDelegationTokens(hadoopConf, creds) | ||
| SparkHadoopUtil.get.serialize(creds) | ||
| } | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[spark] object CoarseGrainedSchedulerBackend { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,6 +74,17 @@ | |
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really needed? I don't see you adding specific tests for this, so wonder why you need the explicit dependency when other modules that depend on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, ok... the credential manager code should be safe when Hive classes aren't present, but if there's a problem in that area it's not your fault. |
||
| <groupId>${hive.group}</groupId> | ||
| <artifactId>hive-exec</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>${hive.group}</groupId> | ||
| <artifactId>hive-metastore</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> | ||
| <dependency> | ||
| <groupId>com.google.guava</groupId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,15 +21,15 @@ import java.io.File | |
| import java.util.{Collections, List => JList} | ||
| import java.util.concurrent.locks.ReentrantLock | ||
|
|
||
| import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} | ||
| import org.apache.mesos.SchedulerDriver | ||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable | ||
| import scala.concurrent.Future | ||
|
|
||
| import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} | ||
| import org.apache.mesos.SchedulerDriver | ||
|
|
||
| import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} | ||
| import org.apache.spark.deploy.mesos.config._ | ||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||
| import org.apache.spark.internal.config | ||
| import org.apache.spark.network.netty.SparkTransportConf | ||
| import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient | ||
|
|
@@ -53,9 +53,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| sc: SparkContext, | ||
| master: String, | ||
| securityManager: SecurityManager) | ||
| extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) | ||
| with org.apache.mesos.Scheduler | ||
| with MesosSchedulerUtils { | ||
| extends CoarseGrainedSchedulerBackend( | ||
| scheduler, | ||
| sc.env.rpcEnv) | ||
| with org.apache.mesos.Scheduler | ||
| with MesosSchedulerUtils { | ||
|
|
||
| override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = | ||
|
||
| Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) | ||
|
|
||
| override val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() | ||
|
||
|
|
||
| // Blacklist a slave after this many failures | ||
| private val MAX_SLAVE_FAILURES = 2 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want this to be printed out every time someone runs spark-submit? Sounds a bit noisy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only prints when
UserGroupInformation.isSecurityEnabledand I think it's useful information whenever a job is run.