From 856cbbed94af44216a22f10c63fd45c0e8529770 Mon Sep 17 00:00:00 2001 From: "Qian.Sun" Date: Wed, 27 Apr 2022 17:17:27 +0800 Subject: [PATCH 1/2] [SPARK-39006][K8S] Show a directional error message for PVC Dynamic Allocation Failure --- .../features/MountVolumesFeatureStep.scala | 16 +++++++++++++ .../MountVolumesFeatureStepSuite.scala | 23 ++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 78dd6ec21ed3..f681d359c08a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL} +import org.apache.spark.internal.config.EXECUTOR_INSTANCES private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { @@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => val claimName = conf match { case c: KubernetesExecutorConf => + checkPVCClaimNameWhenMultiExecutors(claimNameTemplate) claimNameTemplate .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i") @@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { additionalResources.toSeq } + + private def checkPVCClaimNameWhenMultiExecutors(claimName: String): Unit = { + val invalidClaimName = + if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) true + else false + + val executorInstances = conf.get(EXECUTOR_INSTANCES) + if (executorInstances.isEmpty) return + if (invalidClaimName && executorInstances.get > 1) { + throw new IllegalArgumentException("PVC ClaimName should contain " + + PVC_ON_DEMAND + " or " + ENV_EXECUTOR_ID + + " when multiple executors are required") + } + } } private[spark] object MountVolumesFeatureStep { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 468d1dde9fb6..2500bbd95e2c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.internal.config.EXECUTOR_INSTANCES class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts hostPath volumes") { @@ -148,6 +149,26 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) } + test("SPARK-39006 Show a directional error message for PVC Dynamic Allocation Failure") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + mountReadOnly = true, + KubernetesPVCVolumeConf("testClaimName") + ) + val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2) + val executorConf = + KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + assertThrows[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + } + assert(intercept[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + }.getMessage.contains("PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID")) + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume", From 5ed2a175a367b6270268ac8da1affd69116d5997 Mon Sep 17 00:00:00 2001 From: "Qian.Sun" Date: Fri, 29 Apr 2022 17:35:34 +0800 Subject: [PATCH 2/2] address comments --- .../features/MountVolumesFeatureStep.scala | 22 ++++++++-------- .../MountVolumesFeatureStepSuite.scala | 26 +++++++++++++++---- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index f681d359c08a..d47024ca9fe0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -72,7 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => val claimName = conf match { case c: KubernetesExecutorConf => - checkPVCClaimNameWhenMultiExecutors(claimNameTemplate) + checkPVCClaimName(claimNameTemplate) claimNameTemplate .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i") @@ -123,17 +123,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) additionalResources.toSeq } - private def checkPVCClaimNameWhenMultiExecutors(claimName: String): Unit = { - val invalidClaimName = - if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) true - else false - + private def checkPVCClaimName(claimName: String): Unit = { val executorInstances = conf.get(EXECUTOR_INSTANCES) - if (executorInstances.isEmpty) return - if (invalidClaimName && executorInstances.get > 1) { - throw new IllegalArgumentException("PVC ClaimName should contain " + - PVC_ON_DEMAND + " or " + ENV_EXECUTOR_ID + - " when multiple executors are required") + if (executorInstances.isDefined && executorInstances.get > 1) { + // PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID + // when requiring multiple executors. + // Else, spark continues to try to create the executor pod. + if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) { + throw new IllegalArgumentException(s"PVC ClaimName: $claimName " + + s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " + + "when requiring multiple executors") + } } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 2500bbd95e2c..e428e54d661b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.features +import java.util.UUID + import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} @@ -149,24 +151,38 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) } - test("SPARK-39006 Show a directional error message for PVC Dynamic Allocation Failure") { + test("SPARK-39006: Check PVC ClaimName") { + val claimName = s"pvc-${UUID.randomUUID().toString}" val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", "", mountReadOnly = true, - KubernetesPVCVolumeConf("testClaimName") + KubernetesPVCVolumeConf(claimName) ) + // Create pvc without specified claimName unsuccessfully when requiring multiple executors val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2) - val executorConf = + var executorConf = KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) - val executorStep = new MountVolumesFeatureStep(executorConf) + var executorStep = new MountVolumesFeatureStep(executorConf) assertThrows[IllegalArgumentException] { executorStep.configurePod(SparkPod.initialPod()) } assert(intercept[IllegalArgumentException] { executorStep.configurePod(SparkPod.initialPod()) - }.getMessage.contains("PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID")) + }.getMessage.equals(s"PVC ClaimName: $claimName " + + "should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors")) + + // Create and mount pvc with any claimName successfully when requiring one executor + conf.set(EXECUTOR_INSTANCES, 1) + executorConf = + KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) + executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName.equals(claimName)) } test("Mounts emptyDir") {