" +
"");
reselectCheckboxesBasedOnTaskTableState();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c32c674d64e0..5aff6682bfdd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -68,7 +68,7 @@ import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage._
-import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
+import org.apache.spark.storage.BlockManagerMessages.{TriggerHeapHistogram, TriggerThreadDump}
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.util._
import org.apache.spark.util.logging.DriverLogger
@@ -750,6 +750,30 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
+ /**
+ * Called by the web UI to obtain executor heap histogram.
+ */
+ private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = {
+ try {
+ if (executorId == SparkContext.DRIVER_IDENTIFIER) {
+ Some(Utils.getHeapHistogram())
+ } else {
+ env.blockManager.master.getExecutorEndpointRef(executorId) match {
+ case Some(endpointRef) =>
+ Some(endpointRef.askSync[Array[String]](TriggerHeapHistogram))
+ case None =>
+ logWarning(s"Executor $executorId might already have stopped and " +
+ "can not request heap histogram from it.")
+ None
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Exception getting heap histogram from executor $executorId", e)
+ None
+ }
+ }
+
private[spark] def getLocalProperties: Properties = localProperties.get()
private[spark] def setLocalProperties(props: Properties): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index a32e60de2a45..d0db5a908548 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -20,6 +20,8 @@ package org.apache.spark.internal.config
import java.util.Locale
import java.util.concurrent.TimeUnit
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+
import org.apache.spark.network.util.ByteUnit
private[spark] object UI {
@@ -97,6 +99,11 @@ private[spark] object UI {
.booleanConf
.createWithDefault(true)
+ val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled")
+ .version("3.5.0")
+ .booleanConf
+ .createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))
+
val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled")
.internal()
.doc("Expose executor metrics at /metrics/executors/prometheus. " +
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 24d0f239f731..7fb145556a11 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -56,6 +56,11 @@ private[spark] object BlockManagerMessages {
*/
case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint
+ /**
+ * Driver to Executor message to get a heap histogram.
+ */
+ case object TriggerHeapHistogram extends ToBlockManagerMasterStorageEndpoint
+
//////////////////////////////////////////////////////////////////////////////////
// Messages from storage endpoints to the master.
//////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index d4c631e59a1a..476be80e67df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -78,6 +78,9 @@ class BlockManagerStorageEndpoint(
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())
+ case TriggerHeapHistogram =>
+ context.reply(Utils.getHeapHistogram())
+
case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
new file mode 100644
index 000000000000..6964711a7889
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Text}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}
+
+private[ui] class ExecutorHeapHistogramPage(
+ parent: SparkUITab,
+ sc: Option[SparkContext]) extends WebUIPage("heapHistogram") {
+
+ // Match the lines containing object informations
+ val pattern = """\s*([0-9]+):\s+([0-9]+)\s+([0-9]+)\s+(\S+)(.*)""".r
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val executorId = Option(request.getParameter("executorId")).map { executorId =>
+ UIUtils.decodeURLParameter(executorId)
+ }.getOrElse {
+ throw new IllegalArgumentException(s"Missing executorId parameter")
+ }
+ val time = System.currentTimeMillis()
+ val maybeHeapHistogram = sc.get.getExecutorHeapHistogram(executorId)
+
+ val content = maybeHeapHistogram.map { heapHistogram =>
+ val rows = heapHistogram.map { row =>
+ row match {
+ case pattern(rank, instances, bytes, name, module) =>
+
+
{rank}
+
{instances}
+
{bytes}
+
{name}
+
{module}
+
+ case pattern(rank, instances, bytes, name) =>
+
+
{rank}
+
{instances}
+
{bytes}
+
{name}
+
+
+ case _ =>
+ // Ignore the first two lines and the last line
+ //
+ // num #instances #bytes class name (module)
+ // -------------------------------------------------------
+ // ...
+ // Total 1267867 72845688
+ }
+ }
+
+
+
Updated at {UIUtils.formatDate(time)}
+
+
+
Rank
+
Instances
+
Bytes
+
Class Name
+
Module
+
+ {rows}
+
+
+
+ }.getOrElse(Text("Error fetching heap histogram"))
+ UIUtils.headerSparkPage(request, s"Heap Histogram for Executor $executorId", content, parent)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 7a857e57ceeb..b92c5e67989d 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -31,18 +31,24 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
private def init(): Unit = {
val threadDumpEnabled =
parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED)
+ val heapHistogramEnabled =
+ parent.sc.isDefined && parent.conf.get(UI_HEAP_HISTOGRAM_ENABLED)
- attachPage(new ExecutorsPage(this, threadDumpEnabled))
+ attachPage(new ExecutorsPage(this, threadDumpEnabled, heapHistogramEnabled))
if (threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this, parent.sc))
}
+ if (heapHistogramEnabled) {
+ attachPage(new ExecutorHeapHistogramPage(this, parent.sc))
+ }
}
}
private[ui] class ExecutorsPage(
parent: SparkUITab,
- threadDumpEnabled: Boolean)
+ threadDumpEnabled: Boolean,
+ heapHistogramEnabled: Boolean)
extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
@@ -52,6 +58,7 @@ private[ui] class ExecutorsPage(
++
++
+
}
UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6e8f2c496e8b..ee74eacb84f7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2287,6 +2287,23 @@ private[spark] object Utils extends Logging with SparkClassUtils {
}.map(threadInfoToThreadStackTrace)
}
+ /** Return a heap dump. Used to capture dumps for the web UI */
+ def getHeapHistogram(): Array[String] = {
+ // From Java 9+, we can use 'ProcessHandle.current().pid()'
+ val pid = getProcessName().split("@").head
+ val builder = new ProcessBuilder("jmap", "-histo:live", pid)
+ builder.redirectErrorStream(true)
+ val p = builder.start()
+ val r = new BufferedReader(new InputStreamReader(p.getInputStream()))
+ val rows = ArrayBuffer.empty[String]
+ var line = ""
+ while (line != null) {
+ if (line.nonEmpty) rows += line
+ line = r.readLine()
+ }
+ rows.toArray
+ }
+
def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = {
if (threadId <= 0) {
None