Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f1b8c08
Fundamental building blocks for the new pod construction architecture.
mccheah Mar 26, 2018
80e1562
Intermediate commit to move file.
mccheah Mar 26, 2018
c3460ae
Move basic driver configuration to new architecture.
mccheah Mar 26, 2018
6d1711b
Intermediate commit to move file
mccheah Mar 26, 2018
4036d72
Migrate mounting K8s credentials to the new architecture.
mccheah Mar 26, 2018
d46d671
Intermediate commit to move file.
mccheah Mar 26, 2018
2936aa5
Migrate creating the driver service to the new architecture.
mccheah Mar 26, 2018
d2751b6
Remove dependency resolution step entirely.
mccheah Mar 26, 2018
430fbb2
Move mounting driver secrets to new architecture.
mccheah Mar 26, 2018
fd3e8e6
Complete driver migration to new pod construction architecture.
mccheah Mar 26, 2018
f0ea6d9
Intermediate commit to move file
mccheah Mar 26, 2018
67e9ca1
Migrate executor pod construction to use the new architecture.
mccheah Mar 26, 2018
4c944c4
Manage args differently.
mccheah Mar 27, 2018
27b8634
Revert "Manage args differently."
mccheah Mar 27, 2018
9c67016
Make envs role-specific
mccheah Mar 27, 2018
f3540f8
Address comments.
mccheah Mar 27, 2018
33f9d56
Move executor env to KubernetesExecutorSpecificConf
mccheah Mar 27, 2018
9b6cc05
Fix import
mccheah Mar 27, 2018
a5f08bb
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 3, 2018
fbde25d
Merge remote-tracking branch 'master' into spark-22839-incremental
mccheah Apr 3, 2018
dff0089
Fix indentation
mccheah Apr 3, 2018
02bbcbc
Fix closures
mccheah Apr 3, 2018
7d65875
Fix compilation
mccheah Apr 3, 2018
041a240
Address comments.
mccheah Apr 4, 2018
f446868
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 4, 2018
df75a9c
Fix merge conflicts.
mccheah Apr 4, 2018
518fb2a
Move around some code.
mccheah Apr 4, 2018
7b339c3
Fix line breaks
mccheah Apr 4, 2018
dbe35fa
Simplify a line
mccheah Apr 4, 2018
4b92989
Rename KubernetesSpec -> KubernetesDriverSpec
mccheah Apr 11, 2018
7807c9c
Fix scalastyle
mccheah Apr 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
package org.apache.spark.deploy.k8s

import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import io.fabric8.kubernetes.api.model.HasMetadata

/**
* Represents a step in configuring the Spark driver pod.
*/
private[spark] trait DriverConfigurationStep {
private[k8s] case class KubernetesSpec(
pod: SparkPod,
additionalDriverKubernetesResources: Seq[HasMetadata],
podJavaSystemProperties: Map[String, String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we shorten the name to just systemProperties? One of the most frequent types of comments I got while working on the upstreaming was to use short names.


/**
* Apply some transformation to the previous state of the driver to add a new feature to it.
*/
def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
private[k8s] object KubernetesSpec {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should just be private[spark]

def initialSpec(initialProps: Map[String, String]): KubernetesSpec = KubernetesSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -80,40 +78,31 @@ private[spark] object ClientArguments {
* watcher that monitors and logs the application status. Waits for the application to terminate if
* spark.kubernetes.submission.waitAppCompletion is true.
*
* @param submissionSteps steps that collectively configure the driver
* @param sparkConf the submission client Spark configuration
* @param builder Responsible for building the base driver pod based on a composition of
* implemented features.
* @param kubernetesConf application configuration
* @param kubernetesClient the client to talk to the Kubernetes API server
* @param waitForAppCompletion a flag indicating whether the client should wait for the application
* to complete
* @param appName the application name
* @param watcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
submissionSteps: Seq[DriverConfigurationStep],
sparkConf: SparkConf,
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
watcher: LoggingPodStatusWatcher,
kubernetesResourceNamePrefix: String) extends Logging {
builder: KubernetesDriverBuilder,
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
watcher: LoggingPodStatusWatcher,
kubernetesResourceNamePrefix: String) extends Logging {

/**
* Run command that initializes a DriverSpec that will be updated after each
* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec
* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
*/
def run(): Unit = {
var currentDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf)
// submissionSteps contain steps necessary to take, to resolve varying
// client arguments that are passed in, created by orchestrator
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
}
val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf)
val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.podJavaSystemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
// Spark command builder to pickup on the Java Options present in the ConfigMap
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
.addNewEnv()
.withName(ENV_SPARK_CONF_DIR)
.withValue(SPARK_CONF_DIR_INTERNAL)
Expand All @@ -123,7 +112,7 @@ private[spark] class Client(
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.addNewVolume()
Expand All @@ -141,12 +130,10 @@ private[spark] class Client(
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
val otherKubernetesResources =
currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
val otherKubernetesResources =
resolvedDriverSpec.additionalDriverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
Expand Down Expand Up @@ -180,20 +167,17 @@ private[spark] class Client(
}

// Build a Config Map that will house spark conf properties in a single file for spark-submit
private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
val properties = new Properties()
conf.getAll.foreach { case (k, v) =>
conf.foreach { case (k, v) =>
properties.setProperty(k, v)
}
val propertiesWriter = new StringWriter()
properties.store(propertiesWriter,
s"Java properties built from Kubernetes config map with name: $configMapName")

val namespace = conf.get(KUBERNETES_NAMESPACE)
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.withNamespace(namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to set namespaces on these objects because the kubernetes client itself is namespaced.

.endMetadata()
.addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
.build()
Expand All @@ -211,34 +195,34 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
}

private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
val kubernetesResourceNamePrefix = {
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
}
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs)
val orchestrator = new KubernetesDriverBuilder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be builder

val namespace = kubernetesConf.namespace()
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

val orchestrator = new DriverConfigOrchestrator(
kubernetesAppId,
kubernetesResourceNamePrefix,
clientArguments.mainAppResource,
appName,
clientArguments.mainClass,
clientArguments.driverArgs,
sparkConf)

Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(namespace),
Expand All @@ -247,8 +231,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None,
None)) { kubernetesClient =>
val client = new Client(
orchestrator.getAllConfigurationSteps,
sparkConf,
orchestrator,
kubernetesConf,
kubernetesClient,
waitForAppCompletion,
appName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesSpec}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
new BasicDriverFeatureStep(_),
provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf])
=> DriverKubernetesCredentialsFeatureStep =
new DriverKubernetesCredentialsFeatureStep(_),
provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => DriverServiceFeatureStep =
new DriverServiceFeatureStep(_),
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_)) {

def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesSpec = {
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures
var spec = KubernetesSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new line before this line?

for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedResources = feature.getAdditionalKubernetesResources()
spec = KubernetesSpec(
configuredPod,
spec.additionalDriverKubernetesResources ++ addedResources,
spec.podJavaSystemProperties ++ addedSystemProperties)
}
spec
}
}
Loading