Skip to content

Commit 11c0241

Browse files
committed
Revert "[SPARK-31953][SS] Add Spark Structured Streaming History Server Support"
This reverts commit 4f96670.
1 parent e88f0d4 commit 11c0241

17 files changed

+158
-693
lines changed

dev/.rat-excludes

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ SessionHandler.java
123123
GangliaReporter.java
124124
application_1578436911597_0052
125125
config.properties
126-
local-1596020211915
127126
app-20200706201101-0003
128127
py.typed
129128
_metadata
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin
2-
org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,16 @@ import org.apache.spark.util.ListenerBus
3131
* Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
3232
* to StreamingQueryListeners.
3333
*
34-
* Note 1: Each bus and its registered listeners are associated with a single SparkSession
34+
* Note that each bus and its registered listeners are associated with a single SparkSession
3535
* and StreamingQueryManager. So this bus will dispatch events to registered listeners for only
3636
* those queries that were started in the associated SparkSession.
37-
*
38-
* Note 2: To rebuild Structured Streaming UI in SHS, this bus will be registered into
39-
* [[org.apache.spark.scheduler.ReplayListenerBus]]. We check `sparkListenerBus` defined or not to
40-
* determine how to process [[StreamingQueryListener.Event]]. If false, it means this bus is used to
41-
* replay all streaming query event from eventLog.
4237
*/
43-
class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
38+
class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
4439
extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
4540

4641
import StreamingQueryListener._
4742

48-
sparkListenerBus.foreach(_.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY))
43+
sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)
4944

