Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.internal.Logging

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.{Minutes, Seconds, Span}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}

private[spark] class KubernetesSuite extends SparkFunSuite {
private var kubernetesTestClient: KubernetesTestClient = _
private var testBackend: IntegrationTestBackend = _
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If initialization is done in a separate method and not the constructor, then this can become a val and set inline here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding this. I still need to keep a reference to it to later cleanUp after the tests correct?

Copy link

@mccheah mccheah Apr 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but this can be a val instead of a var that's set in beforeAll():

private val testBackend = IntegrationTestBackendFactory...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Got it. Thanks. Fixed.


override def beforeAll(): Unit = {
kubernetesTestClient = new KubernetesTestClient()
testBackend = IntegrationTestBackendFactory.getTestBackend()
testBackend.initialize()
}

override def afterAll(): Unit = {
kubernetesTestClient.cleanUp()
testBackend.cleanUp()
}

override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = {
Vector(
new KubernetesV1Suite(kubernetesTestClient),
new KubernetesV2Suite(kubernetesTestClient))
new KubernetesV1Suite(testBackend),
new KubernetesV2Suite(testBackend))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil

private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
Expand Down Expand Up @@ -109,45 +109,4 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
field.setAccessible(false)
}
}
}

object KubernetesTestBackend extends Enumeration {
val SingleNode, MultiNode = Value
}

/**
* Creates and holds a Kubernetes client for executing tests.
* When master and driver/executor images are supplied, we return a client
* for that cluster. By default, we return a Minikube client
*/

class KubernetesTestClient {
var defaultClient: DefaultKubernetesClient = _
var testBackend: KubernetesTestBackend.Value = _

Option(System.getProperty("spark.kubernetes.test.master")).map {
master =>
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(resolveK8sMaster(master))
val k8ClientConfig = k8ConfBuilder.build
defaultClient = new DefaultKubernetesClient(k8ClientConfig)
testBackend = KubernetesTestBackend.MultiNode
}.getOrElse {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
defaultClient = Minikube.getKubernetesClient
testBackend = KubernetesTestBackend.SingleNode
}

def getClient(): DefaultKubernetesClient = {
defaultClient
}

def cleanUp(): Unit = {
if (testBackend == KubernetesTestBackend.SingleNode
&& !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,23 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND}
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager}
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils

@DoNotDiscover
private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClient)
private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient())
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
kubernetesTestComponents.createNamespace()
}

Expand Down Expand Up @@ -170,7 +172,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Enable SSL on the driver submit server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
Minikube.getMinikubeIp,
Expand All @@ -192,7 +194,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Enable SSL on the driver submit server using PEM files") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
Expand All @@ -207,7 +209,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Added files should exist on the driver.") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath)
sparkConf.setAppName("spark-file-existence-test")
Expand Down Expand Up @@ -265,7 +267,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use external URI provider") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val externalUriProviderWatch =
new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient)
Expand Down Expand Up @@ -298,7 +300,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Mount the Kubernetes credentials onto the driver pod") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE,
kubernetesTestComponents.clientConfig.getCaCertFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import java.util.UUID
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl}

@DoNotDiscover
private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClient)
private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
Expand All @@ -37,7 +39,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient())
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
}
Expand All @@ -55,14 +57,14 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use submission v2.") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions())
runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the submission server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
Expand All @@ -86,7 +88,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use container-local resources without the resource staging server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.setJars(Seq(
KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE,
Expand All @@ -95,7 +97,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}

import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.constants.GCE_TEST_BACKEND

private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the GCE case it's probably fine to initialize in the constructor - create a KubernetesClient should be fairly lightweight. What we wanted to avoid in the Minikube case was to have side effects in the constructor.


override def initialize(): Unit = {
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(resolveK8sMaster(master))
defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build)
}

override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def name(): String = GCE_TEST_BACKEND
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes.integrationtest.backend

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE.GCETestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder

private[spark] trait IntegrationTestBackend {
def name(): String
def initialize(): Unit
def getKubernetesClient(): DefaultKubernetesClient
def cleanUp(): Unit = {}
}

private[spark] object IntegrationTestBackendFactory {
def getTestBackend(): IntegrationTestBackend = {
Option(System.getProperty("spark.kubernetes.test.master")).map {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass the system property's value into the constructor of GCEBackend, and GCEBackend shouldn't look up the system property itself.

master =>
new GCETestBackend(master)
}.getOrElse {
new MinikubeTestBackend()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.minikube
package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube

import java.nio.file.Paths

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder

private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _

override def initialize(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
defaultClient = Minikube.getKubernetesClient
}

override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def cleanUp(): Unit = {
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
}

override def name(): String = MINIKUBE_TEST_BACKEND


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest

package object constants {
val MINIKUBE_TEST_BACKEND = "minikube"
val GCE_TEST_BACKEND = "gce"
}