File tree Expand file tree Collapse file tree
streaming/src/test/scala/org/apache/spark/streaming Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
152152 stopSparkContext : Boolean
153153 ): Seq [Seq [V ]] = {
154154 try {
155- val batchDuration = ssc.graph.batchDuration
156155 val batchCounter = new BatchCounter (ssc)
157156 ssc.start()
158157 val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
159- val currentTime = clock.getTimeMillis()
160158
161159 logInfo(" Manual clock before advancing = " + clock.getTimeMillis())
162160 clock.setTime(targetBatchTime.milliseconds)
@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
171169
172170 eventually(timeout(10 seconds)) {
173171 val checkpointFilesOfLatestTime = Checkpoint .getCheckpointFiles(checkpointDir).filter {
174- _.toString .contains(clock.getTimeMillis.toString)
172+ _.getName .contains(clock.getTimeMillis.toString)
175173 }
176174 // Checkpoint files are written twice for every batch interval. So assert that both
177175 // are written to make sure that both of them have been written.
You can’t perform that action at this time.
0 commit comments