Skip to content

Commit e6bef7d

Browse files
keypointtzsxwing
authored andcommitted
[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'
https://issues.apache.org/jira/browse/SPARK-17038 ## What changes were proposed in this pull request? StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart, and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch. In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission time. This is apparent when looking at StreamingSource.scala, lines 89-94. ## How was this patch tested? Manually running unit tests on local laptop Author: Xin Ren <iamshrek@126.com> Closes #14681 from keypointt/SPARK-17038.
1 parent d60af8f commit e6bef7d

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
8787
// Gauge for last received batch, useful for monitoring the streaming job's running status,
8888
// displayed data -1 for any abnormal condition.
8989
registerGaugeWithOption("lastReceivedBatch_submissionTime",
90-
_.lastCompletedBatch.map(_.submissionTime), -1L)
90+
_.lastReceivedBatch.map(_.submissionTime), -1L)
9191
registerGaugeWithOption("lastReceivedBatch_processingStartTime",
92-
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
92+
_.lastReceivedBatch.flatMap(_.processingStartTime), -1L)
9393
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
94-
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
94+
_.lastReceivedBatch.flatMap(_.processingEndTime), -1L)
9595

9696
// Gauge for last received batch records.
9797
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)

streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
6868
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
6969
listener.runningBatches should be (Nil)
7070
listener.retainedCompletedBatches should be (Nil)
71+
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted)))
7172
listener.lastCompletedBatch should be (None)
7273
listener.numUnprocessedBatches should be (1)
7374
listener.numTotalCompletedBatches should be (0)
@@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
8182
listener.waitingBatches should be (Nil)
8283
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
8384
listener.retainedCompletedBatches should be (Nil)
85+
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted)))
8486
listener.lastCompletedBatch should be (None)
8587
listener.numUnprocessedBatches should be (1)
8688
listener.numTotalCompletedBatches should be (0)
@@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
123125
listener.waitingBatches should be (Nil)
124126
listener.runningBatches should be (Nil)
125127
listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
128+
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted)))
126129
listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
127130
listener.numUnprocessedBatches should be (0)
128131
listener.numTotalCompletedBatches should be (1)

0 commit comments

Comments
 (0)