Skip to content

Commit fa88651

Browse files
attilapirosdongjoon-hyun
authored andcommitted
[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1
### What changes were proposed in this pull request? Bump kubernetes-client version from 5.12.3 to 6.1.1 and clean up all the deprecations. ### Why are the changes needed? To keep up with kubernetes-client [changes](fabric8io/kubernetes-client@v5.12.3...v6.1.1). As this is an upgrade where the main version changed I have cleaned up all the deprecations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? #### Unit tests #### Manual tests for submit and application management Started an application in a non-default namespace (`bla`): ``` ➜ spark git:(SPARK-40458) ✗ ./bin/spark-submit \ --master k8s://http://127.0.0.1:8001 \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.namespace=bla \ --conf spark.kubernetes.container.image=docker.io/kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D \ local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar 200000 ``` Check that we cannot find it in the default namespace even with glob without the namespace definition: ``` ➜ spark git:(SPARK-40458) ✗ minikube kubectl -- config set-context --current --namespace=default Context "minikube" modified. ➜ spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001 Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001. No applications found. ``` Then check we can find it by specifying the namespace: ``` ➜ spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "bla:spark-pi-*" --master k8s://http://127.0.0.1:8001 Submitting a request for the status of submission bla:spark-pi-* in k8s://http://127.0.0.1:8001. Application status (driver): pod name: spark-pi-4c4e70837c86ae1a-driver namespace: bla labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d creation time: 2022-09-27T01:19:06Z service account name: default volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw node name: minikube start time: 2022-09-27T01:19:06Z phase: Running container status: container name: spark-kubernetes-driver container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D container state: running container started at: 2022-09-27T01:19:07Z ``` Changing the namespace to `bla` with `kubectl`: ``` ➜ spark git:(SPARK-40458) ✗ minikube kubectl -- config set-context --current --namespace=bla Context "minikube" modified. ``` Checking we can find it without specifying the namespace (and glob): ``` ➜ spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001 Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001. Application status (driver): pod name: spark-pi-4c4e70837c86ae1a-driver namespace: bla labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d creation time: 2022-09-27T01:19:06Z service account name: default volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw node name: minikube start time: 2022-09-27T01:19:06Z phase: Running container status: container name: spark-kubernetes-driver container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D container state: running container started at: 2022-09-27T01:19:07Z ``` Killing the app: ``` ➜ spark git:(SPARK-40458) ✗ ./bin/spark-submit --kill "spark-pi-*" --master k8s://http://127.0.0.1:8001 Submitting a request to kill submission spark-pi-* in k8s://http://127.0.0.1:8001. Grace period in secs: not set. Deleting driver pod: spark-pi-4c4e70837c86ae1a-driver. ``` Closes #37990 from attilapiros/SPARK-40458. Authored-by: attilapiros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 03ef022 commit fa88651

23 files changed

Lines changed: 455 additions & 349 deletions

dev/deps/spark-deps-hadoop-2-hive-2.3

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
2424
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
2525
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
2626
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
27-
automaton/1.11-8//automaton-1.11-8.jar
2827
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
2928
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
3029
avro/1.11.1//avro-1.11.1.jar
@@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
6968
failureaccess/1.0.1//failureaccess-1.0.1.jar
7069
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
7170
gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
72-
generex/1.0.2//generex-1.0.2.jar
7371
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
7472
grpc-api/1.47.0//grpc-api-1.47.0.jar
7573
grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
175173
jta/1.1//jta-1.1.jar
176174
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
177175
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
178-
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
179-
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
180-
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
181-
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
182-
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
183-
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
184-
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
185-
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
186-
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
187-
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
188-
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
189-
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
190-
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
191-
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
192-
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
193-
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
194-
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
195-
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
196-
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
197-
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
198-
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
176+
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
177+
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
178+
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
179+
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
180+
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
181+
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
182+
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
183+
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
184+
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
185+
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
186+
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
187+
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
188+
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
189+
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
190+
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
191+
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
192+
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
193+
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
194+
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
195+
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
196+
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
197+
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
198+
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
199+
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
199200
lapack/3.0.2//lapack-3.0.2.jar
200201
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
201202
libfb303/0.9.3//libfb303-0.9.3.jar

dev/deps/spark-deps-hadoop-3-hive-2.3

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
2323
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
2424
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
2525
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
26-
automaton/1.11-8//automaton-1.11-8.jar
2726
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
2827
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
2928
avro/1.11.1//avro-1.11.1.jar
@@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
6665
failureaccess/1.0.1//failureaccess-1.0.1.jar
6766
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
6867
gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
69-
generex/1.0.2//generex-1.0.2.jar
7068
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
7169
grpc-api/1.47.0//grpc-api-1.47.0.jar
7270
grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
159157
jta/1.1//jta-1.1.jar
160158
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
161159
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
162-
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
163-
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
164-
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
165-
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
166-
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
167-
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
168-
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
169-
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
170-
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
171-
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
172-
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
173-
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
174-
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
175-
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
176-
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
177-
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
178-
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
179-
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
180-
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
181-
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
182-
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
160+
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
161+
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
162+
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
163+
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
164+
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
165+
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
166+
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
167+
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
168+
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
169+
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
170+
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
171+
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
172+
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
173+
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
174+
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
175+
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
176+
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
177+
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
178+
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
179+
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
180+
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
181+
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
182+
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
183+
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
183184
lapack/3.0.2//lapack-3.0.2.jar
184185
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
185186
libfb303/0.9.3//libfb303-0.9.3.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@
216216
<arrow.version>9.0.0</arrow.version>
217217
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
218218
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
219-
<kubernetes-client.version>5.12.3</kubernetes-client.version>
219+
<kubernetes-client.version>6.1.1</kubernetes-client.version>
220220

221221
<test.java.home>${java.home}</test.java.home>
222222

resource-managers/kubernetes/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@
7575
<scope>test</scope>
7676
</dependency>
7777

78+
<dependency>
79+
<groupId>io.fabric8</groupId>
80+
<artifactId>kubernetes-httpclient-okhttp</artifactId>
81+
<version>${kubernetes-client.version}</version>
82+
</dependency>
7883
<dependency>
7984
<groupId>io.fabric8</groupId>
8085
<artifactId>kubernetes-client</artifactId>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import com.fasterxml.jackson.databind.ObjectMapper
2222
import com.google.common.base.Charsets
2323
import com.google.common.io.Files
24-
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
24+
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
2525
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
2626
import io.fabric8.kubernetes.client.Config.autoConfigure
2727
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
@@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
115115
}
116116
logDebug("Kubernetes client config: " +
117117
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
118-
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
118+
new KubernetesClientBuilder()
119+
.withHttpClientFactory(factoryWithCustomDispatcher)
120+
.withConfig(config)
121+
.build()
119122
}
120123

