Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ private[spark] class AppStatusListener(
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))

coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
.getOrElse(coresPerTask)
kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
}

Expand Down Expand Up @@ -132,7 +134,7 @@ private[spark] class AppStatusListener(
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
// This needs to be an update in case an executor re-registers after the driver has
// marked it as "dead".
val exec = getOrCreateExecutor(event.executorId)
val exec = getOrCreateExecutor(event.executorId, event.time)
exec.host = event.executorInfo.executorHost
exec.isActive = true
exec.totalCores = event.executorInfo.totalCores
Expand All @@ -144,6 +146,8 @@ private[spark] class AppStatusListener(
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, System.nanoTime())
}
}
Expand Down Expand Up @@ -357,18 +361,25 @@ private[spark] class AppStatusListener(
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
if (event.taskMetrics != null) {
val readMetrics = event.taskMetrics.shuffleReadMetrics
exec.totalGcTime += event.taskMetrics.jvmGCTime
exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
}

exec.activeTasks -= 1
exec.completedTasks += completedDelta
exec.failedTasks += failedDelta
exec.totalDuration += event.taskInfo.duration

// Note: For resubmitted tasks, we continue to use the metrics that belong to the
// first attempt of this task. This may not be 100% accurate because the first attempt
// could have failed half-way through. The correct fix would be to keep track of the
// metrics added by each attempt, but this is much more complicated.
if (event.reason != Resubmitted) {
if (event.taskMetrics != null) {
val readMetrics = event.taskMetrics.shuffleReadMetrics
exec.totalGcTime += event.taskMetrics.jvmGCTime
exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
}
}

maybeUpdate(exec, now)
}
}
Expand Down Expand Up @@ -409,7 +420,7 @@ private[spark] class AppStatusListener(
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
// This needs to set fields that are already set by onExecutorAdded because the driver is
// considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.
val exec = getOrCreateExecutor(event.blockManagerId.executorId)
val exec = getOrCreateExecutor(event.blockManagerId.executorId, event.time)
exec.hostPort = event.blockManagerId.hostPort
event.maxOnHeapMem.foreach { _ =>
exec.totalOnHeap = event.maxOnHeapMem.get
Expand Down Expand Up @@ -561,8 +572,8 @@ private[spark] class AppStatusListener(
}
}

private def getOrCreateExecutor(executorId: String): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
}

private def getOrCreateStage(info: StageInfo): LiveStage = {
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,22 @@ private[spark] class AppStatusStore(store: KVStore) {
}

def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true)
.last(true).asScala.map(_.info).toSeq
val base = store.view(classOf[ExecutorSummaryWrapper])
val filtered = if (activeOnly) {
base.index("active").reverse().first(true).last(true)
} else {
base
}
filtered.asScala.map(_.info).toSeq
}

def executorSummary(executorId: String): Option[v1.ExecutorSummary] = {
try {
Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info)
} catch {
case _: NoSuchElementException =>
None
}
}

def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,17 @@ private class LiveTask(

}

private class LiveExecutor(val executorId: String) extends LiveEntity {
private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {

var hostPort: String = null
var host: String = null
var isActive = true
var totalCores = 0

val addTime = new Date(_addTime)
var removeTime: Date = null
var removeReason: String = null

var rddBlocks = 0
var memoryUsed = 0L
var diskUsed = 0L
Expand Down Expand Up @@ -276,6 +280,9 @@ private class LiveExecutor(val executorId: String) extends LiveEntity {
totalShuffleWrite,
isBlacklisted,
maxMemory,
addTime,
Option(removeTime),
Option(removeReason),
executorLogs,
memoryMetrics)
new ExecutorSummaryWrapper(info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,11 @@ import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.exec.ExecutorsPage

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllExecutorListResource(ui: SparkUI) {

@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
(0 until listener.activeStorageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
} ++ (0 until listener.deadStorageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
}
}
}
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,11 @@ import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.exec.ExecutorsPage

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ExecutorListResource(ui: SparkUI) {

@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
}
}
def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true)

}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class ExecutorSummary private[spark](
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
val addTime: Date,
val removeTime: Option[Date],
val removeReason: Option[String],
val executorLogs: Map[String, String],
val memoryMetrics: Option[MemoryMetrics])

Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.status.api.v1._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
Expand All @@ -44,7 +44,6 @@ private[spark] class SparkUI private (
val conf: SparkConf,
securityManager: SecurityManager,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener,
Expand All @@ -68,7 +67,7 @@ private[spark] class SparkUI private (
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
val stagesTab = new StagesTab(this, store)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this, store))
Expand Down Expand Up @@ -189,18 +188,16 @@ private[spark] object SparkUI {
}

val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)

addListenerFn(storageStatusListener)
addListenerFn(executorsListener)
addListenerFn(storageListener)
addListenerFn(operationGraphListener)

new SparkUI(store, sc, conf, securityManager, storageStatusListener, executorsListener,
jobProgressListener, storageListener, operationGraphListener, appName, basePath,
lastUpdateTime, startTime, appSparkVersion)
new SparkUI(store, sc, conf, securityManager, storageStatusListener, jobProgressListener,
storageListener, operationGraphListener, appName, basePath, lastUpdateTime, startTime,
appSparkVersion)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Text}

import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.SparkContext
import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}

private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {

private val sc = parent.sc
private[ui] class ExecutorThreadDumpPage(
parent: SparkUITab,
sc: Option[SparkContext]) extends WebUIPage("threadDump") {

// stripXSS is called first to remove suspicious characters used in XSS attacks
def render(request: HttpServletRequest): Seq[Node] = {
Expand Down
Loading