diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1776d23607f7..0c41bfd590c0 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1674,6 +1674,16 @@ Any of the stateful operation(s) after any of below stateful operations can have As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function emits late rows if the operator uses Append mode. +Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: + +1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab +2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent. + +Please note that the definition of "input" is relative: it doesn't always mean "input rows" for the operator. +Streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, +hence the number is not same as the number of original input rows. You'd like to check the fact whether the value is zero +or non-zero. + There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 57fbb125dc47..addb2d815218 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,10 +46,11 @@ object MimaExcludes { // false positive, no binary incompatibility ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.ChiSqSelectorModel"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.ChiSqSelector"), - //[SPARK-31840] Add instance weight support in LogisticRegressionSummary // weightCol in org.apache.spark.ml.classification.LogisticRegressionSummary is present only in current version - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightCol") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightCol"), + // [SPARK-24634] Add a new metric regarding number of inputs later than watermark plus allowed delay + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StateOperatorProgress.$default$4") ) // Exclude rules for 3.0.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index d65c4ffbb7a2..eb8b8af7950b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -122,7 +122,7 @@ case class FlatMapGroupsWithStateExec( // If timeout is based on event time, then filter late data based on watermark val filteredIter = watermarkPredicateForData match { case Some(predicate) if timeoutConf == EventTimeTimeout => - iter.filter(row => !predicate.eval(row)) + applyRemovingRowsOlderThanWatermark(iter, predicate) case _ => iter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2fe576..bdfdc31e508c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -222,7 +222,7 @@ trait ProgressReporter extends Logging { lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0) + if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputs = 0) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index 198e17db419a..dc5fc2e43143 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -462,7 +462,7 @@ case class StreamingSymmetricHashJoinExec( WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match { case Some(watermarkExpr) => val predicate = Predicate.create(watermarkExpr, inputAttributes) - inputIter.filter { row => !predicate.eval(row) } + applyRemovingRowsOlderThanWatermark(inputIter, predicate) case None => inputIter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1bec924ba219..073266bd621d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -77,6 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numLateInputs" -> SQLMetrics.createMetric(sparkContext, + "number of inputs which are later than watermark ('inputs' are relative to operators)"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), @@ -100,6 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, memoryUsedBytes = longMetric("stateMemory").value, + numLateInputs = longMetric("numLateInputs").value, javaConvertedCustomMetrics ) } @@ -132,6 +135,16 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => }.toMap } + protected def applyRemovingRowsOlderThanWatermark( + iter: Iterator[InternalRow], + predicateFilterOutLateInput: BasePredicate): Iterator[InternalRow] = { + iter.filterNot { row => + val lateInput = predicateFilterOutLateInput.eval(row) + if (lateInput) longMetric("numLateInputs") += 1 + lateInput + } + } + /** * Should the MicroBatchExecution run another batch based on this stateful operator and the * current updated metadata. @@ -328,7 +341,8 @@ case class StateStoreSaveExec( // Assumption: watermark predicates must be non-empty if append mode is allowed case Some(Append) => allUpdatesTimeMs += timeTakenMs { - val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row)) + val filteredIter = applyRemovingRowsOlderThanWatermark(iter, + watermarkPredicateForData.get) while (filteredIter.hasNext) { val row = filteredIter.next().asInstanceOf[UnsafeRow] stateManager.put(store, row) @@ -371,7 +385,7 @@ case class StateStoreSaveExec( new NextIterator[InternalRow] { // Filter late date using watermark if specified private[this] val baseIterator = watermarkPredicateForData match { - case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row)) + case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) case None => iter } private val updatesStartTimeNs = System.nanoTime @@ -456,7 +470,7 @@ case class StreamingDeduplicateExec( val commitTimeMs = longMetric("commitTimeMs") val baseIterator = watermarkPredicateForData match { - case Some(predicate) => iter.filter(row => !predicate.eval(row)) + case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) case None => iter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 13b506b60a12..22bae76ef422 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -43,6 +43,7 @@ class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, val memoryUsedBytes: Long, + val numLateInputs: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -52,13 +53,17 @@ class StateOperatorProgress private[sql]( /** The pretty (i.e. indented) JSON representation of this progress. */ def prettyJson: String = pretty(render(jsonValue)) - private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) + private[sql] def copy( + newNumRowsUpdated: Long, + newNumLateInputs: Long): StateOperatorProgress = + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, newNumLateInputs, + customMetrics) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("numLateInputs" -> JInt(numLateInputs)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { val keys = customMetrics.keySet.asScala.toSeq.sorted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 6486e1aee864..329196a5cfef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,9 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), + assertNumLateInputs(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateInputs(1) ) } @@ -321,12 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), + assertNumLateInputs(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), + assertNumLateInputs(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateInputs(1) ) } @@ -783,6 +788,18 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } + private def assertNumLateInputs(numLateInputs: Long): AssertOnQuery = AssertOnQuery { q => + q.processAllAvailable() + val progressWithData = q.recentProgress.filterNot { p => + // filter out batches which are falling into one of types: + // 1) doesn't execute the batch run + // 2) empty input batch + p.inputRowsPerSecond == 0 + }.lastOption.get + assert(progressWithData.stateOperators(0).numLateInputs === numLateInputs) + true + } + /** Assert event stats generated on that last batch with data in it */ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { Execute("AssertEventStats") { q => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index fb5d13d09fb0..640f5181aa52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -29,8 +29,12 @@ trait StateStoreMetricsTest extends StreamTest { lastCheckedRecentProgressIndex = -1 } - def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = - AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q => + def assertNumStateRows( + total: Seq[Long], + updated: Seq[Long], + lateInputs: Seq[Long]): AssertOnQuery = + AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + + s", late inputs = $lateInputs") { q => // This assumes that the streaming query will not make any progress while the eventually // is being executed. eventually(timeout(streamingTimeout)) { @@ -51,6 +55,9 @@ trait StateStoreMetricsTest extends StreamTest { val allNumUpdatedRowsSinceLastCheck = progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated)) + val allNumLateInputsSinceLastCheck = + progressesSinceLastCheck.map(_.stateOperators.map(_.numLateInputs)) + lazy val debugString = "recent progresses:\n" + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") @@ -60,13 +67,22 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") + val numLateInputs = arraySum(allNumLateInputsSinceLastCheck, numStateOperators) + assert(numLateInputs === lateInputs, s"incorrect late inputs, $debugString") + lastCheckedRecentProgressIndex = recentProgress.length - 1 } true } - def assertNumStateRows(total: Long, updated: Long): AssertOnQuery = - assertNumStateRows(Seq(total), Seq(updated)) + def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = { + assert(total.length === updated.length) + assertNumStateRows(total, updated, lateInputs = (0 until total.length).map(_ => 0L)) + } + + def assertNumStateRows(total: Long, updated: Long, lateInput: Long = 0): AssertOnQuery = { + assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput)) + } def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = { if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7..ceb6775f79cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a" -> 2), // Dropped CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 0), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } @@ -102,7 +102,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 1), AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25 CheckNewAnswer(45), @@ -136,7 +136,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputs = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 3f218c9cb7fd..1f6d0a994568 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -166,7 +166,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(input1, 5), CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0) + assertNumStateRows(total = 2, updated = 0, lateInput = 1) ) } @@ -219,12 +219,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state AddData(rightInput, (1, 20), (1, 21), (1, 28)), CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)), - assertNumStateRows(total = 5, updated = 1), + assertNumStateRows(total = 5, updated = 1, lateInput = 1), // New data to left input with leftTime <= 20 should be filtered due to event time watermark AddData(leftInput, (1, 20), (1, 21)), CheckNewAnswer((1, 21, 28)), - assertNumStateRows(total = 6, updated = 1) + assertNumStateRows(total = 6, updated = 1, lateInput = 1) ) } @@ -293,7 +293,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)), - assertNumStateRows(total = 11, updated = 1), // only 31 added + assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added // Advance the watermark AddData(rightInput, (1, 80)), @@ -307,7 +307,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state CheckNewAnswer((1, 49, 50), (1, 50, 50)), - assertNumStateRows(total = 7, updated = 1) // 50 added + assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 08b3644745f9..79028a6c442d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -64,6 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 3, + | "numLateInputs" : 0, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, | "loadedMapCacheMissCount" : 0, @@ -113,7 +114,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2 + | "memoryUsedBytes" : 2, + | "numLateInputs" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -321,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numLateInputs = 0, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -353,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputs = 0)), sources = Array( new SourceProgress( description = "source",