Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2875,6 +2875,15 @@ object SQLConf {
.stringConf
.createWithDefault("")

val DISABLED_STREAMING_UI_CUSTOM_METRICS_LIST =
buildConf("spark.sql.streaming.disabledUICustomMetricsList")
.internal()
.doc("Configures a list of custom metrics on Structured Streaming UI, which are disabled. " +
"The list contains the name of the custom metrics separated by comma.")
.version("3.1.0")
.stringConf
.createWithDefault("loadedMapCacheHitCount")

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3512,6 +3521,9 @@ class SQLConf extends Serializable with Logging {

def disabledJdbcConnectionProviders: String = getConf(SQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST)

def disabledStreamingCustomMetrics: String =
getConf(SQLConf.DISABLED_STREAMING_UI_CUSTOM_METRICS_LIST)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import java.lang.{Long => JLong}
import java.util.UUID
import javax.servlet.http.HttpServletRequest

import scala.collection.JavaConverters._
import scala.xml.{Node, NodeBuffer, Unparsed}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.util.Utils

private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
Expand Down Expand Up @@ -199,49 +202,98 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)

// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
val result =
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}</strong></div>
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on

result ++= generateAggregatedCustomMetrics(query, minBatchTime, maxBatchTime, jsCollector)
result
} else {
new NodeBuffer()
}
}

def generateAggregatedCustomMetrics(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
val result: NodeBuffer = new NodeBuffer

// This is made sure on caller side but put it here to be defensive
require(query.lastProgress.stateOperators.nonEmpty)
val disabledCustomMetrics = Utils.stringToSeq(SQLConf.get.disabledStreamingCustomMetrics)
query.lastProgress.stateOperators.head.customMetrics.keySet().asScala
.filterNot(disabledCustomMetrics.contains(_)).map { metricName =>
val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum))
val max = data.maxBy(_._2)._2

val graphUIData =
new GraphUIData(
s"aggregated-$metricName-timeline",
s"aggregated-$metricName-histogram",
data,
minBatchTime,
maxBatchTime,
0,
max,
"")
graphUIData.generateDataJs(jsCollector)

result ++=
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 240px;">
<div><strong>Aggregated Custom Metric {s"$metricName"} {SparkUIUtils.tooltip("Custom metric.", "right")}</strong></div>
</div>
</td>
<td class={s"aggregated-$metricName-timeline"}>{graphUIData.generateTimelineHtml(jsCollector)}</td>
<td class={s"aggregated-$metricName-histogram"}>{graphUIData.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
}

result
}

def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of State Rows Dropped By Watermark (?)")
summaryText should contain ("Aggregated Custom Metric stateOnCurrentVersionSizeBytes" +
" (?)")
summaryText should not contain ("Aggregated Custom Metric loadedMapCacheHitCount (?)")
summaryText should contain ("Aggregated Custom Metric loadedMapCacheMissCount (?)")
}
} finally {
spark.streams.active.foreach(_.stop())
Expand Down