-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28574][CORE] Allow to config different sizes for event queues #25307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
add to whitelist |
|
ok to test |
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good only some nits
| test("event queue size can be configued through spark conf") { | ||
| val conf = new SparkConf(false) | ||
| .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) | ||
| .set("spark.scheduler.listenerbus.eventqueue.shared.capacity", "1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would use s${SHARED_QUEUE}.capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| val conf = new SparkConf(false) | ||
| .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) | ||
| .set("spark.scheduler.listenerbus.eventqueue.shared.capacity", "1") | ||
| .set("spark.scheduler.listenerbus.eventqueue.eventLog.capacity", "2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, I would use s${EVENT_LOG_QUEUE}.capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| val counter2 = new BasicJobCounter() | ||
| val counter3 = new BasicJobCounter() | ||
|
|
||
| bus.addToSharedQueue(counter1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add comment to explain this is just to trigger add a new Queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments
| // if no such conf is specified, use the value specified in | ||
| // LISTENER_BUS_EVENT_QUEUE_CAPACITY | ||
| protected def capacity: Int = conf.getInt( | ||
| s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shall assert the capacity is > 0, and add test to cover the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertion
|
|
||
| // For testing only. | ||
| private[scheduler] def getQueueCapacity(name: String): Int = { | ||
| queues.asScala.find(_.name == name) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe queues.asScala.find(_.name == name).map(_.capacity).getOrElse(-1) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
|
the |
|
add to whitelist |
|
ok to test |
|
Test build #108483 has finished for PR 25307 at commit
|
|
Test build #108485 has finished for PR 25307 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Left some nits.
| } | ||
|
|
||
| // For testing only. | ||
| private[scheduler] def getQueueCapacity(name: String): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you change the return type to Option[Int] and use None to indicate an unknown queue rather than -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
| .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", "1") | ||
| .set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2") | ||
|
|
||
| val bus = new LiveListenerBus(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: bus.stop() is missing. It should be called in finally to stop the internal thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bus is actually never started, so no need to stop.
| private[scheduler] def capacity: Int = { | ||
| val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", | ||
| conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) | ||
| assert(queuesize > 0, s"capacity for event queue $name must be greater than 0," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing space after ,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Since names of the queues are private, it's fine to not document them to keep them internal right now. |
|
LGTM pending tests |
|
Test build #108574 has finished for PR 25307 at commit
|
|
Test build #108576 has finished for PR 25307 at commit
|
|
retest this please |
|
LGTM |
|
Test build #108579 has finished for PR 25307 at commit
|
|
Thanks! Merging to master. |
| // LISTENER_BUS_EVENT_QUEUE_CAPACITY | ||
| private[scheduler] def capacity: Int = { | ||
| val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", | ||
| conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent.
| conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) | ||
| // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, | ||
| // if no such conf is specified, use the value specified in | ||
| // LISTENER_BUS_EVENT_QUEUE_CAPACITY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update the conf description of LISTENER_BUS_EVENT_QUEUE_CAPACITY.
| // if no such conf is specified, use the value specified in | ||
| // LISTENER_BUS_EVENT_QUEUE_CAPACITY | ||
| private[scheduler] def capacity: Int = { | ||
| val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of hard-coded here, can we define it in core/src/main/scala/org/apache/spark/internal/config/package.scala ?
What changes were proposed in this pull request?
Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity to allow configuration of different event queue size.
How was this patch tested?
Unit test in core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala