-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27394][WebUI]Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate #24303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27394][WebUI]Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate #24303
Changes from 3 commits
ee53708
1f927a2
5a04be9
289e996
2645f35
39ea357
1c53071
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,14 @@ private[spark] class AppStatusListener( | |
| // operations that we can live without when rapidly processing incoming task events. | ||
| private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L | ||
|
|
||
| /** | ||
| * A time limit before we force to flush all live entities. When the last flush does't past | ||
zsxwing marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * this limit, UI will not trigger a heavy flush to sync the states since it may slow down Spark | ||
| * events processing significantly. Otherwise, UI will try to flush when receiving the next | ||
| * executor heartbeat. | ||
| */ | ||
| private val liveUpdateStalenessLimit = conf.get(LIVE_ENTITY_UPDATE_STALENESS_LIMIT) | ||
|
|
||
| private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) | ||
| private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) | ||
|
|
||
|
|
@@ -76,6 +84,9 @@ private[spark] class AppStatusListener( | |
| // around liveExecutors. | ||
| @volatile private var activeExecutorCount = 0 | ||
|
|
||
| /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ | ||
| private var lastFlushTimeNs = System.nanoTime() | ||
|
|
||
| kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) | ||
| { count => cleanupExecutors(count) } | ||
|
|
||
|
|
@@ -89,7 +100,8 @@ private[spark] class AppStatusListener( | |
|
|
||
| kvstore.onFlush { | ||
| if (!live) { | ||
| flush() | ||
| val now = System.nanoTime() | ||
| flush(update(_, now)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -831,6 +843,14 @@ private[spark] class AppStatusListener( | |
| } | ||
| } | ||
| } | ||
| // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush | ||
| // here to ensure the staleness of Spark UI doesn't last more than | ||
| // `max(heartbeat interval, liveUpdateStalenessLimit)`. | ||
| if (now - lastFlushTimeNs > liveUpdateStalenessLimit) { | ||
| flush(maybeUpdate(_, now)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... in the bug you mention that job-level data is not being updated. Is that the only case? Because if that's it, then this looks like overkill. You could e.g. update the jobs in the code that handles Doing a full flush here seems like overkill and a little expensive when you think about many heartbeats arriving in a short period (even when considering
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I also noticed that executor active tasks sometimes could be wrong. That's why I decided to flush everything to make sure we don't miss any places. It's also hard to maintain if we need to manually flush in every place. Ideally, we should flush periodically so that it doesn't depend on receiving a Spark event. But then I will need to add a new event type and post to the listener bus. That's overkilled.
At least there will be at least 100ms between each flush. As long as we process heart beats very fast, most of them won't trigger the flush.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the goal is to use the hearbeats as some trigger for flushing, how about using some ratio of the heartbeat period instead of Really large apps can get a little backed up when processing hearbeats from lots and lots of busy executors, and this would make it a little worse.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The update only happens in live UI, which should be fine in general. For real large apps, will it help by setting
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The "don't write to the store all the time" thing was added specifically to speed up live UIs, because copying + writing the data (even to the memory store) becomes really expensive when you have event storms (think thousands of tasks starting and stopping in a very short period).
We should avoid requiring configuration tweaks for things not to break, when possible. |
||
| // Re-get the current system time because `flush` may be slow and `now` is stale. | ||
| lastFlushTimeNs = System.nanoTime() | ||
| } | ||
| } | ||
|
|
||
| override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { | ||
|
|
@@ -856,18 +876,17 @@ private[spark] class AppStatusListener( | |
| } | ||
| } | ||
|
|
||
| /** Flush all live entities' data to the underlying store. */ | ||
| private def flush(): Unit = { | ||
| val now = System.nanoTime() | ||
| /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ | ||
| private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { | ||
| liveStages.values.asScala.foreach { stage => | ||
| update(stage, now) | ||
| stage.executorSummaries.values.foreach(update(_, now)) | ||
| entityFlushFunc(stage) | ||
| stage.executorSummaries.values.foreach(entityFlushFunc) | ||
| } | ||
| liveJobs.values.foreach(update(_, now)) | ||
| liveExecutors.values.foreach(update(_, now)) | ||
| liveTasks.values.foreach(update(_, now)) | ||
| liveRDDs.values.foreach(update(_, now)) | ||
| pools.values.foreach(update(_, now)) | ||
| liveJobs.values.foreach(entityFlushFunc) | ||
| liveExecutors.values.foreach(entityFlushFunc) | ||
| liveTasks.values.foreach(entityFlushFunc) | ||
| liveRDDs.values.foreach(entityFlushFunc) | ||
| pools.values.foreach(entityFlushFunc) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,14 +100,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B | |
| * Create a test SparkContext with the SparkUI enabled. | ||
| * It is safe to `get` the SparkUI directly from the SparkContext returned here. | ||
| */ | ||
| private def newSparkContext(killEnabled: Boolean = true): SparkContext = { | ||
| private def newSparkContext( | ||
| killEnabled: Boolean = true, | ||
| master: String = "local", | ||
| additionalConfs: Map[String, String] = Map.empty): SparkContext = { | ||
| val conf = new SparkConf() | ||
| .setMaster("local") | ||
| .setMaster(master) | ||
| .setAppName("test") | ||
| .set(UI_ENABLED, true) | ||
| .set(UI_PORT, 0) | ||
| .set(UI_KILL_ENABLED, killEnabled) | ||
| .set(MEMORY_OFFHEAP_SIZE.key, "64m") | ||
| additionalConfs.foreach { case (k, v) => conf.set(k, v) } | ||
| val sc = new SparkContext(conf) | ||
| assert(sc.ui.isDefined) | ||
| sc | ||
|
|
@@ -725,6 +729,30 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B | |
| } | ||
| } | ||
|
|
||
| test("Staleness of Spark UI should not last minutes or hours") { | ||
| withSpark(newSparkContext( | ||
| master = "local[2]", | ||
| // Set a small heart beat interval to make the test fast | ||
| additionalConfs = Map( | ||
| EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms", | ||
| LIVE_ENTITY_UPDATE_STALENESS_LIMIT.key -> "10ms"))) { sc => | ||
| val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => | ||
| // Make the task never finish so there won't be any task start/end events after the first 2 | ||
| // tasks start. | ||
| Thread.sleep(300000) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: what about a less than 5 minutes sleep here something comparable with the eventually, like:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I turned on
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have checked and the thread leaks are gone. |
||
| } | ||
| try { | ||
| eventually(timeout(10.seconds)) { | ||
| val jobsJson = getJson(sc.ui.get, "jobs") | ||
| jobsJson.children.length should be (1) | ||
| (jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2) | ||
| } | ||
| } finally { | ||
| f.cancel() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def goToUi(sc: SparkContext, path: String): Unit = { | ||
| goToUi(sc.ui.get, path) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why internal? Spark doesn't set it itself. If anyone is going to change it, it will be users.