@@ -32,6 +32,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
3232 val input = (1 to 4 ).map(Seq (_)).toSeq
3333 val operation = (d : DStream [Int ]) => d.map(x => x)
3434
35+ private def createJobStart (
36+ batchTime : Time , outputOpId : Int , jobId : Int ): SparkListenerJobStart = {
37+ val properties = new Properties ()
38+ properties.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , batchTime.milliseconds.toString)
39+ properties.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , outputOpId.toString)
40+ SparkListenerJobStart (jobId = jobId,
41+ 0L , // unused
42+ Nil , // unused
43+ properties)
44+ }
45+
3546 override def batchDuration : Duration = Milliseconds (100 )
3647
3748 test(" onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
@@ -69,40 +80,16 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
6980 listener.numTotalReceivedRecords should be (600 )
7081
7182 // onJobStart
72- val properties1 = new Properties ()
73- properties1.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , Time (1000 ).milliseconds.toString)
74- properties1.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , 0 .toString)
75- val jobStart1 = SparkListenerJobStart (jobId = 0 ,
76- 0L , // unused
77- Nil , // unused
78- properties1)
83+ val jobStart1 = createJobStart(Time (1000 ), outputOpId = 0 , jobId = 0 )
7984 listener.onJobStart(jobStart1)
8085
81- val properties2 = new Properties ()
82- properties2.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , Time (1000 ).milliseconds.toString)
83- properties2.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , 0 .toString)
84- val jobStart2 = SparkListenerJobStart (jobId = 1 ,
85- 0L , // unused
86- Nil , // unused
87- properties2)
86+ val jobStart2 = createJobStart(Time (1000 ), outputOpId = 0 , jobId = 1 )
8887 listener.onJobStart(jobStart2)
8988
90- val properties3 = new Properties ()
91- properties3.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , Time (1000 ).milliseconds.toString)
92- properties3.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , 1 .toString)
93- val jobStart3 = SparkListenerJobStart (jobId = 0 ,
94- 0L , // unused
95- Nil , // unused
96- properties3)
89+ val jobStart3 = createJobStart(Time (1000 ), outputOpId = 1 , jobId = 0 )
9790 listener.onJobStart(jobStart3)
9891
99- val properties4 = new Properties ()
100- properties4.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , Time (1000 ).milliseconds.toString)
101- properties4.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , 1 .toString)
102- val jobStart4 = SparkListenerJobStart (jobId = 1 ,
103- 0L , // unused
104- Nil , // unused
105- properties4)
92+ val jobStart4 = createJobStart(Time (1000 ), outputOpId = 1 , jobId = 1 )
10693 listener.onJobStart(jobStart4)
10794
10895 val batchUIData = listener.getBatchUIData(Time (1000 ))
@@ -172,7 +159,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
172159 listener.numTotalCompletedBatches should be(limit + 10 )
173160 }
174161
175- test(" disorder onJobStart and onBatchXXX" ) {
162+ test(" out-of-order onJobStart and onBatchXXX" ) {
176163 val ssc = setupStreams(input, operation)
177164 val limit = ssc.conf.getInt(" spark.streaming.ui.retainedBatches" , 100 )
178165 val listener = new StreamingJobProgressListener (ssc)
@@ -181,25 +168,13 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
181168 for (i <- 0 until limit) {
182169 val batchInfoCompleted =
183170 BatchInfo (Time (1000 + i * 100 ), Map .empty, 1000 + i * 100 , Some (2000 + i * 100 ), None )
184- val properties = new Properties ()
185- properties.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , (1000 + i * 100 ).toString)
186- properties.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , " 0" )
187- val jobStart = SparkListenerJobStart (jobId = 1 ,
188- 0L , // unused
189- Nil , // unused
190- properties)
191171 listener.onBatchCompleted(StreamingListenerBatchCompleted (batchInfoCompleted))
172+ val jobStart = createJobStart(Time (1000 + i * 100 ), outputOpId = 0 , jobId = 1 )
192173 listener.onJobStart(jobStart)
193174 }
194175
195176 // onJobStart happens before onBatchSubmitted
196- val properties = new Properties ()
197- properties.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , (1000 + limit * 100 ).toString)
198- properties.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , " 0" )
199- val jobStart = SparkListenerJobStart (jobId = 0 ,
200- 0L , // unused
201- Nil , // unused
202- properties)
177+ val jobStart = createJobStart(Time (1000 + limit * 100 ), outputOpId = 0 , jobId = 0 )
203178 listener.onJobStart(jobStart)
204179
205180 val batchInfoSubmitted =
@@ -225,13 +200,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
225200 }
226201
227202 for (i <- limit + 1 to limit * 2 ) {
228- val properties = new Properties ()
229- properties.setProperty(JobScheduler .BATCH_TIME_PROPERTY_KEY , (1000 + i * 100 ).toString)
230- properties.setProperty(JobScheduler .OUTPUT_OP_ID_PROPERTY_KEY , " 0" )
231- val jobStart = SparkListenerJobStart (jobId = 1 ,
232- 0L , // unused
233- Nil , // unused
234- properties)
203+ val jobStart = createJobStart(Time (1000 + i * 100 ), outputOpId = 0 , jobId = 1 )
235204 listener.onJobStart(jobStart)
236205 }
237206
0 commit comments