Skip to content

Commit 9e91a10

Browse files
dhruvezsxwing
authored andcommitted
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)
## What changes were proposed in this pull request? Backport #14269 to 2.0. ## How was this patch tested? Jenkins. Author: Dhruve Ashar <[email protected]> Closes #15222 from zsxwing/SPARK-15703-2.0.
1 parent 5bc5b49 commit 9e91a10

File tree

9 files changed

+60
-37
lines changed

9 files changed

+60
-37
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
249249
def isStopped: Boolean = stopped.get()
250250

251251
// An asynchronous listener bus for Spark events
252-
private[spark] val listenerBus = new LiveListenerBus
252+
private[spark] val listenerBus = new LiveListenerBus(this)
253253

254254
// This function allows components created by SparkEnv to be mocked in unit tests:
255255
private[spark] def createSparkEnv(
@@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
21542154
}
21552155
}
21562156

2157-
listenerBus.start(this)
2157+
listenerBus.start()
21582158
_listenerBusStarted = true
21592159
}
21602160

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,9 @@ package object config {
112112
// To limit how many applications are shown in the History Server summary ui
113113
private[spark] val HISTORY_UI_MAX_APPS =
114114
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
115+
116+
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
117+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
118+
.intConf
119+
.createWithDefault(10000)
115120
}

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
2222

2323
import scala.util.DynamicVariable
2424

25-
import org.apache.spark.SparkContext
25+
import org.apache.spark.{SparkContext, SparkException}
26+
import org.apache.spark.internal.config._
2627
import org.apache.spark.util.Utils
2728

2829
/**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
3233
* has started will events be actually propagated to all attached listeners. This listener bus
3334
* is stopped when `stop()` is called, and it will drop further events after stopping.
3435
*/
35-
private[spark] class LiveListenerBus extends SparkListenerBus {
36+
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
3637

3738
self =>
3839

3940
import LiveListenerBus._
4041

41-
private var sparkContext: SparkContext = null
42-
4342
// Cap the capacity of the event queue so we get an explicit error (rather than
4443
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
45-
private val EVENT_QUEUE_CAPACITY = 10000
46-
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
44+
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
45+
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
46+
47+
private def validateAndGetQueueSize(): Int = {
48+
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
49+
if (queueSize <= 0) {
50+
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
51+
}
52+
queueSize
53+
}
4754

4855
// Indicate if `start()` is called
4956
private val started = new AtomicBoolean(false)
@@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
96103
* listens for any additional events asynchronously while the listener bus is still running.
97104
* This should only be called once.
98105
*
99-
* @param sc Used to stop the SparkContext in case the listener thread dies.
100106
*/
101-
def start(sc: SparkContext): Unit = {
107+
def start(): Unit = {
102108
if (started.compareAndSet(false, true)) {
103-
sparkContext = sc
104109
listenerThread.start()
105110
} else {
106111
throw new IllegalStateException(s"$name already started!")

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
142142
extraConf.foreach { case (k, v) => conf.set(k, v) }
143143
val logName = compressionCodec.map("test-" + _).getOrElse("test")
144144
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
145-
val listenerBus = new LiveListenerBus
145+
val listenerBus = new LiveListenerBus(sc)
146146
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
147147
125L, "Mickey", None)
148148
val applicationEnd = SparkListenerApplicationEnd(1000L)
149149

150150
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
151151
eventLogger.start()
152-
listenerBus.start(sc)
152+
listenerBus.start()
153153
listenerBus.addListener(eventLogger)
154154
listenerBus.postToAll(applicationStart)
155155
listenerBus.postToAll(applicationEnd)

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
3737
val jobCompletionTime = 1421191296660L
3838

3939
test("don't call sc.stop in listener") {
40-
sc = new SparkContext("local", "SparkListenerSuite")
40+
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
4141
val listener = new SparkContextStoppingListener(sc)
42-
val bus = new LiveListenerBus
42+
val bus = new LiveListenerBus(sc)
4343
bus.addListener(listener)
4444

4545
// Starting listener bus should flush all buffered events
46-
bus.start(sc)
46+
bus.start()
4747
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
4848
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
4949

@@ -52,16 +52,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
5252
}
5353

5454
test("basic creation and shutdown of LiveListenerBus") {
55+
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
5556
val counter = new BasicJobCounter
56-
val bus = new LiveListenerBus
57+
val bus = new LiveListenerBus(sc)
5758
bus.addListener(counter)
5859

5960
// Listener bus hasn't started yet, so posting events should not increment counter
6061
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
6162
assert(counter.count === 0)
6263

6364
// Starting listener bus should flush all buffered events
64-
bus.start(sc)
65+
bus.start()
6566
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
6667
assert(counter.count === 5)
6768

@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
7273

7374
// Listener bus must not be started twice
7475
intercept[IllegalStateException] {
75-
val bus = new LiveListenerBus
76-
bus.start(sc)
77-
bus.start(sc)
76+
val bus = new LiveListenerBus(sc)
77+
bus.start()
78+
bus.start()
7879
}
7980

8081
// ... or stopped before starting
8182
intercept[IllegalStateException] {
82-
val bus = new LiveListenerBus
83+
val bus = new LiveListenerBus(sc)
8384
bus.stop()
8485
}
8586
}
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
106107
drained = true
107108
}
108109
}
109-
110-
val bus = new LiveListenerBus
110+
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
111+
val bus = new LiveListenerBus(sc)
111112
val blockingListener = new BlockingListener
112113

113114
bus.addListener(blockingListener)
114-
bus.start(sc)
115+
bus.start()
115116
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
116117

117118
listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
353354
val badListener = new BadListener
354355
val jobCounter1 = new BasicJobCounter
355356
val jobCounter2 = new BasicJobCounter
356-
val bus = new LiveListenerBus
357+
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
358+
val bus = new LiveListenerBus(sc)
357359

358360
// Propagate events to bad listener first
359361
bus.addListener(badListener)
360362
bus.addListener(jobCounter1)
361363
bus.addListener(jobCounter2)
362-
bus.start(sc)
364+
bus.start()
363365

364366
// Post events to all listeners, and wait until the queue is drained
365367
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
3838
import org.apache.spark.storage.StorageLevel._
3939

4040
/** Testsuite that tests block replication in BlockManager */
41-
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
41+
class BlockManagerReplicationSuite extends SparkFunSuite
42+
with Matchers
43+
with BeforeAndAfter
44+
with LocalSparkContext {
4245

4346
private val conf = new SparkConf(false).set("spark.app.id", "test")
4447
private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
9194
// to make cached peers refresh frequently
9295
conf.set("spark.storage.cachedPeersTtl", "10")
9396

97+
sc = new SparkContext("local", "test", conf)
9498
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
95-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
99+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
100+
new LiveListenerBus(sc))), conf, true)
96101
allStores.clear()
97102
}
98103

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.util._
4949
import org.apache.spark.util.io.ChunkedByteBuffer
5050

