@@ -31,16 +31,17 @@ private[spark] class SparkDeploySchedulerBackend(
3131 with AppClientListener
3232 with Logging {
3333
34- var client : AppClient = null
35- var stopping = false
36- var shutdownCallback : (SparkDeploySchedulerBackend ) => Unit = _
37- @ volatile var appId : String = _
34+ private var client : AppClient = null
35+ private var stopping = false
36+ private val shutdownCallbackLock = new Object ()
37+ private var shutdownCallback : (SparkDeploySchedulerBackend ) => Unit = _
38+ @ volatile private var appId : String = _
3839
39- val registrationLock = new Object ()
40- var registrationDone = false
40+ private val registrationLock = new Object ()
41+ private var registrationDone = false
4142
42- val maxCores = conf.getOption(" spark.cores.max" ).map(_.toInt)
43- val totalExpectedCores = maxCores.getOrElse(0 )
43+ private val maxCores = conf.getOption(" spark.cores.max" ).map(_.toInt)
44+ private val totalExpectedCores = maxCores.getOrElse(0 )
4445
4546 override def start () {
4647 super .start()
@@ -82,8 +83,11 @@ private[spark] class SparkDeploySchedulerBackend(
8283 stopping = true
8384 super .stop()
8485 client.stop()
85- if (shutdownCallback != null ) {
86- shutdownCallback(this )
86+
87+ shutdownCallbackLock.synchronized {
88+ if (shutdownCallback != null ) {
89+ shutdownCallback(this )
90+ }
8791 }
8892 }
8993
@@ -135,6 +139,12 @@ private[spark] class SparkDeploySchedulerBackend(
135139 super .applicationId
136140 }
137141
142+ def setShutdownCallback (f : SparkDeploySchedulerBackend => Unit ) {
143+ shutdownCallbackLock.synchronized {
144+ shutdownCallback = f
145+ }
146+ }
147+
138148 private def waitForRegistration () = {
139149 registrationLock.synchronized {
140150 while (! registrationDone) {
0 commit comments