Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.ExecutorService

import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand Down Expand Up @@ -60,6 +61,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}

/**
* Get an application ID associated with the job.
* This returns the string value of spark.app.id if set, otherwise
* the locally-generated ID from the superclass.
* @return The application ID
*/
override def applicationId(): String = {
conf.getOption("spark.app.id").map(_.toString).getOrElse {super.applicationId}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit style: spacing around { }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this should use getOrElse(super.applicationId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn

private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
.set("spark.executor.instances", "3").set("spark.app.id", TEST_SPARK_APP_ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test this?


@Mock
private var sc: SparkContext = _
Expand Down Expand Up @@ -100,9 +100,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
podAllocator,
lifecycleEventHandler,
watchEvents,
pollEvents) {
override def applicationId(): String = TEST_SPARK_APP_ID
}
pollEvents)
}

test("Start all components") {
Expand Down