Skip to content

Commit 4216405

Browse files
committed
Fix UTs
1 parent 88aa0b3 commit 4216405

2 files changed

Lines changed: 10 additions & 2 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
790790

791791
private def assertNumLateInputs(numLateInputs: Long): AssertOnQuery = AssertOnQuery { q =>
792792
q.processAllAvailable()
793-
val progressWithData = q.recentProgress.lastOption.get
793+
val progressWithData = q.recentProgress.filterNot { p =>
794+
// filter out batches which are falling into one of types:
795+
// 1) doesn't execute the batch run
796+
// 2) empty input batch
797+
p.inputRowsPerSecond == 0
798+
}.lastOption.get
794799
assert(progressWithData.stateOperators(0).numLateInputs === numLateInputs)
795800
true
796801
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ trait StateStoreMetricsTest extends StreamTest {
5555
val allNumUpdatedRowsSinceLastCheck =
5656
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
5757

58+
val allNumLateInputsSinceLastCheck =
59+
progressesSinceLastCheck.map(_.stateOperators.map(_.numLateInputs))
60+
5861
lazy val debugString = "recent progresses:\n" +
5962
progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
6063

@@ -64,7 +67,7 @@ trait StateStoreMetricsTest extends StreamTest {
6467
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
6568
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")
6669

67-
val numLateInputs = recentProgress.last.stateOperators.map(_.numLateInputs)
70+
val numLateInputs = arraySum(allNumLateInputsSinceLastCheck, numStateOperators)
6871
assert(numLateInputs === lateInputs, s"incorrect late inputs, $debugString")
6972

7073
lastCheckedRecentProgressIndex = recentProgress.length - 1

0 commit comments

Comments
 (0)