diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index ca1074fcf6fc..02cb6f29622f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -249,4 +249,16 @@ object StaticSQLConf {
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)
+
+ val ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST =
+ buildStaticConf("spark.sql.streaming.ui.enabledCustomMetricList")
+ .internal()
+ .doc("Configures a list of custom metrics on Structured Streaming UI, which are enabled. " +
+ "The list contains the name of the custom metrics separated by comma. In aggregation" +
+ "only sum used. The list of supported custom metrics is state store provider specific " +
+ "and it can be found out for example from query progress log entry.")
+ .version("3.1.0")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index f48672afb41f..77b1e61d587a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -19,18 +19,32 @@ package org.apache.spark.sql.streaming.ui
import java.{util => ju}
import java.lang.{Long => JLong}
-import java.util.UUID
+import java.util.{Locale, UUID}
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.xml.{Node, NodeBuffer, Unparsed}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.state.StateStoreProvider
+import org.apache.spark.sql.internal.SQLConf.STATE_STORE_PROVIDER_CLASS
+import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
+ // State store provider implementation mustn't do any heavyweight initialiation in constructor
+ // but in its init method.
+ private val supportedCustomMetrics = StateStoreProvider.create(
+ parent.parent.conf.get(STATE_STORE_PROVIDER_CLASS)).supportedCustomMetrics
+ logDebug(s"Supported custom metrics: $supportedCustomMetrics")
+
+ private val enabledCustomMetrics =
+ parent.parent.conf.get(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST).map(_.toLowerCase(Locale.ROOT))
+ logDebug(s"Enabled custom metrics: $enabledCustomMetrics")
+
def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
@@ -199,49 +213,100 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
- // scalastyle:off
-
-
-
- Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}
-
- |
- {graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)} |
- {graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)} |
-
-
-
-
- Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}
-
- |
- {graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)} |
- {graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)} |
-
-
-
-
- Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}
-
- |
- {graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)} |
- {graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)} |
-
-
-
-
- Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Accumulates all input rows being dropped in stateful operators by watermark. 'Inputs' are relative to operators.", "right")}
-
- |
- {graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)} |
- {graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)} |
-
- // scalastyle:on
+ val result =
+ // scalastyle:off
+
+
+
+ Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}
+
+ |
+ {graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)} |
+ {graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)} |
+
+
+
+
+ Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}
+
+ |
+ {graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)} |
+ {graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)} |
+
+
+
+
+ Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}
+
+ |
+ {graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)} |
+ {graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)} |
+
+
+
+
+ Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Accumulates all input rows being dropped in stateful operators by watermark. 'Inputs' are relative to operators.", "right")}
+
+ |
+ {graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)} |
+ {graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)} |
+
+ // scalastyle:on
+
+ if (enabledCustomMetrics.nonEmpty) {
+ result ++= generateAggregatedCustomMetrics(query, minBatchTime, maxBatchTime, jsCollector)
+ }
+ result
} else {
new NodeBuffer()
}
}
+ def generateAggregatedCustomMetrics(
+ query: StreamingQueryUIData,
+ minBatchTime: Long,
+ maxBatchTime: Long,
+ jsCollector: JsCollector): NodeBuffer = {
+ val result: NodeBuffer = new NodeBuffer
+
+ // This is made sure on caller side but put it here to be defensive
+ require(query.lastProgress.stateOperators.nonEmpty)
+ query.lastProgress.stateOperators.head.customMetrics.keySet().asScala
+ .filter(m => enabledCustomMetrics.contains(m.toLowerCase(Locale.ROOT))).map { metricName =>
+ val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
+ p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum))
+ val max = data.maxBy(_._2)._2
+ val metric = supportedCustomMetrics.find(_.name.equalsIgnoreCase(metricName)).get
+
+ val graphUIData =
+ new GraphUIData(
+ s"aggregated-$metricName-timeline",
+ s"aggregated-$metricName-histogram",
+ data,
+ minBatchTime,
+ maxBatchTime,
+ 0,
+ max,
+ "")
+ graphUIData.generateDataJs(jsCollector)
+
+ result ++=
+ // scalastyle:off
+
+
+
+ Aggregated Custom Metric {s"$metricName"} {SparkUIUtils.tooltip(metric.desc, "right")}
+
+ |
+ {graphUIData.generateTimelineHtml(jsCollector)} |
+ {graphUIData.generateHistogramHtml(jsCollector)} |
+
+ // scalastyle:on
+ }
+
+ result
+ }
+
def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index 640c21c52a14..c2b6688faf0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -24,8 +24,10 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
import scala.xml.Node
+import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.ui.SparkUI
class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
@@ -65,10 +67,13 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ val ui = mock(classOf[SparkUI])
when(request.getParameter("id")).thenReturn(id.toString)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(tab.statusListener).thenReturn(statusListener)
+ when(ui.conf).thenReturn(new SparkConf())
+ when(tab.parent).thenReturn(ui)
val streamQuery = createStreamQueryUIData(id)
when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
index 307479db3394..94844c4e87a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
import org.apache.spark.sql.LocalSparkSession.withSparkSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.ui.SparkUICssErrorHandler
@@ -53,6 +54,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.setAppName("ui-test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
+ .set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, Seq("stateOnCurrentVersionSizeBytes"))
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val spark = SparkSession.builder().master(master).config(conf).getOrCreate()
assert(spark.sparkContext.ui.isDefined)
@@ -140,6 +142,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of Rows Dropped By Watermark (?)")
+ summaryText should contain ("Aggregated Custom Metric stateOnCurrentVersionSizeBytes" +
+ " (?)")
+ summaryText should not contain ("Aggregated Custom Metric loadedMapCacheHitCount (?)")
+ summaryText should not contain ("Aggregated Custom Metric loadedMapCacheMissCount (?)")
}
} finally {
spark.streams.active.foreach(_.stop())