Skip to content

Commit 4a1f657

Browse files
committed
only remove the active listener for an interrupt
1 parent 0a44c06 commit 4a1f657

3 files changed

Lines changed: 47 additions & 26 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -85,28 +85,18 @@ private class AsyncEventQueue(
8585
}
8686

8787
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
88-
try {
89-
var next: SparkListenerEvent = eventQueue.take()
90-
while (next != POISON_PILL) {
91-
val ctx = processingTime.time()
92-
try {
93-
super.postToAll(next)
94-
} finally {
95-
ctx.stop()
96-
}
97-
eventCount.decrementAndGet()
98-
next = eventQueue.take()
88+
var next: SparkListenerEvent = eventQueue.take()
89+
while (next != POISON_PILL) {
90+
val ctx = processingTime.time()
91+
try {
92+
super.postToAll(next)
93+
} finally {
94+
ctx.stop()
9995
}
10096
eventCount.decrementAndGet()
101-
} catch {
102-
case ie: InterruptedException =>
103-
logInfo(s"Stopping listener queue $name.", ie)
104-
stopped.set(true)
105-
bus.removeQueue(name)
106-
// we're not going to process any more events in this queue, so might as well clear it
107-
eventQueue.clear()
108-
eventCount.set(0)
97+
next = eventQueue.take()
10998
}
99+
eventCount.decrementAndGet()
110100
}
111101

112102
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
@@ -139,7 +129,11 @@ private class AsyncEventQueue(
139129
eventCount.incrementAndGet()
140130
eventQueue.put(POISON_PILL)
141131
}
142-
dispatchThread.join()
132+
// this thread might be trying to stop itself as part of error handling -- we can't join
133+
// in that case.
134+
if (Thread.currentThread() != dispatchThread) {
135+
dispatchThread.join()
136+
}
143137
}
144138

145139
def post(event: SparkListenerEvent): Unit = {
@@ -196,6 +190,12 @@ private class AsyncEventQueue(
196190
true
197191
}
198192

193+
override def removeListenerOnError(listener: SparkListenerInterface): Unit = {
194+
// the listener failed in an unrecoverably way, we want to remove it from the entire
195+
// LiveListenerBus (potentially stopping a queue if its empty)
196+
bus.removeListener(listener)
197+
}
198+
199199
}
200200

201201
private object AsyncEventQueue {

core/src/main/scala/org/apache/spark/util/ListenerBus.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
6060
}
6161
}
6262

63+
/**
64+
* This can be overriden by subclasses if there is any extra cleanup to do when removing a
65+
* listener. In particular AsyncEventQueues can clean up queues in the LiveListenerBus.
66+
*/
67+
def removeListenerOnError(listener: L): Unit = {
68+
removeListener(listener)
69+
}
70+
71+
6372
/**
6473
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
6574
* `postToAll` in the same thread for all events.
@@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
8089
}
8190
try {
8291
doPostEvent(listener, event)
92+
if (Thread.interrupted()) {
93+
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
94+
s"Removing that listener.")
95+
removeListenerOnError(listener)
96+
}
8397
} catch {
8498
case NonFatal(e) if !isIgnorableException(e) =>
8599
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -495,25 +495,32 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
495495
val bus = new LiveListenerBus(conf)
496496
val counter1 = new BasicJobCounter()
497497
val counter2 = new BasicJobCounter()
498-
val interruptingListener = new InterruptingListener
498+
val interruptingListener1 = new InterruptingListener
499+
val interruptingListener2 = new InterruptingListener
499500
bus.addToSharedQueue(counter1)
500-
bus.addToSharedQueue(interruptingListener)
501+
bus.addToSharedQueue(interruptingListener1)
501502
bus.addToStatusQueue(counter2)
502-
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
503+
bus.addToEventLogQueue(interruptingListener2)
504+
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE))
503505
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
506+
assert(bus.findListenersByClass[InterruptingListener]().size === 2)
504507

505508
bus.start(mockSparkContext, mockMetricsSystem)
506509

507-
// after we post one event, the shared queue should stop because of the interrupt
510+
// after we post one event, both interrupting listeners should get removed, and the
511+
// event log queue should be removed
508512
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
509513
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
510-
assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
511-
assert(bus.findListenersByClass[BasicJobCounter]().size === 1)
514+
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
515+
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
516+
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
517+
assert(counter1.count === 1)
512518
assert(counter2.count === 1)
513519

514520
// posting more events should be fine, they'll just get processed from the OK queue.
515521
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
516522
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
523+
assert(counter1.count === 6)
517524
assert(counter2.count === 6)
518525

519526
// Make sure stopping works -- this requires putting a poison pill in all active queues, which

0 commit comments

Comments
 (0)