Skip to content

Commit 4af142f

Browse files
zsxwingtdas
authored andcommitted
[SPARK-18722][SS] Move no data rate limit from StreamExecution to ProgressReporter
## What changes were proposed in this pull request? Move no data rate limit from StreamExecution to ProgressReporter to make `recentProgresses` and listener events consistent. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16155 from zsxwing/SPARK-18722.
1 parent 508de38 commit 4af142f

File tree

3 files changed

+33
-24
lines changed

3 files changed

+33
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
2727
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2828
import org.apache.spark.sql.execution.QueryExecution
2929
import org.apache.spark.sql.streaming._
30+
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
3031
import org.apache.spark.util.Clock
3132

3233
/**
@@ -56,6 +57,7 @@ trait ProgressReporter extends Logging {
5657
protected def offsetSeqMetadata: OffsetSeqMetadata
5758
protected def currentBatchId: Long
5859
protected def sparkSession: SparkSession
60+
protected def postEvent(event: StreamingQueryListener.Event): Unit
5961

6062
// Local timestamps and counters.
6163
private var currentTriggerStartTimestamp = -1L
@@ -70,6 +72,12 @@ trait ProgressReporter extends Logging {
7072
/** Holds the most recent query progress updates. Accesses must lock on the queue itself. */
7173
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
7274

75+
private val noDataProgressEventInterval =
76+
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
77+
78+
// The timestamp we report an event that has no input data
79+
private var lastNoDataProgressEventTime = Long.MinValue
80+
7381
@volatile
7482
protected var currentStatus: StreamingQueryStatus = {
7583
new StreamingQueryStatus(
@@ -100,6 +108,17 @@ trait ProgressReporter extends Logging {
100108
currentDurationsMs.clear()
101109
}
102110

111+
private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
112+
progressBuffer.synchronized {
113+
progressBuffer += newProgress
114+
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
115+
progressBuffer.dequeue()
116+
}
117+
}
118+
postEvent(new QueryProgressEvent(newProgress))
119+
logInfo(s"Streaming query made progress: $newProgress")
120+
}
121+
103122
/** Finalizes the query progress and adds it to list of recent status updates. */
104123
protected def finishTrigger(hasNewData: Boolean): Unit = {
105124
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
@@ -145,14 +164,18 @@ trait ProgressReporter extends Logging {
145164
sources = sourceProgress.toArray,
146165
sink = sinkProgress)
147166

148-
progressBuffer.synchronized {
149-
progressBuffer += newProgress
150-
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
151-
progressBuffer.dequeue()
167+
if (hasNewData) {
168+
// Reset noDataEventTimestamp if we processed any data
169+
lastNoDataProgressEventTime = Long.MinValue
170+
updateProgress(newProgress)
171+
} else {
172+
val now = triggerClock.getTimeMillis()
173+
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
174+
lastNoDataProgressEventTime = now
175+
updateProgress(newProgress)
152176
}
153177
}
154178

155-
logInfo(s"Streaming query made progress: $newProgress")
156179
currentStatus = currentStatus.copy(isTriggerActive = false)
157180
}
158181

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ class StreamExecution(
5858

5959
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
6060

61-
private val noDataProgressEventInterval =
62-
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
63-
6461
/**
6562
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
6663
*/
@@ -217,9 +214,6 @@ class StreamExecution(
217214
// While active, repeatedly attempt to run batches.
218215
SparkSession.setActiveSession(sparkSession)
219216

220-
// The timestamp we report an event that has no input data
221-
var lastNoDataProgressEventTime = Long.MinValue
222-
223217
triggerExecutor.execute(() => {
224218
startTrigger()
225219

@@ -242,18 +236,6 @@ class StreamExecution(
242236

243237
// Report trigger as finished and construct progress object.
244238
finishTrigger(dataAvailable)
245-
if (dataAvailable) {
246-
// Reset noDataEventTimestamp if we processed any data
247-
lastNoDataProgressEventTime = Long.MinValue
248-
postEvent(new QueryProgressEvent(lastProgress))
249-
} else {
250-
val now = triggerClock.getTimeMillis()
251-
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
252-
lastNoDataProgressEventTime = now
253-
postEvent(new QueryProgressEvent(lastProgress))
254-
}
255-
}
256-
257239
if (dataAvailable) {
258240
// We'll increase currentBatchId after we complete processing current batch's data
259241
currentBatchId += 1
@@ -504,7 +486,7 @@ class StreamExecution(
504486
}
505487
}
506488

507-
private def postEvent(event: StreamingQueryListener.Event) {
489+
override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
508490
sparkSession.streams.postListenerEvent(event)
509491
}
510492

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
237237
}
238238
true
239239
}
240+
// `recentProgresses` should not receive too many no data events
241+
actions += AssertOnQuery { q =>
242+
q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
243+
}
240244
testStream(input.toDS)(actions: _*)
241245
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
242246
// 11 is the max value of the possible numbers of events.

0 commit comments

Comments
 (0)