-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28074][SS] Log warn message on possible correctness issue for multiple stateful operations in single query #24890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
fff18ff
6f22864
232a8fc
4206bb3
bb24dcb
b6c7f7b
57f1cec
d2d511e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1505,7 +1505,6 @@ Additional details on supported joins: | |
|
|
||
| - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins. | ||
|
|
||
|
|
||
| ### Streaming Deduplication | ||
| You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. | ||
|
|
||
|
|
@@ -1616,6 +1615,8 @@ this configuration judiciously. | |
| ### Arbitrary Stateful Operations | ||
| Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). | ||
|
|
||
| Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows. | ||
|
|
||
| ### Unsupported Operations | ||
| There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. | ||
| Some of them are as follows. | ||
|
|
@@ -1647,6 +1648,26 @@ For example, sorting on the input stream is not supported, as it requires keepin | |
| track of all the data received in the stream. This is therefore fundamentally hard to execute | ||
| efficiently. | ||
|
|
||
| ### Limitation of global watermark | ||
|
|
||
| In Append mode, some stateful operations could emit rows older than current watermark plus allowed late record delay, | ||
| which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. | ||
|
||
| This is a limitation of global watermark, and it could bring correctness issue. | ||
|
||
|
|
||
| Spark will check the logical plan of query and log warning when Spark detects such pattern. | ||
|
||
|
|
||
| Any of the following stateful operation(s) after any of below stateful operations can have this issue: | ||
|
||
|
|
||
| * streaming aggregation in Append mode | ||
| * stream-stream outer join | ||
| * `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of state function) | ||
|
||
|
|
||
| As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function | ||
| could emit late rows if the operator uses Append mode. | ||
|
|
||
| There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure | ||
| end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. | ||
|
|
||
| ## Starting Streaming Queries | ||
| Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter` | ||
| ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state function -> the state function
of output -> of the output