-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28074][SS] Log warn message on possible correctness issue for multiple stateful operations in single query #24890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #106575 has finished for PR 24890 at commit
|
|
Test build #106576 has finished for PR 24890 at commit
|
| - One of "alternative" approach is breaking down your query into multiple chained queries, each per stateful operation. | ||
| - Each query must guarantee "end-to-end" exactly once, otherwise intermediate outputs can be duplicated which leads to incorrect outputs. | ||
| - Only 'Append mode' can be "semantically" correct for a query having multiple stateful operations. | ||
| - In 'Update mode', downstream stateful operator cannot distinguish whether the input is new, or updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @HeartSaVioR .
Could you add the screenshot of the generate HTML page result to the PR description, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meh, I don't know if it matters. We can evaluate the text and formatting pretty well here. That said there are too many bullet-pointed sentences here. Can't it just be prose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I agree with @srowen regarding screenshot, as it doesn't draw some UI like table. They're just bullet points with proper indentation, and if we add it to PR description I should update the screenshot whenever I reflect review comments, which sounds redundant.
Regarding too many bullet-pointed sentences, I agree there're some unnecessary bullet points. I'll concat. Any feedbacks on nuance, writing style, etc are welcome, as I'm not native speaker of English.
| - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. | ||
| - `spark.sql.streaming.stateStore.providerClass`: To read the previous state of the query properly, the class of state store provider should be unchanged. | ||
| - `spark.sql.streaming.multipleWatermarkPolicy`: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged. | ||
| - Structured Streaming uses `global watermark` which might impact on query having multiple stateful operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a global watermark? it also shouldn't be code-formatted
impact on query -> impact a query.
How would it impact the query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix on above two lines of review comment.
How would it impact the query?
The answer is in below line - Fail to answer above questions might lead to incorrect outputs - e.g. intermediate outputs being discarded. Not sure the flow looks natural, or you might want to revise the format/flow of content.
| - How global watermark is calculated on your query? | ||
| - How global watermark is applied to each stateful operator? | ||
| - Is there any intermediate output being discarded as "late input" due to watermark? | ||
| - Fail to answer above questions might lead to incorrect outputs - e.g. intermediate outputs being discarded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably an important topic but I'm not clear what the guidance is here. If my answers are what then what do I do? what do you mean fail to answer the question? can this be more direct? what's an example of a problem, cause and solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/SPARK-28094 is one of the example on problem, as I tried to help resolving that issue (in user mailing list) and found the correctness issue.
If my answers are what then what do I do?
The set of questions are asking theirselves as whether they're fully understanding the details of watermark, because it could bring "unexpected" discard on rows between stateful operators, say, correctness issue.
So it's like an exam to prevent end users to "just do it as Spark allows it" without fully understanding what they're doing. If they can't answer with their query, they should find alternative approach.
what do you mean fail to answer the question?
Literally. If they are having any small unsure thing about answering these question they fail. I know it's too hard on end users, but well, better than realizing incorrect outputs in production env.
can this be more direct? what's an example of a problem, cause and solution?
I'm happy to do it but not sure how much I need to explain with details. If we want to have detailed explanation like providing example, maybe better to have individual chapter. It might be long enough to use as a blog post. This is an advanced topic - the book "Streaming Systems" assigns a chapter for watermark - so may not be feasible to add the details to the guide page, but due to the importance we should mention it. So that looks to be a dilemma.
To be honest, there's no "solution", only workaround, since it's due to lack of feature - watermark propagation and stateful operator-wise watermark. https://issues.apache.org/jira/browse/SPARK-26655 tries to address this, but not much efforts are here due to lack of interests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this might be too much for the docs here, but can we explain more clearly what type of incorrect usage may cause what problem? so that users know what to look for, in both their causes and symptoms. Otherwise it's just kind of saying "there are problems here if you're not careful".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what type of incorrect usage may cause what problem
Incorrect usages can't be collected out as some types - Spark allows arbitrary operations on state via map/flatMapGroupsWithState, so can't enumerate the overall usages and picks up incorrect usages. Users are open to encounter the situation whenever they're using multiple stateful operations in a query, according to their query.
One symptom would be incorrect outputs as SPARK-26655 represents, but Spark doesn't provide any information to trace back (dig) the issue so I had to analyze the query via answering these questions for one of query in SPARK-26655.
Maybe we shouldn't refer the case as "incorrect usage" even they've encountered, since the issue is coming from design and limitation on watermark in structured streaming. It's not their fault and they're not doing wrong: Spark just doesn't support it.
As I commented below, maybe disallowing the cases (with guiding alternative) until we make it correct would be better approach, but I'd defer to committers/PMCs on how to handle this.
| - One of "alternative" approach is breaking down your query into multiple chained queries, each per stateful operation. | ||
| - Each query must guarantee "end-to-end" exactly once, otherwise intermediate outputs can be duplicated which leads to incorrect outputs. | ||
| - Only 'Append mode' can be "semantically" correct for a query having multiple stateful operations. | ||
| - In 'Update mode', downstream stateful operator cannot distinguish whether the input is new, or updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meh, I don't know if it matters. We can evaluate the text and formatting pretty well here. That said there are too many bullet-pointed sentences here. Can't it just be prose?
|
IMHO clearer solution is just preventing any possible pairs and documenting alternative approach in guide doc, as I don't expect many end users could answer these questions. We don't need to start with long and complicated explanation to let starters or light users fear about. I'm just careful because then I'm proposing to drop supporting existing features (though I'm unsure these have been used). |
|
Not yet considered the problem in depth but as a warm-up question: Is it possible to block the scenario(exception) or print warning in such cases? |
If you meant can we let Spark analyze query and check whether global watermark brings correctness issue for the query, then not sure we can. Instead, Spark could measure number of discarded rows in each stateful operator - that said, #21617 may be reopened - which helps letting end users indicate the issue from their query. I don't think it's sufficient though, as they would recognize while running query and the query would already push incorrect outputs to sink, and they have to stop the query and do non-trivial work (analyze, take workaround) to resolve the issue. |
|
Now I'm trying to narrow down the issue... As we've figured out global watermark is the root cause (to discard intermediate rows or evict rows from state incorrectly), the condition which might bring correctness issue is, 1) having multiple stateful operators 2) more than one stateful operators have watermark to discard input rows/evict states. Note that the condition is applied to "Append mode". For "Update mode" and "Complete mode", the behavior on multiple stateful operators are not defined properly - most of cases they need retraction to correct the outputs for given key which is not supported, hence we could just define the condition as having multiple stateful operators. Please also note that not all the cases would produce incorrect outputs. That makes hard to say what is the best approach from Spark side to avoid the correctness issue. Maybe we have some options here:
Would like to hear your opinions about these options. |
|
I've invested time and now I understand the issue. I can confirm Spark discards data.
I agree with this. If the user will see the problem only after the queries started is not sufficient. |
|
After consideration I would vote on point 2 because of the following:
|
| - `spark.sql.streaming.multipleWatermarkPolicy`: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged. | ||
| - Structured Streaming uses `global watermark` which might impact on query having multiple stateful operations. | ||
| - Stateful operators: aggregation, deduplication, stream-stream join, (flat)mapGroupsWithState | ||
| - You should be able to answer below questions for your query to get correct outputs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this will be understood by users (I know the details since played a lot with it but also a bit struggling :)). I would focus on global watermark and maybe the simplest discard example + mentioning this may apply other situations.
All in all I think the content is good but somehow this style/way of representing is odd to me.
|
cc. @tdas @zsxwing @jose-torres This would also require either #24936 or #21617 (if we prefer #21617 I'll rework and submit again). |
|
Ping. |
|
retest this, please |
|
Test build #110706 has finished for PR 24890 at commit
|
741701b to
9555bba
Compare
|
Test build #110711 has finished for PR 24890 at commit
|
3f1cd11 to
3c0ff7f
Compare
…erations in single query
3c0ff7f to
fff18ff
Compare
|
I've revisited this and revised the patch. Now Spark will log warning message if the query contains the pattern of data loss on intermediate outputs. I've updated docs as well to remove the content which would be too hard to understand from end users. I'm not sure the nuance is proper, so please review this again and correct the sentences. Thanks! |
|
Test build #111472 has finished for PR 24890 at commit
|
|
Test build #111471 has finished for PR 24890 at commit
|
|
retest this, please |
|
Test build #111475 has finished for PR 24890 at commit
|
|
Test build #111477 has finished for PR 24890 at commit
|
| ### Arbitrary Stateful Operations | ||
| Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). | ||
|
|
||
| Though Spark cannot check and force it, state function should be implemented with respect of semantic of output mode. e.g. In update mode Spark doesn't expect state function will emit rows which are older than current watermark, whereas in Append mode state function can emit these rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
respect of -> respect to
semantic -> the semantics
e.g. -> For example,
update -> Update
expect state -> expect that the state
state function -> the state function
|
|
||
| ### Limitation of global watermark | ||
|
|
||
| In some circumstance, some stateful operations could emit rows older than current watermark (with allowed delay), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"In some circumstances, if a stateful operation emits rows ..."
Is it clearer to say something like "the current watermark plus allowed late record delay"?
uses global -> uses a global
and these -> , then these
could bring -> can cause a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can we say anything more about the 'circumstances'? how can users know if this affects them? is it what is described below? then you could say so explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just remove "In some circumstance", as that's expected behavior as always.
- Streaming aggregation in Append mode: only evicted rows are emitted by nature, as it should wait for inputs until watermark passes by, to ensure there's no more rows to aggregate with such key.
- Outer Join (any mode): it basically emits matched rows, but it also emits evicted rows if the row haven't matched with other side of row. That's what outer join is, so it emits evicted rows conditionally, but mostly expected.
- FlatMapGroupsWithState in Append mode: it strongly depends on the implementation of state function, but if state function respects the semantic of Append mode, it could high likely emit evicted rows to ensure there's no further input rows to affect emitted rows.
In fact, emitting evicted rows are tied to the semantic of Append mode.
how can users know if this affects them?
We'll log warning message during checking unsupported operations. We can easily change the behavior to block the query as unsupported as well, so it's up to the community (mostly committers/PMCs) decision.
is it what is described below?
Yes. And with SPARK-24634 we can measure the number of late rows per stateful operator in runtime. (For now there's no information on late rows. Spark discards them with no metrics.) First stateful operator could have late rows, but following state operators shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced In some circumstance with In Append mode for clarification.
| which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. | ||
| This could bring correctness issue. | ||
|
|
||
| This is a limitation of global watermark and operator-wise watermark is not yet supported. Before Spark will support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"of a global watermark, as operator-wise..."
I'm not sure what the next sentence means, is this a statement about potential future changes? I would omit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it describes potential future change. I left the content because operator-wise watermark is what Spark is left behind so it might be better to provide some promise, but it's not even ongoing effort (may require a new SPIP) so omitting it would be better to not provide wrong signal. I'll omit it.
| This is a limitation of global watermark and operator-wise watermark is not yet supported. Before Spark will support | ||
| operator-wise watermark, Spark will check the logical plan of query and log warning when Spark detects such pattern. | ||
|
|
||
| Any stateful operation(s) after any of below stateful operations are possibly having issue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any of below -> any of the following
are possibly having issue -> can have this issue
|
|
||
| * streaming aggregation in Append mode | ||
| * stream-stream outer join | ||
| * mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe back-tick-quote the method names?
state function -> the state function
| * stream-stream outer join | ||
| * mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function) | ||
|
|
||
| As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that state function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that state -> that the state
is append mode -> uses Append mode
...lyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
Outdated
Show resolved
Hide resolved
| if (isStatefulOperation(subPlan)) { | ||
| subPlan.find { p => | ||
| (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) | ||
| } match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foreach on the Option instead of match?
| case Some(_) => | ||
| val errorMsg = "Detected pattern of possible 'correctness' issue " + | ||
| "due to global watermark. " + | ||
| "The query contains stateful operation which can possibly emit late rows, and " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above suggestions about editing this text
|
Test build #111499 has finished for PR 24890 at commit
|
|
Test build #111500 has finished for PR 24890 at commit
|
|
Test build #111506 has finished for PR 24890 at commit
|
| ### Arbitrary Stateful Operations | ||
| Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). | ||
|
|
||
| Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state function -> the state function
of output -> of the output
| ### Limitation of global watermark | ||
|
|
||
| In Append mode, some stateful operations could emit rows older than current watermark plus allowed late record delay, | ||
| which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be discarded implies that it's OK to discard them. Are you saying "may be discarded"?
Then I'd say "if a stateful operation emits rows older ... note that these rows may be discarded"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be discarded implies that it's OK to discard them. Are you saying "may be discarded"?
Ah thanks for correcting the details. If former interprets as that nuance, latter is correct.
|
|
||
| In Append mode, some stateful operations could emit rows older than current watermark plus allowed late record delay, | ||
| which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. | ||
| This is a limitation of global watermark, and it could bring correctness issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of global -> of a global
could bring correctness -> can potentially cause a correctness..
| which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. | ||
| This is a limitation of global watermark, and it could bring correctness issue. | ||
|
|
||
| Spark will check the logical plan of query and log warning when Spark detects such pattern. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log warning -> log a warning
such pattern -> such a pattern
But above don't you say that Spark can't check this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above sentence means "Spark can't check the state function is implemented via semantics of output mode" because we don't restrict anything on implementing state function (even FlatMapGroupsWithState knows about output mode but doesn't do anything with output mode), whereas this is checking logical plan to find the pattern which is possible - assuming the state function is implemented via semantics of output mode.
|
|
||
| Spark will check the logical plan of query and log warning when Spark detects such pattern. | ||
|
|
||
| Any of the following stateful operation(s) after any of below stateful operations can have this issue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You refer to the below operations twice here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK I didn't mean "following" as same as "below" but it's pretty confusing. As we mention "after" here, "following" should be removed. Thanks!
|
|
||
| * streaming aggregation in Append mode | ||
| * stream-stream outer join | ||
| * `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of state function) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state function -> the state function
|
Test build #111543 has finished for PR 24890 at commit
|
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The text looks good. Anyone else want to evaluate the test/claim here?
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just revisited and LGTM.
|
Merged to master |
|
Thanks all for reviewing and merging! |
What changes were proposed in this pull request?
Please refer the link on dev. mailing list to see rationalization of this patch.
This patch adds the functionality to detect the possible correct issue on multiple stateful operations in single streaming query and logs warning message to inform end users.
This patch also documents some notes to inform caveats when using multiple stateful operations in single query, and provide one known alternative.
How was this patch tested?
Added new UTs in UnsupportedOperationsSuite to test various combination of stateful operators on streaming query.