Skip to content

Commit ad014e5

Browse files
committed
[SPARK-41781][K8S] Add the ability to create pvc before creating driver/executor pod
1 parent 15a0f55 commit ad014e5

10 files changed

Lines changed: 293 additions & 53 deletions

File tree

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ import io.fabric8.kubernetes.api.model.HasMetadata
2020

2121
private[spark] case class KubernetesExecutorSpec(
2222
pod: SparkPod,
23+
executorPreKubernetesResources: Seq[HasMetadata],
2324
executorKubernetesResources: Seq[HasMetadata])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.{Collections, UUID}
2323

2424
import scala.collection.JavaConverters._
2525

26-
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
26+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, PersistentVolumeClaim, Pod, PodBuilder, Quantity}
2727
import io.fabric8.kubernetes.client.KubernetesClient
2828
import org.apache.commons.codec.binary.Hex
2929
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -414,4 +414,52 @@ object KubernetesUtils extends Logging {
414414
.build()
415415
}
416416
}
417+
418+
/**
419+
* Create pre-resource in need before pod creation
420+
*/
421+
@Since("3.4.0")
422+
def createPreResource(
423+
client: KubernetesClient,
424+
resource: HasMetadata,
425+
namespace: String): Unit = {
426+
resource match {
427+
case pvc: PersistentVolumeClaim =>
428+
client.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
429+
case other =>
430+
client.resourceList(Seq(other): _*).createOrReplace()
431+
}
432+
}
433+
434+
/**
435+
* Refresh OwnerReference in the given resource
436+
* making the driver or executor pod an owner of them
437+
*/
438+
@Since("3.4.0")
439+
def refreshOwnerReferenceInResource(
440+
client: KubernetesClient,
441+
resource: HasMetadata,
442+
namespace: String,
443+
pod: Pod): Unit = {
444+
resource match {
445+
case pvc: PersistentVolumeClaim =>
446+
val createdPVC =
447+
client
448+
.persistentVolumeClaims()
449+
.inNamespace(namespace)
450+
.withName(pvc.getMetadata.getName)
451+
.get()
452+
addOwnerReference(pod, Seq(createdPVC))
453+
logDebug(s"Trying to refresh PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
454+
s"OwnerReference ${pvc.getMetadata.getOwnerReferences}")
455+
client
456+
.persistentVolumeClaims()
457+
.inNamespace(namespace)
458+
.withName(pvc.getMetadata.getName)
459+
.patch(pvc)
460+
case other =>
461+
addOwnerReference(pod, Seq(other))
462+
client.resourceList(Seq(other): _*).createOrReplace()
463+
}
464+
}
417465
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
2929
extends KubernetesFeatureConfigStep {
3030
import MountVolumesFeatureStep._
3131

32-
val additionalResources = ArrayBuffer.empty[HasMetadata]
32+
val additionalPreResources = ArrayBuffer.empty[HasMetadata]
3333

3434
override def configurePod(pod: SparkPod): SparkPod = {
3535
val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip
@@ -82,7 +82,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
8282
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
8383
}
8484
if (storageClass.isDefined && size.isDefined) {
85-
additionalResources.append(new PersistentVolumeClaimBuilder()
85+
additionalPreResources.append(new PersistentVolumeClaimBuilder()
8686
.withKind(PVC)
8787
.withApiVersion("v1")
8888
.withNewMetadata()
@@ -119,8 +119,8 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
119119
}
120120
}
121121

122-
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
123-
additionalResources.toSeq
122+
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
123+
additionalPreResources.toSeq
124124
}
125125

126126
private def checkPVCClaimName(claimName: String): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkApplication
3030
import org.apache.spark.deploy.k8s._
3131
import org.apache.spark.deploy.k8s.Config._
3232
import org.apache.spark.deploy.k8s.Constants._
33-
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
33+
import org.apache.spark.deploy.k8s.KubernetesUtils.{addOwnerReference, createPreResource, refreshOwnerReferenceInResource}
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.util.Utils
3636

@@ -136,14 +136,16 @@ private[spark] class Client(
136136

137137
// setup resources before pod creation
138138
val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
139-
try {
140-
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
141-
} catch {
142-
case NonFatal(e) =>
143-
logError("Please check \"kubectl auth can-i create [resource]\" first." +
144-
" It should be yes. And please also check your feature step implementation.")
145-
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
146-
throw e
139+
preKubernetesResources.foreach { resource =>
140+
try {
141+
createPreResource(kubernetesClient, resource, conf.namespace)
142+
} catch {
143+
case NonFatal(e) =>
144+
logError("Please check \"kubectl auth can-i create [resource]\" first." +
145+
" It should be yes. And please also check feature step implementation.")
146+
kubernetesClient.resourceList(Seq(resource): _*).delete()
147+
throw e
148+
}
147149
}
148150

149151
var watch: Watch = null
@@ -159,14 +161,16 @@ private[spark] class Client(
159161
}
160162

161163
// Refresh all pre-resources' owner references
162-
try {
163-
addOwnerReference(createdDriverPod, preKubernetesResources)
164-
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
165-
} catch {
166-
case NonFatal(e) =>
167-
kubernetesClient.pods().resource(createdDriverPod).delete()
168-
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
169-
throw e
164+
preKubernetesResources.foreach { resource =>
165+
try {
166+
refreshOwnerReferenceInResource(kubernetesClient, resource, conf.namespace,
167+
createdDriverPod)
168+
} catch {
169+
case NonFatal(e) =>
170+
kubernetesClient.pods().resource(createdDriverPod).delete()
171+
kubernetesClient.resourceList(Seq(resource): _*).delete()
172+
throw e
173+
}
170174
}
171175

172176
// setup resources after pod creation, and refresh all resources' owner references

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3131
import org.apache.spark.deploy.k8s.Config._
3232
import org.apache.spark.deploy.k8s.Constants._
3333
import org.apache.spark.deploy.k8s.KubernetesConf
34-
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
34+
import org.apache.spark.deploy.k8s.KubernetesUtils.{createPreResource, refreshOwnerReferenceInResource}
3535
import org.apache.spark.internal.Logging
3636
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
3737
import org.apache.spark.resource.ResourceProfile
@@ -434,34 +434,59 @@ class ExecutorPodsAllocator(
434434
.addToContainers(executorPod.container)
435435
.endSpec()
436436
.build()
437-
val resources = replacePVCsIfNeeded(
438-
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
439-
val createdExecutorPod =
440-
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
441-
try {
442-
addOwnerReference(createdExecutorPod, resources)
443-
resources
444-
.filter(_.getKind == "PersistentVolumeClaim")
445-
.foreach { resource =>
446-
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
447-
addOwnerReference(driverPod.get, Seq(resource))
448-
}
449-
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
450-
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
451-
s"StorageClass ${pvc.getSpec.getStorageClassName}")
452-
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
437+
val preResources = replacePVCsIfNeeded(
438+
podWithAttachedContainer, resolvedExecutorSpec.executorPreKubernetesResources, reusablePVCs)
439+
440+
preResources.foreach { resource =>
441+
try {
442+
createPreResource(kubernetesClient, resource, namespace)
443+
if (resource.isInstanceOf[PersistentVolumeClaim]) {
453444
PVC_COUNTER.incrementAndGet()
454445
}
446+
} catch {
447+
case NonFatal(e) =>
448+
logError("Please check \"kubectl auth can-i create [resource]\" first." +
449+
" It should be yes. And please also check feature step implementation.")
450+
kubernetesClient.resourceList(Seq(resource): _*).delete()
451+
throw e
452+
}
453+
}
454+
455+
var createdExecutorPod: Pod = null
456+
try {
457+
createdExecutorPod =
458+
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
455459
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
456460
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
457461
} catch {
458462
case NonFatal(e) =>
459-
kubernetesClient.pods()
460-
.inNamespace(namespace)
461-
.resource(createdExecutorPod)
462-
.delete()
463+
kubernetesClient.resourceList(preResources: _*).delete()
464+
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
463465
throw e
464466
}
467+
468+
// Refresh all pre-resources' owner references
469+
preResources.foreach { resource =>
470+
try {
471+
if (resource.isInstanceOf[PersistentVolumeClaim]) {
472+
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
473+
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
474+
driverPod.get)
475+
} else {
476+
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
477+
createdExecutorPod)
478+
}
479+
} else {
480+
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
481+
createdExecutorPod)
482+
}
483+
} catch {
484+
case NonFatal(e) =>
485+
kubernetesClient.pods().inNamespace(namespace).resource(createdExecutorPod).delete()
486+
kubernetesClient.resourceList(Seq(resource): _*).delete()
487+
throw e
488+
}
489+
}
465490
}
466491
}
467492

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,18 @@ private[spark] class KubernetesExecutorBuilder {
7575

7676
val spec = KubernetesExecutorSpec(
7777
initialPod,
78+
executorPreKubernetesResources = Seq.empty,
7879
executorKubernetesResources = Seq.empty)
7980

8081
// If using a template this will always get the resources from that and combine
8182
// them with any Spark conf or ResourceProfile resources.
8283
features.foldLeft(spec) { case (spec, feature) =>
8384
val configuredPod = feature.configurePod(spec.pod)
85+
val addedPreResources = feature.getAdditionalPreKubernetesResources()
8486
val addedResources = feature.getAdditionalKubernetesResources()
8587
KubernetesExecutorSpec(
8688
configuredPod,
89+
spec.executorPreKubernetesResources ++ addedPreResources,
8790
spec.executorKubernetesResources ++ addedResources)
8891
}
8992
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ class StatefulSetPodsAllocator(
127127
val meta = executorPod.pod.getMetadata()
128128

129129
// Resources that need to be created, volumes are per-pod which is all we care about here.
130-
val resources = resolvedExecutorSpec.executorKubernetesResources
130+
val preResources = resolvedExecutorSpec.executorPreKubernetesResources
131131
// We'll let PVCs be handled by the statefulset. Note user is responsible for
132132
// cleaning up PVCs. Future work: integrate with KEP1847 once stabilized.
133-
val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim")
133+
val dynamicVolumeClaims = preResources.filter(_.getKind == "PersistentVolumeClaim")
134134
.map(_.asInstanceOf[PersistentVolumeClaim])
135135
// Remove the dynamic volumes from our pod
136136
val dynamicVolumeClaimNames: Set[String] = dynamicVolumeClaims.map(_.getMetadata().getName())

0 commit comments

Comments
 (0)