-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31706][SQL] add back the support of streaming update mode #28523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
-1 As we agreed when this was removed, it is ambiguous and should not be in DSv2. Let's add this feature the right way. |
|
@rdblue I absolutely agree that the Update mode does require major improvements and we should fix them for DSv2. However, we CANNOT release Spark 3.0 where a streaming query that worked by default in Spark 2.4 will not work by default in Spark 3.0. Update mode is extensively used in streaming aggregation queries with foreachBatch, foreach, Kafka, and even console for debugging purposes. To make them work with update mode in Spark 3.0, as far as I can see, there are the following two options
In the interest of how late we are in 3.0 release timeline, I am inclined to do 2 to absolutely minimize risk. Furthermore, from the Spark user's point of view, they do not care that much about DSv1 or DSv2 implementations of the built-in sources. They just want it to work. So suddenly Update mode not working Spark 3.0 without any alternative is not okay. The only way to roll out the alternative API (say, Upsert mode) is to
So we have to keep supporting Update mode irrespective of what implementation is used underneath until Spark 4.0. |
|
Okay, so it sounds like the problem is that this wasn't used in v2 sources when we removed it, but sources have since moved to use v2 by default and are missing updates as a result. I think the safest option is what you suggest as #2 -- let's add the feature using private extensions so that built-in sources work as expected. The alternative to allow sources to have diverging behavior until at least Spark 4.0 was already discussed and rejected when we removed the ambiguous mode. And, I think it is unlikely that it would be fixed even in the 4.0 release because we will want to avoid breaking third-party sources. |
|
@rdblue yes, those are fair concerns, and we have to discuss further to figure out a good way to roll out better-update mode. That's for future discussion. Thanks for understanding. I know this is not the ideal solution, but something we have to do in the interest of 3.0 given that v2 sources have been the default. Though, it would be great if you can review this PR. You are the one most familiar with these code paths. :) |
| case Update => | ||
| // Although no v2 sinks really support Update mode now, but during tests we do want them | ||
| // to pretend to support Update mode, and treat Update mode same as Append mode. | ||
| if (Utils.isTesting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan thank you for removing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what prevented tests from catching that v2 sinks didn't support update?
I find it very concerning that the default implementations were changed to v2 when sinks don't actually support update.
How do we plan to verify that the v2 path works, since we don't have reliable test coverage? Maybe we should move back to v1 after all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 2.4.x does not have this condition, and therefore all of Spark 2.4.x since Nov 2018 has been using v2 code paths. That is lot of production hours in v2 sinks and not in v1 sinks, and that dwarf the testing we can do with unit test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm my understanding, does DSV1 even support update mode in sink natively? I guess not. If then even moving back to v1 is not a solution.
As I commented earlier in other PR, the issue is that streaming output mode is not 100% tied with the output mode in sink. The streaming output mode is applied on the "result table" on aggregation which is "logical" and not materialized. The streaming output mode is working properly for the result table - it might be possible to say tests itself are correct, except the sink side.
I think the issue may be complicated than it seems, because of the fact the result table is flat (no information on keys), and end users can apply arbitrary operations on the result table to produce any form of data before putting to the sink. The semantic of streaming output mode can be lost in any operation between - we don't restrict anything and it's up to the end users to respect the semantic to shape the final output.
(flatMapGroupsWithState is even one of things we don't restrict anything w.r.t streaming output mode even for the result table. End users can emit multiple outputs for a key in append mode across batches - it's up to the end users to implement their logic correctly.)
If we call this as "freedom" and don't want to restrict the end users applying their logic, then it's not weird to have different semantic for streaming output mode and sink output mode, say, update mode for streaming output mode and append for sink output mode. It's up to end users to deal with upsert and that's what Spark has been providing for whole lifetime of Spark 2.x. AFAIK, in fact, sink output mode has been always append mode, regardless of streaming output mode.
(Maybe we want to match the representation of sink output mode as same as batch query when we'd like to separate both.)
So the boundary of streaming output mode is actually misleading and I don't think just adding a method in interface to define the key is enough to address this. (It's much more complicated than that.) I'm not sure there's a plan to force respecting the semantic between mode on result table and mode on sink, but at least as of now, streaming output mode is only effective on creating result table. The result table itself doesn't have the information of the semantic, and any further operation can break the semantic.
|
Test build #122596 has finished for PR 28523 at commit
|
|
retest this please |
|
About how to evolute to a better update mode, I have a rough idea. To me, the missing piece of the current update mode is: there is no update condition. This makes the behavior ambiguous. We can add a new API in |
|
Test build #122606 has finished for PR 28523 at commit
|
|
retest this please |
|
Test build #122617 has finished for PR 28523 at commit
|
|
Test build #5003 has started for PR 28523 at commit |
|
YES. Add the UPSERT mode is what we should try in Spark 3.1 release. Then, we can add the DELETE semantics like the other streaming systems. cc @xccui Could you try it? |
|
Hi all, I am not quite familiar with the version history of the streaming sink, but would like to share pieces of my thought here. Please correct me if I misunderstood.
I'll try to make some improvements to the related issues. |
|
Thanks for the great input, @xccui. Basically I agree with your input - that's the same as my understanding as I commented before (#28523 (comment)). To summarize my previous comment, I also don't know how the streaming output mode was designed, but from my understanding it's effective only on result table for stateful aggregation operators. It's not even applied for all stateful operators, e.g. the mode doesn't affect stream-stream join. It doesn't guarantee the final output is respecting the semantic, and then there's no meaning of applying the same on the sink side. Another concern comes into my mind is complete mode. The complete mode is also effective on the result table. It may sound making sense to support complete mode in sink as truncate and insert, but it leads to data loss for the case the result table is being union to other stream which is not creating "result table". (I haven't had such query but it's technically possible.) The complete mode will not care about the other stream and in every batch the previous output from the other stream will be lost. I think complete mode is weird one for streaming and better to discontinue supporting; I wouldn't expect any production query to use this mode, but please let me know if there is. Anyway I think the streaming update mode technically doesn't couple with the availability of sink. It should be left as it is, though we'll probably have to fix guide doc as the guide doc says it's for result table "as well as" for the sink. Description of the streaming output mode in sink should be corrected as well - they're not dependent on streaming output mode, and as of now only append is possible. ps. We may need to revisit the operators and streaming output modes to see any flaw, similarly I went through via discussion thread and #24890. One thing would be flatMapGroupsWithState with append mode. |
|
@HeartSaVioR Yes. It seems the output mode option was mainly designed for stateful aggregations, which means it actually works in a restricted way. Ideally, to support complete mode, all the operators must be capable of outputting the "complete" result seen so far for each epoch. Personally, I'm in favor of removing this mode in a future version. But for now, I propose to add more restrictions while doing the plan check (e.g., disallowing the union situation you mentioned) and also a note to the documentation. IMO, the mode of the result table should only be decided by the operators in the plan and it could either be "append" or "update" (including the current "complete" mode). Basically, the designated sink should match the mode of the result table. Usually, supporting "update" needs more effort and that means only part of the sinks could be chosen for a plan containing an aggregation or some kind of joins. |
|
Thanks for the good discussion! It will be great if you can create a JIRA ticket for the better output mode design and put the insights there. This PR is just to keep things as it was in 2.4, to unblock 3.0. |
|
I'm curious how SS committers think about these comments upon - if they agree about the comments then the issue is rather about the design issue of streaming output mode, and I think the right way to fix is decoupling streaming output mode with sink. Would it break backward compatibility? If you look back branch-2.4, you'll find it very surprised that most built-in sink implementations "ignore" the output mode. The output mode provided by StreamWriteSupport.createStreamWriter is not used at all, with only one exception. The only exception for built-in sink is memory sink, and it doesn't deal with the difference between append mode and update mode. It's only used to truncate the memory for complete mode. Given that the sink exists most likely for testing, it only helps to make tests be easier to implement, nothing else. Also I'm strongly in favor of dropping complete mode, as I already provided data loss issue, and I don't think it's production-wise. I don't even think custom sinks have been respecting the output mode, as the API lacks of information on update mode, and complete mode is not production-wise. The major problem is the time. I totally understand such change may feel a bit huge to go with the release which has already done in RC1 though... I hope we address it in the major release (that's a good rationalization), but if we really want to minimize the changes for now, what about adding As both That would make custom sinks only be able to append as we won't expose these abilities, but that's what I expect so far. Even with the new DSv2 implementing truncate in streaming sink looks to be very limited, as truncation should take place when committing, which means write tasks cannot write to the destination directly, not scalable. Does my proposal make sense? cc. to @tdas @zsxwing @jose-torres @brkyvz @jerryshao @gaborgsomogyi to hear their voices as well. Please cc. to more ppl if you have anyone to get some help taking a look at. |
|
retest this please |
|
I created a JIRA ticket (https://issues.apache.org/jira/browse/SPARK-31724) for that. Let's move the discussion there. |
|
Test build #122685 has finished for PR 28523 at commit
|
|
My proposal is target to Spark 3.0 - in short, let's not expose the availability of complete mode in sink (streaming truncate) in public API (custom sinks would only be able to do append) so that we open the possibility to drop the complete mode afterwards without bugging with breaking public API. This is same workaround path of streaming update, though the reason of workaround for streaming truncate is to remove afterwards. |
|
retest this please |
|
Test build #122787 has finished for PR 28523 at commit
|
|
retest this please |
|
Test build #122801 has finished for PR 28523 at commit
|
|
ping @tdas @rdblue @zsxwing @jose-torres |
|
@rdblue @tdas @jose-torres tests pass and it's ready for review |
|
This is the last blocker issue of Spark 3.0. Can we speed up the review? |
|
Without going into the nitty-gritty arguments about output modes (requires a different venue), I am okay with the changes. I think there are two ways to move forward
I think the major advantage of 1 over 2 is that the branches will stay in sync for the time being which makes backporting of fixes etc much easier. Furthermore, we will restore the reliability of the unit tests because it is testing the same thing that runs in production. I think the only disadvantage of 1 over 2, is that for the time being, even if marked internal, we are adding update mode back to the public DSv2 API in master. Personally I think the advantage of 1 over 2 outweighs the disadvantage. The disadvantage is a relatively minor one because this can be always changed in master in a principled way after further discussion. Until then it's best to keep 3.0 and master in sync to minimize the impact on the Spark developer community which is a much larger community than the DSv2 developer community (who anyways should know the risks of depending on unreleased master and internal APIs for developing their DSv2 sources). Hence I propose merging this PR to unblock 3.0 and not have API regressions in it. |
|
Personally I don't see any risk on 1, because we haven't make changes on major release (via merging this in), and master is for next minor release which has more risk to break. Option 2 means we decide to break things again in Spark 3.1, which may only make sense when we make progress on SPARK-31724 in Spark 3.1, but SPARK-31724 isn't necessary needed to depend on this so no point except forcing us to deal with it within Spark 3.1. |
|
LGTM. Primarily because all tests seem to work without the hack in the StreamExecution that force v1 sink execution. |
|
merging to master/3.0, thanks for the review! |
### What changes were proposed in this pull request? This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode. Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression `key1 = key2 + 10`. ### Why are the changes needed? In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911 It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility. ### Does this PR introduce _any_ user-facing change? Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4 ### How was this patch tested? existing tests. Closes #28523 from cloud-fan/update. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 34414ac) Signed-off-by: Wenchen Fan <[email protected]>
|
@cloud-fan, please stop merging pull requests that have a standing veto. It is better to ask for a review. |
|
I thought you have canceled your veto in #28523 (comment) , as what you agreed on is exactly what this PR is doing. Hopefully you have read the PR description before you leave -1. BTW, if someone leaves a veto, please be active to defend it. You can't just leave a veto and then disappear, assuming no one would merge the PR. It's a 3.0 blocker and your last review was 7 days ago. If you are busy with other stuff, please let people know when you are able to come back to review. |
|
This PR wasn't ready to review until late Monday where I am, and was merged less than a day later. I had also assumed that the fix would be in a separate PR, which is why I didn't follow this one too closely. In any case, if you want to merge a PR that has been vetoed, make sure concerns are satisfied. Being "active" is not a requirement and is poorly defined. That's not a valid excuse. |
|
This PR (including code diff and PR description) is verbatim as it was when created. I thought you misread this PR, because your comment right after the veto comment actually agreed with this PR. If you prefer to review a PR after all tests pass, please let me know. Then I'll ping you after Jenkins passes, to involve you later and save time. |
|
@rdblue Looks it became the different proposal as you and other people discussed and suggested there, which you didn't technically vote for. It seems reviewed properly by other committers, and I see you were pinged multiple times. I am surprised people actually thumbed up for the comment that simply misunderstood. |
|
@cloud-fan, sorry for my confusion here. From the summary, I didn't realize that this actually added back the streaming modes using private interfaces and mistakenly thought you were trying to add back the public interfaces. That's what I intended to -1 and I didn't realize the mistake until you pointed out that the code hadn't changed, even now. I took the early lack of changes as simply a lack of progress, which led to my surprise when this was merged. Next time, let's try to communicate more clearly. Perhaps you didn't realize why I was confused, too, but it would have been helpful to point out that the solution we agreed on was what was already implemented. And it is still necessary to have a veto reversed; there is no timeout and inferring that a veto has been reversed is unreliable compared to asking for clarification. |
|
@rdblue, the first sentence says "This PR adds a private ..." though. I don't fully understand how -1 can be valid when it's just from misunderstanding.
Probably we should raise our voice if -1 based purely on misunderstanding can stand, and think about revising to clarify it, @rdblue WDYT? At least I can see the intention of describing a possible veto invalidation case in the documentation you pointed out. But I fully agree that we should better clarify potential confusions ahead, and that's also what I targeted at #25310, and hopefully people carefully read and follow, or we can improve it more. I didn't target to force people to do it but probably we should consider that given that I still see many poorly written descriptions that can cause many confusions. |
|
Ryan, also I don't think you're a Spark PMC yet who holds a binding vote. |
|
I agree that if it is a misunderstanding, then a veto isn't binding. And the way to handle that situation is to discuss why the justification was based on a misunderstanding and make sure there is agreement. Misunderstanding goes both ways, so I don't think it would be safe to simply ignore someone if you think they missed something. Also, I don't think that "qualified voter" refers only to PMC members. That's not how it has worked in my experience, though this can vary between communities. A qualified voter is normally a committer, but could also be a contributor. Someone who it's a good idea to listen to if they raise a serious concern -- the purpose is to reach consensus by taking the time to discuss the problem. And in this case, I was pointing out that (I thought) this PR went against a previous community decision to not expose this as a public API. |
|
I think the meaning of "binding" should be consistent - committers can give +1 and merge the code change, because they have "binding vote" for "code change". -1 applies exactly the same. If I understand correctly, the major differences between committers and PMC members mainly come from R&R. Excluding the things which occur in private mailing list, only PMC members have binding vote for "release process" (and maybe decisions for the direction of the project, according to the R&R), but not elsewhere. The role denotes what someone can (should) do, rather than saying someone is more powerful than other one. E.g. there's some unfamiliar case which PMC member is not a committer - I don't think they have binding vote for the code change, according to the guide of committer / PMC member. (Not explicitly documented, but it's straightforward if we focus on R&R) https://www.apache.org/dev/new-committers-guide.html |
|
I guess we agree that the vote based clearly on misunderstanding should not stand in practice. Well, I agree with you all of it. What I would like to discuss about further is the fact that it's still being pointed out here as a violation of ASF policies - does it look clear to all of you?
I am talking about the case that the vote is clearly misunderstood - it was talking about a different thing what the proposal doesn't, and it blocked the RC, and people had to wait indefinitely just pinging. I assume the argument is that a justification clearly based on false is valid.
which doesn't make sense to me. Maybe we should clarify it in the documentation if this looks violating the policy. If we're going to argue about the policy, it at least looks unclear to me if I read it literally from the documentation there - does it look clear to all of you?
|
|
First of all, I'd rather say I'm not in favor of "veto" due to drawbacks described already - personally I have been rarely voted -1 for the code change, even I was enough active to watch the PR daily. As documented in ASF doc, veto is not a something can be disregarded by specific project policy, and its effect is also clearly documented in doc - it kills the process. I mean "technically". So is that a great right/power for the qualified voters? Well, yes, but with great responsibility, by two perspectives,
I guess this is describing the case where -1 is placed without any explanation. In practice when veto voters are knowing the great responsibility of veto, incorrect justification can be corrected by others (they should be open for disagreements) and veto will be cancelled eventually. Time matters.
For me I agree it's confusing one - I think most of vote processes follow the sentence (hence the sentence stays there), but at least it doesn't apply to the vote for code change. Suppose only PMC members have binding votes for code change, then I'd say +1 from committers cannot ensure the code change to pass the vote and be merged in, because they can only do non-binding votes. That said, it makes more sense to say committers have binding votes for code change, and that leads committers have the right for veto for code change. |
|
I agree all with the practical points. I don't target to change our existing practices - I believe what you guys said are what the Apache Spark community is already doing pretty well. If this case is going to be picked up as an example of the violation of ASF policy, I disagree. It looks not very clear to me. If it should be, maybe we should revise the ASF policy to make it clearer. |
What changes were proposed in this pull request?
This PR adds a private
WriteBuildermixin trait:SupportsStreamingUpdate, so that the builtin v2 streaming sinks can still support the update mode.Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression
key1 = key2 + 10.Why are the changes needed?
In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911
It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility.
Does this PR introduce any user-facing change?
Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4
How was this patch tested?
existing tests.