Skip to content
Closed
43 changes: 33 additions & 10 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ $(document).ready(function () {
var summary = [];
var allExecCnt = 0;
var allRDDBlocks = 0;
var allMemoryUsed = 0;
var allMaxMemory = 0;
var allOnHeapMemoryUsed = 0;
var allOnHeapMaxMemory = 0;
var allOffHeapMemoryUsed = 0;
Expand All @@ -208,6 +210,8 @@ $(document).ready(function () {

var activeExecCnt = 0;
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
var activeMaxMemory = 0;
var activeOnHeapMemoryUsed = 0;
var activeOnHeapMaxMemory = 0;
var activeOffHeapMemoryUsed = 0;
Expand All @@ -228,6 +232,8 @@ $(document).ready(function () {

var deadExecCnt = 0;
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
var deadMaxMemory = 0;
var deadOnHeapMemoryUsed = 0;
var deadOnHeapMaxMemory = 0;
var deadOffHeapMemoryUsed = 0;
Expand All @@ -246,9 +252,18 @@ $(document).ready(function () {
var deadTotalShuffleWrite = 0;
var deadTotalBlacklisted = 0;

response.forEach(function (exec) {
exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
});

response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
allMemoryUsed += exec.memoryUsed;
allMaxMemory += exec.maxMemory;
allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
allOnHeapMaxMemory += exec.maxOnHeapMemory;
allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
Expand All @@ -269,6 +284,8 @@ $(document).ready(function () {
if (exec.isActive) {
activeExecCnt += 1;
activeRDDBlocks += exec.rddBlocks;
activeMemoryUsed += exec.memoryUsed;
activeMaxMemory += exec.maxMemory;
activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
activeOnHeapMaxMemory += exec.maxOnHeapMemory;
activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
Expand All @@ -291,6 +308,10 @@ $(document).ready(function () {
deadRDDBlocks += exec.rddBlocks;
deadMemoryUsed += exec.memoryUsed;
deadMaxMemory += exec.maxMemory;
deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
deadOnHeapMaxMemory += exec.maxOnHeapMemory;
deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
deadOffHeapMaxMemory += exec.maxOffHeapMemory;
deadDiskUsed += exec.diskUsed;
deadTotalCores += exec.totalCores;
deadMaxTasks += exec.maxTasks;
Expand All @@ -310,6 +331,8 @@ $(document).ready(function () {
var totalSummary = {
"execCnt": ( "Total(" + allExecCnt + ")"),
"allRDDBlocks": allRDDBlocks,
"allMemoryUsed": allMemoryUsed,
"allMaxMemory": allMaxMemory,
"allOnHeapMemoryUsed": allOnHeapMemoryUsed,
"allOnHeapMaxMemory": allOnHeapMaxMemory,
"allOffHeapMemoryUsed": allOffHeapMemoryUsed,
Expand All @@ -331,6 +354,8 @@ $(document).ready(function () {
var activeSummary = {
"execCnt": ( "Active(" + activeExecCnt + ")"),
"allRDDBlocks": activeRDDBlocks,
"allMemoryUsed": activeMemoryUsed,
"allMaxMemory": activeMaxMemory,
"allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
"allOnHeapMaxMemory": activeOnHeapMaxMemory,
"allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
Expand All @@ -352,6 +377,8 @@ $(document).ready(function () {
var deadSummary = {
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
"allRDDBlocks": deadRDDBlocks,
"allMemoryUsed": deadMemoryUsed,
"allMaxMemory": deadMaxMemory,
"allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
"allOnHeapMaxMemory": deadOnHeapMaxMemory,
"allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
Expand Down Expand Up @@ -395,12 +422,10 @@ $(document).ready(function () {
{
data: function (row, type) {
if (type !== 'display')
return row.onHeapMemoryUsed + row.offHeapMemoryUsed;
return row.memoryUsed;
else
var memoryUsed = row.onHeapMemoryUsed + row.offHeapMemoryUsed;
var maxMemory = row.maxOnHeapMemory + row.maxOffHeapMemory;
return (formatBytes(memoryUsed, type) + ' / ' +
formatBytes(maxMemory, type));
return (formatBytes(row.memoryUsed, type) + ' / ' +
formatBytes(row.maxMemory, type));
}
},
{
Expand Down Expand Up @@ -497,12 +522,10 @@ $(document).ready(function () {
{
data: function (row, type) {
if (type !== 'display')
return row.allOnHeapMemoryUsed + row.allOffHeapMemoryUsed;
return row.allMemoryUsed
else
var memoryUsed = row.allOnHeapMemoryUsed + row.allOffHeapMemoryUsed;
var maxMemory = row.allOnHeapMaxMemory + row.allOffHeapMaxMemory;
return (formatBytes(memoryUsed, type) + ' / ' +
formatBytes(maxMemory, type));
return (formatBytes(row.allMemoryUsed, type) + ' / ' +
formatBytes(row.allMaxMemory, type));
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ private[spark] object AllRDDResource {
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L,
offHeapMemoryUsed =
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L,
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
// Get RDDStorageInfo REST API will return RDD information only in the Live UI, so
// we assume this two fields always exist.
onHeapMemoryRemaining = status.onHeapMemRemaining.get,
offHeapMemoryRemaining = status.offHeapMemRemaining.get
) } )
} else {
None
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class ExecutorSummary private[spark](
val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String],
val onHeapMemoryUsed: Long,
val offHeapMemoryUsed: Long,
val maxOnHeapMemory: Long,
val maxOffHeapMemory: Long)
val onHeapMemoryUsed: Option[Long],
val offHeapMemoryUsed: Option[Long],
val maxOnHeapMemory: Option[Long],
val maxOffHeapMemory: Option[Long])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields now change to Option, it will not be printed out for old event log.


class JobData private[spark](
val jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(

private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxOnHeapMem, info.maxOffHeapMem, info.blocks.asScala)
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one more field maxMem. If using old event log to replay the storage info, only maxMem field is valid. Other two fields (maxOnHeapMem and maxOffHeapMem) will be None.

}.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager)
_.getStorageStatus.map(_.maxMem).sum)

registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"),
_.getStorageStatus.map(_.maxOnHeapMem).sum)
_.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum)

registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"),
_.getStorageStatus.map(_.maxOffHeapMem).sum)
_.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum)

registerGauge(MetricRegistry.name("memory", "remainingMem_MB"),
_.getStorageStatus.map(_.memRemaining).sum)

registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"),
_.getStorageStatus.map(_.onHeapMemRemaining).sum)
_.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum)

registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"),
_.getStorageStatus.map(_.offHeapMemRemaining).sum)
_.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum)

registerGauge(MetricRegistry.name("memory", "memUsed_MB"),
_.getStorageStatus.map(_.memUsed).sum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}

def deadStorageStatusList: Seq[StorageStatus] = synchronized {
deadExecutorStorageStatus.toSeq
deadExecutorStorageStatus
}

/** Update storage status list to reflect updated block statuses */
Expand Down Expand Up @@ -74,11 +74,8 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
synchronized {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
// This two fields are compatible with old event logs, in which there only has max on heap
// memory in the event log. So maxOnHeapMem will use maxMem, maxOffHeapMem will set to 0.
val maxOnHeapMem = blockManagerAdded.maxOnHeapMem.getOrElse(blockManagerAdded.maxMem)
val maxOffHeapMem = blockManagerAdded.maxOffHeapMem.getOrElse(0L)
val storageStatus = new StorageStatus(blockManagerId, maxOnHeapMem, maxOffHeapMem)
val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
executorIdToStorageStatus(executorId) = storageStatus

// Try to remove the dead storage status if same executor register the block manager twice.
Expand Down
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ import org.apache.spark.internal.Logging
@DeveloperApi
class StorageStatus(
val blockManagerId: BlockManagerId,
val maxOnHeapMem: Long,
val maxOffHeapMem: Long) {
// Explicitly adding this maxMemory field to handle maxOnHeapMem and maxOffHeapMem not
// existing issue, this happened when trying to replay old event log.
val maxMemory: Long,
val maxOnHeapMem: Option[Long],
val maxOffHeapMem: Option[Long]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the comment you have here a little confusing. I'd get rid of it and put in something more like what you have where you create the StorageStatus in BlockManagerMasterEndpoint, something like

The onHeap and offHeap memory are always defined for new applications, but they can be missing if we are replaying old event logs.


/**
* Internal representation of the blocks stored in this block manager.
Expand All @@ -59,10 +62,11 @@ class StorageStatus(
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
def this(
bmid: BlockManagerId,
maxOnHeapMem: Long,
maxOffHeapMem: Long,
maxMemory: Long,
maxOnHeapMem: Option[Long],
maxOffHeapMem: Option[Long],
initialBlocks: Map[BlockId, BlockStatus]) {
this(bmid, maxOnHeapMem, maxOffHeapMem)
this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
}

Expand Down Expand Up @@ -175,10 +179,10 @@ class StorageStatus(
def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)

/** Return the max memory can be used by this block manager. */
def maxMem: Long = maxOnHeapMem + maxOffHeapMem
def maxMem: Long = maxMemory

/** Return the memory remaining in this block manager. */
def memRemaining: Long = onHeapMemRemaining + offHeapMemRemaining
def memRemaining: Long = maxMem - memUsed

/** Return the memory used by caching RDDs */
def cacheSize: Long = onHeapCacheSize + offHeapCacheSize
Expand All @@ -187,10 +191,10 @@ class StorageStatus(
def memUsed: Long = onHeapMemUsed + offHeapMemUsed

/** Return the on-heap memory remaining in this block manager. */
def onHeapMemRemaining: Long = maxOnHeapMem - onHeapMemUsed
def onHeapMemRemaining: Option[Long] = maxOnHeapMem.map(_ - onHeapMemUsed)

/** Return the off-heap memory remaining in this block manager. */
def offHeapMemRemaining: Long = maxOffHeapMem - offHeapMemUsed
def offHeapMemRemaining: Option[Long] = maxOffHeapMem.map(_ - offHeapMemUsed)

/** Return the on-heap memory used by this block manager. */
def onHeapMemUsed: Long = _nonRddStorageInfo.onHeapUsage + onHeapCacheSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ private[spark] object ExecutorsPage {
val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
val onHeapMemUsed = status.onHeapMemUsed
val offHeapMemUsed = status.offHeapMemUsed
// Only maxOnHeapMem and maxOffHeapMem are defined these two fields are not None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Maybe you mean "status.onHeapMemUsed is only valid if maxOnHeapMem is defined"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry about the confuse. Let me clarify the comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Maybe you mean "status.onHeapMemUsed is only valid if maxOnHeapMem is defined"?

val onHeapMemUsed = status.maxOnHeapMem.map(_ => status.onHeapMemUsed)
val offHeapMemUsed = status.maxOffHeapMem.map(_ => status.offHeapMemUsed)
val maxOnHeapMem = status.maxOnHeapMem
val maxOffHeapMem = status.maxOffHeapMem
val diskUsed = status.diskUsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,5 @@
"totalShuffleWrite" : 13180,
"isBlacklisted" : false,
"maxMemory" : 278302556,
"executorLogs" : { },
"onHeapMemoryUsed": 0,
"offHeapMemoryUsed": 0,
"maxOnHeapMemory": 278302556,
"maxOffHeapMemory": 0
"executorLogs" : { }
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout",
"stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr"
},
"onHeapMemoryUsed" : 0,
"offHeapMemoryUsed" : 0,
"maxOnHeapMemory" : 384093388,
"maxOffHeapMemory" : 0
}
}, {
"id" : "driver",
"hostPort" : "172.22.0.111:64527",
Expand All @@ -46,11 +42,7 @@
"totalShuffleWrite" : 0,
"isBlacklisted" : false,
"maxMemory" : 384093388,
"executorLogs" : { },
"onHeapMemoryUsed" : 0,
"offHeapMemoryUsed" : 0,
"maxOnHeapMemory" : 384093388,
"maxOffHeapMemory" : 0
"executorLogs" : { }
}, {
"id" : "1",
"hostPort" : "172.22.0.111:64541",
Expand All @@ -74,11 +66,7 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stdout",
"stderr" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr"
},
"onHeapMemoryUsed" : 0,
"offHeapMemoryUsed" : 0,
"maxOnHeapMemory" : 384093388,
"maxOffHeapMemory" : 0
}
}, {
"id" : "0",
"hostPort" : "172.22.0.111:64540",
Expand All @@ -102,11 +90,7 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout",
"stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr"
},
"onHeapMemoryUsed" : 0,
"offHeapMemoryUsed" : 0,
"maxOnHeapMemory" : 384093388,
"maxOffHeapMemory" : 0
}
}, {
"id" : "3",
"hostPort" : "172.22.0.111:64543",
Expand All @@ -130,9 +114,5 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout",
"stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr"
},
"onHeapMemoryUsed" : 0,
"offHeapMemoryUsed" : 0,
"maxOnHeapMemory" : 384093388,
"maxOffHeapMemory" : 0
}
} ]
Loading