5151
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
52-
with PrivateMethodTester with ResetSystemProperties {
52+
with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
5353

5454
import BlockManagerSuite._
5555

@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
107107
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
108108
conf.set("spark.driver.port", rpcEnv.address.port.toString)
109109

110+
sc = new SparkContext("local", "test", conf)
110111
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
111-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
112+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
113+
new LiveListenerBus(sc))), conf, true)
112114

113115
val initialize = PrivateMethod[Unit]('initialize)
114116
SizeEstimator invokePrivate initialize()

core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
1919

2020
import org.scalatest.BeforeAndAfter
2121

22-
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
23-
import org.apache.spark.executor.TaskMetrics
22+
import org.apache.spark._
2423
import org.apache.spark.scheduler._
2524
import org.apache.spark.storage._
2625

2726
/**
2827
* Test various functionality in the StorageListener that supports the StorageTab.
2928
*/
30-
class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
29+
class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
3130
private var bus: LiveListenerBus = _
3231
private var storageStatusListener: StorageStatusListener = _
3332
private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
4342
private val bm1 = BlockManagerId("big", "dog", 1)
4443

4544
before {
46-
bus = new LiveListenerBus
47-
storageStatusListener = new StorageStatusListener(new SparkConf())
45+
val conf = new SparkConf()
46+
sc = new SparkContext("local", "test", conf)
47+
bus = new LiveListenerBus(sc)
48+
storageStatusListener = new StorageStatusListener(conf)
4849
storageListener = new StorageListener(storageStatusListener)
4950
bus.addListener(storageStatusListener)
5051
bus.addListener(storageListener)

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ReceivedBlockHandlerSuite
4848
extends SparkFunSuite
4949
with BeforeAndAfter
5050
with Matchers
51+
with LocalSparkContext
5152
with Logging {
5253

5354
import WriteAheadLogBasedBlockHandler._
@@ -78,8 +79,10 @@ class ReceivedBlockHandlerSuite
7879
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
7980
conf.set("spark.driver.port", rpcEnv.address.port.toString)
8081

82+
sc = new SparkContext("local", "test", conf)
8183
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
82-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
84+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
85+
new LiveListenerBus(sc))), conf, true)
8386

8487
storageLevel = StorageLevel.MEMORY_ONLY_SER
8588
blockManager = createBlockManager(blockManagerSize, conf)

0 commit comments

Comments
 (0)