-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted #5414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
493f978
74aed99
fc3a2a1
79b4fed
ca0955b
2f85060
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| val collector = new BatchInfoCollector | ||
| ssc.addStreamingListener(collector) | ||
| runStreams(ssc, input.size, input.size) | ||
| ssc.awaitTerminationOrTimeout(5000) should be (true) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this line? Run streams should have already stopped the ssc, isnt it?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry that I misread |
||
|
|
||
| val batchInfos = collector.batchInfos | ||
| batchInfos should have size 4 | ||
|
|
||
|
|
@@ -61,6 +63,32 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) | ||
| isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) | ||
| isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true) | ||
|
|
||
| // SPARK-6766: batch info should be submitted | ||
| val batchInfosSubmitted = collector.batchInfosSubmitted | ||
| batchInfosSubmitted should have size 4 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: these tests for submitted and started should come before completed :) |
||
|
|
||
| batchInfosSubmitted.foreach(info => { | ||
| info.schedulingDelay should be (None) | ||
| info.processingDelay should be (None) | ||
| info.totalDelay should be (None) | ||
| }) | ||
|
|
||
| isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true) | ||
|
|
||
| // SPARK-6766: processingStartTime of batch info should not be None when starting | ||
| val batchInfosStarted = collector.batchInfosStarted | ||
| batchInfosStarted should have size 4 | ||
|
|
||
| batchInfosStarted.foreach(info => { | ||
| info.schedulingDelay should not be None | ||
| info.schedulingDelay.get should be >= 0L | ||
| info.processingDelay should be (None) | ||
| info.totalDelay should be (None) | ||
| }) | ||
|
|
||
| isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true) | ||
| isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true) | ||
| } | ||
|
|
||
| test("receiver info reporting") { | ||
|
|
@@ -100,6 +128,17 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| /** Listener that collects information on processed batches */ | ||
| class BatchInfoCollector extends StreamingListener { | ||
| val batchInfos = new ArrayBuffer[BatchInfo] | ||
| val batchInfosStarted = new ArrayBuffer[BatchInfo] | ||
| val batchInfosSubmitted = new ArrayBuffer[BatchInfo] | ||
|
|
||
| override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { | ||
| batchInfosSubmitted += batchSubmitted.batchInfo | ||
| } | ||
|
|
||
| override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { | ||
| batchInfosStarted += batchStarted.batchInfo | ||
| } | ||
|
|
||
| override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { | ||
| batchInfos += batchCompleted.batchInfo | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.streaming.ui | ||
|
|
||
| import org.scalatest.Matchers | ||
|
|
||
| import org.apache.spark.streaming.dstream.DStream | ||
| import org.apache.spark.streaming.scheduler._ | ||
| import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase} | ||
|
|
||
| class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { | ||
|
|
||
| val input = (1 to 4).map(Seq(_)).toSeq | ||
| val operation = (d: DStream[Int]) => d.map(x => x) | ||
|
|
||
| override def batchDuration = Milliseconds(100) | ||
|
|
||
| test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " + | ||
| "onReceiverStarted, onReceiverError, onReceiverStopped") { | ||
| val ssc = setupStreams(input, operation) | ||
| val listener = new StreamingJobProgressListener(ssc) | ||
|
|
||
| val receivedBlockInfo = Map( | ||
| 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), | ||
| 1 -> Array(ReceivedBlockInfo(1, 300, null)) | ||
| ) | ||
|
|
||
| // onBatchSubmitted | ||
| val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None) | ||
| listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) | ||
| listener.waitingBatches should be(List(batchInfoSubmitted)) | ||
|
|
||
| // onBatchStarted | ||
| val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) | ||
| listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) | ||
| listener.runningBatches should be(List(batchInfoStarted)) | ||
| listener.waitingBatches should be(Nil) | ||
| listener.numTotalReceivedRecords should be(600) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this test be made after submitted as well? Also you should also test whether the other total counts are zero or not. |
||
|
|
||
| // onBatchCompleted | ||
| val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) | ||
| listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) | ||
| listener.runningBatches should be (Nil) | ||
| listener.waitingBatches should be (Nil) | ||
| listener.lastCompletedBatch should be (Some(batchInfoCompleted)) | ||
| listener.retainedCompletedBatches should be (List(batchInfoCompleted)) | ||
| listener.numTotalCompletedBatches should be (1) | ||
| listener.numTotalProcessedRecords should be (600) | ||
| listener.numTotalReceivedRecords should be (600) | ||
|
|
||
| // onReceiverStarted | ||
| val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost") | ||
| listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted)) | ||
| listener.receiverInfo(0) should be (Some(receiverInfoStarted)) | ||
| listener.receiverInfo(1) should be (None) | ||
|
|
||
| // onReceiverError | ||
| val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost") | ||
| listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError)) | ||
| listener.receiverInfo(0) should be (Some(receiverInfoStarted)) | ||
| listener.receiverInfo(1) should be (Some(receiverInfoError)) | ||
| listener.receiverInfo(2) should be (None) | ||
|
|
||
| // onReceiverStopped | ||
| val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost") | ||
| listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped)) | ||
| listener.receiverInfo(0) should be (Some(receiverInfoStarted)) | ||
| listener.receiverInfo(1) should be (Some(receiverInfoError)) | ||
| listener.receiverInfo(2) should be (Some(receiverInfoStopped)) | ||
| listener.receiverInfo(3) should be (None) | ||
| } | ||
|
|
||
| test("Remove the old completed batches when exceeding the limit") { | ||
| val ssc = setupStreams(input, operation) | ||
| val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) | ||
| val listener = new StreamingJobProgressListener(ssc) | ||
|
|
||
| val receivedBlockInfo = Map( | ||
| 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), | ||
| 1 -> Array(ReceivedBlockInfo(1, 300, null)) | ||
| ) | ||
| val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) | ||
|
|
||
| for(_ <- 0 until (limit + 10)) { | ||
| listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) | ||
| } | ||
|
|
||
| listener.retainedCompletedBatches.size should be (limit) | ||
| listener.numTotalCompletedBatches should be(limit + 10) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. OMG!