Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,16 @@ Any of the stateful operation(s) after any of below stateful operations can have
As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
emits late rows if the operator uses Append mode.

Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:

1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent.

Please note that the definition of "input" is relative: it doesn't always mean "input rows" for the operator.
Streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to check the fact whether the value is zero
or non-zero.

There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

Expand Down
5 changes: 3 additions & 2 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ object MimaExcludes {
// false positive, no binary incompatibility
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.ChiSqSelectorModel"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.ChiSqSelector"),

//[SPARK-31840] Add instance weight support in LogisticRegressionSummary
// weightCol in org.apache.spark.ml.classification.LogisticRegressionSummary is present only in current version
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightCol")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.weightCol"),
// [SPARK-24634] Add a new metric regarding number of inputs later than watermark plus allowed delay
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StateOperatorProgress.<init>$default$4")
)

// Exclude rules for 3.0.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ case class FlatMapGroupsWithStateExec(
// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
applyRemovingRowsOlderThanWatermark(iter, predicate)
case _ =>
iter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ trait ProgressReporter extends Logging {
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0)
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputs = 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ case class StreamingSymmetricHashJoinExec(
WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
case Some(watermarkExpr) =>
val predicate = Predicate.create(watermarkExpr, inputAttributes)
inputIter.filter { row => !predicate.eval(row) }
applyRemovingRowsOlderThanWatermark(inputIter, predicate)
case None =>
inputIter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numLateInputs" -> SQLMetrics.createMetric(sparkContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe name it as numDroppedRowsByWatermark? Technically, we don't guarantee to drop all late inputs. So this metric doesn't report the accurate number of late inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Yeah I guess I tried to explain the behavior but the name seems to be still confusing to others. I agree the suggested name is clearer.

Btw, would we be better to have accurate number of late inputs? (Just asking because #24936 covers this, and can be applied orthogonally like num of late inputs vs num of dropped inputs.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"number of inputs which are later than watermark ('inputs' are relative to operators)"),
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
"allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"),
Expand All @@ -100,6 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
memoryUsedBytes = longMetric("stateMemory").value,
numLateInputs = longMetric("numLateInputs").value,
javaConvertedCustomMetrics
)
}
Expand Down Expand Up @@ -132,6 +135,16 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
}.toMap
}

protected def applyRemovingRowsOlderThanWatermark(
iter: Iterator[InternalRow],
predicateFilterOutLateInput: BasePredicate): Iterator[InternalRow] = {
iter.filterNot { row =>
val lateInput = predicateFilterOutLateInput.eval(row)
if (lateInput) longMetric("numLateInputs") += 1
lateInput
}
}

