-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41410][K8S] Support PVC-oriented executor pod allocation #38943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -33,8 +33,9 @@ import org.apache.spark.deploy.k8s.Constants._ | |||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.deploy.k8s.KubernetesConf | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.internal.Logging | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES} | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.resource.ResourceProfile | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS | ||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.util.{Clock, Utils} | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| class ExecutorPodsAllocator( | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -47,6 +48,17 @@ class ExecutorPodsAllocator( | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| private val EXECUTOR_ID_COUNTER = new AtomicInteger(0) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| private val PVC_COUNTER = new AtomicInteger(0) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| private val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) { | ||||||||||||||||||||||||||||||||||||||||
| conf.get(DYN_ALLOCATION_MAX_EXECUTORS) | ||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||
| conf.getInt(EXECUTOR_INSTANCES.key, DEFAULT_NUMBER_EXECUTORS) | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| private val reusePVC = conf.get(KUBERNETES_DRIVER_OWN_PVC) && | ||||||||||||||||||||||||||||||||||||||||
| conf.get(KUBERNETES_DRIVER_REUSE_PVC) && conf.get(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| // ResourceProfile id -> total expected executors per profile, currently we don't remove | ||||||||||||||||||||||||||||||||||||||||
| // any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749 | ||||||||||||||||||||||||||||||||||||||||
| private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]() | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -398,6 +410,10 @@ class ExecutorPodsAllocator( | |||||||||||||||||||||||||||||||||||||||
| // Check reusable PVCs for this executor allocation batch | ||||||||||||||||||||||||||||||||||||||||
| val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) | ||||||||||||||||||||||||||||||||||||||||
| for ( _ <- 0 until numExecutorsToAllocate) { | ||||||||||||||||||||||||||||||||||||||||
| if (reusablePVCs.isEmpty && reusePVC && maxPVCs <= PVC_COUNTER.get()) { | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
| private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = { | |
| if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) && | |
| driverPod.nonEmpty) { | |
| try { | |
| val createdPVCs = kubernetesClient | |
| .persistentVolumeClaims | |
| .inNamespace(namespace) | |
| .withLabel("spark-app-selector", applicationId) | |
| .list() | |
| .getItems | |
| .asScala | |
| val now = Instant.now().toEpochMilli | |
| val reusablePVCs = createdPVCs | |
| .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) | |
| .filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli | |
| > podAllocationDelay) | |
| logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") | |
| reusablePVCs |
Also, previously, Spark creates new pod and PVCs when some executors are dead. In that case, PVCs could be created a little more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So even maxPVCs <= PVC_COUNTER.get(), if reusablePVCs is not empty, the driver will continue executor pod allocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct! When we have reusablePVCs, PVC-oriented executor pod allocation doesn't need to be blocked. We halts executor allocation only when there is no available PVCs and reached PVC_COUNTER is greater than or equal to the maximum .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to mention
PVC-oriented executor pod allocationin the config description? I think it is more clear on what this feature is.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, initially, I tried to use it as a config name but
PVC-oriented executor pod allocationwas achieved by three configurations.I'll add a K8s document section with that name.