Skip to content

Commit 247ada6

Browse files
committed
Fixed test
1 parent c64632c commit 247ada6

2 files changed

Lines changed: 9 additions & 10 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
167167
extends StreamAction
168168

169169
/** Advance the trigger clock's time manually. */
170-
case class AdvanceManualClock(
171-
timeToAdd: Long, waitForStreamExecThreadToBlock: Boolean = true) extends StreamAction
170+
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
172171

173172
/** Signals that a failure is expected and should not kill the test. */
174173
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
@@ -365,19 +364,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
365364
}
366365
})
367366

368-
case AdvanceManualClock(timeToAdd, waitForStreamExecThreadToBlock) =>
367+
case AdvanceManualClock(timeToAdd) =>
369368
verify(currentStream != null,
370369
"can not advance manual clock when a stream is not running")
371370
verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
372371
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
373372
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
374373
assert(manualClockExpectedTime >= 0)
375-
if (waitForStreamExecThreadToBlock) {
376-
// Make sure we don't advance ManualClock too early. See SPARK-16002.
377-
eventually("StreamManualClock has not yet entered the waiting state") {
378-
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
379-
}
374+
375+
// Make sure we don't advance ManualClock too early. See SPARK-16002.
376+
eventually("StreamManualClock has not yet entered the waiting state") {
377+
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
380378
}
379+
381380
clock.advance(timeToAdd)
382381
manualClockExpectedTime += timeToAdd
383382
verify(clock.getTimeMillis() === manualClockExpectedTime,

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
185185
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
186186

187187
// Test status while batch processing has completed
188-
AdvanceManualClock(500, waitForStreamExecThreadToBlock = false), // time = 1100 to unblock job
188+
AdvanceManualClock(500), // time = 1100 to unblock job
189189
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
190190
CheckAnswer(2),
191191
AssertOnQuery(_.status.isDataAvailable === true),
@@ -240,7 +240,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
240240
AssertStreamExecThreadToWaitForClock(),
241241
AssertOnQuery(_.status.isDataAvailable === false),
242242
AssertOnQuery(_.status.isTriggerActive === false),
243-
AssertOnQuery(_.status.message === "Waiting for data to arrive")
243+
AssertOnQuery(_.status.message === "Waiting for next trigger")
244244
)
245245
}
246246

0 commit comments

Comments
 (0)