1616 */
1717package org .apache .spark .scheduler .cluster .k8s
1818
19+ import java .net .InetAddress
1920import java .util .concurrent .{ScheduledExecutorService , TimeUnit }
2021import java .util .concurrent .atomic .AtomicInteger
2122
@@ -35,8 +36,7 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager
3536import org .apache .spark .internal .config .SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
3637import org .apache .spark .resource .ResourceProfile
3738import org .apache .spark .rpc .{RpcAddress , RpcCallContext }
38- import org .apache .spark .scheduler .{ExecutorDecommissionInfo , ExecutorKilled , ExecutorLossReason ,
39- TaskSchedulerImpl }
39+ import org .apache .spark .scheduler .{ExecutorDecommissionInfo , ExecutorKilled , ExecutorLossReason , TaskSchedulerImpl }
4040import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend , SchedulerBackendUtils }
4141import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .RegisterExecutor
4242import org .apache .spark .util .{ThreadUtils , Utils }
@@ -84,10 +84,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
8484 val labels =
8585 Map (SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE )
8686 val configMap = KubernetesClientUtils .buildConfigMap(configMapName, confFilesMap, labels)
87- KubernetesUtils .addOwnerReference(driverPod.orNull , Seq (configMap))
87+ KubernetesUtils .addOwnerReference(getDriverPodOrLocalPod( driverPod) , Seq (configMap))
8888 kubernetesClient.configMaps().create(configMap)
8989 }
9090
91+ private def getDriverPodOrLocalPod (driverPod : Option [Pod ]): Pod = {
92+ if (driverPod.isDefined) {
93+ return driverPod.get
94+ }
95+ val podName = InetAddress .getLocalHost.getHostName
96+ logInfo(s " LocalPod={podName= $podName} " )
97+ kubernetesClient.pods().withName(podName).get()
98+ }
99+
91100 /**
92101 * Get an application ID associated with the job.
93102 * This returns the string value of spark.app.id if set, otherwise
0 commit comments