Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
<th></th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
</th>
<th class="on_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
</th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Cores</th>
Expand Down Expand Up @@ -73,6 +81,14 @@ <h4 style="clear: left; display: inline-block;">Executors</h4>
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
Storage Memory</span></th>
<th class="on_heap_memory">
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
On Heap Storage Memory</span></th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
Expand Down
103 changes: 101 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ $(document).ready(function () {
var allRDDBlocks = 0;
var allMemoryUsed = 0;
var allMaxMemory = 0;
var allOnHeapMemoryUsed = 0;
var allOnHeapMaxMemory = 0;
var allOffHeapMemoryUsed = 0;
var allOffHeapMaxMemory = 0;
var allDiskUsed = 0;
var allTotalCores = 0;
var allMaxTasks = 0;
Expand All @@ -208,6 +212,10 @@ $(document).ready(function () {
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
var activeMaxMemory = 0;
var activeOnHeapMemoryUsed = 0;
var activeOnHeapMaxMemory = 0;
var activeOffHeapMemoryUsed = 0;
var activeOffHeapMaxMemory = 0;
var activeDiskUsed = 0;
var activeTotalCores = 0;
var activeMaxTasks = 0;
Expand All @@ -226,6 +234,10 @@ $(document).ready(function () {
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
var deadMaxMemory = 0;
var deadOnHeapMemoryUsed = 0;
var deadOnHeapMaxMemory = 0;
var deadOffHeapMemoryUsed = 0;
var deadOffHeapMaxMemory = 0;
var deadDiskUsed = 0;
var deadTotalCores = 0;
var deadMaxTasks = 0;
Expand All @@ -240,11 +252,22 @@ $(document).ready(function () {
var deadTotalShuffleWrite = 0;
var deadTotalBlacklisted = 0;

response.forEach(function (exec) {
exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
});

response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
allMemoryUsed += exec.memoryUsed;
allMaxMemory += exec.maxMemory;
allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
allOnHeapMaxMemory += exec.maxOnHeapMemory;
allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
allOffHeapMaxMemory += exec.maxOffHeapMemory;
allDiskUsed += exec.diskUsed;
allTotalCores += exec.totalCores;
allMaxTasks += exec.maxTasks;
Expand All @@ -263,6 +286,10 @@ $(document).ready(function () {
activeRDDBlocks += exec.rddBlocks;
activeMemoryUsed += exec.memoryUsed;
activeMaxMemory += exec.maxMemory;
activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
activeOnHeapMaxMemory += exec.maxOnHeapMemory;
activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
activeOffHeapMaxMemory += exec.maxOffHeapMemory;
activeDiskUsed += exec.diskUsed;
activeTotalCores += exec.totalCores;
activeMaxTasks += exec.maxTasks;
Expand All @@ -281,6 +308,10 @@ $(document).ready(function () {
deadRDDBlocks += exec.rddBlocks;
deadMemoryUsed += exec.memoryUsed;
deadMaxMemory += exec.maxMemory;
deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
deadOnHeapMaxMemory += exec.maxOnHeapMemory;
deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
deadOffHeapMaxMemory += exec.maxOffHeapMemory;
deadDiskUsed += exec.diskUsed;
deadTotalCores += exec.totalCores;
deadMaxTasks += exec.maxTasks;
Expand All @@ -302,6 +333,10 @@ $(document).ready(function () {
"allRDDBlocks": allRDDBlocks,
"allMemoryUsed": allMemoryUsed,
"allMaxMemory": allMaxMemory,
"allOnHeapMemoryUsed": allOnHeapMemoryUsed,
"allOnHeapMaxMemory": allOnHeapMaxMemory,
"allOffHeapMemoryUsed": allOffHeapMemoryUsed,
"allOffHeapMaxMemory": allOffHeapMaxMemory,
"allDiskUsed": allDiskUsed,
"allTotalCores": allTotalCores,
"allMaxTasks": allMaxTasks,
Expand All @@ -321,6 +356,10 @@ $(document).ready(function () {
"allRDDBlocks": activeRDDBlocks,
"allMemoryUsed": activeMemoryUsed,
"allMaxMemory": activeMaxMemory,
"allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
"allOnHeapMaxMemory": activeOnHeapMaxMemory,
"allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
"allOffHeapMaxMemory": activeOffHeapMaxMemory,
"allDiskUsed": activeDiskUsed,
"allTotalCores": activeTotalCores,
"allMaxTasks": activeMaxTasks,
Expand All @@ -340,6 +379,10 @@ $(document).ready(function () {
"allRDDBlocks": deadRDDBlocks,
"allMemoryUsed": deadMemoryUsed,
"allMaxMemory": deadMaxMemory,
"allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
"allOnHeapMaxMemory": deadOnHeapMaxMemory,
"allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
"allOffHeapMaxMemory": deadOffHeapMaxMemory,
"allDiskUsed": deadDiskUsed,
"allTotalCores": deadTotalCores,
"allMaxTasks": deadMaxTasks,
Expand Down Expand Up @@ -378,7 +421,35 @@ $(document).ready(function () {
{data: 'rddBlocks'},
{
data: function (row, type) {
return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
if (type !== 'display')
return row.memoryUsed;
else
return (formatBytes(row.memoryUsed, type) + ' / ' +
formatBytes(row.maxMemory, type));
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.onHeapMemoryUsed;
else
return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
formatBytes(row.maxOnHeapMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('on_heap_memory')
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.offHeapMemoryUsed;
else
return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
formatBytes(row.maxOffHeapMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('off_heap_memory')
}
},
{data: 'diskUsed', render: formatBytes},
Expand Down Expand Up @@ -450,7 +521,35 @@ $(document).ready(function () {
{data: 'allRDDBlocks'},
{
data: function (row, type) {
return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
if (type !== 'display')
return row.allMemoryUsed
else
return (formatBytes(row.allMemoryUsed, type) + ' / ' +
formatBytes(row.allMaxMemory, type));
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.allOnHeapMemoryUsed;
else
return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
formatBytes(row.allOnHeapMaxMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('on_heap_memory')
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.allOffHeapMemoryUsed;
else
return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
formatBytes(row.allOffHeapMaxMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('off_heap_memory')
}
},
{data: 'allDiskUsed', render: formatBytes},
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
.serialization_time, .getting_result_time, .peak_execution_memory {
.serialization_time, .getting_result_time, .peak_execution_memory,
.on_heap_memory, .off_heap_memory {
display: none;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
case class SparkListenerBlockManagerAdded(
time: Long,
blockManagerId: BlockManagerId,
maxMem: Long,
maxOnHeapMem: Option[Long] = None,
maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that, with this, we get JSON serialization of SparkListenerBlockManagerAdded events for free? It's important that an external history server be able to see the new maxOnHeapMem and maxOffHeapMem values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.


@DeveloperApi
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId)
diskUsed = status.diskUsedByRdd(rddId),
onHeapMemoryUsed = Some(
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
offHeapMemoryUsed = Some(
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])
val executorLogs: Map[String, String],
val onHeapMemoryUsed: Option[Long],
val offHeapMemoryUsed: Option[Long],
val maxOnHeapMemory: Option[Long],
val maxOffHeapMemory: Option[Long])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields now change to Option, it will not be printed out for old event log.


class JobData private[spark](
val jobId: Int,
Expand Down Expand Up @@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
val address: String,
val memoryUsed: Long,
val memoryRemaining: Long,
val diskUsed: Long)
val diskUsed: Long,
val onHeapMemoryUsed: Option[Long],
val offHeapMemoryUsed: Option[Long],
val onHeapMemoryRemaining: Option[Long],
val offHeapMemoryRemaining: Option[Long])

class RDDPartitionInfo private[spark](
val blockName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory =
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
Expand Down Expand Up @@ -180,7 +180,8 @@ private[spark] class BlockManager(

val idFromMaster = master.registerBlockManager(
id,
maxMemory,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id
Expand Down Expand Up @@ -258,7 +259,7 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class BlockManagerMaster(
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
logInfo("BlockManagerMasterEndpoint up")

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
Expand Down Expand Up @@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(

private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one more field maxMem. If using old event log to replay the storage info, only maxMem field is valid. Other two fields (maxOnHeapMem and maxOffHeapMem) will be None.

}.toArray
}

Expand Down Expand Up @@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxMemSize: Long,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
Expand All @@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxMemSize), id))
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))

blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

Expand Down Expand Up @@ -464,10 +467,13 @@ object BlockStatus {
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val maxOnHeapMem: Long,
val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef)
extends Logging {

val maxMem = maxOnHeapMem + maxOffHeapMem

private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem

Expand Down
Loading