-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23257][K8S] Kerberos Support for Spark on K8S #21669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
73f2777
6069be5
000120f
13b3adc
0939738
347536e
c30ad8c
1697e74
4a000d2
719b059
fb9e810
e7935f8
aa3779c
32c408c
3cf644e
583a52c
6ae3def
78953e6
73f157f
367e65b
5f52a1a
6548ef9
7f72af5
4ce00a5
89063fd
e303048
69840a8
2108154
a987a70
e2f8063
f3a0ffb
a958920
dd95fca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -336,7 +336,7 @@ private[spark] class SparkSubmit extends Logging { | |
| val targetDir = Utils.createTempDir() | ||
|
|
||
| // assure a keytab is available from any place in a JVM | ||
| if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) { | ||
| if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) { | ||
| if (args.principal != null) { | ||
| if (args.keytab != null) { | ||
| require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") | ||
|
|
@@ -644,7 +644,8 @@ private[spark] class SparkSubmit extends Logging { | |
| } | ||
| } | ||
|
|
||
| if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { | ||
| if ((clusterManager == MESOS || clusterManager == KUBERNETES) | ||
| && UserGroupInformation.isSecurityEnabled) { | ||
|
||
| setRMPrincipal(sparkConf) | ||
| } | ||
|
|
||
|
|
@@ -755,7 +756,7 @@ private[spark] class SparkSubmit extends Logging { | |
|
|
||
| // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with | ||
| // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we | ||
| // must trick it into thinking we're YARN. | ||
| // must trick it into thinking we're YARN. Same is on for Kubernetes. | ||
|
||
| private def setRMPrincipal(sparkConf: SparkConf): Unit = { | ||
| val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName | ||
| val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -211,6 +211,51 @@ private[spark] object Config extends Logging { | |
| "Ensure that major Python version is either Python2 or Python3") | ||
| .createWithDefault("2") | ||
|
|
||
| val KUBERNETES_KERBEROS_SUPPORT = | ||
| ConfigBuilder("spark.kubernetes.kerberos.enabled") | ||
| .doc("Specify whether your job is a job that will require a Delegation Token to access HDFS") | ||
|
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val KUBERNETES_KERBEROS_KEYTAB = | ||
| ConfigBuilder("spark.kubernetes.kerberos.keytab") | ||
| .doc("Specify the location of keytab " + | ||
|
||
| "for Kerberos in order to access Secure HDFS") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_KERBEROS_PRINCIPAL = | ||
| ConfigBuilder("spark.kubernetes.kerberos.principal") | ||
| .doc("Specify the principal " + | ||
| "for Kerberos in order to access Secure HDFS") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL = | ||
| ConfigBuilder("spark.kubernetes.kerberos.renewer.principal") | ||
| .doc("Specify the principal " + | ||
| "you wish to renew and retrieve your Kerberos values with") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_KERBEROS_DT_SECRET_NAME = | ||
| ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name") | ||
| .doc("Specify the name of the secret where " + | ||
| "your existing delegation token is stored. This removes the need " + | ||
| "for the job user to provide any keytab for launching a job") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY = | ||
| ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey") | ||
| .doc("Specify the item key of the data where " + | ||
| "your existing delegation token is stored. This removes the need " + | ||
| "for the job user to provide any keytab for launching a job") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
ifilonenko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.submission" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,11 +65,13 @@ private[spark] object Constants { | |
| val ENV_CLASSPATH = "SPARK_CLASSPATH" | ||
| val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" | ||
| val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR" | ||
| val ENV_SPARK_USER = "SPARK_USER" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is for setting the correct user. But I think hadoop libs should pick the correct user like in SparkContext where |
||
| // Spark app configs for containers | ||
| val SPARK_CONF_VOLUME = "spark-conf-volume" | ||
| val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf" | ||
| val SPARK_CONF_FILE_NAME = "spark.properties" | ||
| val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME" | ||
| val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" | ||
|
|
||
| // BINDINGS | ||
| val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" | ||
|
|
@@ -81,4 +83,35 @@ private[spark] object Constants { | |
| val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" | ||
| val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" | ||
| val MEMORY_OVERHEAD_MIN_MIB = 384L | ||
|
|
||
| // Hadoop Configuration | ||
| val HADOOP_FILE_VOLUME = "hadoop-properties" | ||
| val HADOOP_CONF_DIR_PATH = "/etc/hadoop/conf" | ||
| val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" | ||
| val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir" | ||
| val HADOOP_CONFIG_MAP_SPARK_CONF_NAME = | ||
|
||
| "spark.kubernetes.hadoop.executor.hadoopConfigMapName" | ||
|
|
||
| // Kerberos Configuration | ||
| val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = | ||
| "spark.kubernetes.kerberos.delegation-token-secret-name" | ||
| val KERBEROS_KEYTAB_SECRET_NAME = | ||
|
||
| "spark.kubernetes.kerberos.key-tab-secret-name" | ||
| val KERBEROS_KEYTAB_SECRET_KEY = | ||
| "spark.kubernetes.kerberos.key-tab-secret-key" | ||
| val KERBEROS_SPARK_USER_NAME = | ||
| "spark.kubernetes.kerberos.spark-user-name" | ||
| val KERBEROS_SECRET_LABEL_PREFIX = | ||
| "hadoop-tokens" | ||
| val SPARK_HADOOP_PREFIX = "spark.hadoop." | ||
| val HADOOP_SECURITY_AUTHENTICATION = | ||
| SPARK_HADOOP_PREFIX + "hadoop.security.authentication" | ||
|
|
||
| // Kerberos Token-Refresh Server | ||
| val KERBEROS_REFRESH_LABEL_KEY = "refresh-hadoop-tokens" | ||
|
||
| val KERBEROS_REFRESH_LABEL_VALUE = "yes" | ||
|
|
||
| // Hadoop credentials secrets for the Spark app. | ||
| val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" | ||
| val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferen | |
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.features.hadoopsteps.HadoopStepsOrchestrator | ||
| import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager | ||
| import org.apache.spark.deploy.k8s.submit._ | ||
| import org.apache.spark.internal.config.ConfigEntry | ||
|
|
||
|
|
@@ -59,7 +61,20 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( | |
| roleSecretNamesToMountPaths: Map[String, String], | ||
| roleSecretEnvNamesToKeyRefs: Map[String, String], | ||
| roleEnvs: Map[String, String], | ||
| sparkFiles: Seq[String]) { | ||
| sparkFiles: Seq[String], | ||
| hadoopConfDir: Option[String]) { | ||
|
|
||
| def getHadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" | ||
|
||
|
|
||
| def getHadoopStepsOrchestrator : Option[HadoopStepsOrchestrator] = hadoopConfDir.map { | ||
| hConf => new HadoopStepsOrchestrator( | ||
|
||
| sparkConf, | ||
| appResourceNamePrefix, | ||
| hConf, | ||
| getHadoopConfigMapName)} | ||
|
|
||
| def getTokenManager : KubernetesHadoopDelegationTokenManager = | ||
|
||
| new KubernetesHadoopDelegationTokenManager | ||
|
|
||
| def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) | ||
|
|
||
|
|
@@ -111,7 +126,8 @@ private[spark] object KubernetesConf { | |
| mainAppResource: Option[MainAppResource], | ||
| mainClass: String, | ||
| appArgs: Array[String], | ||
| maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { | ||
| maybePyFiles: Option[String], | ||
| hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { | ||
| val sparkConfWithMainAppJar = sparkConf.clone() | ||
| val additionalFiles = mutable.ArrayBuffer.empty[String] | ||
| mainAppResource.foreach { | ||
|
|
@@ -171,7 +187,8 @@ private[spark] object KubernetesConf { | |
| driverSecretNamesToMountPaths, | ||
| driverSecretEnvNamesToKeyRefs, | ||
| driverEnvs, | ||
| sparkFiles) | ||
| sparkFiles, | ||
| hadoopConfDir) | ||
| } | ||
|
|
||
| def createExecutorConf( | ||
|
|
@@ -214,6 +231,7 @@ private[spark] object KubernetesConf { | |
| executorMountSecrets, | ||
| executorEnvSecrets, | ||
| executorEnv, | ||
| Seq.empty[String]) | ||
| Seq.empty[String], | ||
| None) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s.features | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this is not under the |
||
|
|
||
| import io.fabric8.kubernetes.api.model.HasMetadata | ||
|
|
||
| import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.features.hadoopsteps.HadoopBootstrapUtil | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * This step is responsible for bootstraping the container with ConfigMaps | ||
| * containing Hadoop config files mounted as volumes and an ENV variable | ||
| * pointed to the mounted file directory. This is run by both the driver | ||
| * and executor, as they both require Hadoop config files. | ||
| */ | ||
| private[spark] class HadoopConfExecutorFeatureStep( | ||
| kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) | ||
| extends KubernetesFeatureConfigStep with Logging{ | ||
|
|
||
| override def configurePod(pod: SparkPod): SparkPod = { | ||
| val maybeHadoopConfDir = kubernetesConf.sparkConf.getOption(HADOOP_CONF_DIR_LOC) | ||
| val maybeHadoopConfigMap = kubernetesConf.sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME) | ||
| require(maybeHadoopConfDir.isDefined && maybeHadoopConfigMap.isDefined, | ||
| "Ensure that HADOOP_CONF_DIR is defined") | ||
| logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files") | ||
| HadoopBootstrapUtil.bootstrapHadoopConfDir( | ||
| maybeHadoopConfDir.get, | ||
| maybeHadoopConfigMap.get, | ||
| kubernetesConf.getTokenManager, | ||
| pod) | ||
| } | ||
|
|
||
| override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to address it here but it feels like these methods should have default implementations, given that lots of classes just don't do anything with them. |
||
|
|
||
| override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s.features | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder} | ||
|
|
||
| import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf | ||
| import org.apache.spark.deploy.k8s.features.hadoopsteps.{HadoopBootstrapUtil, HadoopConfigSpec, HadoopConfigurationStep} | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * This is the main method that runs the hadoopConfigurationSteps defined | ||
|
||
| * by the HadoopStepsOrchestrator. These steps are run to modify the | ||
| * SparkPod and Kubernetes Resources using the additive method of the feature steps | ||
| */ | ||
| private[spark] class HadoopGlobalFeatureDriverStep( | ||
|
||
| kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) | ||
|
||
| extends KubernetesFeatureConfigStep with Logging { | ||
| private val hadoopTestOrchestrator = | ||
| kubernetesConf.getHadoopStepsOrchestrator | ||
|
||
| require(kubernetesConf.hadoopConfDir.isDefined && | ||
| hadoopTestOrchestrator.isDefined, "Ensure that HADOOP_CONF_DIR is defined") | ||
|
||
| private val hadoopSteps = | ||
| hadoopTestOrchestrator | ||
| .map(hto => hto.getHadoopSteps(kubernetesConf.getTokenManager)) | ||
| .getOrElse(Seq.empty[HadoopConfigurationStep]) | ||
|
|
||
| var currentHadoopSpec = HadoopConfigSpec( | ||
| podVolumes = Seq.empty, | ||
| containerEnvs = Seq.empty, | ||
| containerVMs = Seq.empty, | ||
| configMapProperties = Map.empty[String, String], | ||
| dtSecret = None, | ||
| dtSecretName = KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME, | ||
| dtSecretItemKey = None, | ||
| jobUserName = None) | ||
|
|
||
| for (nextStep <- hadoopSteps) { | ||
| currentHadoopSpec = nextStep.configureHadoopSpec(currentHadoopSpec) | ||
| } | ||
|
|
||
| override def configurePod(pod: SparkPod): SparkPod = { | ||
| val hadoopBasedPod = new PodBuilder(pod.pod) | ||
| .editSpec() | ||
| .addAllToVolumes(currentHadoopSpec.podVolumes.asJava) | ||
| .endSpec() | ||
| .build() | ||
|
|
||
| val hadoopBasedContainer = new ContainerBuilder(pod.container) | ||
| .addAllToEnv(currentHadoopSpec.containerEnvs.asJava) | ||
| .addAllToVolumeMounts(currentHadoopSpec.containerVMs.asJava) | ||
| .build() | ||
|
|
||
| val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( | ||
| kubernetesConf.hadoopConfDir.get, | ||
| kubernetesConf.getHadoopConfigMapName, | ||
| kubernetesConf.getTokenManager, | ||
| SparkPod(hadoopBasedPod, hadoopBasedContainer)) | ||
|
|
||
| val maybeKerberosModification = | ||
| for { | ||
| secretItemKey <- currentHadoopSpec.dtSecretItemKey | ||
| userName <- currentHadoopSpec.jobUserName | ||
| } yield { | ||
| HadoopBootstrapUtil.bootstrapKerberosPod( | ||
| currentHadoopSpec.dtSecretName, | ||
| secretItemKey, | ||
| userName, | ||
| hadoopBasedSparkPod) | ||
| } | ||
| maybeKerberosModification.getOrElse( | ||
| HadoopBootstrapUtil.bootstrapSparkUserPod( | ||
| kubernetesConf.getTokenManager.getCurrentUser.getShortUserName, | ||
| hadoopBasedSparkPod)) | ||
| } | ||
|
|
||
| override def getAdditionalPodSystemProperties(): Map[String, String] = { | ||
| val maybeKerberosConfValues = | ||
| for { | ||
| secretItemKey <- currentHadoopSpec.dtSecretItemKey | ||
| userName <- currentHadoopSpec.jobUserName | ||
| } yield { | ||
| Map(KERBEROS_KEYTAB_SECRET_NAME -> currentHadoopSpec.dtSecretName, | ||
| KERBEROS_KEYTAB_SECRET_KEY -> secretItemKey, | ||
| KERBEROS_SPARK_USER_NAME -> userName) | ||
| } | ||
|
||
| val resolvedConfValues = maybeKerberosConfValues.getOrElse( | ||
| Map(KERBEROS_SPARK_USER_NAME -> | ||
| kubernetesConf.getTokenManager.getCurrentUser.getShortUserName) | ||
| ) | ||
| Map(HADOOP_CONFIG_MAP_SPARK_CONF_NAME -> kubernetesConf.getHadoopConfigMapName, | ||
| HADOOP_CONF_DIR_LOC -> kubernetesConf.hadoopConfDir.get) ++ resolvedConfValues | ||
| } | ||
|
|
||
| override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { | ||
| val configMap = | ||
| new ConfigMapBuilder() | ||
| .withNewMetadata() | ||
| .withName(kubernetesConf.getHadoopConfigMapName) | ||
| .endMetadata() | ||
| .addToData(currentHadoopSpec.configMapProperties.asJava) | ||
| .build() | ||
| Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check has been restrictive for customers in the past. There are cases where spark submit should not have the file locally and keytab should be mounted as a secret within the cluster for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check can be removed, but I included it since I believed that the keytab shouldn't be stored as a secret for security reasons and should instead be only accessible from the JVM.