Skip to content

Commit ca0955b

Browse files
committed
Combine unit tests
1 parent 79b4fed commit ca0955b

2 files changed

Lines changed: 71 additions & 118 deletions

File tree

streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala

Lines changed: 39 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
4646
val collector = new BatchInfoCollector
4747
ssc.addStreamingListener(collector)
4848
runStreams(ssc, input.size, input.size)
49+
ssc.awaitTerminationOrTimeout(5000) should be (true)
50+
4951
val batchInfos = collector.batchInfos
5052
batchInfos should have size 4
5153

@@ -61,6 +63,32 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
6163
isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
6264
isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
6365
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
66+
67+
// SPARK-6766: batch info should be submitted
68+
val batchInfosSubmitted = collector.batchInfosSubmitted
69+
batchInfosSubmitted should have size 4
70+
71+
batchInfosSubmitted.foreach(info => {
72+
info.schedulingDelay should be (None)
73+
info.processingDelay should be (None)
74+
info.totalDelay should be (None)
75+
})
76+
77+
isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
78+
79+
// SPARK-6766: processingStartTime of batch info should not be None when starting
80+
val batchInfosStarted = collector.batchInfosStarted
81+
batchInfosStarted should have size 4
82+
83+
batchInfosStarted.foreach(info => {
84+
info.schedulingDelay should not be None
85+
info.schedulingDelay.get should be >= 0L
86+
info.processingDelay should be (None)
87+
info.totalDelay should be (None)
88+
})
89+
90+
isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
91+
isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
6492
}
6593

6694
test("receiver info reporting") {
@@ -88,58 +116,6 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
88116
}
89117
}
90118

91-
test("SPARK-6766: batch info should be submitted") {
92-
val ssc = setupStreams(input, operation)
93-
val collector = new StreamingListener {
94-
val batchInfos = new ArrayBuffer[BatchInfo]
95-
96-
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
97-
batchInfos += batchSubmitted.batchInfo
98-
}
99-
}
100-
ssc.addStreamingListener(collector)
101-
runStreams(ssc, input.size, input.size)
102-
ssc.awaitTerminationOrTimeout(5000) should be (true)
103-
104-
val batchInfos = collector.batchInfos
105-
batchInfos should have size 4
106-
107-
batchInfos.foreach(info => {
108-
info.schedulingDelay should be (None)
109-
info.processingDelay should be (None)
110-
info.totalDelay should be (None)
111-
})
112-
113-
isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
114-
}
115-
116-
test("SPARK-6766: processingStartTime of batch info should not be None when starting") {
117-
val ssc = setupStreams(input, operation)
118-
val collector = new StreamingListener {
119-
val batchInfos = new ArrayBuffer[BatchInfo]
120-
121-
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
122-
batchInfos += batchStarted.batchInfo
123-
}
124-
}
125-
ssc.addStreamingListener(collector)
126-
runStreams(ssc, input.size, input.size)
127-
ssc.awaitTerminationOrTimeout(5000) should be (true)
128-
129-
val batchInfos = collector.batchInfos
130-
batchInfos should have size 4
131-
132-
batchInfos.foreach(info => {
133-
info.schedulingDelay should not be None
134-
info.schedulingDelay.get should be >= 0L
135-
info.processingDelay should be (None)
136-
info.totalDelay should be (None)
137-
})
138-
139-
isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
140-
isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
141-
}
142-
143119
/** Check if a sequence of numbers is in increasing order */
144120
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
145121
for(i <- 1 until seq.size) {
@@ -152,6 +128,17 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
152128
/** Listener that collects information on processed batches */
153129
class BatchInfoCollector extends StreamingListener {
154130
val batchInfos = new ArrayBuffer[BatchInfo]
131+
val batchInfosStarted = new ArrayBuffer[BatchInfo]
132+
val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
133+
134+
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
135+
batchInfosSubmitted += batchSubmitted.batchInfo
136+
}
137+
138+
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
139+
batchInfosStarted += batchStarted.batchInfo
140+
}
141+
155142
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
156143
batchInfos += batchCompleted.batchInfo
157144
}

streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -30,62 +30,62 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
3030

3131
override def batchDuration = Milliseconds(100)
3232

