diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 524ab0c845c6..d8ae910b1aec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -373,7 +373,11 @@ class ExecutorPodsAllocator( .getItems .asScala - val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + val now = Instant.now().toEpochMilli + val reusablePVCs = createdPVCs + .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + .filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli + > podAllocationDelay) logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") reusablePVCs } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index caec9ef92012..c526bf0968e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant +import java.time.temporal.ChronoUnit.MILLIS import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -721,8 +722,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .set(s"$prefix.option.sizeLimit", "200Gi") .set(s"$prefix.option.storageClass", "gp2") - when(persistentVolumeClaimList.getItems) - .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava) + val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi") + pvc.getMetadata + .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, MILLIS).toString) + when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), meq(kubernetesClient), any(classOf[ResourceProfile]))) .thenAnswer((invocation: InvocationOnMock) => { @@ -791,6 +794,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String]) } + test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in the previous batch") { + val getReusablePVCs = + PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs")) + + val pvc1 = persistentVolumeClaim("pvc-0", "gp2", "200Gi") + val pvc2 = persistentVolumeClaim("pvc-1", "gp2", "200Gi") + + val now = Instant.now() + pvc1.getMetadata.setCreationTimestamp(now.minus(2 * podAllocationDelay, MILLIS).toString) + pvc2.getMetadata.setCreationTimestamp(now.toString) + + when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc1, pvc2).asJava) + podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1")) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)