Skip to content

Commit f88163a

Browse files
author
Marcelo Vanzin
committed
[SPARK-20646][core] Port executors page to new UI backend.
The executors page is built on top of the REST API, so the page itself was easy to hook up to the new code. Some other pages depend on the `ExecutorListener` class that is being removed, though, so they needed to be modified to use data from the new store. Fortunately, all they seemed to need is the map of executor logs, so that was somewhat easy too. The executor timeline graph required adding some properties to the ExecutorSummary API type. Instead of following the previous code, which stored all the listener events in memory, the timeline is now created based on the data available from the API. I had to change some of the test golden files because the old code would return executors in "random" order (since it used a mutable Map instead of something that returns a sorted list), and the new code returns executors in id order. Tested with existing unit tests.
1 parent 65a8bf6 commit f88163a

22 files changed

+359
-624
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,15 @@ private[spark] class AppStatusListener(
8888
kvstore.write(new ApplicationInfoWrapper(appInfo))
8989
}
9090

91+
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
92+
val details = event.environmentDetails
93+
94+
coresPerTask = details.getOrElse("Spark Properties", Nil).toMap
95+
.get("spark.task.cpus")
96+
.map(_.toInt)
97+
.getOrElse(coresPerTask)
98+
}
99+
91100
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
92101
val old = appInfo.attempts.head
93102
val attempt = new v1.ApplicationAttemptInfo(
@@ -114,7 +123,7 @@ private[spark] class AppStatusListener(
114123
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
115124
// This needs to be an update in case an executor re-registers after the driver has
116125
// marked it as "dead".
117-
val exec = getOrCreateExecutor(event.executorId)
126+
val exec = getOrCreateExecutor(event.executorId, event.time)
118127
exec.host = event.executorInfo.executorHost
119128
exec.isActive = true
120129
exec.totalCores = event.executorInfo.totalCores
@@ -126,6 +135,8 @@ private[spark] class AppStatusListener(
126135
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
127136
liveExecutors.remove(event.executorId).foreach { exec =>
128137
exec.isActive = false
138+
exec.removeTime = new Date(event.time)
139+
exec.removeReason = event.reason
129140
update(exec, System.nanoTime())
130141
}
131142
}
@@ -339,18 +350,25 @@ private[spark] class AppStatusListener(
339350
}
340351

341352
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
342-
if (event.taskMetrics != null) {
343-
val readMetrics = event.taskMetrics.shuffleReadMetrics
344-
exec.totalGcTime += event.taskMetrics.jvmGCTime
345-
exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
346-
exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
347-
exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
348-
}
349-
350353
exec.activeTasks -= 1
351354
exec.completedTasks += completedDelta
352355
exec.failedTasks += failedDelta
353356
exec.totalDuration += event.taskInfo.duration
357+
358+
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
359+
// first attempt of this task. This may not be 100% accurate because the first attempt
360+
// could have failed half-way through. The correct fix would be to keep track of the
361+
// metrics added by each attempt, but this is much more complicated.
362+
if (event.reason != Resubmitted) {
363+
if (event.taskMetrics != null) {
364+
val readMetrics = event.taskMetrics.shuffleReadMetrics
365+
exec.totalGcTime += event.taskMetrics.jvmGCTime
366+
exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
367+
exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
368+
exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
369+
}
370+
}
371+
354372
maybeUpdate(exec, now)
355373
}
356374
}
@@ -391,7 +409,7 @@ private[spark] class AppStatusListener(
391409
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
392410
// This needs to set fields that are already set by onExecutorAdded because the driver is
393411
// considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.
394-
val exec = getOrCreateExecutor(event.blockManagerId.executorId)
412+
val exec = getOrCreateExecutor(event.blockManagerId.executorId, event.time)
395413
exec.hostPort = event.blockManagerId.hostPort
396414
event.maxOnHeapMem.foreach { _ =>
397415
exec.totalOnHeap = event.maxOnHeapMem.get
@@ -543,8 +561,8 @@ private[spark] class AppStatusListener(
543561
}
544562
}
545563

546-
private def getOrCreateExecutor(executorId: String): LiveExecutor = {
547-
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
564+
private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
565+
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
548566
}
549567

550568
private def getOrCreateStage(info: StageInfo): LiveStage = {

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,22 @@ private[spark] class AppStatusStore(store: KVStore) {
4747
}
4848

4949
def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
50-
store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true)
51-
.last(true).asScala.map(_.info).toSeq
50+
val base = store.view(classOf[ExecutorSummaryWrapper])
51+
val filtered = if (activeOnly) {
52+
base.index("active").reverse().first(true).last(true)
53+
} else {
54+
base
55+
}
56+
filtered.asScala.map(_.info).toSeq
57+
}
58+
59+
def executorSummary(executorId: String): Option[v1.ExecutorSummary] = {
60+
try {
61+
Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info)
62+
} catch {
63+
case _: NoSuchElementException =>
64+
None
65+
}
5266
}
5367

5468
def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,17 @@ private class LiveTask(
212212

213213
}
214214

