Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -46,6 +47,8 @@ class ExecutorPodsWatchSnapshotSource(
private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)

private val namespace = conf.get(KUBERNETES_NAMESPACE)

// If we're constructed with the old API get the SparkConf from the running SparkContext.
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
Expand All @@ -58,6 +61,7 @@ class ExecutorPodsWatchSnapshotSource(
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.concurrent.Future

import io.fabric8.kubernetes.api.model.Pod
Expand Down Expand Up @@ -69,6 +68,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile

private val namespace = conf.get(KUBERNETES_NAMESPACE)

// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
Expand All @@ -77,15 +78,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
val configMapName = KubernetesClientUtils.configMapNameExecutor
val resolvedExecutorProperties =
Map(KUBERNETES_NAMESPACE.key -> conf.get(KUBERNETES_NAMESPACE))
Map(KUBERNETES_NAMESPACE.key -> namespace)
val confFilesMap = KubernetesClientUtils
.buildSparkConfDirFilesMap(configMapName, conf, resolvedExecutorProperties) ++
resolvedExecutorProperties
val labels =
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
kubernetesClient.configMaps().inAnyNamespace().resource(configMap).create()
kubernetesClient.configMaps().inNamespace(namespace).resource(configMap).create()
}

/**
Expand Down Expand Up @@ -136,6 +137,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.services()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.delete()
}
Expand All @@ -145,6 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.persistentVolumeClaims()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.delete()
}
Expand All @@ -158,6 +161,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.configMaps()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down Expand Up @@ -193,22 +197,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
val labelTask = new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {

val podsToLabel = kubernetesClient.pods()
kubernetesClient.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
.list().getItems().asScala

podsToLabel.foreach { pod =>
kubernetesClient.pods()
.inNamespace(pod.getMetadata.getNamespace)
.withName(pod.getMetadata.getName)
.edit({p: Pod => new PodBuilder(p).editMetadata()
.addToLabels(label,
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
.endMetadata()
.build()})
.resources()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE label was earlier done in two phases:

  • Using list().getItems().asScala which resulted into a Scala List of Pod.
  • Iterating over the list and getting the Pods again one by one using the name (using withName) and namespace (using inNamespace )

Now with the resources() and forEach the above two is merged into a single DSL expression.

.forEach { podResource =>
podResource.edit({ p: Pod =>
new PodBuilder(p).editOrNewMetadata()
.addToLabels(label,
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
.endMetadata()
.build()})
}
}
}
Expand Down Expand Up @@ -246,6 +247,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def run(): Unit = Utils.tryLogNonFatalError {
val running = kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Running")
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
Expand Down Expand Up @@ -302,6 +304,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def run(): Unit = Utils.tryLogNonFatalError {
// Label the pod with it's exec ID
kubernetesClient.pods()
.inNamespace(namespace)
.withName(x.podName)
.edit({p: Pod => new PodBuilder(p).editMetadata()
.addToLabels(SPARK_EXECUTOR_ID_LABEL, newId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.api.model.apps.StatefulSetList
import io.fabric8.kubernetes.client.dsl.{AnyNamespaceOperation, FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}

object Fabric8Aliases {
type PODS = MixedOperation[Pod, PodList, PodResource]
type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource]
type CONFIG_MAPS = MixedOperation[
ConfigMap, ConfigMapList, Resource[ConfigMap]]
type CONFIG_MAPS_OPERATION = AnyNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type CONFIG_MAPS_WITH_NAMESPACE =
NonNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type CONFIG_MAPS_RESOURCE = Resource[ConfigMap]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
@Mock
private var podOperations: PODS = _

@Mock
private var podsWithNamespace: PODS_WITH_NAMESPACE = _

@Mock
private var appIdLabeledPods: LABELED_PODS = _

Expand All @@ -58,7 +61,8 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
MockitoAnnotations.openMocks(this).close()
watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.Arrays
import java.util.concurrent.TimeUnit

import io.fabric8.kubernetes.api.model.{ConfigMap, ObjectMeta, Pod, PodList}
import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
Expand Down Expand Up @@ -66,14 +66,17 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
@Mock
private var podOperations: PODS = _

@Mock
private var podsWithNamespace: PODS_WITH_NAMESPACE = _

@Mock
private var labeledPods: LABELED_PODS = _

@Mock
private var configMapsOperations: CONFIG_MAPS = _

@Mock
private var configMap: CONFIG_MAPS_OPERATION = _
private var configMapsWithNamespace: CONFIG_MAPS_WITH_NAMESPACE = _

@Mock
private var configMapResource: CONFIG_MAPS_RESOURCE = _
Expand Down Expand Up @@ -123,9 +126,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
when(kubernetesClient.configMaps()).thenReturn(configMapsOperations)
when(configMapsOperations.inAnyNamespace()).thenReturn(configMap)
when(configMap.resource(any[ConfigMap]())).thenReturn(configMapResource)
when(configMapsOperations.inNamespace("default")).thenReturn(configMapsWithNamespace)
when(configMapsWithNamespace.resource(any[ConfigMap]())).thenReturn(configMapResource)
when(podAllocator.driverPod).thenReturn(None)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
Expand Down Expand Up @@ -154,9 +158,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
}

test("Stop all components") {
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
when(configMapsOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
when(configMapsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(labeledConfigMaps)
when(labeledConfigMaps.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(labeledConfigMaps)
Expand Down Expand Up @@ -185,36 +189,14 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
test("Kill executors") {
schedulerBackendUnderTest.start()

val operation = mock(classOf[NonNamespaceOperation[
Pod, PodList, PodResource]])

when(podOperations.inNamespace(any())).thenReturn(operation)
when(podOperations.withField(any(), any())).thenReturn(labeledPods)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)

val pod1 = mock(classOf[Pod])
val pod1Metadata = mock(classOf[ObjectMeta])
when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife")
when(pod1Metadata.getName).thenReturn("pod1")
when(pod1.getMetadata).thenReturn(pod1Metadata)

val pod2 = mock(classOf[Pod])
val pod2Metadata = mock(classOf[ObjectMeta])
when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife")
when(pod2Metadata.getName).thenReturn("pod2")
when(pod2.getMetadata).thenReturn(pod2Metadata)

val pod1op = mock(classOf[PodResource])
val pod2op = mock(classOf[PodResource])
when(operation.withName("pod1")).thenReturn(pod1op)
when(operation.withName("pod2")).thenReturn(pod2op)

val podList = mock(classOf[PodList])
when(labeledPods.list()).thenReturn(podList)
when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
when(labeledPods.resources()).thenReturn(Arrays.asList[PodResource]().stream)
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods, never()).delete()
Expand All @@ -235,7 +217,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
verify(pod2op, never()).edit(any(
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))

when(podList.getItems()).thenReturn(Arrays.asList(pod1))
when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
val podList = mock(classOf[PodList])
when(labeledPods.list()).thenReturn(podList)
val pod1 = mock(classOf[Pod])
val pod2 = mock(classOf[Pod])
when(podList.getItems).thenReturn(Arrays.asList(pod1, pod2))

schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
schedulerExecutorService.runUntilIdle()
Expand Down