Skip to content

Commit b065c94

Browse files
dcoliversundongjoon-hyun
authored andcommitted
[SPARK-39006][K8S] Show a directional error message for executor PVC dynamic allocation failure
### What changes were proposed in this pull request? This PR aims to show a directional error message for executor PVC dynamic allocation failure. ### Why are the changes needed? apache#29846 supports dynamic PVC creation/deletion for K8s executors. apache#29557 support execId placeholder in executor PVC conf. If not set `spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName` with `onDemand` or `SPARK_EXECUTOR_ID`, spark will continue to try to create the executor pod. After this PR, spark can show a directional error message for this situation. ```plain ERROR ExecutorPodsSnapshotsStoreImpl: Going to stop due to IllegalArgumentException java.lang.IllegalArgumentException: PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID when multiple executors are required ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test. Closes apache#36374 from dcoliversun/SPARK-39006. Authored-by: Qian.Sun <qian.sun2020@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent fdcbc8c commit b065c94

2 files changed

Lines changed: 54 additions & 1 deletion

File tree

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._
2323

2424
import org.apache.spark.deploy.k8s._
2525
import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL}
26+
import org.apache.spark.internal.config.EXECUTOR_INSTANCES
2627

2728
private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
2829
extends KubernetesFeatureConfigStep {
@@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
7172
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
7273
val claimName = conf match {
7374
case c: KubernetesExecutorConf =>
75+
checkPVCClaimName(claimNameTemplate)
7476
claimNameTemplate
7577
.replaceAll(PVC_ON_DEMAND,
7678
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
@@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
120122
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
121123
additionalResources.toSeq
122124
}
125+
126+
private def checkPVCClaimName(claimName: String): Unit = {
127+
val executorInstances = conf.get(EXECUTOR_INSTANCES)
128+
if (executorInstances.isDefined && executorInstances.get > 1) {
129+
// PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID
130+
// when requiring multiple executors.
131+
// Else, spark continues to try to create the executor pod.
132+
if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) {
133+
throw new IllegalArgumentException(s"PVC ClaimName: $claimName " +
134+
s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " +
135+
"when requiring multiple executors")
136+
}
137+
}
138+
}
123139
}
124140

125141
private[spark] object MountVolumesFeatureStep {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.features
1818

19+
import java.util.UUID
20+
1921
import scala.collection.JavaConverters._
2022

21-
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
2224
import org.apache.spark.deploy.k8s._
25+
import org.apache.spark.internal.config.EXECUTOR_INSTANCES
2326

2427
class MountVolumesFeatureStepSuite extends SparkFunSuite {
2528
test("Mounts hostPath volumes") {
@@ -148,6 +151,40 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
148151
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
149152
}
150153

154+
test("SPARK-39006: Check PVC ClaimName") {
155+
val claimName = s"pvc-${UUID.randomUUID().toString}"
156+
val volumeConf = KubernetesVolumeSpec(
157+
"testVolume",
158+
"/tmp",
159+
"",
160+
mountReadOnly = true,
161+
KubernetesPVCVolumeConf(claimName)
162+
)
163+
// Create pvc without specified claimName unsuccessfully when requiring multiple executors
164+
val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2)
165+
var executorConf =
166+
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
167+
var executorStep = new MountVolumesFeatureStep(executorConf)
168+
assertThrows[IllegalArgumentException] {
169+
executorStep.configurePod(SparkPod.initialPod())
170+
}
171+
assert(intercept[IllegalArgumentException] {
172+
executorStep.configurePod(SparkPod.initialPod())
173+
}.getMessage.equals(s"PVC ClaimName: $claimName " +
174+
"should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors"))
175+
176+
// Create and mount pvc with any claimName successfully when requiring one executor
177+
conf.set(EXECUTOR_INSTANCES, 1)
178+
executorConf =
179+
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
180+
executorStep = new MountVolumesFeatureStep(executorConf)
181+
val executorPod = executorStep.configurePod(SparkPod.initialPod())
182+
183+
assert(executorPod.pod.getSpec.getVolumes.size() === 1)
184+
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
185+
assert(executorPVC.getClaimName.equals(claimName))
186+
}
187+
151188
test("Mounts emptyDir") {
152189
val volumeConf = KubernetesVolumeSpec(
153190
"testVolume",

0 commit comments

Comments
 (0)