diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 3cfafeb951105..96e4b53b24181 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -348,15 +348,23 @@ private[spark] class SecurityManager( */ def initializeAuth(): Unit = { import SparkMasterRegex._ + val k8sRegex = "k8s.*".r if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { return } + // TODO: this really should be abstracted somewhere else. val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "") - master match { + val storeInUgi = master match { case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) => - // Secret generation allowed here + true + + case k8sRegex() => + // Don't propagate the secret through the user's credentials in kubernetes. That conflicts + // with the way k8s handles propagation of delegation tokens. + false + case _ => require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") @@ -364,9 +372,12 @@ private[spark] class SecurityManager( } secretKey = Utils.createSecret(sparkConf) - val creds = new Credentials() - creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8)) - UserGroupInformation.getCurrentUser().addCredentials(creds) + + if (storeInUgi) { + val creds = new Credentials() + creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8)) + UserGroupInformation.getCurrentUser().addCredentials(creds) + } } // Default SecurityManager only has a single secret key, so ignore appId. diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index e357299770a2e..eec8004fc94f4 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -395,15 +395,23 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(keyFromEnv === new SecurityManager(conf2).getSecretKey()) } - test("secret key generation") { - Seq( - ("yarn", true), - ("local", true), - ("local[*]", true), - ("local[1, 2]", true), - ("local-cluster[2, 1, 1024]", false), - ("invalid", false) - ).foreach { case (master, shouldGenerateSecret) => + // How is the secret expected to be generated and stored. + object SecretTestType extends Enumeration { + val MANUAL, AUTO, UGI = Value + } + + import SecretTestType._ + + Seq( + ("yarn", UGI), + ("local", UGI), + ("local[*]", UGI), + ("local[1, 2]", UGI), + ("k8s://127.0.0.1", AUTO), + ("local-cluster[2, 1, 1024]", MANUAL), + ("invalid", MANUAL) + ).foreach { case (master, secretType) => + test(s"secret key generation: master '$master'") { val conf = new SparkConf() .set(NETWORK_AUTH_ENABLED, true) .set(SparkLauncher.SPARK_MASTER, master) @@ -412,19 +420,26 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { UserGroupInformation.createUserForTesting("authTest", Array()).doAs( new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { - if (shouldGenerateSecret) { - mgr.initializeAuth() - val creds = UserGroupInformation.getCurrentUser().getCredentials() - val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) - assert(secret != null) - assert(new String(secret, UTF_8) === mgr.getSecretKey()) - } else { - intercept[IllegalArgumentException] { + secretType match { + case UGI => + mgr.initializeAuth() + val creds = UserGroupInformation.getCurrentUser().getCredentials() + val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) + assert(secret != null) + assert(new String(secret, UTF_8) === mgr.getSecretKey()) + + case AUTO => mgr.initializeAuth() - } - intercept[IllegalArgumentException] { - mgr.getSecretKey() - } + val creds = UserGroupInformation.getCurrentUser().getCredentials() + assert(creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) === null) + + case MANUAL => + intercept[IllegalArgumentException] { + mgr.initializeAuth() + } + intercept[IllegalArgumentException] { + mgr.getSecretKey() + } } } } diff --git a/docs/security.md b/docs/security.md index be4834660fb7a..2a4f3c074c1e5 100644 --- a/docs/security.md +++ b/docs/security.md @@ -26,21 +26,29 @@ not documented, Spark does not support. Spark currently supports authentication for RPC channels using a shared secret. Authentication can be turned on by setting the `spark.authenticate` configuration parameter. -The exact mechanism used to generate and distribute the shared secret is deployment-specific. +The exact mechanism used to generate and distribute the shared secret is deployment-specific. Unless +specified below, the secret must be defined by setting the `spark.authenticate.secret` config +option. The same secret is shared by all Spark applications and daemons in that case, which limits +the security of these deployments, especially on multi-tenant clusters. -For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle -generating and distributing the shared secret. Each application will use a unique shared secret. In +The REST Submission Server and the MesosClusterDispatcher do not support authentication. You should +ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077 +respectively by default) are restricted to hosts that are trusted to submit jobs. + +### YARN + +For Spark on [YARN](running-on-yarn.html), Spark will automatically handle generating and +distributing the shared secret. Each application will use a unique shared secret. In the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of secrets to be secure. -For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes. -This secret will be shared by all the daemons and applications, so this deployment configuration is -not as secure as the above, especially when considering multi-tenant clusters. In this -configuration, a user with the secret can effectively impersonate any other user. +### Kubernetes -The Rest Submission Server and the MesosClusterDispatcher do not support authentication. You should -ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077 -respectively by default) are restricted to hosts that are trusted to submit jobs. +On Kubernetes, Spark will also automatically generate an authentication secret unique to each +application. The secret is propagated to executor pods using environment variables. This means +that any user that can list pods in the namespace where the Spark application is running can +also see their authentication secret. Access control rules should be properly set up by the +Kubernetes admin to ensure that Spark authentication is secure. @@ -738,10 +746,10 @@ tokens for supported will be created. ## Secure Interaction with Kubernetes When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens -so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are -shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: +so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are +shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: -In all cases you must define the environment variable: `HADOOP_CONF_DIR` or +In all cases you must define the environment variable: `HADOOP_CONF_DIR` or `spark.kubernetes.hadoop.configMapName.` It also important to note that the KDC needs to be visible from inside the containers. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 8bf315248388f..939aa88b07973 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ -import org.apache.spark.SparkException +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -29,11 +29,12 @@ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutorConf) +private[spark] class BasicExecutorFeatureStep( + kubernetesConf: KubernetesExecutorConf, + secMgr: SecurityManager) extends KubernetesFeatureConfigStep { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf - private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH) private val executorContainerImage = kubernetesConf .get(EXECUTOR_CONTAINER_IMAGE) .getOrElse(throw new SparkException("Must specify the executor container image")) @@ -87,44 +88,61 @@ private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutor val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCoresRequest) .build() - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = kubernetesConf - .get(EXECUTOR_JAVA_OPTIONS) - .map { opts => - val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, - kubernetesConf.executorId) - val delimitedOpts = Utils.splitCommandString(subsOpts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + + val executorEnv: Seq[EnvVar] = { + (Seq( + (ENV_DRIVER_URL, driverUrl), + (ENV_EXECUTOR_CORES, executorCores.toString), + (ENV_EXECUTOR_MEMORY, executorMemoryString), + (ENV_APPLICATION_ID, kubernetesConf.appId), + // This is to set the SPARK_CONF_DIR to be /opt/spark/conf + (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), + (ENV_EXECUTOR_ID, kubernetesConf.executorId) + ) ++ kubernetesConf.environment).map { case (k, v) => + new EnvVarBuilder() + .withName(k) + .withValue(v) + .build() } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_DRIVER_URL, driverUrl), - (ENV_EXECUTOR_CORES, executorCores.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, kubernetesConf.appId), - // This is to set the SPARK_CONF_DIR to be /opt/spark/conf - (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++ - kubernetesConf.environment) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") + } ++ { + Seq(new EnvVarBuilder() + .withName(ENV_EXECUTOR_POD_IP) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) .build()) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + } ++ { + Option(secMgr.getSecretKey()).map { authSecret => + new EnvVarBuilder() + .withName(SecurityManager.ENV_AUTH_SECRET) + .withValue(authSecret) + .build() + } + } ++ { + kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp => + new EnvVarBuilder() + .withName(ENV_CLASSPATH) + .withValue(cp) + .build() + } + } ++ { + val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts => + val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, + kubernetesConf.executorId) + Utils.splitCommandString(subsOpts) + } + + val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf, + SparkConf.isExecutorStartupConf) + + (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) => + new EnvVarBuilder() + .withName(s"$ENV_JAVA_OPT_PREFIX$index") + .withValue(opt) + .build() + } + } + val requiredPorts = Seq( (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) .map { case (name, port) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index ef4cbdf162c6c..27377f12ab134 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.mutable -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf @@ -31,6 +31,7 @@ import org.apache.spark.util.{Clock, Utils} private[spark] class ExecutorPodsAllocator( conf: SparkConf, + secMgr: SecurityManager, executorBuilder: KubernetesExecutorBuilder, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -133,7 +134,7 @@ private[spark] class ExecutorPodsAllocator( newExecutorId.toString, applicationId, driverPod) - val executorPod = executorBuilder.buildFromFeatures(executorConf) + val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr) val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index ce10f766334ff..b31fbb420ed6d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -94,6 +94,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val executorPodsAllocator = new ExecutorPodsAllocator( sc.conf, + sc.env.securityManager, KubernetesExecutorBuilder(kubernetesClient, sc.conf), kubernetesClient, snapshotsStore, @@ -110,7 +111,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit new KubernetesClusterSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], - sc.env.rpcEnv, + sc, kubernetesClient, requestExecutorsService, snapshotsStore, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 6356b58645806..68f6f2e46e316 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} @@ -30,7 +31,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, - rpcEnv: RpcEnv, + sc: SparkContext, kubernetesClient: KubernetesClient, requestExecutorsService: ExecutorService, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -38,7 +39,7 @@ private[spark] class KubernetesClusterSchedulerBackend( lifecycleEventHandler: ExecutorPodsLifecycleManager, watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( requestExecutorsService) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index d24ff0d1e6600..ba273cad6a8e5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -20,14 +20,14 @@ import java.io.File import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesExecutorBuilder( - provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) = - new BasicExecutorFeatureStep(_), + provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep = + new BasicExecutorFeatureStep(_, _), provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = @@ -44,13 +44,16 @@ private[spark] class KubernetesExecutorBuilder( new HadoopSparkUserExecutorFeatureStep(_), provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { - def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = { + def buildFromFeatures( + kubernetesConf: KubernetesExecutorConf, + secMgr: SecurityManager): SparkPod = { val sparkConf = kubernetesConf.sparkConf val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME) val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY) - val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) + val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr), + provideLocalDirsStep(kubernetesConf)) val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) } else Nil diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index d6003c977937c..6aa862643c788 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -21,13 +21,14 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -63,7 +64,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private var baseConf: SparkConf = _ before { - baseConf = new SparkConf() + baseConf = new SparkConf(false) .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX) .set(CONTAINER_IMAGE, EXECUTOR_IMAGE) @@ -84,7 +85,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("basic executor pod has reasonable defaults") { - val step = new BasicExecutorFeatureStep(newExecutorConf()) + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. @@ -106,7 +107,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(executor.pod.getSpec.getNodeSelector.isEmpty) assert(executor.pod.getSpec.getVolumes.isEmpty) - checkEnv(executor, Map()) + checkEnv(executor, baseConf, Map()) checkOwnerReferences(executor.pod, DRIVER_POD_UID) } @@ -114,7 +115,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix) - val step = new BasicExecutorFeatureStep(newExecutorConf()) + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -122,10 +123,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar") baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz") val kconf = newExecutorConf(environment = Map("qux" -> "quux")) - val step = new BasicExecutorFeatureStep(kconf) + val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) - checkEnv(executor, + checkEnv(executor, baseConf, Map("SPARK_JAVA_OPT_0" -> "foo=bar", ENV_CLASSPATH -> "bar=baz", "qux" -> "quux")) @@ -136,12 +137,27 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.set("spark.kubernetes.resource.type", "python") baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L) - val step = new BasicExecutorFeatureStep(newExecutorConf()) + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") } + test("auth secret propagation") { + val conf = baseConf.clone() + .set(NETWORK_AUTH_ENABLED, true) + .set("spark.master", "k8s://127.0.0.1") + + val secMgr = new SecurityManager(conf) + secMgr.initializeAuth() + + val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf), + secMgr) + + val executor = step.configurePod(SparkPod.initialPod()) + checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey())) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) @@ -150,7 +166,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } // Check that the expected environment variables are present. - private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = { + private def checkEnv( + executorPod: SparkPod, + conf: SparkConf, + additionalEnvVars: Map[String, String]): Unit = { val defaultEnvs = Map( ENV_EXECUTOR_ID -> "1", ENV_DRIVER_URL -> DRIVER_ADDRESS.toString, @@ -160,10 +179,15 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars - assert(executorPod.container.getEnv.size() === defaultEnvs.size) + val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX)) + val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) + val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) => + s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt + }.toMap + val mapEnvs = executorPod.container.getEnv.asScala.map { x => (x.getName, x.getValue) }.toMap - assert(defaultEnvs === mapEnvs) + assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index ddf9f67a0727d..cd77145b4138d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -20,13 +20,13 @@ import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations} -import org.mockito.Matchers.any +import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -52,6 +52,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L) + private val secMgr = new SecurityManager(conf) private var waitForExecutorPodsClock: ManualClock = _ @@ -79,12 +80,12 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) - when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]))) + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr))) .thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( - conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 52e7a12dbaf06..75232f7b98b04 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq} import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} @@ -41,6 +41,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn @Mock private var sc: SparkContext = _ + @Mock + private var env: SparkEnv = _ + @Mock private var rpcEnv: RpcEnv = _ @@ -81,6 +84,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn MockitoAnnotations.initMocks(this) when(taskScheduler.sc).thenReturn(sc) when(sc.conf).thenReturn(sparkConf) + when(sc.env).thenReturn(env) + when(env.rpcEnv).thenReturn(rpcEnv) driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint]) when(rpcEnv.setupEndpoint( mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) @@ -88,7 +93,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn when(kubernetesClient.pods()).thenReturn(podOperations) schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend( taskScheduler, - rpcEnv, + sc, kubernetesClient, requestExecutorsService, eventQueue, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index b6a75b15af85a..ef521fd801e97 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{Config => _, _} import io.fabric8.kubernetes.client.KubernetesClient import org.mockito.Mockito.{mock, never, verify} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ @@ -39,6 +39,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val KERBEROS_CONF_STEP_TYPE = "kerberos-step" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" + private val secMgr = new SecurityManager(new SparkConf(false)) + private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep]) private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -57,7 +59,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) private val builderUnderTest = new KubernetesExecutorBuilder( - _ => basicFeatureStep, + (_, _) => basicFeatureStep, _ => mountSecretsStep, _ => envSecretsStep, _ => localDirsStep, @@ -69,7 +71,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { test("Basic steps are consistently applied.") { val conf = KubernetesTestConf.createExecutorConf() validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) + builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) } test("Apply secrets step if secrets are present.") { @@ -77,7 +79,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"), secretNamesToMountPaths = Map("secret" -> "secretMountPath")) validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), + builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, @@ -94,7 +96,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { val conf = KubernetesTestConf.createExecutorConf( volumes = Seq(volumeSpec)) validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), + builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE) @@ -107,7 +109,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")) validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), + builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, HADOOP_CONF_STEP_TYPE, @@ -123,7 +125,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { .set(KERBEROS_DT_SECRET_NAME, "dt-secret") .set(KERBEROS_DT_SECRET_KEY, "dt-key" )) validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), + builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, HADOOP_CONF_STEP_TYPE, @@ -154,7 +156,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { .endMetadata() .build())) val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf) - .buildFromFeatures(kubernetesConf) + .buildFromFeatures(kubernetesConf, secMgr) PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index b746a01eb5294..f8f4b4177f3bd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.{SPARK_VERSION, SparkFunSuite} import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite @@ -138,6 +139,7 @@ class KubernetesSuite extends SparkFunSuite .set("spark.kubernetes.driver.pod.name", driverPodName) .set("spark.kubernetes.driver.label.spark-app-locator", appLocator) .set("spark.kubernetes.executor.label.spark-app-locator", appLocator) + .set(NETWORK_AUTH_ENABLED.key, "true") if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { kubernetesTestComponents.createNamespace() }
Property NameDefaultMeaning