Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1503,8 +1503,6 @@ Additional details on supported joins:

- Cannot use streaming aggregations before joins.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.


### Streaming Deduplication
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
Expand Down Expand Up @@ -1616,6 +1614,8 @@ this configuration judiciously.
### Arbitrary Stateful Operations
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).

Though Spark cannot check and force it, state function should be implemented with respect of semantic of output mode. e.g. In update mode Spark doesn't expect state function will emit rows which are older than current watermark, whereas in Append mode state function can emit these rows.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

respect of -> respect to
semantic -> the semantics
e.g. -> For example,
update -> Update
expect state -> expect that the state
state function -> the state function


### Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.
Expand Down Expand Up @@ -1647,6 +1647,27 @@ For example, sorting on the input stream is not supported, as it requires keepin
track of all the data received in the stream. This is therefore fundamentally hard to execute
efficiently.

### Limitation of global watermark

In some circumstance, some stateful operations could emit rows older than current watermark (with allowed delay),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In some circumstances, if a stateful operation emits rows ..."
Is it clearer to say something like "the current watermark plus allowed late record delay"?
uses global -> uses a global
and these -> , then these
could bring -> can cause a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can we say anything more about the 'circumstances'? how can users know if this affects them? is it what is described below? then you could say so explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just remove "In some circumstance", as that's expected behavior as always.

  • Streaming aggregation in Append mode: only evicted rows are emitted by nature, as it should wait for inputs until watermark passes by, to ensure there's no more rows to aggregate with such key.
  • Outer Join (any mode): it basically emits matched rows, but it also emits evicted rows if the row haven't matched with other side of row. That's what outer join is, so it emits evicted rows conditionally, but mostly expected.
  • FlatMapGroupsWithState in Append mode: it strongly depends on the implementation of state function, but if state function respects the semantic of Append mode, it could high likely emit evicted rows to ensure there's no further input rows to affect emitted rows.

In fact, emitting evicted rows are tied to the semantic of Append mode.

how can users know if this affects them?

We'll log warning message during checking unsupported operations. We can easily change the behavior to block the query as unsupported as well, so it's up to the community (mostly committers/PMCs) decision.

is it what is described below?

