Skip to content

Commit 571a6f0

Browse files
committed
[SPARK-23146][K8S] Support client mode.
## What changes were proposed in this pull request? Support client mode for the Kubernetes scheduler. Client mode works more or less identically to cluster mode. However, in client mode, the Spark Context needs to be manually bootstrapped with certain properties which would have otherwise been set up by spark-submit in cluster mode. Specifically: - If the user doesn't provide a driver pod name, we don't add an owner reference. This is for usage when the driver is not running in a pod in the cluster. In such a case, the driver can only provide a best effort to clean up the executors when the driver exits, but cleaning up the resources is not guaranteed. The executor JVMs should exit if the driver JVM exits, but the pods will still remain in the cluster in a COMPLETED or FAILED state. - The user must provide a host (spark.driver.host) and port (spark.driver.port) that the executors can connect to. When using spark-submit in cluster mode, spark-submit generates the headless service automatically; in client mode, the user is responsible for setting up their own connectivity. We also change the authentication configuration prefixes for client mode. ## How was this patch tested? Adding an integration test to exercise client mode support. Author: mcheah <mcheah@palantir.com> Closes #21748 from mccheah/k8s-client-mode.
1 parent c44eb56 commit 571a6f0

17 files changed

Lines changed: 290 additions & 76 deletions

File tree

docs/running-on-kubernetes.md

