@@ -21,6 +21,7 @@ import java.util.Collections
2121
2222import scala .collection .JavaConverters ._
2323import scala .collection .mutable .ArrayBuffer
24+ import scala .reflect .ClassTag
2425
2526import org .apache .mesos .{Protos , Scheduler , SchedulerDriver }
2627import org .apache .mesos .Protos ._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
3334import org .apache .spark .{LocalSparkContext , SecurityManager , SparkConf , SparkContext , SparkFunSuite }
3435import org .apache .spark .network .shuffle .mesos .MesosExternalShuffleClient
3536import org .apache .spark .rpc .RpcEndpointRef
37+ import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .RemoveExecutor
3638import org .apache .spark .scheduler .TaskSchedulerImpl
3739import org .apache .spark .scheduler .cluster .mesos .Utils ._
3840
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
4749 private var backend : MesosCoarseGrainedSchedulerBackend = _
4850 private var externalShuffleClient : MesosExternalShuffleClient = _
4951 private var driverEndpoint : RpcEndpointRef = _
52+ @ volatile private var stopCalled = false
5053
5154 test(" mesos supports killing and limiting executors" ) {
5255 setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
341344 assert(! dockerInfo.getForcePullImage)
342345 }
343346
347+ test(" Do not call parent methods like removeExecutor() after backend is stopped" ) {
348+ setBackend()
349+
350+ // launches a task on a valid offer
351+ val offers = List ((backend.executorMemory(sc), 1 ))
352+ offerResources(offers)
353+ verifyTaskLaunched(" o1" )
354+
355+ // launches a thread simulating status update
356+ val statusUpdateThread = new Thread {
357+ override def run (): Unit = {
358+ while (! stopCalled) {
359+ Thread .sleep(100 )
360+ }
361+
362+ val status = createTaskStatus(" 0" , " s1" , TaskState .TASK_FINISHED )
363+ backend.statusUpdate(driver, status)
364+ }
365+ }.start
366+
367+ backend.stop
368+ // Any method of the backend involving sending messages to the driver endpoint should not
369+ // be called after the backend is stopped.
370+ verify(driverEndpoint, never()).askWithRetry(isA(classOf [RemoveExecutor ]))(any[ClassTag [_]])
371+ }
372+
344373 private def verifyDeclinedOffer (driver : SchedulerDriver ,
345374 offerId : OfferID ,
346375 filter : Boolean = false ): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
396425 mesosDriver = newDriver
397426 }
398427
428+ override def stopExecutors (): Unit = {
429+ stopCalled = true
430+ }
431+
399432 markRegistered()
400433 }
401434 backend.start()
0 commit comments