Yes. And with SPARK-24634 we can measure the number of late rows per stateful operator in runtime. (For now there's no information on late rows. Spark discards them with no metrics.) First stateful operator could have late rows, but following state operators shouldn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced In some circumstance with In Append mode for clarification.

which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be discarded implies that it's OK to discard them. Are you saying "may be discarded"?
Then I'd say "if a stateful operation emits rows older ... note that these rows may be discarded"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be discarded implies that it's OK to discard them. Are you saying "may be discarded"?

Ah thanks for correcting the details. If former interprets as that nuance, latter is correct.

This could bring correctness issue.

This is a limitation of global watermark and operator-wise watermark is not yet supported. Before Spark will support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of a global watermark, as operator-wise..."
I'm not sure what the next sentence means, is this a statement about potential future changes? I would omit it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it describes potential future change. I left the content because operator-wise watermark is what Spark is left behind so it might be better to provide some promise, but it's not even ongoing effort (may require a new SPIP) so omitting it would be better to not provide wrong signal. I'll omit it.

operator-wise watermark, Spark will check the logical plan of query and log warning when Spark detects such pattern.

Any stateful operation(s) after any of below stateful operations are possibly having issue:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any of below -> any of the following
are possibly having issue -> can have this issue


* streaming aggregation in Append mode
* stream-stream outer join
* mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe back-tick-quote the method names?
state function -> the state function


As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that state function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that state -> that the state
is append mode -> uses Append mode

could emit late rows if the operator is append mode.

There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

## Starting Streaming Queries
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter`
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand All @@ -30,7 +31,7 @@ import org.apache.spark.sql.streaming.OutputMode
/**
* Analyzes the presence of unsupported operations in a logical plan.
*/
object UnsupportedOperationChecker {
object UnsupportedOperationChecker extends Logging {

def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
Expand All @@ -41,8 +42,57 @@ object UnsupportedOperationChecker {
}
}

def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
def checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode,
failWhenDetected: Boolean): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
case s: Aggregate
if s.isStreaming && outputMode == InternalOutputModes.Append => true
case Join(left, right, joinType, _, _)
if left.isStreaming && right.isStreaming && joinType != Inner => true
case f: FlatMapGroupsWithState
if f.isStreaming && f.outputMode == OutputMode.Append() => true
case _ => false
}

def isStatefulOperation(p: LogicalPlan): Boolean = p match {
case s: Aggregate if s.isStreaming => true
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
case f: FlatMapGroupsWithState if f.isStreaming => true
case d: Deduplicate if d.isStreaming => true
case _ => false
}

var loggedWarnMessage = false
plan.foreach { subPlan =>
if (isStatefulOperation(subPlan)) {
subPlan.find { p =>
(p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p)
} match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach on the Option instead of match?

case Some(_) =>
val errorMsg = "Detected pattern of possible 'correctness' issue " +
"due to global watermark. " +
"The query contains stateful operation which can possibly emit late rows, and " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above suggestions about editing this text

"downstream stateful operation which drop emitted late rows. " +
"Please refer the programming guide doc for more details."

if (failWhenDetected) {
throwError(errorMsg)(plan)
} else {
if (!loggedWarnMessage) {
logWarning(s"$errorMsg;\n$plan")
loggedWarnMessage = true
}
}

case _ =>
}
}
}
}

def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
if (!plan.isStreaming) {
throwError(
"Queries without streaming sources cannot be executed with writeStream.start()")(plan)
Expand Down Expand Up @@ -339,6 +389,8 @@ object UnsupportedOperationChecker {
// Check if there are unsupported expressions in streaming query plan.
checkUnsupportedExpressions(subPlan)
}

checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false)
}

def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,153 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
null,
new TestStreamingRelationV2(attribute)), OutputMode.Append())

// streaming aggregation
{
assertPassOnGlobalWatermarkLimit(
"single streaming aggregation in Append mode",
streamRelation.groupBy("a")(count("*")),
OutputMode.Append())

assertFailOnGlobalWatermarkLimit(
"chained streaming aggregations in Append mode",
streamRelation.groupBy("a")(count("*")).groupBy()(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
val plan = streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType)
assertFailOnGlobalWatermarkLimit(
s"$joinType join after streaming aggregation in Append mode",
streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType),
OutputMode.Append())
}

assertFailOnGlobalWatermarkLimit(
"deduplicate after streaming aggregation in Append mode",
Deduplicate(Seq(attribute), streamRelation.groupBy("a")(count("*"))),
OutputMode.Append())

assertFailOnGlobalWatermarkLimit(
"FlatMapGroupsWithState after streaming aggregation in Append mode",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
streamRelation.groupBy("a")(count("*"))),
OutputMode.Append())
}

// stream-stream join
// stream-stream inner join doesn't emit late rows, whereas outer joins could
Seq((Inner, false), (LeftOuter, true), (RightOuter, true)).map { case (joinType, expectFailure) =>
assertPassOnGlobalWatermarkLimit(
s"single $joinType join in Append mode",
streamRelation.join(streamRelation, joinType = RightOuter,
condition = Some(attributeWithWatermark === attribute)),
OutputMode.Append())

testGlobalWatermarkLimit(
s"streaming aggregation after stream-stream $joinType join in Append mode",
streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attributeWithWatermark === attribute))
.groupBy("a")(count("*")),
OutputMode.Append(),
expectFailure = expectFailure)

Seq(Inner, LeftOuter, RightOuter).map { joinType2 =>
testGlobalWatermarkLimit(
s"streaming-stream $joinType2 after stream-stream $joinType join in Append mode",
streamRelation.join(
streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attributeWithWatermark === attribute)),
joinType = joinType2,
condition = Some(attributeWithWatermark === attribute)),
OutputMode.Append(),
expectFailure = expectFailure)
}

testGlobalWatermarkLimit(
s"FlatMapGroupsWithState after stream-stream $joinType join in Append mode",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attributeWithWatermark === attribute))),
OutputMode.Append(),
expectFailure = expectFailure)

testGlobalWatermarkLimit(
s"deduplicate after stream-stream $joinType join in Append mode",
Deduplicate(Seq(attribute), streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attributeWithWatermark === attribute))),
OutputMode.Append(),
expectFailure = expectFailure)
}

// FlatMapGroupsWithState
{
assertPassOnGlobalWatermarkLimit(
"single FlatMapGroupsWithState in Append mode",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation),
OutputMode.Append())

assertFailOnGlobalWatermarkLimit(
"streaming aggregation after FlatMapGroupsWithState in Append mode",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
assertFailOnGlobalWatermarkLimit(
s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
streamRelation.join(
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation), joinType = joinType,
condition = Some(attributeWithWatermark === attribute)),
OutputMode.Append())
}

assertFailOnGlobalWatermarkLimit(
"FlatMapGroupsWithState after FlatMapGroupsWithState in Append mode",
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation)),
OutputMode.Append())

assertFailOnGlobalWatermarkLimit(
s"deduplicate after FlatMapGroupsWithState in Append mode",
Deduplicate(Seq(attribute),
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation)),
OutputMode.Append())
}

// deduplicate
{
assertPassOnGlobalWatermarkLimit(
"streaming aggregation after deduplicate in Append mode",
Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
assertPassOnGlobalWatermarkLimit(
s"$joinType join after deduplicate in Append mode",
streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType,
condition = Some(attributeWithWatermark === attribute)),
OutputMode.Append())
}

assertPassOnGlobalWatermarkLimit(
"FlatMapGroupsWithState after deduplicate in Append mode",
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
Deduplicate(Seq(attribute), streamRelation)),
OutputMode.Append())
}

/*
=======================================================================================
TESTING FUNCTIONS
Expand Down Expand Up @@ -839,6 +986,40 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
}
}


def assertPassOnGlobalWatermarkLimit(
testNamePostfix: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false)
}

def assertFailOnGlobalWatermarkLimit(
testNamePostfix: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true)
}

def testGlobalWatermarkLimit(
testNamePostfix: String,
plan: LogicalPlan,
outputMode: OutputMode,
expectFailure: Boolean): Unit = {
test(s"Global watermark limit - $testNamePostfix") {
if (expectFailure) {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
} else {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
}
}
}

/**
* Test whether the body of code will fail. If it does fail, then check if it has expected
* messages.
Expand Down