Skip to content

Commit 3425231

Browse files
committed
Address TD's comments
1 parent 4ace2bd commit 3425231

3 files changed

Lines changed: 14 additions & 10 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class StreamExecution(
6363

6464
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
6565

66-
private val noDataReportInterval = sparkSession.sessionState.conf.streamingNoDataReportInterval
66+
private val noDataEventInterval = sparkSession.sessionState.conf.streamingNoDataEventInterval
6767

6868
/**
6969
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
@@ -199,7 +199,7 @@ class StreamExecution(
199199
SparkSession.setActiveSession(sparkSession)
200200

201201
// The timestamp we report an event that has no input data
202-
var noDataReportTimestamp = Long.MinValue
202+
var noDataEventTimestamp = Long.MinValue
203203

204204
triggerExecutor.execute(() => {
205205
startTrigger()
@@ -224,11 +224,13 @@ class StreamExecution(
224224
// Report trigger as finished and construct progress object.
225225
finishTrigger(dataAvailable)
226226
if (dataAvailable) {
227+
// Reset noDataEventTimestamp if we processed any data
228+
noDataEventTimestamp = Long.MinValue
227229
postEvent(new QueryProgressEvent(lastProgress))
228230
} else {
229231
val now = triggerClock.getTimeMillis()
230-
if (now - noDataReportInterval >= noDataReportTimestamp) {
231-
noDataReportTimestamp = now
232+
if (now - noDataEventInterval >= noDataEventTimestamp) {
233+
noDataEventTimestamp = now
232234
postEvent(new QueryProgressEvent(lastProgress))
233235
}
234236
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,8 @@ object SQLConf {
603603
.timeConf(TimeUnit.MILLISECONDS)
604604
.createWithDefault(10L)
605605

606-
val STREAMING_NO_DATA_REPORT_INTERVAL =
607-
SQLConfigBuilder("spark.sql.streaming.noDataReportInterval")
606+
val STREAMING_NO_DATA_EVENT_INTERVAL =
607+
SQLConfigBuilder("spark.sql.streaming.noDataEventInterval")
608608
.internal()
609609
.doc("How long to wait between two progress events when there is no data")
610610
.timeConf(TimeUnit.MILLISECONDS)
@@ -691,7 +691,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
691691

692692
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
693693

694-
def streamingNoDataReportInterval: Long = getConf(STREAMING_NO_DATA_REPORT_INTERVAL)
694+
def streamingNoDataEventInterval: Long = getConf(STREAMING_NO_DATA_EVENT_INTERVAL)
695695

696696
def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
697697

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
192192
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
193193
}
194194

195-
test("noDataReportInterval") {
196-
withSQLConf(SQLConf.STREAMING_NO_DATA_REPORT_INTERVAL.key -> "100ms") {
195+
test("only one progress event per interval when no data") {
196+
// This test will start a query but not push any data, and then check if we push too many events
197+
withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") {
197198
@volatile var progressEventCount = 0
198199

199200
val listener = new StreamingQueryListener {
@@ -220,13 +221,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
220221
for (_ <- 1 to 100) {
221222
actions += AdvanceManualClock(10)
222223
actions += AssertOnQuery { _ =>
223-
// Sleep so that if the config `noDataReportInterval` doesn't work, it has enough time
224+
// Sleep so that if the config `noDataEventInterval` doesn't work, it has enough time
224225
// to report too many events.
225226
Thread.sleep(10)
226227
true
227228
}
228229
}
229230
testStream(MemoryStream[Int].toDS)(actions: _*)
231+
// 11 is the max value of the possible numbers of events.
230232
assert(progressEventCount <= 11)
231233
} finally {
232234
spark.streams.removeListener(listener)

0 commit comments

Comments
 (0)