Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,11 @@ class ExecutorPodsAllocator(
.getItems
.asScala

val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
val now = Instant.now().toEpochMilli
val reusablePVCs = createdPVCs
.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
.filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
> podAllocationDelay)
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
reusablePVCs
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.time.temporal.ChronoUnit.MILLIS
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -721,8 +722,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.set(s"$prefix.option.sizeLimit", "200Gi")
.set(s"$prefix.option.storageClass", "gp2")

when(persistentVolumeClaimList.getItems)
.thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
pvc.getMetadata
.setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, MILLIS).toString)
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a previous test case for SPARK-35416: Support PersistentVolumeClaim Reuse.

Since our test framework does't fill CreationTimestamp, this PR added it properly.

.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
.endMetadata()

when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile])))
.thenAnswer((invocation: InvocationOnMock) => {
Expand Down Expand Up @@ -791,6 +794,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String])
}

test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in the previous batch") {
val getReusablePVCs =
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))

val pvc1 = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
val pvc2 = persistentVolumeClaim("pvc-1", "gp2", "200Gi")

val now = Instant.now()
pvc1.getMetadata.setCreationTimestamp(now.minus(2 * podAllocationDelay, MILLIS).toString)
pvc2.getMetadata.setCreationTimestamp(now.toString)

when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc1, pvc2).asJava)
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1"))
}

private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
Expand Down