33-
test("onBatchSubmitted") {
33+
test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
34+
"onReceiverStarted, onReceiverError, onReceiverStopped") {
3435
val ssc = setupStreams(input, operation)
3536
val listener = new StreamingJobProgressListener(ssc)
3637

37-
val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None)
38-
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
39-
40-
listener.waitingBatches should be(List(batchInfoSubmitted))
41-
}
42-
43-
test("onBatchStarted") {
44-
val ssc = setupStreams(input, operation)
45-
val listener = new StreamingJobProgressListener(ssc)
46-
47-
val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None)
48-
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
49-
5038
val receivedBlockInfo = Map(
5139
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
5240
1 -> Array(ReceivedBlockInfo(1, 300, null))
5341
)
54-
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
55-
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
5642

57-
listener.runningBatches should be(List(batchInfoStarted))
58-
listener.waitingBatches should be(Nil)
59-
listener.numTotalReceivedRecords should be(600)
60-
}
61-
62-
test("onBatchCompleted") {
63-
val ssc = setupStreams(input, operation)
64-
val listener = new StreamingJobProgressListener(ssc)
65-
66-
val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None)
43+
// onBatchSubmitted
44+
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
6745
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
46+
listener.waitingBatches should be(List(batchInfoSubmitted))
6847

69-
val receivedBlockInfo = Map(
70-
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
71-
1 -> Array(ReceivedBlockInfo(1, 300, null))
72-
)
48+
// onBatchStarted
7349
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
7450
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
51+
listener.runningBatches should be(List(batchInfoStarted))
52+
listener.waitingBatches should be(Nil)
53+
listener.numTotalReceivedRecords should be(600)
7554

55+
// onBatchCompleted
7656
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
7757
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
78-
7958
listener.runningBatches should be (Nil)
8059
listener.waitingBatches should be (Nil)
8160
listener.lastCompletedBatch should be (Some(batchInfoCompleted))
8261
listener.retainedCompletedBatches should be (List(batchInfoCompleted))
8362
listener.numTotalCompletedBatches should be (1)
8463
listener.numTotalProcessedRecords should be (600)
8564
listener.numTotalReceivedRecords should be (600)
65+
66+
// onReceiverStarted
67+
val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost")
68+
listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
69+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
70+
listener.receiverInfo(1) should be (None)
71+
72+
// onReceiverError
73+
val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost")
74+
listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
75+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
76+
listener.receiverInfo(1) should be (Some(receiverInfoError))
77+
listener.receiverInfo(2) should be (None)
78+
79+
// onReceiverStopped
80+
val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost")
81+
listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
82+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
83+
listener.receiverInfo(1) should be (Some(receiverInfoError))
84+
listener.receiverInfo(2) should be (Some(receiverInfoStopped))
85+
listener.receiverInfo(3) should be (None)
8686
}
8787

88-
test("retain completed batch") {
88+
test("Remove the old completed batches when exceeding the limit") {
8989
val ssc = setupStreams(input, operation)
9090
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
9191
val listener = new StreamingJobProgressListener(ssc)
@@ -103,38 +103,4 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
103103
listener.retainedCompletedBatches.size should be (limit)
104104
listener.numTotalCompletedBatches should be(limit + 10)
105105
}
106-
107-
test("onReceiverStarted") {
108-
val ssc = setupStreams(input, operation)
109-
val listener = new StreamingJobProgressListener(ssc)
110-
111-
val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost")
112-
listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfo))
113-
114-
listener.receiverInfo(0) should be (Some(receiverInfo))
115-
listener.receiverInfo(1) should be (None)
116-
}
117-
118-
test("onReceiverError") {
119-
val ssc = setupStreams(input, operation)
120-
val listener = new StreamingJobProgressListener(ssc)
121-
122-
val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost")
123-
listener.onReceiverError(StreamingListenerReceiverError(receiverInfo))
124-
125-
listener.receiverInfo(0) should be (Some(receiverInfo))
126-
listener.receiverInfo(1) should be (None)
127-
}
128-
129-
test("onReceiverStopped") {
130-
val ssc = setupStreams(input, operation)
131-
val listener = new StreamingJobProgressListener(ssc)
132-
133-
val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost")
134-
listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfo))
135-
136-
listener.receiverInfo(0) should be (Some(receiverInfo))
137-
listener.receiverInfo(1) should be (None)
138-
}
139-
140106
}

0 commit comments

Comments
 (0)