Skip to content
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.{Long => JLong}
import java.util.UUID
import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}
import scala.xml.{Node, NodeBuffer, Unparsed}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ui.UIUtils._
Expand Down Expand Up @@ -126,6 +126,123 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<br />
}

def generateAggregatedStateOperators(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector
): NodeBuffer = {
// This is made sure on caller side but put it here to be defensive
require(query.lastProgress != null)
if (query.lastProgress.stateOperators.nonEmpty) {
val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsTotal).sum.toDouble))
val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2

val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsUpdated).sum.toDouble))
val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2

val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.memoryUsedBytes).sum.toDouble))
val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2

val numRowsDroppedByWatermarkData = query.recentProgress
.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble))
val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2

val graphUIDataForNumberTotalRows =
new GraphUIData(
"aggregated-num-total-rows-timeline",
"aggregated-num-total-rows-histogram",
numRowsTotalData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsTotal,
"records")
graphUIDataForNumberTotalRows.generateDataJs(jsCollector)

val graphUIDataForNumberUpdatedRows =
new GraphUIData(
"aggregated-num-updated-rows-timeline",
"aggregated-num-updated-rows-histogram",
numRowsUpdatedData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsUpdated,
"records")
graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)

val graphUIDataForMemoryUsedBytes =
new GraphUIData(
"aggregated-memory-used-bytes-timeline",
"aggregated-memory-used-bytes-histogram",
memoryUsedBytesData,
minBatchTime,
maxBatchTime,
0,
maxMemoryUsedBytes,
"bytes")
graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)

val graphUIDataForNumRowsDroppedByWatermark =
new GraphUIData(
"aggregated-num-rows-dropped-by-watermark-timeline",
"aggregated-num-rows-dropped-by-watermark-histogram",
numRowsDroppedByWatermarkData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsDroppedByWatermark,
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)

// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}</strong></div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out during work on mine - the rows dropped by watermark are not from state. It's a bit confusing, but they're input rows for "stateful operators". I'll make a follow-up PR to correct this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</div>
</td>
<td class={"aggregated-num-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
} else {
new NodeBuffer()
}
}

def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
Expand Down Expand Up @@ -284,6 +401,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</td>
<td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
</tr>
{generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
</tbody>
</table>
} else {
Expand Down