Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.apache.spark.deploy.k8s

import java.io.File
import java.util.UUID

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -205,4 +204,9 @@ private[spark] object KubernetesUtils extends Logging {
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}

def generateAppId(): String = {
s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val kubernetesAppId = KubernetesUtils.generateAppId()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.ExecutorService

import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
Expand Down Expand Up @@ -60,6 +62,43 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}

/**
* Get an application ID associated with the job.
* This returns the string value of [[appId]] if set, otherwise
* the locally-generated ID from the superclass.
* @return The application ID
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line.

var appId: Option[String] = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need this var. The overridden applicationId can use a local var.


override def applicationId(): String = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line.

appId.map(_.toString).getOrElse {
logInfo("Initializing Application ID.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this log line is really needed.

bindApplicationId();
appId.get
}
}

def bindApplicationId(): Unit = {
val appIdString = {
val wasSparkSubmittedInClusterMode = conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)

// cluster mode: get appId from driver env
if (wasSparkSubmittedInClusterMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking wasSparkSubmittedInClusterMode, I think you can simply check spark.app.id and use the value of it if it's set and use super.applicationId. In client mode, since KubernetesClientApplication is not invoked, how the application ID is generated doesn't really matter, so using super.applicationId is fine for client mode. So the logic is simple: if spark.app.id is set, use the value of it, otherwise use super.applicationId.

val sparkAppId = conf.getOption("spark.app.id")
sparkAppId.map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet in cluster mode.")
super.applicationId
}
} else {
// client mode: generate new appId
KubernetesUtils.generateAppId()
}
}
appId = Some(appIdString)
}

override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
Expand Down