Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f1b8c08
Fundamental building blocks for the new pod construction architecture.
mccheah Mar 26, 2018
80e1562
Intermediate commit to move file.
mccheah Mar 26, 2018
c3460ae
Move basic driver configuration to new architecture.
mccheah Mar 26, 2018
6d1711b
Intermediate commit to move file
mccheah Mar 26, 2018
4036d72
Migrate mounting K8s credentials to the new architecture.
mccheah Mar 26, 2018
d46d671
Intermediate commit to move file.
mccheah Mar 26, 2018
2936aa5
Migrate creating the driver service to the new architecture.
mccheah Mar 26, 2018
d2751b6
Remove dependency resolution step entirely.
mccheah Mar 26, 2018
430fbb2
Move mounting driver secrets to new architecture.
mccheah Mar 26, 2018
fd3e8e6
Complete driver migration to new pod construction architecture.
mccheah Mar 26, 2018
f0ea6d9
Intermediate commit to move file
mccheah Mar 26, 2018
67e9ca1
Migrate executor pod construction to use the new architecture.
mccheah Mar 26, 2018
4c944c4
Manage args differently.
mccheah Mar 27, 2018
27b8634
Revert "Manage args differently."
mccheah Mar 27, 2018
9c67016
Make envs role-specific
mccheah Mar 27, 2018
f3540f8
Address comments.
mccheah Mar 27, 2018
33f9d56
Move executor env to KubernetesExecutorSpecificConf
mccheah Mar 27, 2018
9b6cc05
Fix import
mccheah Mar 27, 2018
a5f08bb
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 3, 2018
fbde25d
Merge remote-tracking branch 'master' into spark-22839-incremental
mccheah Apr 3, 2018
dff0089
Fix indentation
mccheah Apr 3, 2018
02bbcbc
Fix closures
mccheah Apr 3, 2018
7d65875
Fix compilation
mccheah Apr 3, 2018
041a240
Address comments.
mccheah Apr 4, 2018
f446868
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 4, 2018
df75a9c
Fix merge conflicts.
mccheah Apr 4, 2018
518fb2a
Move around some code.
mccheah Apr 4, 2018
7b339c3
Fix line breaks
mccheah Apr 4, 2018
dbe35fa
Simplify a line
mccheah Apr 4, 2018
4b92989
Rename KubernetesSpec -> KubernetesDriverSpec
mccheah Apr 11, 2018
7807c9c
Fix scalastyle
mccheah Apr 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.internal.config.ConfigEntry

private[spark] sealed trait KubernetesRoleSpecificConf

private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String]) extends KubernetesRoleSpecificConf

private[spark] case class KubernetesExecutorSpecificConf(
executorId: String, driverPod: Pod)
extends KubernetesRoleSpecificConf

private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should be a case class? This seems like a struct-like object which inclines me to think using a case class seems more idiomatic here.

val sparkConf: SparkConf,
val roleSpecificConf: T,
val appResourceNamePrefix: String,
val appId: String,
val roleLabels: Map[String, String],
val roleAnnotations: Map[String, String],
val roleSecretNamesToMountPaths: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def sparkJars(): Seq[String] = sparkConf
.getOption("spark.jars")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def driverCustomEnvs(): Seq[(String, String)] =
Copy link
Contributor

@liyinan926 liyinan926 Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is driver specific and probably should not be here. What about making custom envs as an argument of the class similarly to labels and annotations? Then createDriverConf below gets the driver custom envs and pass them in. This also works for executor environment variables specified by spark.executorEnv..

sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def nodeSelector(): Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)

def getOption(key: String): Option[String] = sparkConf.getOption(key)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, do you mean we should remove this newline or that one should be added here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant removing the extra new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will address in the next patch after others review.

}

private[spark] object KubernetesConf {
def createDriverConf(
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
}
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new line before this line?

sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val driverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverAnnotations =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent line breaking theme here, i.e., breaking after = or after the opening (.

KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
new KubernetesConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new line before new KubernetesConf?

sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
appResourceNamePrefix,
appId,
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths)
}

def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
" Spark.")
require(
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
val executorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorCustomLabels
val executorAnnotations =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
new KubernetesConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, it improves readability with new lines separating the code a bit.

sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
appId,
executorLabels,
executorAnnotations,
executorSecrets)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
package org.apache.spark.deploy.k8s

import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import io.fabric8.kubernetes.api.model.HasMetadata

/**
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
*
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class DriverMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
private[k8s] case class KubernetesSpec(
pod: SparkPod,
additionalDriverKubernetesResources: Seq[HasMetadata],
podJavaSystemProperties: Map[String, String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we shorten the name to just systemProperties? One of the most frequent types of comments I got while working on the upstreaming was to use short names.


override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
driverSpec.copy(
driverPod = pod,
driverContainer = container
)
}
private[k8s] object KubernetesSpec {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should just be private[spark]

def initialSpec(initialProps: Map[String, String]): KubernetesSpec = KubernetesSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
package org.apache.spark.deploy.k8s

import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Represents a step in configuring the Spark driver pod.
*/
private[spark] trait DriverConfigurationStep {
private[spark] case class SparkPod(pod: Pod, container: Container)

/**
* Apply some transformation to the previous state of the driver to add a new feature to it.
*/
def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
private[spark] object SparkPod {
def initialPod(): SparkPod = {
SparkPod(
new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need .withNewMetadata().endMetadata().withNewSpec().endSpec() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of. It allows everything that consumes one of these to use .editMetadata() or editOrNewMetadata when creating features. If you don't initialize the metadata and spec and then a downstream caller tries to invoke editMetadata then we throw an NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about Spark code style. But shouldn't we put one call per line for builders? i.e.

new PodBuilder()
  .withNewMetadata()
  .endMetadata()
  .withNewSpec()
  .endSpec()
  .build()

new ContainerBuilder().build())
}
}
Loading