5045
/**
5146
* RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
@@ -72,11 +67,11 @@ class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
7267
event match {
7368
case s: QueryStartedEvent =>
7469
activeQueryRunIds.synchronized { activeQueryRunIds += s.runId }
75-
sparkListenerBus.foreach(bus => bus.post(s))
70+
sparkListenerBus.post(s)
7671
// post to local listeners to trigger callbacks
7772
postToAll(s)
7873
case _ =>
79-
sparkListenerBus.foreach(bus => bus.post(event))
74+
sparkListenerBus.post(event)
8075
}
8176
}
8277

@@ -100,11 +95,7 @@ class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
10095
// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
10196
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
10297
// thread
103-
//
104-
// When loaded by Spark History Server, we should process all event coming from replay
105-
// listener bus.
106-
if (sparkListenerBus.isEmpty || !LiveListenerBus.withinListenerThread.value ||
107-
!e.isInstanceOf[QueryStartedEvent]) {
98+
if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
10899
postToAll(e)
109100
}
110101
case _ =>
@@ -119,10 +110,7 @@ class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
119110
listener: StreamingQueryListener,
120111
event: StreamingQueryListener.Event): Unit = {
121112
def shouldReport(runId: UUID): Boolean = {
122-
// When loaded by Spark History Server, we should process all event coming from replay
123-
// listener bus.
124-
sparkListenerBus.isEmpty ||
125-
activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
113+
activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
126114
}
127115

128116
event match {

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala

Lines changed: 0 additions & 43 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala

Lines changed: 0 additions & 53 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
3434
import org.apache.spark.sql.catalyst.catalog._
3535
import org.apache.spark.sql.execution.CacheManager
3636
import org.apache.spark.sql.execution.streaming.StreamExecution
37-
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore}
37+
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
3838
import org.apache.spark.sql.internal.StaticSQLConf._
3939
import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
4040
import org.apache.spark.status.ElementTrackingStore
@@ -111,9 +111,9 @@ private[sql] class SharedState(
111111
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
112112
sparkContext.ui.flatMap { ui =>
113113
if (conf.get(STREAMING_UI_ENABLED)) {
114-
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
115-
new StreamingQueryTab(new StreamingQueryStatusStore(kvStore), ui)
116-
Some(new StreamingQueryStatusListener(conf, kvStore))
114+
val statusListener = new StreamingQueryStatusListener(conf)
115+
new StreamingQueryTab(statusListener, ui)
116+
Some(statusListener)
117117
} else {
118118
None
119119
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
4949

5050
private[sql] val stateStoreCoordinator =
5151
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
52-
private val listenerBus =
53-
new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus))
52+
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
5453

5554
@GuardedBy("activeQueriesSharedLock")
5655
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]

sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
4040
}
4141

4242
private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
43-
val (activeQueries, inactiveQueries) =
44-
parent.store.allQueryUIData.partition(_.summary.isActive)
43+
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
44+
.partition(_.isActive)
4545

4646
val content = mutable.ListBuffer[Node]()
4747
// show active queries table only if there is at least one active query
@@ -176,7 +176,7 @@ class StreamingQueryPagedTable(
176176
val streamingQuery = query.streamingUIData
177177
val statisticsLink = "%s/%s/statistics?id=%s"
178178
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,
179-
streamingQuery.summary.runId)
179+
streamingQuery.runId)
180180

181181
def details(detail: Any): Seq[Node] = {
182182
if (isActive) {
@@ -194,14 +194,14 @@ class StreamingQueryPagedTable(
194194
<tr>
195195
<td>{UIUtils.getQueryName(streamingQuery)}</td>
196196
<td>{UIUtils.getQueryStatus(streamingQuery)}</td>
197-
<td>{streamingQuery.summary.id}</td>
198-
<td><a href={statisticsLink}>{streamingQuery.summary.runId}</a></td>
199-
<td>{SparkUIUtils.formatDate(streamingQuery.summary.startTimestamp)}</td>
197+
<td>{streamingQuery.id}</td>
198+
<td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
199+
<td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
200200
<td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td>
201201
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
202202
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
203203
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
204-
{details(streamingQuery.summary.exception.getOrElse("-"))}
204+
{details(streamingQuery.exception.getOrElse("-"))}
205205
</tr>
206206
}
207207
}
@@ -222,32 +222,32 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St
222222

223223
override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)
224224

225-
private def streamingRow(uiData: StreamingQueryUIData): StructuredStreamingRow = {
225+
private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
226226
val duration = if (isActive) {
227-
System.currentTimeMillis() - uiData.summary.startTimestamp
227+
System.currentTimeMillis() - query.startTimestamp
228228
} else {
229-
withNoProgress(uiData, {
230-
val endTimeMs = uiData.lastProgress.timestamp
231-
parseProgressTimestamp(endTimeMs) - uiData.summary.startTimestamp
229+
withNoProgress(query, {
230+
val endTimeMs = query.lastProgress.timestamp
231+
parseProgressTimestamp(endTimeMs) - query.startTimestamp
232232
}, 0)
233233
}
234234

235-
val avgInput = (uiData.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
236-
uiData.recentProgress.length)
235+
val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
236+
query.recentProgress.length)
237237

238-
val avgProcess = (uiData.recentProgress.map(p =>
239-
withNumberInvalid(p.processedRowsPerSecond)).sum / uiData.recentProgress.length)
238+
val avgProcess = (query.recentProgress.map(p =>
239+
withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)
240240

241-
StructuredStreamingRow(duration, avgInput, avgProcess, uiData)
241+
StructuredStreamingRow(duration, avgInput, avgProcess, query)
242242
}
243243

244244
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
245245
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
246-
case "Name" => Ordering.by(row => UIUtils.getQueryName(row.streamingUIData))
247-
case "Status" => Ordering.by(row => UIUtils.getQueryStatus(row.streamingUIData))
248-
case "ID" => Ordering.by(_.streamingUIData.summary.id)
249-
case "Run ID" => Ordering.by(_.streamingUIData.summary.runId)
250-
case "Start Time" => Ordering.by(_.streamingUIData.summary.startTimestamp)
246+
case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
247+
case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData))
248+
case "ID" => Ordering.by(_.streamingUIData.id)
249+
case "Run ID" => Ordering.by(_.streamingUIData.runId)
250+
case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
251251
case "Duration" => Ordering.by(_.duration)
252252
case "Avg Input /sec" => Ordering.by(_.avgInput)
253253
case "Avg Process /sec" => Ordering.by(_.avgProcess)

sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
5858
val parameterId = request.getParameter("id")
5959
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
6060

61-
val query = parent.store.allQueryUIData.find { uiData =>
62-
uiData.summary.runId.equals(UUID.fromString(parameterId))
61+
val query = parent.statusListener.allQueryStatus.find { case q =>
62+
q.runId.equals(UUID.fromString(parameterId))
6363
}.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
6464

6565
val resources = generateLoadResources(request)
@@ -109,35 +109,34 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
109109
<script>{Unparsed(js)}</script>
110110
}
111111

112-
def generateBasicInfo(uiData: StreamingQueryUIData): Seq[Node] = {
113-
val duration = if (uiData.summary.isActive) {
114-
val durationMs = System.currentTimeMillis() - uiData.summary.startTimestamp
115-
SparkUIUtils.formatDurationVerbose(durationMs)
112+
def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
113+
val duration = if (query.isActive) {
114+
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
116115
} else {
117-
withNoProgress(uiData, {
118-
val end = uiData.lastProgress.timestamp
119-
val start = uiData.recentProgress.head.timestamp
116+
withNoProgress(query, {
117+
val end = query.lastProgress.timestamp
118+
val start = query.recentProgress.head.timestamp
120119
SparkUIUtils.formatDurationVerbose(
121120
parseProgressTimestamp(end) - parseProgressTimestamp(start))
122121
}, "-")
123122
}
124123

125-
val name = UIUtils.getQueryName(uiData)
126-
val numBatches = withNoProgress(uiData, { uiData.lastProgress.batchId + 1L }, 0)
124+
val name = UIUtils.getQueryName(query)
125+
val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
127126
<div>Running batches for
128127
<strong>
129128
{duration}
130129
</strong>
131130
since
132131
<strong>
133-
{SparkUIUtils.formatDate(uiData.summary.startTimestamp)}
132+
{SparkUIUtils.formatDate(query.startTimestamp)}
134133
</strong>
135134
(<strong>{numBatches}</strong> completed batches)
136135
</div>
137136
<br />
138137
<div><strong>Name: </strong>{name}</div>
139-
<div><strong>Id: </strong>{uiData.summary.id}</div>
140-
<div><strong>RunId: </strong>{uiData.summary.runId}</div>
138+
<div><strong>Id: </strong>{query.id}</div>
139+
<div><strong>RunId: </strong>{query.runId}</div>
141140
<br />
142141
}
143142

0 commit comments

Comments
 (0)