/**
* Should the MicroBatchExecution run another batch based on this stateful operator and the
* current updated metadata.
Expand Down Expand Up @@ -328,7 +341,8 @@ case class StateStoreSaveExec(
// Assumption: watermark predicates must be non-empty if append mode is allowed
case Some(Append) =>
allUpdatesTimeMs += timeTakenMs {
val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
watermarkPredicateForData.get)
while (filteredIter.hasNext) {
val row = filteredIter.next().asInstanceOf[UnsafeRow]
stateManager.put(store, row)
Expand Down Expand Up @@ -371,7 +385,7 @@ case class StateStoreSaveExec(
new NextIterator[InternalRow] {
// Filter late date using watermark if specified
private[this] val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}
private val updatesStartTimeNs = System.nanoTime
Expand Down Expand Up @@ -456,7 +470,7 @@ case class StreamingDeduplicateExec(
val commitTimeMs = longMetric("commitTimeMs")

val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long,
val numLateInputs: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
) extends Serializable {

Expand All @@ -52,13 +53,17 @@ class StateOperatorProgress private[sql](
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics)
private[sql] def copy(
newNumRowsUpdated: Long,
newNumLateInputs: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, newNumLateInputs,
customMetrics)

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numLateInputs" -> JInt(numLateInputs)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
val keys = customMetrics.keySet.asScala.toSeq.sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumLateInputs(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputs(1)
)
}

Expand All @@ -321,12 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumLateInputs(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumLateInputs(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputs(1)
)
}

Expand Down Expand Up @@ -783,6 +788,18 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

private def assertNumLateInputs(numLateInputs: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.filterNot { p =>
// filter out batches which are falling into one of types:
// 1) doesn't execute the batch run
// 2) empty input batch
p.inputRowsPerSecond == 0
}.lastOption.get
assert(progressWithData.stateOperators(0).numLateInputs === numLateInputs)
true
}

/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
Execute("AssertEventStats") { q =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ trait StateStoreMetricsTest extends StreamTest {
lastCheckedRecentProgressIndex = -1
}

def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q =>
def assertNumStateRows(
total: Seq[Long],
updated: Seq[Long],
lateInputs: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" +
s", late inputs = $lateInputs") { q =>
// This assumes that the streaming query will not make any progress while the eventually
// is being executed.
eventually(timeout(streamingTimeout)) {
Expand All @@ -51,6 +55,9 @@ trait StateStoreMetricsTest extends StreamTest {
val allNumUpdatedRowsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))

val allNumLateInputsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numLateInputs))

lazy val debugString = "recent progresses:\n" +
progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")

Expand All @@ -60,13 +67,22 @@ trait StateStoreMetricsTest extends StreamTest {
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")

val numLateInputs = arraySum(allNumLateInputsSinceLastCheck, numStateOperators)
assert(numLateInputs === lateInputs, s"incorrect late inputs, $debugString")

lastCheckedRecentProgressIndex = recentProgress.length - 1
}
true
}

def assertNumStateRows(total: Long, updated: Long): AssertOnQuery =
assertNumStateRows(Seq(total), Seq(updated))
def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = {
assert(total.length === updated.length)
assertNumStateRows(total, updated, lateInputs = (0 until total.length).map(_ => 0L))
}

def assertNumStateRows(total: Long, updated: Long, lateInput: Long = 0): AssertOnQuery = {
assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput))
}

def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {
if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
testStream(result, Append)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1),
assertNumStateRows(total = 1, updated = 1),
assertNumStateRows(total = 1, updated = 1, lateInput = 0),
AddData(inputData, "a" -> 2), // Dropped
CheckLastBatch(),
assertNumStateRows(total = 1, updated = 0),
assertNumStateRows(total = 1, updated = 0, lateInput = 0),
AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1),
assertNumStateRows(total = 2, updated = 1)
assertNumStateRows(total = 2, updated = 1, lateInput = 0)
)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(total = 1, updated = 0),
assertNumStateRows(total = 1, updated = 0, lateInput = 1),

AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25
CheckNewAnswer(45),
Expand Down Expand Up @@ -136,7 +136,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputs = Seq(0L, 1L)),

AddData(inputData, 40), // Advance watermark to 30 seconds
CheckLastBatch((15 -> 1), (25 -> 1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(input1, 5),
CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0)
assertNumStateRows(total = 2, updated = 0, lateInput = 1)
)
}

Expand Down Expand Up @@ -219,12 +219,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
// (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
AddData(rightInput, (1, 20), (1, 21), (1, 28)),
CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
assertNumStateRows(total = 5, updated = 1),
assertNumStateRows(total = 5, updated = 1, lateInput = 1),

// New data to left input with leftTime <= 20 should be filtered due to event time watermark
AddData(leftInput, (1, 20), (1, 21)),
CheckNewAnswer((1, 21, 28)),
assertNumStateRows(total = 6, updated = 1)
assertNumStateRows(total = 6, updated = 1, lateInput = 1)
)
}

Expand Down Expand Up @@ -293,7 +293,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state
CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
assertNumStateRows(total = 11, updated = 1), // only 31 added
assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added

// Advance the watermark
AddData(rightInput, (1, 80)),
Expand All @@ -307,7 +307,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state
CheckNewAnswer((1, 49, 50), (1, 50, 50)),
assertNumStateRows(total = 7, updated = 1) // 50 added
assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 3,
| "numLateInputs" : 0,
| "customMetrics" : {
| "loadedMapCacheHitCount" : 1,
| "loadedMapCacheMissCount" : 0,
Expand Down Expand Up @@ -113,7 +114,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2
| "memoryUsedBytes" : 2,
| "numLateInputs" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -321,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3,
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numLateInputs = 0,
customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
.mapValues(long2Long).asJava)
Expand Down Expand Up @@ -353,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputs = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand Down