diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bfb501fb..70fafa9b0b303 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -17,6 +17,10 @@ package org.apache.spark.scheduler.cluster +import scala.annotation.meta.getter + +import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} /** @@ -29,9 +33,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} * @param totalCores The total number of cores available to the executor */ private[cluster] class ExecutorData( + @(JsonIgnore @getter) val executorEndpoint: RpcEndpointRef, + @(JsonIgnore @getter) val executorAddress: RpcAddress, override val executorHost: String, + @(JsonIgnore @getter) var freeCores: Int, override val totalCores: Int, override val logUrlMap: Map[String, String] diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala index 17a6b83810fd4..32ed6ad5fd011 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -47,6 +47,7 @@ private[spark] class AppStateListener( private var appInfo: v1.ApplicationInfo = null private var coresPerTask: Int = 1 + private var executorEventId: Long = 0L // How often to update live entities. -1 means "never update" when replaying applications, // meaning only the last write will happen. For live applications, this avoids a few @@ -100,6 +101,8 @@ private[spark] class AppStateListener( details("System Properties"), details("Classpath Entries")) + coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt) + .getOrElse(coresPerTask) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) } @@ -135,6 +138,8 @@ private[spark] class AppStateListener( exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap liveUpdate(exec) + + writeExecutorEvent(event) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { @@ -142,12 +147,19 @@ private[spark] class AppStateListener( exec.isActive = false update(exec) } + + writeExecutorEvent(event) } override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { updateBlackListStatus(event.executorId, true) } + private def writeExecutorEvent(event: SparkListenerEvent): Unit = { + executorEventId += 1 + kvstore.write(new ExecutorEventData(executorEventId, event)) + } + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { updateBlackListStatus(event.executorId, false) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala index ce22b6bb9d35e..aedd662413179 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.kvstore.{InMemoryStore, KVStore} -import org.apache.spark.scheduler.SparkListenerBus +import org.apache.spark.scheduler.{SparkListenerBus, SparkListenerEvent} import org.apache.spark.status.api.v1 import org.apache.spark.util.{Distribution, Utils} @@ -56,6 +56,15 @@ private[spark] class AppStateStore(store: KVStore) { .last(true).asScala.map(_.info).toSeq } + def executorSummary(executorId: String): Option[v1.ExecutorSummary] = { + try { + Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info) + } catch { + case _: NoSuchElementException => + None + } + } + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { val it = store.view(classOf[StageDataWrapper]).sorted().asScala.map(_.info) if (!statuses.isEmpty) { @@ -198,6 +207,10 @@ private[spark] class AppStateStore(store: KVStore) { store.read(classOf[RDDStorageInfoWrapper], rddId).info } + def executorEvents(): Seq[SparkListenerEvent] = { + store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq + } + def close(): Unit = { store.close() } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala index 01f2a18122e6f..f4bc752d31f7e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala @@ -20,22 +20,11 @@ import javax.ws.rs.{GET, Produces} import javax.ws.rs.core.MediaType import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.exec.ExecutorsPage @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class AllExecutorListResource(ui: SparkUI) { @GET - def executorList(): Seq[ExecutorSummary] = { - val listener = ui.executorsListener - listener.synchronized { - // The follow codes should be protected by `listener` to make sure no executors will be - // removed before we query their status. See SPARK-12784. - (0 until listener.activeStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } ++ (0 until listener.deadStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = false) - } - } - } + def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false) + } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index ab53881594180..03f359262332b 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -20,21 +20,11 @@ import javax.ws.rs.{GET, Produces} import javax.ws.rs.core.MediaType import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.exec.ExecutorsPage @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ExecutorListResource(ui: SparkUI) { @GET - def executorList(): Seq[ExecutorSummary] = { - val listener = ui.executorsListener - listener.synchronized { - // The follow codes should be protected by `listener` to make sure no executors will be - // removed before we query their status. See SPARK-12784. - val storageStatusList = listener.activeStorageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } - } - } + def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true) + } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index e8f89fcbd4589..12ad4f383bebb 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -22,6 +22,7 @@ import java.lang.{Integer => JInteger, Long => JLong} import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.kvstore.KVIndex +import org.apache.spark.scheduler.SparkListenerEvent import org.apache.spark.status.api.v1._ import org.apache.spark.status.KVUtils._ @@ -132,3 +133,11 @@ private[spark] class ExecutorStageSummaryWrapper( private[this] val stage: Array[Int] = Array(stageId, stageAttemptId) } + +/** + * Store raw executor events so that the executor timeline can be drawn. The event is wrapped + * in a container so that a monotonically increasing ID can be added to it. + */ +private[spark] class ExecutorEventData( + @KVIndexParam val id: Long, + val event: SparkListenerEvent) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9b5837c6f6243..6422690f17cbe 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.status.api.v1._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.EnvironmentTab -import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} +import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} @@ -44,7 +44,6 @@ private[spark] class SparkUI private ( val conf: SparkConf, securityManager: SecurityManager, val storageStatusListener: StorageStatusListener, - val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, val operationGraphListener: RDDOperationGraphListener, @@ -66,7 +65,7 @@ private[spark] class SparkUI private ( def initialize() { val jobsTab = new JobsTab(this) attachTab(jobsTab) - val stagesTab = new StagesTab(this) + val stagesTab = new StagesTab(this, store) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this, store)) @@ -181,17 +180,15 @@ private[spark] object SparkUI { } val storageStatusListener = new StorageStatusListener(conf) - val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener, - jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) + new SparkUI(store, sc, conf, securityManager, storageStatusListener, jobProgressListener, + storageListener, operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index 6ce3f511e89c7..483f94ed0fa05 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -22,11 +22,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, Text} -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.SparkContext +import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage} -private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { - - private val sc = parent.sc +private[ui] class ExecutorThreadDumpPage( + parent: SparkUITab, + sc: Option[SparkContext]) extends WebUIPage("threadDump") { def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(request.getParameter("executorId")).map { executorId => diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala deleted file mode 100644 index b7cbed468517c..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.exec - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics} -import org.apache.spark.ui.{UIUtils, WebUIPage} - -// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive -private[ui] case class ExecutorSummaryInfo( - id: String, - hostPort: String, - rddBlocks: Int, - memoryUsed: Long, - diskUsed: Long, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalInputBytes: Long, - totalShuffleRead: Long, - totalShuffleWrite: Long, - isBlacklisted: Int, - maxOnHeapMem: Long, - maxOffHeapMem: Long, - executorLogs: Map[String, String]) - - -private[ui] class ExecutorsPage( - parent: ExecutorsTab, - threadDumpEnabled: Boolean) - extends WebUIPage("") { - - def render(request: HttpServletRequest): Seq[Node] = { - val content = -
- { -
- - - Show Additional Metrics - - -
++ -
++ - ++ - ++ - - } -
- - UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) - } -} - -private[spark] object ExecutorsPage { - private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " + - "storage of data like RDD partitions cached in memory." - private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " + - "storage of data like RDD partitions cached in memory." - - /** Represent an executor's info as a map given a storage status index */ - def getExecInfo( - listener: ExecutorsListener, - statusId: Int, - isActive: Boolean): ExecutorSummary = { - val status = if (isActive) { - listener.activeStorageStatusList(statusId) - } else { - listener.deadStorageStatusList(statusId) - } - val execId = status.blockManagerId.executorId - val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.numBlocks - val memUsed = status.memUsed - val maxMem = status.maxMem - val memoryMetrics = for { - onHeapUsed <- status.onHeapMemUsed - offHeapUsed <- status.offHeapMemUsed - maxOnHeap <- status.maxOnHeapMem - maxOffHeap <- status.maxOffHeapMem - } yield { - new MemoryMetrics(onHeapUsed, offHeapUsed, maxOnHeap, maxOffHeap) - } - - - val diskUsed = status.diskUsed - val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId)) - - new ExecutorSummary( - execId, - hostPort, - isActive, - rddBlocks, - memUsed, - diskUsed, - taskSummary.totalCores, - taskSummary.tasksMax, - taskSummary.tasksActive, - taskSummary.tasksFailed, - taskSummary.tasksComplete, - taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete, - taskSummary.duration, - taskSummary.jvmGCTime, - taskSummary.inputBytes, - taskSummary.shuffleRead, - taskSummary.shuffleWrite, - taskSummary.isBlacklisted, - maxMem, - taskSummary.executorLogs, - memoryMetrics - ) - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index aabf6e0c63c02..310d22d106773 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,192 +17,45 @@ package org.apache.spark.ui.exec -import scala.collection.mutable.{LinkedHashMap, ListBuffer} +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node -import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ -import org.apache.spark.storage.{StorageStatus, StorageStatusListener} -import org.apache.spark.ui.{SparkUI, SparkUITab} +import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils, WebUIPage} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val listener = parent.executorsListener - val sc = parent.sc - val threadDumpEnabled = - sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) - attachPage(new ExecutorsPage(this, threadDumpEnabled)) - if (threadDumpEnabled) { - attachPage(new ExecutorThreadDumpPage(this)) - } -} + init() -private[ui] case class ExecutorTaskSummary( - var executorId: String, - var totalCores: Int = 0, - var tasksMax: Int = 0, - var tasksActive: Int = 0, - var tasksFailed: Int = 0, - var tasksComplete: Int = 0, - var duration: Long = 0L, - var jvmGCTime: Long = 0L, - var inputBytes: Long = 0L, - var inputRecords: Long = 0L, - var outputBytes: Long = 0L, - var outputRecords: Long = 0L, - var shuffleRead: Long = 0L, - var shuffleWrite: Long = 0L, - var executorLogs: Map[String, String] = Map.empty, - var isAlive: Boolean = true, - var isBlacklisted: Boolean = false -) - -/** - * :: DeveloperApi :: - * A SparkListener that prepares information to be displayed on the ExecutorsTab - */ -@DeveloperApi -@deprecated("This class will be removed in a future release.", "2.2.0") -class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) - extends SparkListener { - val executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() - var executorEvents = new ListBuffer[SparkListenerEvent]() + private def init(): Unit = { + val threadDumpEnabled = + parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) - private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) - private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - - def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList - - def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList - - override def onExecutorAdded( - executorAdded: SparkListenerExecutorAdded): Unit = synchronized { - val eid = executorAdded.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap - taskSummary.totalCores = executorAdded.executorInfo.totalCores - taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1) - executorEvents += executorAdded - if (executorEvents.size > maxTimelineExecutors) { - executorEvents.remove(0) + attachPage(new ExecutorsPage(this, threadDumpEnabled)) + if (threadDumpEnabled) { + attachPage(new ExecutorThreadDumpPage(this, parent.sc)) } - - val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive) - if (deadExecutors.size > retainedDeadExecutors) { - val head = deadExecutors.head - executorToTaskSummary.remove(head._1) - } - } - - override def onExecutorRemoved( - executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { - executorEvents += executorRemoved - if (executorEvents.size > maxTimelineExecutors) { - executorEvents.remove(0) - } - executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false) } - override def onApplicationStart( - applicationStart: SparkListenerApplicationStart): Unit = { - applicationStart.driverLogs.foreach { logs => - val storageStatus = activeStorageStatusList.find { s => - s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || - s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER - } - storageStatus.foreach { s => - val eid = s.blockManagerId.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.executorLogs = logs.toMap - } - } - } - - override def onTaskStart( - taskStart: SparkListenerTaskStart): Unit = synchronized { - val eid = taskStart.taskInfo.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskSummary.tasksActive += 1 - } - - override def onTaskEnd( - taskEnd: SparkListenerTaskEnd): Unit = synchronized { - val info = taskEnd.taskInfo - if (info != null) { - val eid = info.executorId - val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskEnd.reason match { - case Resubmitted => - // Note: For resubmitted tasks, we continue to use the metrics that belong to the - // first attempt of this task. This may not be 100% accurate because the first attempt - // could have failed half-way through. The correct fix would be to keep track of the - // metrics added by each attempt, but this is much more complicated. - return - case _: ExceptionFailure => - taskSummary.tasksFailed += 1 - case _ => - taskSummary.tasksComplete += 1 - } - if (taskSummary.tasksActive >= 1) { - taskSummary.tasksActive -= 1 - } - taskSummary.duration += info.duration - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - taskSummary.inputBytes += metrics.inputMetrics.bytesRead - taskSummary.inputRecords += metrics.inputMetrics.recordsRead - taskSummary.outputBytes += metrics.outputMetrics.bytesWritten - taskSummary.outputRecords += metrics.outputMetrics.recordsWritten - - taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead - taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten - taskSummary.jvmGCTime += metrics.jvmGCTime - } - } - } - - private def updateExecutorBlacklist( - eid: String, - isBlacklisted: Boolean): Unit = { - val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - execTaskSummary.isBlacklisted = isBlacklisted - } - - override def onExecutorBlacklisted( - executorBlacklisted: SparkListenerExecutorBlacklisted) - : Unit = synchronized { - updateExecutorBlacklist(executorBlacklisted.executorId, true) - } - - override def onExecutorUnblacklisted( - executorUnblacklisted: SparkListenerExecutorUnblacklisted) - : Unit = synchronized { - updateExecutorBlacklist(executorUnblacklisted.executorId, false) - } - - override def onNodeBlacklisted( - nodeBlacklisted: SparkListenerNodeBlacklisted) - : Unit = synchronized { - // Implicitly blacklist every executor associated with this node, and show this in the UI. - activeStorageStatusList.foreach { status => - if (status.blockManagerId.host == nodeBlacklisted.hostId) { - updateExecutorBlacklist(status.blockManagerId.executorId, true) - } - } - } +} - override def onNodeUnblacklisted( - nodeUnblacklisted: SparkListenerNodeUnblacklisted) - : Unit = synchronized { - // Implicitly unblacklist every executor associated with this node, regardless of how - // they may have been blacklisted initially (either explicitly through executor blacklisting - // or implicitly through node blacklisting). Show this in the UI. - activeStorageStatusList.foreach { status => - if (status.blockManagerId.host == nodeUnblacklisted.hostId) { - updateExecutorBlacklist(status.blockManagerId.executorId, false) - } - } +private[ui] class ExecutorsPage( + parent: SparkUITab, + threadDumpEnabled: Boolean) + extends WebUIPage("") { + + def render(request: HttpServletRequest): Seq[Node] = { + val content = +
+ { +
++ + ++ + ++ + + } +
+ + UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 18be0870746e9..5818cc364432d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -358,9 +358,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { var content = summary - val executorListener = parent.executorListener content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, - executorListener.executorEvents, startTime) + parent.parent.store.executorEvents(), startTime) if (shouldShowActiveJobs) { content ++=

Active Jobs ({activeJobs.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 382a6f979f2e6..7cd625925c2c4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,12 +20,17 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.{Node, Unparsed} +import org.apache.spark.status.AppStateStore import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Stage summary grouped by executors. */ -private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) { +private[ui] class ExecutorTable( + stageId: Int, + stageAttemptId: Int, + parent: StagesTab, + store: AppStateStore) { private val listener = parent.progressListener def toNodeSeq: Seq[Node] = { @@ -123,9 +128,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{k}
{ - val logs = parent.executorsListener.executorToTaskSummary.get(k) - .map(_.executorLogs).getOrElse(Map.empty) - logs.map { + store.executorSummary(k).map(_.executorLogs).getOrElse(Map.empty).map { case (logName, logUrl) =>
{logName}
} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 3131c4a1eb7d4..dd7e11fef1ba6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -321,11 +321,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { var content = summary val appStartTime = listener.startTime - val executorListener = parent.executorListener val operationGraphListener = parent.operationGraphListener content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, - executorListener.executorEvents, appStartTime) + parent.parent.store.executorEvents(), appStartTime) content ++= UIUtils.showDagVizForJob( jobId, operationGraphListener.getOperationGraphForJob(jobId)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 620c54c2dc0a5..13b2ba11f6bc6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -23,11 +23,10 @@ import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { +private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") { val sc = parent.sc val killEnabled = parent.killEnabled val jobProgresslistener = parent.jobProgressListener - val executorListener = parent.executorsListener val operationGraphListener = parent.operationGraphListener def isFairScheduler: Boolean = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 19325a2dc9169..db79dd90f1659 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -29,18 +29,17 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.SparkConf import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} +import org.apache.spark.status.AppStateStore import org.apache.spark.ui._ -import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Distribution, Utils} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +private[ui] class StagePage(parent: StagesTab, store: AppStateStore) extends WebUIPage("stage") { import StagePage._ private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener - private val executorsListener = parent.executorsListener private val TIMELINE_LEGEND = {
@@ -302,7 +301,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { pageSize = taskPageSize, sortColumn = taskSortColumn, desc = taskSortDesc, - executorsListener = executorsListener + store = store ) (_taskTable, _taskTable.table(page)) } catch { @@ -561,7 +560,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stripeRowsWithCss = false)) } - val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) + val executorTable = new ExecutorTable(stageId, stageAttemptId, parent, store) val maybeAccumulableTable: Seq[Node] = if (hasAccumulators) {

Accumulators

++ accumulableTable } else Seq() @@ -865,7 +864,7 @@ private[ui] class TaskDataSource( pageSize: Int, sortColumn: String, desc: Boolean, - executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) { + store: AppStateStore) extends PagedDataSource[TaskTableRowData](pageSize) { import StagePage._ // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table @@ -1007,8 +1006,7 @@ private[ui] class TaskDataSource( None } - val logs = executorsListener.executorToTaskSummary.get(info.executorId) - .map(_.executorLogs).getOrElse(Map.empty) + val logs = store.executorSummary(info.executorId).map(_.executorLogs).getOrElse(Map.empty) new TaskTableRowData( info.index, info.taskId, @@ -1154,7 +1152,7 @@ private[ui] class TaskPagedTable( pageSize: Int, sortColumn: String, desc: Boolean, - executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { + store: AppStateStore) extends PagedTable[TaskTableRowData] { override def tableId: String = "task-table" @@ -1179,7 +1177,7 @@ private[ui] class TaskPagedTable( pageSize, sortColumn, desc, - executorsListener) + store) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 181465bdf9609..5c8e1aba2ee2f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -20,19 +20,21 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.status.AppStateStore import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all stages in the given SparkContext. */ -private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") { +private[ui] class StagesTab(val parent: SparkUI, store: AppStateStore) + extends SparkUITab(parent, "stages") { + val sc = parent.sc val conf = parent.conf val killEnabled = parent.killEnabled val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener - val executorsListener = parent.executorsListener attachPage(new AllStagesPage(this)) - attachPage(new StagePage(this)) + attachPage(new StagePage(this, store)) attachPage(new PoolPage(this)) def isFairScheduler: Boolean = progressListener.schedulingMode == Some(SchedulingMode.FAIR) diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json index 0f94e3b255dbc..0ad0ccca5c6e1 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json @@ -1,6 +1,33 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.167:51487", + "id" : "driver", + "hostPort" : "172.22.0.167:51475", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : true, + "maxMemory" : 908381388, + "executorLogs" : { }, + "memoryMetrics" : { + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 + } +}, { + "id" : "3", + "hostPort" : "172.22.0.167:51485", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,52 +35,55 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 4, - "completedTasks" : 0, - "totalTasks" : 4, - "totalDuration" : 2537, - "totalGCTime" : 88, + "failedTasks" : 0, + "completedTasks" : 12, + "totalTasks" : 12, + "totalDuration" : 2453, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, "executorLogs" : { - "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } -}, { - "id" : "driver", - "hostPort" : "172.22.0.167:51475", +} ,{ + "id" : "2", + "hostPort" : "172.22.0.167:51487", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 4, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 4, + "totalDuration" : 2537, + "totalGCTime" : 88, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, - "executorLogs" : { }, + "executorLogs" : { + "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { "id" : "1", @@ -110,39 +140,9 @@ "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 - } -}, { - "id" : "3", - "hostPort" : "172.22.0.167:51485", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 12, - "totalTasks" : 12, - "totalDuration" : 2453, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : true, - "maxMemory" : 908381388, - "executorLogs" : { - "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" - }, - "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json index 0f94e3b255dbc..7727bef0178bd 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json @@ -1,6 +1,33 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.167:51487", + "id" : "driver", + "hostPort" : "172.22.0.167:51475", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : true, + "maxMemory" : 908381388, + "executorLogs" : { }, + "memoryMetrics": { + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 + } +}, { + "id" : "3", + "hostPort" : "172.22.0.167:51485", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,52 +35,55 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 4, - "completedTasks" : 0, - "totalTasks" : 4, - "totalDuration" : 2537, - "totalGCTime" : 88, + "failedTasks" : 0, + "completedTasks" : 12, + "totalTasks" : 12, + "totalDuration" : 2453, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, "executorLogs" : { - "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { - "id" : "driver", - "hostPort" : "172.22.0.167:51475", + "id" : "2", + "hostPort" : "172.22.0.167:51487", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 4, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 4, + "totalDuration" : 2537, + "totalGCTime" : 88, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : true, "maxMemory" : 908381388, - "executorLogs" : { }, + "executorLogs" : { + "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr" + }, "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 + "usedOnHeapStorageMemory" : 0, + "usedOffHeapStorageMemory" : 0, + "totalOnHeapStorageMemory" : 384093388, + "totalOffHeapStorageMemory" : 524288000 } }, { "id" : "1", @@ -115,34 +145,4 @@ "totalOnHeapStorageMemory": 384093388, "totalOffHeapStorageMemory": 524288000 } -}, { - "id" : "3", - "hostPort" : "172.22.0.167:51485", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 12, - "totalTasks" : 12, - "totalDuration" : 2453, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : true, - "maxMemory" : 908381388, - "executorLogs" : { - "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr" - }, - "memoryMetrics": { - "usedOnHeapStorageMemory": 0, - "usedOffHeapStorageMemory": 0, - "totalOnHeapStorageMemory": 384093388, - "totalOffHeapStorageMemory": 524288000 - } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json index 92e249c851116..4a8539a8558bb 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json @@ -1,6 +1,27 @@ [ { - "id" : "2", - "hostPort" : "172.22.0.111:64539", + "id" : "driver", + "hostPort" : "172.22.0.111:64527", + "isActive" : true, + "rddBlocks" : 0, + "memoryUsed" : 0, + "diskUsed" : 0, + "totalCores" : 0, + "maxTasks" : 0, + "activeTasks" : 0, + "failedTasks" : 0, + "completedTasks" : 0, + "totalTasks" : 0, + "totalDuration" : 0, + "totalGCTime" : 0, + "totalInputBytes" : 0, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 0, + "isBlacklisted" : false, + "maxMemory" : 384093388, + "executorLogs" : { } +}, { + "id" : "3", + "hostPort" : "172.22.0.111:64543", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, @@ -8,41 +29,44 @@ "totalCores" : 4, "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 6, - "completedTasks" : 0, - "totalTasks" : 6, - "totalDuration" : 2792, - "totalGCTime" : 128, + "failedTasks" : 0, + "completedTasks" : 4, + "totalTasks" : 4, + "totalDuration" : 3457, + "totalGCTime" : 72, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "executorLogs" : { - "stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout", - "stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr" + "stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout", + "stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr" } }, { - "id" : "driver", - "hostPort" : "172.22.0.111:64527", + "id" : "2", + "hostPort" : "172.22.0.111:64539", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, - "totalCores" : 0, - "maxTasks" : 0, + "totalCores" : 4, + "maxTasks" : 4, "activeTasks" : 0, - "failedTasks" : 0, + "failedTasks" : 6, "completedTasks" : 0, - "totalTasks" : 0, - "totalDuration" : 0, - "totalGCTime" : 0, + "totalTasks" : 6, + "totalDuration" : 2792, + "totalGCTime" : 128, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, - "executorLogs" : { } + "executorLogs" : { + "stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout", + "stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr" + } }, { "id" : "1", "hostPort" : "172.22.0.111:64541", @@ -91,28 +115,4 @@ "stdout" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout", "stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr" } -}, { - "id" : "3", - "hostPort" : "172.22.0.111:64543", - "isActive" : true, - "rddBlocks" : 0, - "memoryUsed" : 0, - "diskUsed" : 0, - "totalCores" : 4, - "maxTasks" : 4, - "activeTasks" : 0, - "failedTasks" : 0, - "completedTasks" : 4, - "totalTasks" : 4, - "totalDuration" : 3457, - "totalGCTime" : 72, - "totalInputBytes" : 0, - "totalShuffleRead" : 0, - "totalShuffleWrite" : 0, - "isBlacklisted" : false, - "maxMemory" : 384093388, - "executorLogs" : { - "stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout", - "stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr" - } } ] diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 499d47b13d702..4d8ef8ab97ef4 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -22,13 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.mockito.Matchers.anyString import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStateStore import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener @@ -55,20 +56,21 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { * This also runs a dummy stage to populate the page with useful content. */ private def renderStagePage(conf: SparkConf): Seq[Node] = { + val store = mock(classOf[AppStateStore]) + when(store.executorSummary(anyString())).thenReturn(None) + val jobListener = new JobProgressListener(conf) val graphListener = new RDDOperationGraphListener(conf) - val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf) val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) when(tab.conf).thenReturn(conf) when(tab.progressListener).thenReturn(jobListener) when(tab.operationGraphListener).thenReturn(graphListener) - when(tab.executorsListener).thenReturn(executorsListener) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) when(request.getParameter("id")).thenReturn("0") when(request.getParameter("attempt")).thenReturn("0") - val page = new StagePage(tab) + val page = new StagePage(tab, store) // Simulate a stage in job progress listener val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 724b5102b7b4c..4bd6c3e206e8e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,6 +40,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.env.EnvironmentListener"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.exec.ExecutorsListener"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") )