121124
private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
1919
import scala.collection.JavaConverters._
2020

2121
import K8SSparkSubmitOperation.getGracePeriod
22-
import io.fabric8.kubernetes.api.model.{Pod, PodList}
22+
import io.fabric8.kubernetes.api.model.Pod
2323
import io.fabric8.kubernetes.client.KubernetesClient
24-
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
24+
import io.fabric8.kubernetes.client.dsl.PodResource
2525

2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.deploy.SparkSubmitOperation
@@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
3232
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}
3333

3434
private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
35-
type NON_NAMESPACED_PODS =
36-
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
3735
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
3836
(implicit client: KubernetesClient): Unit
3937
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
4038
(implicit client: KubernetesClient): Unit
41-
def listPodsInNameSpace(namespace: Option[String])
42-
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
39+
def getPod(namespace: Option[String], name: String)
40+
(implicit client: KubernetesClient): PodResource = {
4341
namespace match {
44-
case Some(ns) => client.pods.inNamespace(ns)
45-
case None => client.pods
42+
case Some(ns) => client.pods.inNamespace(ns).withName(name)
43+
case None => client.pods.withName(name)
4644
}
4745
}
4846
}
4947

5048
private class KillApplication extends K8sSubmitOp {
5149
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
5250
(implicit client: KubernetesClient): Unit = {
53-
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
51+
val podToDelete = getPod(namespace, pName)
5452

5553
if (Option(podToDelete).isDefined) {
5654
getGracePeriod(sparkConf) match {
@@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
6664
(implicit client: KubernetesClient): Unit = {
6765
if (pods.nonEmpty) {
6866
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
69-
val listedPods = listPodsInNameSpace(namespace)
70-
7167
getGracePeriod(sparkConf) match {
7268
case Some(period) =>
73-
// this is not using the batch api because no option is provided
74-
// when using the grace period.
75-
pods.foreach { pod =>
76-
listedPods
77-
.withName(pod.getMetadata.getName)
78-
.withGracePeriod(period)
79-
.delete()
80-
}
81-
case _ => listedPods.delete(pods.asJava)
69+
client.resourceList(pods.asJava).withGracePeriod(period).delete()
70+
case _ =>
71+
client.resourceList(pods.asJava).delete()
8272
}
8373
} else {
8474
printMessage("No applications found.")
@@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
8979
private class ListStatus extends K8sSubmitOp {
9080
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
9181
(implicit client: KubernetesClient): Unit = {
92-
val pod = listPodsInNameSpace(namespace).withName(pName).get()
82+
val pod = getPod(namespace, pName).get()
9383
if (Option(pod).isDefined) {
9484
printMessage("Application status (driver): " +
9585
Option(pod).map(formatPodState).getOrElse("unknown."))
@@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
145135
.pods
146136
}
147137
val pods = ops
138+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
148139
.list()
149140
.getItems
150141
.asScala
151142
.filter { pod =>
152-
val meta = pod.getMetadata
153-
meta.getName.startsWith(pName.stripSuffix("*")) &&
154-
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
143+
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
155144
}.toList
156145
op.executeOnGlob(pods, namespace, sparkConf)
157146
} else {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ private[spark] class Client(
149149
var watch: Watch = null
150150
var createdDriverPod: Pod = null
151151
try {
152-
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
152+
createdDriverPod =
153+
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
153154
} catch {
154155
case NonFatal(e) =>
155156
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
@@ -163,7 +164,7 @@ private[spark] class Client(
163164
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
164165
} catch {
165166
case NonFatal(e) =>
166-
kubernetesClient.pods().delete(createdDriverPod)
167+
kubernetesClient.pods().resource(createdDriverPod).delete()
167168
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
168169
throw e
169170
}
@@ -175,7 +176,7 @@ private[spark] class Client(
175176
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
176177
} catch {
177178
case NonFatal(e) =>
178-
kubernetesClient.pods().delete(createdDriverPod)
179+
kubernetesClient.pods().resource(createdDriverPod).delete()
179180
throw e
180181
}
181182

@@ -185,6 +186,7 @@ private[spark] class Client(
185186
while (true) {
186187
val podWithName = kubernetesClient
187188
.pods()
189+
.inNamespace(conf.namespace)
188190
.withName(driverPodName)
189191
// Reset resource to old before we start the watch, this is important for race conditions
190192
watcher.reset()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class ExecutorPodsAllocator(
7676

7777
val driverPod = kubernetesDriverPodName
7878
.map(name => Option(kubernetesClient.pods()
79+
.inNamespace(namespace)
7980
.withName(name)
8081
.get())
8182
.getOrElse(throw new SparkException(
@@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
112113
Utils.tryLogNonFatalError {
113114
kubernetesClient
114115
.pods()
116+
.inNamespace(namespace)
115117
.withName(pod.getMetadata.getName)
116118
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
117119
}
@@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
185187
Utils.tryLogNonFatalError {
186188
kubernetesClient
187189
.pods()
190+
.inNamespace(namespace)
188191
.withLabel(SPARK_APP_ID_LABEL, applicationId)
189192
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
190193
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
@@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
299302
Utils.tryLogNonFatalError {
300303
kubernetesClient
301304
.pods()
305+
.inNamespace(namespace)
302306
.withField("status.phase", "Pending")
303307
.withLabel(SPARK_APP_ID_LABEL, applicationId)
304308
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
@@ -363,6 +367,7 @@ class ExecutorPodsAllocator(
363367
try {
364368
val createdPVCs = kubernetesClient
365369
.persistentVolumeClaims
370+
.inNamespace(namespace)
366371
.withLabel("spark-app-selector", applicationId)
367372
.list()
368373
.getItems
@@ -406,7 +411,8 @@ class ExecutorPodsAllocator(
406411
.build()
407412
val resources = replacePVCsIfNeeded(
408413
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
409-
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
414+
val createdExecutorPod =
415+
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
410416
try {
411417
addOwnerReference(createdExecutorPod, resources)
412418
resources
@@ -418,13 +424,16 @@ class ExecutorPodsAllocator(
418424
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
419425
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
420426
s"StorageClass ${pvc.getSpec.getStorageClassName}")
421-
kubernetesClient.persistentVolumeClaims().create(pvc)
427+
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
422428
}
423429
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
424430
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
425431
} catch {
426432
case NonFatal(e) =>
427-
kubernetesClient.pods().delete(createdExecutorPod)
433+
kubernetesClient.pods()
434+
.inNamespace(namespace)
435+
.resource(createdExecutorPod)
436+
.delete()
428437
throw e
429438
}
430439
}
@@ -475,6 +484,7 @@ class ExecutorPodsAllocator(
475484
Utils.tryLogNonFatalError {
476485
kubernetesClient
477486
.pods()
487+
.inNamespace(namespace)
478488
.withLabel(SPARK_APP_ID_LABEL, applicationId)
479489
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
480490
.delete()

0 commit comments

Comments
 (0)