Lines changed: 110 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0.
117117
spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`.
118118
This URI is the location of the example jar that is already in the Docker image.
119119

120+
## Client Mode
121+
122+
Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application
123+
runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode,
124+
it is recommended to account for the following factors:
125+
126+
### Client Mode Networking
127+
128+
Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark
129+
executors. The specific network configuration that will be required for Spark to work in client mode will vary per
130+
setup. If you run your driver inside a Kubernetes pod, you can use a
131+
[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your
132+
driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that
133+
the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver
134+
pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's
135+
hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`.
136+
137+
### Client Mode Executor Pod Garbage Collection
138+
139+
If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod.
140+
When this property is set, the Spark scheduler will deploy the executor pods with an
141+
[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will
142+
ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted.
143+
The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and
144+
an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid
145+
setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated
146+
prematurely when the wrong pod is deleted.
147+
148+
If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is
149+
actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the
150+
application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails
151+
for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the
152+
driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application
153+
exits.
154+
155+
### Authentication Parameters
156+
157+
Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode.
158+
120159
## Dependency Management
121160

122161
If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to
@@ -258,10 +297,6 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl
258297
[Using RBAC Authorization](https://kubernetes.io/docs/admin/authorization/rbac/) and
259298
[Configure Service Accounts for Pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/).
260299

261-
## Client Mode
262-
263-
Client mode is not currently supported.
264-
265300
## Future Work
266301

267302
There are several Spark on Kubernetes features that are currently being incubated in a fork -
@@ -354,7 +389,7 @@ specific to Spark on Kubernetes.
354389
<td>
355390
Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file
356391
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
357-
a scheme).
392+
a scheme). In client mode, use <code>spark.kubernetes.authenticate.caCertFile</code> instead.
358393
</td>
359394
</tr>
360395
<tr>
@@ -363,7 +398,7 @@ specific to Spark on Kubernetes.
363398
<td>
364399
Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file
365400
must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide
366-
a scheme).
401+
a scheme). In client mode, use <code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
367402
</td>
368403
</tr>
369404
<tr>
@@ -372,7 +407,7 @@ specific to Spark on Kubernetes.
372407
<td>
373408
Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This
374409
file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not
375-
provide a scheme).
410+
provide a scheme). In client mode, use <code>spark.kubernetes.authenticate.clientCertFile</code> instead.
376411
</td>
377412
</tr>
378413
<tr>
@@ -381,7 +416,7 @@ specific to Spark on Kubernetes.
381416
<td>
382417
OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note
383418
that unlike the other authentication options, this is expected to be the exact string value of the token to use for
384-
the authentication.
419+
the authentication. In client mode, use <code>spark.kubernetes.authenticate.oauthToken</code> instead.
385420
</td>
386421
</tr>
387422
<tr>
@@ -390,7 +425,7 @@ specific to Spark on Kubernetes.
390425
<td>
391426
Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server when starting the driver.
392427
This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not
393-
provide a scheme).
428+
provide a scheme). In client mode, use <code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
394429
</td>
395430
</tr>
396431
<tr>
@@ -399,18 +434,18 @@ specific to Spark on Kubernetes.
399434
<td>
400435
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting
401436
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
402-
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
437+
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
438+
<code>spark.kubernetes.authenticate.caCertFile</code> instead.
403439
</td>
404440
</tr>
405441
<tr>
406442
<td><code>spark.kubernetes.authenticate.driver.clientKeyFile</code></td>
407443
<td>(none)</td>
408444
<td>
409445
Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting
410-
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod.
411-
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly
412-
recommended to set up TLS for the driver submission server, as this value is sensitive information that would be
413-
passed to the driver pod in plaintext otherwise.
446+
executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod as
447+
a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
448+
In client mode, use <code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
414449
</td>
415450
</tr>
416451
<tr>
@@ -419,7 +454,8 @@ specific to Spark on Kubernetes.
419454
<td>
420455
Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when
421456
requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the
422-
driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
457+
driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
458+
In client mode, use <code>spark.kubernetes.authenticate.clientCertFile</code> instead.
423459
</td>
424460
</tr>
425461
<tr>
@@ -428,9 +464,8 @@ specific to Spark on Kubernetes.
428464
<td>
429465
OAuth token to use when authenticating against the Kubernetes API server from the driver pod when
430466
requesting executors. Note that unlike the other authentication options, this must be the exact string value of
431-
the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is
432-
highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would
433-
be passed to the driver pod in plaintext otherwise.
467+
the token to use for the authentication. This token value is uploaded to the driver pod as a Kubernetes secret.
468+
In client mode, use <code>spark.kubernetes.authenticate.oauthToken</code> instead.
434469
</td>
435470
</tr>
436471
<tr>
@@ -439,9 +474,8 @@ specific to Spark on Kubernetes.
439474
<td>
440475
Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server from the driver pod when
441476
requesting executors. Note that unlike the other authentication options, this file must contain the exact string value of
442-
the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is
443-
highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would
444-
be passed to the driver pod in plaintext otherwise.
477+
the token to use for the authentication. This token value is uploaded to the driver pod as a secret. In client mode, use
478+
<code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
445479
</td>
446480
</tr>
447481
<tr>
@@ -450,7 +484,8 @@ specific to Spark on Kubernetes.
450484
<td>
451485
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting
452486
executors. This path must be accessible from the driver pod.
453-
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
487+
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
488+
<code>spark.kubernetes.authenticate.caCertFile</code> instead.
454489
</td>
455490
</tr>
456491
<tr>
@@ -459,7 +494,8 @@ specific to Spark on Kubernetes.
459494
<td>
460495
Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting
461496
executors. This path must be accessible from the driver pod.
462-
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
497+
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
498+
<code>spark.kubernetes.authenticate.clientKeyFile</code> instead.
463499
</td>
464500
</tr>
465501
<tr>
@@ -468,7 +504,8 @@ specific to Spark on Kubernetes.
468504
<td>
469505
Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when
470506
requesting executors. This path must be accessible from the driver pod.
471-
Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
507+
Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use
508+
<code>spark.kubernetes.authenticate.clientCertFile</code> instead.
472509
</td>
473510
</tr>
474511
<tr>
@@ -477,7 +514,8 @@ specific to Spark on Kubernetes.
477514
<td>
478515
Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server from the driver pod when
479516
requesting executors. This path must be accessible from the driver pod.
480-
Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication.
517+
Note that unlike the other authentication options, this file must contain the exact string value of the token to use
518+
for the authentication. In client mode, use <code>spark.kubernetes.authenticate.oauthTokenFile</code> instead.
481519
</td>
482520
</tr>
483521
<tr>
@@ -486,7 +524,48 @@ specific to Spark on Kubernetes.
486524
<td>
487525
Service account that is used when running the driver pod. The driver pod uses this service account when requesting
488526
executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file,
489-
client cert file, and/or OAuth token.
527+
client cert file, and/or OAuth token. In client mode, use <code>spark.kubernetes.authenticate.serviceAccountName</code> instead.
528+
</td>
529+
</tr>
530+
<tr>
531+
<td><code>spark.kubernetes.authenticate.caCertFile</code></td>
532+
<td>(none)</td>
533+
<td>
534+
In client mode, path to the CA cert file for connecting to the Kubernetes API server over TLS when
535+
requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
536+
</td>
537+
</tr>
538+
<tr>
539+
<td><code>spark.kubernetes.authenticate.clientKeyFile</code></td>
540+
<td>(none)</td>
541+
<td>
542+
In client mode, path to the client key file for authenticating against the Kubernetes API server
543+
when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
544+
</td>
545+
</tr>
546+
<tr>
547+
<td><code>spark.kubernetes.authenticate.clientCertFile</code></td>
548+
<td>(none)</td>
549+
<td>
550+
In client mode, path to the client cert file for authenticating against the Kubernetes API server
551+
when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).
552+
</td>
553+
</tr>
554+
<tr>
555+
<td><code>spark.kubernetes.authenticate.oauthToken</code></td>
556+
<td>(none)</td>
557+
<td>
558+
In client mode, the OAuth token to use when authenticating against the Kubernetes API server when
559+
requesting executors. Note that unlike the other authentication options, this must be the exact string value of
560+
the token to use for the authentication.
561+
</td>
562+
</tr>
563+
<tr>
564+
<td><code>spark.kubernetes.authenticate.oauthTokenFile</code></td>
565+
<td>(none)</td>
566+
<td>
567+
In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API
568+
server when requesting executors.
490569
</td>
491570
</tr>
492571
<tr>
@@ -529,8 +608,11 @@ specific to Spark on Kubernetes.
529608
<td><code>spark.kubernetes.driver.pod.name</code></td>
530609
<td>(none)</td>
531610
<td>
532-
Name of the driver pod. If not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp
533-
to avoid name conflicts.
611+
Name of the driver pod. In cluster mode, if this is not set, the driver pod name is set to "spark.app.name"
612+
suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running
613+
inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this
614+
value in client mode allows the driver to become the owner of its executor pods, which in turn allows the executor
615+
pods to be garbage collected by the cluster.
534616
</td>
535617
</tr>
536618
<tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ private[spark] object Config extends Logging {
6565
"spark.kubernetes.authenticate.driver"
6666
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
6767
"spark.kubernetes.authenticate.driver.mounted"
68+
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
6869
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
6970
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
7071
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
@@ -90,7 +91,7 @@ private[spark] object Config extends Logging {
9091
ConfigBuilder("spark.kubernetes.submitInDriver")
9192
.internal()
9293
.booleanConf
93-
.createOptional
94+
.createWithDefault(false)
9495

9596
val KUBERNETES_EXECUTOR_LIMIT_CORES =
9697
ConfigBuilder("spark.kubernetes.executor.limit.cores")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] case class KubernetesDriverSpecificConf(
4343
*/
4444
private[spark] case class KubernetesExecutorSpecificConf(
4545
executorId: String,
46-
driverPod: Pod)
46+
driverPod: Option[Pod])
4747
extends KubernetesRoleSpecificConf
4848

4949
/**
@@ -186,7 +186,7 @@ private[spark] object KubernetesConf {
186186
sparkConf: SparkConf,
187187
executorId: String,
188188
appId: String,
189-
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
189+
driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = {
190190
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
191191
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
192192
require(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ private[spark] object KubernetesUtils {
5858
case _ => uri
5959
}
6060
}
61+
62+
def parseMasterUrl(url: String): String = url.substring("k8s://".length)
6163
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,20 @@ private[spark] class BasicExecutorFeatureStep(
152152
.build()
153153
}.getOrElse(executorContainer)
154154
val driverPod = kubernetesConf.roleSpecificConf.driverPod
155+
val ownerReference = driverPod.map(pod =>
156+
new OwnerReferenceBuilder()
157+
.withController(true)
158+
.withApiVersion(pod.getApiVersion)
159+
.withKind(pod.getKind)
160+
.withName(pod.getMetadata.getName)
161+
.withUid(pod.getMetadata.getUid)
162+
.build())
155163
val executorPod = new PodBuilder(pod.pod)
156164
.editOrNewMetadata()
157165
.withName(name)
158166
.withLabels(kubernetesConf.roleLabels.asJava)
159167
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
160-
.withOwnerReferences()
161-
.addNewOwnerReference()
162-
.withController(true)
163-
.withApiVersion(driverPod.getApiVersion)
164-
.withKind(driverPod.getKind)
165-
.withName(driverPod.getMetadata.getName)
166-
.withUid(driverPod.getMetadata.getUid)
167-
.endOwnerReference()
168+
.addToOwnerReferences(ownerReference.toSeq: _*)
168169
.endMetadata()
169170
.editOrNewSpec()
170171
.withHostname(hostname)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.spark.SparkConf
2929
import org.apache.spark.deploy.SparkApplication
30-
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory}
30+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkKubernetesClientFactory}
3131
import org.apache.spark.deploy.k8s.Config._
3232
import org.apache.spark.deploy.k8s.Constants._
3333
import org.apache.spark.internal.Logging
@@ -228,7 +228,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
228228
val namespace = kubernetesConf.namespace()
229229
// The master URL has been checked for validity already in SparkSubmit.
230230
// We just need to get rid of the "k8s://" prefix here.
231-
val master = sparkConf.get("spark.master").substring("k8s://".length)
231+
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
232232
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
233233

234234
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,18 @@ private[spark] class ExecutorPodsAllocator(
4646

4747
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)
4848

49+
private val namespace = conf.get(KUBERNETES_NAMESPACE)
50+
4951
private val kubernetesDriverPodName = conf
5052
.get(KUBERNETES_DRIVER_POD_NAME)
51-
.getOrElse(throw new SparkException("Must specify the driver pod name"))
5253

53-
private val driverPod = kubernetesClient.pods()
54-
.withName(kubernetesDriverPodName)
55-
.get()
54+
private val driverPod = kubernetesDriverPodName
55+
.map(name => Option(kubernetesClient.pods()
56+
.withName(name)
57+
.get())
58+
.getOrElse(throw new SparkException(
59+
s"No pod was found named $kubernetesDriverPodName in the cluster in the " +
60+
s"namespace $namespace (this was supposed to be the driver pod.).")))
5661

5762
// Executor IDs that have been requested from Kubernetes but have not been detected in any
5863
// snapshot yet. Mapped to the timestamp when they were created.

0 commit comments

Comments
 (0)