Skip to content

Commit 2a3aec1

Browse files
zwangshengyaooqinn
authored andcommitted
[SPARK-44906][K8S] Make Kubernetes[Driver|Executor]Conf.annotations substitute annotations instead of feature steps
### What changes were proposed in this pull request? Move `Utils. SubstituteAppNExecIds` logic into `KubernetesConf.annotations` as the default logic, ### Why are the changes needed? Easy for users to reuse, rather than to rewrite it again at the same logic. When user write custom feature step and using annotations, before this pr, they should call `Utils. SubstituteAppNExecIds` once. ### Does this PR introduce _any_ user-facing change? Yes, but no sense for user to use annotations. ### How was this patch tested? Add unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #42600 from zwangsheng/SPARK-44906. Lead-authored-by: zwangsheng <2213335496@qq.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 2d0a0a0 commit 2a3aec1

4 files changed

Lines changed: 15 additions & 9 deletions

File tree

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ private[spark] class KubernetesDriverConf(
117117

118118
override def annotations: Map[String, String] = {
119119
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
120+
.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) }
120121
}
121122

122123
def serviceLabels: Map[String, String] = {
@@ -188,6 +189,7 @@ private[spark] class KubernetesExecutorConf(
188189

189190
override def annotations: Map[String, String] = {
190191
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
192+
.map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) }
191193
}
192194

193195
override def secretNamesToMountPaths: Map[String, String] = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
143143
.editOrNewMetadata()
144144
.withName(driverPodName)
145145
.addToLabels(conf.labels.asJava)
146-
.addToAnnotations(conf.annotations.map { case (k, v) =>
147-
(k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava)
146+
.addToAnnotations(conf.annotations.asJava)
148147
.endMetadata()
149148
.editOrNewSpec()
150149
.withRestartPolicy("Never")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,12 @@ private[spark] class BasicExecutorFeatureStep(
255255
case "statefulset" => "Always"
256256
case _ => "Never"
257257
}
258-
val annotations = kubernetesConf.annotations.map { case (k, v) =>
259-
(k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, kubernetesConf.executorId))
260-
}
258+
261259
val executorPodBuilder = new PodBuilder(pod.pod)
262260
.editOrNewMetadata()
263261
.withName(name)
264262
.addToLabels(kubernetesConf.labels.asJava)
265-
.addToAnnotations(annotations.asJava)
263+
.addToAnnotations(kubernetesConf.annotations.asJava)
266264
.addToOwnerReferences(ownerReference.toSeq: _*)
267265
.endMetadata()
268266
.editOrNewSpec()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.deploy.k8s.Config._
2424
import org.apache.spark.deploy.k8s.Constants._
2525
import org.apache.spark.deploy.k8s.submit._
2626
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
27+
import org.apache.spark.util.Utils
2728

2829
class KubernetesConfSuite extends SparkFunSuite {
2930

@@ -42,7 +43,9 @@ class KubernetesConfSuite extends SparkFunSuite {
4243
"customLabel2Key" -> "customLabel2Value")
4344
private val CUSTOM_ANNOTATIONS = Map(
4445
"customAnnotation1Key" -> "customAnnotation1Value",
45-
"customAnnotation2Key" -> "customAnnotation2Value")
46+
"customAnnotation2Key" -> "customAnnotation2Value",
47+
"customAnnotation3Key" -> "{{APP_ID}}",
48+
"customAnnotation4Key" -> "{{EXECUTOR_ID}}")
4649
private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
4750
"secret1" -> "/mnt/secrets/secret1",
4851
"secret2" -> "/mnt/secrets/secret2")
@@ -93,7 +96,9 @@ class KubernetesConfSuite extends SparkFunSuite {
9396
SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
9497
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
9598
CUSTOM_LABELS)
96-
assert(conf.annotations === CUSTOM_ANNOTATIONS)
99+
assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
100+
case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, ""))
101+
})
97102
assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
98103
assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
99104
assert(conf.environment === CUSTOM_ENVS)
@@ -161,7 +166,9 @@ class KubernetesConfSuite extends SparkFunSuite {
161166
SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
162167
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
163168
SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS)
164-
assert(conf.annotations === CUSTOM_ANNOTATIONS)
169+
assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
170+
case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID))
171+
})
165172
assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
166173
assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
167174
}

0 commit comments

Comments
 (0)