diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 1067cdc84ceb..4da2ae289a2d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -827,6 +827,7 @@ private[spark] class AppStatusListener( event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) case stream: StreamBlockId => updateStreamBlock(event, stream) + case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } } @@ -885,15 +886,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { - if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) - } else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) - } - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) } // Update the block entry in the RDD info, keeping track of the deltas above so that we @@ -995,6 +988,39 @@ private[spark] class AppStatusListener( } } + private def updateBroadcastBlock( + event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { + val executorId = event.blockUpdatedInfo.blockManagerId.executorId + liveExecutors.get(executorId).foreach { exec => + val now = System.nanoTime() + val storageLevel = event.blockUpdatedInfo.storageLevel + + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + maybeUpdate(exec, now) + } + } + + private def updateExecutorMemoryDiskInfo( + exec: LiveExecutor, + storageLevel: StorageLevel, + memoryDelta: Long, + diskDelta: Long): Unit = { + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) + } else { + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) + } + } + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + } + private def getOrCreateStage(info: StageInfo): LiveStage = { val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), new Function[(Int, Int), LiveStage]() { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index ede9466b3d63..e367b3034a52 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -938,6 +938,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { intercept[NoSuchElementException] { check[StreamBlockData](stream1.name) { _ => () } } + + // Update a BroadcastBlock. + val broadcast1 = BroadcastBlockId(1L) + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, broadcast1, level, 1L, 1L))) + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + // Drop a BroadcastBlock. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, broadcast1, StorageLevel.NONE, 1L, 1L))) + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.memoryUsed === 0) + assert(exec.info.diskUsed === 0) + } } test("eviction of old data") {