Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
73f2777
initial Driver logic for Hadoop and Kerberos Support
ifilonenko Jun 29, 2018
6069be5
add executors... still need to refactor to use sparkConf exclusivley
ifilonenko Jul 1, 2018
000120f
refactored executor logic preparing for e2e testing
ifilonenko Jul 2, 2018
13b3adc
resolved initial comments
ifilonenko Jul 7, 2018
0939738
merge conflicts
ifilonenko Jul 30, 2018
347536e
Merge branch 'spark-master' into secure-hdfs
ifilonenko Aug 7, 2018
c30ad8c
launching driver with kerberos authentication instead of simple
ifilonenko Aug 7, 2018
1697e74
merge conflicts and addition of security context
ifilonenko Aug 20, 2018
4a000d2
fix dockerfile
ifilonenko Aug 20, 2018
719b059
non-effective attempt to solve null UnixUsername error
ifilonenko Aug 29, 2018
fb9e810
move credential get
ifilonenko Aug 29, 2018
e7935f8
current working solution
ifilonenko Sep 4, 2018
aa3779c
merge conflicts
ifilonenko Sep 4, 2018
32c408c
merge conflicts
ifilonenko Sep 7, 2018
3cf644e
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 13, 2018
583a52c
merge conflicts and various additions
ifilonenko Sep 21, 2018
6ae3def
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 21, 2018
78953e6
fixes so tests pass
ifilonenko Sep 21, 2018
73f157f
refactor to handle login logic being used in spark-submit
ifilonenko Sep 26, 2018
367e65b
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 27, 2018
5f52a1a
resolve comments and add documentation
ifilonenko Sep 27, 2018
6548ef9
resolved comments
ifilonenko Oct 6, 2018
7f72af5
resolved rest of comments
ifilonenko Oct 6, 2018
4ce00a5
small doc addition
ifilonenko Oct 6, 2018
89063fd
fixes to pass kerberos tests
ifilonenko Oct 7, 2018
e303048
resolve comments
ifilonenko Oct 8, 2018
69840a8
resolve comments
ifilonenko Oct 9, 2018
2108154
style and indentation
ifilonenko Oct 9, 2018
a987a70
resolving comments
ifilonenko Oct 9, 2018
e2f8063
hopefully final comment resolution
ifilonenko Oct 9, 2018
f3a0ffb
style issues
ifilonenko Oct 10, 2018
a958920
included new ability to bake krb5.conf into your docker images and no…
ifilonenko Oct 10, 2018
dd95fca
style check
ifilonenko Oct 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
sparkFiles: Seq[String],
hadoopConfDir: Option[HadoopConfSpec]) {
hadoopConfSpec: Option[HadoopConfSpec]) {

def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ private[spark] class HadoopConfExecutorFeatureStep(

override def configurePod(pod: SparkPod): SparkPod = {
val sparkConf = kubernetesConf.sparkConf
val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
require(maybeHadoopConfDir.isDefined,
val hadoopConfDirCMapName = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
require(hadoopConfDirCMapName.isDefined,
"Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " +
" using pre-existing ConfigMaps")
logInfo("HADOOP_CONF_DIR defined")
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, maybeHadoopConfDir, pod)
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to address it here but it feels like these methods should have default implementations, given that lots of classes just don't do anything with them.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ private[spark] class KerberosConfDriverFeatureStep(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
extends KubernetesFeatureConfigStep with Logging {

require(kubernetesConf.hadoopConfDir.isDefined,
require(kubernetesConf.hadoopConfSpec.isDefined,
"Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined ConfigMap")
private val hadoopConfDirSpec = kubernetesConf.hadoopConfDir.get
private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get
private val conf = kubernetesConf.sparkConf
private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL)
private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ private[spark] object HadoopKerberosLogin {
submissionSparkConf,
hadoopConf)
require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
val currentTime = tokenManager.getCurrentTime
val initialTokenDataKeyName = s"$KERBEROS_SECRET_KEY_PREFIX-$currentTime"
val newSecretName =
s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME.$currentTime"
val initialTokenDataKeyName = KERBEROS_SECRET_KEY_PREFIX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you don't need this variable and you can rename KERBEROS_SECRET_KEY_PREFIX to KERBEROS_SECRET_KEY.

val newSecretName = s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
val secretDT =
new SecretBuilder()
.withNewMetadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, SystemClock}

/**
* The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
Expand All @@ -37,14 +36,12 @@ private[spark] class KubernetesHadoopDelegationTokenManager(
tokenManager: HadoopDelegationTokenManager) extends Logging {

// HadoopUGI Util methods
private val clock: Clock = new SystemClock()
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def getShortUserName : String = getCurrentUser.getShortUserName
def getFileSystem(hadoopConf: Configuration) : FileSystem = FileSystem.get(hadoopConf)
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
def getCurrentTime: Long = clock.getTimeMillis()
def serializeCreds(creds: Credentials): Array[Byte] = SparkHadoopUtil.get.serialize(creds)
def nextRT(rt: Long, conf: SparkConf): Long = SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[spark] class KubernetesDriverBuilder(
.getOrElse(provideJavaStep(kubernetesConf))

val maybeHadoopConfigStep =
kubernetesConf.hadoopConfDir.map { _ =>
kubernetesConf.hadoopConfSpec.map { _ =>
provideHadoopGlobalStep(kubernetesConf)}

val allFeatures: Seq[KubernetesFeatureConfigStep] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ENVS,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
Expand Down Expand Up @@ -157,7 +157,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ENVS,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val pythonKubernetesConf = KubernetesConf(
pythonSparkConf,
Expand All @@ -175,7 +175,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ENVS,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
val basePod = SparkPod.initialPod()
Expand Down Expand Up @@ -205,7 +205,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ENVS,
Nil,
allFiles,
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())

// The executor pod name and default labels.
Expand Down Expand Up @@ -133,7 +133,7 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}

Expand All @@ -155,7 +155,7 @@ class BasicExecutorFeatureStepSuite
Map("qux" -> "quux"),
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())

checkEnv(executor,
Expand Down Expand Up @@ -183,7 +183,7 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
Expand Down Expand Up @@ -96,7 +96,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
Expand Down Expand Up @@ -136,7 +136,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
val expectedSparkConf = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
Expand Down Expand Up @@ -102,7 +102,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
Expand All @@ -125,7 +125,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None))
hadoopConfSpec = None))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
.head
Expand Down Expand Up @@ -157,7 +157,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None),
hadoopConfSpec = None),
clock)
val driverService = configurationStep
.getAdditionalKubernetesResources()
Expand Down Expand Up @@ -186,7 +186,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None),
hadoopConfSpec = None),
clock)
fail("The driver bind address should not be allowed.")
} catch {
Expand All @@ -213,7 +213,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None),
hadoopConfSpec = None),
clock)
fail("The driver host address should not be allowed.")
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new EnvSecretsFeatureStep(kubernetesConf)
val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
}

test("Resolve to default local dir if neither env nor configuration are set") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new MountSecretsFeatureStep(kubernetesConf)
val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
roleEnvs = Map.empty,
roleVolumes = Nil,
sparkFiles = Nil,
hadoopConfDir = None)
hadoopConfSpec = None)

test("Mounts hostPath volumes") {
val volumeConf = KubernetesVolumeSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
roleEnvs = Map.empty,
roleVolumes = Nil,
sparkFiles = Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new JavaDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleEnvs = Map.empty,
roleVolumes = Nil,
sparkFiles = Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
Expand Down Expand Up @@ -92,7 +92,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleEnvs = Map.empty,
roleVolumes = Nil,
sparkFiles = Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
val args = driverContainerwithPySpark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RDriverFeatureStepSuite extends SparkFunSuite {
roleEnvs = Map.empty,
roleVolumes = Seq.empty,
sparkFiles = Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)

val step = new RDriverFeatureStep(kubernetesConf)
val driverContainerwithR = step.configurePod(baseDriverPod).container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand All @@ -126,7 +126,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand Down Expand Up @@ -155,7 +155,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand All @@ -182,7 +182,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand Down Expand Up @@ -214,7 +214,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
volumeSpec :: Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand Down Expand Up @@ -242,7 +242,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = None)
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
Expand All @@ -269,7 +269,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = Some(
hadoopConfSpec = Some(
HadoopConfSpec(
Some("/var/hadoop-conf"),
None)))
Expand Down Expand Up @@ -300,7 +300,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Nil,
Seq.empty[String],
hadoopConfDir = Some(
hadoopConfSpec = Some(
HadoopConfSpec(
None,
Some("pre-defined-configMapName"))))
Expand Down