215-
private class LiveExecutor(val executorId: String) extends LiveEntity {
215+
private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
216216

217217
var hostPort: String = null
218218
var host: String = null
219219
var isActive = true
220220
var totalCores = 0
221221

222+
val addTime = new Date(_addTime)
223+
var removeTime: Date = null
224+
var removeReason: String = null
225+
222226
var rddBlocks = 0
223227
var memoryUsed = 0L
224228
var diskUsed = 0L
@@ -276,6 +280,9 @@ private class LiveExecutor(val executorId: String) extends LiveEntity {
276280
totalShuffleWrite,
277281
isBlacklisted,
278282
maxMemory,
283+
addTime,
284+
Option(removeTime),
285+
Option(removeReason),
279286
executorLogs,
280287
memoryMetrics)
281288
new ExecutorSummaryWrapper(info)

core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,11 @@ import javax.ws.rs.{GET, Produces}
2020
import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.ui.SparkUI
23-
import org.apache.spark.ui.exec.ExecutorsPage
2423

2524
@Produces(Array(MediaType.APPLICATION_JSON))
2625
private[v1] class AllExecutorListResource(ui: SparkUI) {
2726

2827
@GET
29-
def executorList(): Seq[ExecutorSummary] = {
30-
val listener = ui.executorsListener
31-
listener.synchronized {
32-
// The follow codes should be protected by `listener` to make sure no executors will be
33-
// removed before we query their status. See SPARK-12784.
34-
(0 until listener.activeStorageStatusList.size).map { statusId =>
35-
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
36-
} ++ (0 until listener.deadStorageStatusList.size).map { statusId =>
37-
ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
38-
}
39-
}
40-
}
28+
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false)
29+
4130
}

core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,11 @@ import javax.ws.rs.{GET, Produces}
2020
import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.ui.SparkUI
23-
import org.apache.spark.ui.exec.ExecutorsPage
2423

2524
@Produces(Array(MediaType.APPLICATION_JSON))
2625
private[v1] class ExecutorListResource(ui: SparkUI) {
2726

2827
@GET
29-
def executorList(): Seq[ExecutorSummary] = {
30-
val listener = ui.executorsListener
31-
listener.synchronized {
32-
// The follow codes should be protected by `listener` to make sure no executors will be
33-
// removed before we query their status. See SPARK-12784.
34-
val storageStatusList = listener.activeStorageStatusList
35-
(0 until storageStatusList.size).map { statusId =>
36-
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
37-
}
38-
}
39-
}
28+
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true)
29+
4030
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ class ExecutorSummary private[spark](
8585
val totalShuffleWrite: Long,
8686
val isBlacklisted: Boolean,
8787
val maxMemory: Long,
88+
val addTime: Date,
89+
val removeTime: Option[Date],
90+
val removeReason: Option[String],
8891
val executorLogs: Map[String, String],
8992
val memoryMetrics: Option[MemoryMetrics])
9093

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo,
3030
import org.apache.spark.storage.StorageStatusListener
3131
import org.apache.spark.ui.JettyUtils._
3232
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
33-
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
33+
import org.apache.spark.ui.exec.ExecutorsTab
3434
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
3535
import org.apache.spark.ui.scope.RDDOperationGraphListener
3636
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
@@ -46,7 +46,6 @@ private[spark] class SparkUI private (
4646
securityManager: SecurityManager,
4747
val environmentListener: EnvironmentListener,
4848
val storageStatusListener: StorageStatusListener,
49-
val executorsListener: ExecutorsListener,
5049
val jobProgressListener: JobProgressListener,
5150
val storageListener: StorageListener,
5251
val operationGraphListener: RDDOperationGraphListener,
@@ -70,7 +69,7 @@ private[spark] class SparkUI private (
7069
def initialize() {
7170
val jobsTab = new JobsTab(this)
7271
attachTab(jobsTab)
73-
val stagesTab = new StagesTab(this)
72+
val stagesTab = new StagesTab(this, store)
7473
attachTab(stagesTab)
7574
attachTab(new StorageTab(this))
7675
attachTab(new EnvironmentTab(this))
@@ -186,19 +185,17 @@ private[spark] object SparkUI {
186185
}
187186
val environmentListener = new EnvironmentListener
188187
val storageStatusListener = new StorageStatusListener(conf)
189-
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
190188
val storageListener = new StorageListener(storageStatusListener)
191189
val operationGraphListener = new RDDOperationGraphListener(conf)
192190

193191
addListenerFn(environmentListener)
194192
addListenerFn(storageStatusListener)
195-
addListenerFn(executorsListener)
196193
addListenerFn(storageListener)
197194
addListenerFn(operationGraphListener)
198195

199196
new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener,
200-
executorsListener, jobProgressListener, storageListener, operationGraphListener,
201-
appName, basePath, lastUpdateTime, startTime, appSparkVersion)
197+
jobProgressListener, storageListener, operationGraphListener, appName, basePath,
198+
lastUpdateTime, startTime, appSparkVersion)
202199
}
203200

204201
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.{Node, Text}
2424

25-
import org.apache.spark.ui.{UIUtils, WebUIPage}
25+
import org.apache.spark.SparkContext
26+
import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}
2627

27-
private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {
28-
29-
private val sc = parent.sc
28+
private[ui] class ExecutorThreadDumpPage(
29+
parent: SparkUITab,
30+
sc: Option[SparkContext]) extends WebUIPage("threadDump") {
3031

3132
// stripXSS is called first to remove suspicious characters used in XSS attacks
3233
def render(request: HttpServletRequest): Seq[Node] = {

0 commit comments

Comments
 (0)