-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18838][CORE] Introduce blocking strategy for LiveListener #18004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Jenkins, test this please |
|
@markhamstra, @vanzin: Can I have a review please ? |
## What changes were proposed in this pull request? When the queue of the LiveListener is full, events are dropped in an arbitrary fashion. But some of them can be crucial for the good execution of the job. the job doesn t in general fail immediately, but presents incoherent state before dying. In this changelist: the queue implementation to increase a bit the performances (reduce object allocation & decrease lock usage) A "waiting for space" strategy is introduced and can be used instead of the default and current "dropping" strategy. The new strategy even if it does not resolve the "real" problem (the small dequeing rate) bring a lot more stability to the jobs with a huge event production rate (i.e jobs with many partitions). ## How was this patch tested? Existing unit tests have been enriched to run on both strategy. + manual tests have been run on the cluster
|
@cloud-fan Can I have a review please ? |
|
I'm fine to improve the performance, but I hesitate to change the behavior from "drop" to "block", is it really safe to do so? |
|
I'm more fond of an approach like #16291, but the owner of that PR seems to have abandoned it. You seem to be singling out the event log listener here, while other listeners can be just as harmful; the UI listeners to synchronization and can block the bus when a UI request comes in, for example. Having separate buses for groups of listeners is a more generic way of doing what you did for the event logger here. Then you could potentially have different configs for each group (e.g. allow the event log thread to back up a bit more), although maybe that's overkill. BTW you don't even call out the event logging listener changes in your PR description. |
## What changes were proposed in this pull request? An asynchronous version of the EventLoggingListener has been introduced This listener is the slowest one of all standard ones and is clearly the bottleneck of the liveListenerBus dequeing process ## How was this patch tested? Existing unit tests have been enriched to run on both version. + manual tests have been run on the cluster
|
@cloud-fan The block strategy is not the default one ! It is safe to block while any listener does not add messages to the queue (which is the case currently). I am not fan of blocking the queue too, but in my mind this strategy is a safety net while all the performance issues of the listener bus have been solved. It is a temporary and clearly not ideal solution (which is solving all the performance issue) but it is better than dropping messages without failing the job. @vanzin I think that approach like #16291 is a very bad idea. It make the design more complex, force to duplicate queues, change the synchronization paradigm between listener, ... The current design is the good one I think. It is a kind of "reactor pattern", which works pretty well (see nodejs, vertx.io, ...) But the golden rule is to not block the dequeing process ! and this is the responsibility of each listener unfortunately. We can try to enforce this by providing an abstract asynchronous listener for example of by mesuring in real time the listener execution time and logging error or warning for the most time consuming listeners. Some listeners which make trivial work (in terms of performances) do not need to be asynchronous (overkill, and less efficient). It is the case for every listener which react only to very rare messages (like only to job start, executor add or remove, ...) So Listerner should be migrate to an asynchronous version only after case by case analysis. |
That's exactly why the other approach is better. It's more complex and may use more memory, but it allows you to choose "special" listeners that cannot miss events and place them in a separate queue. For example, if the dynamic allocation listener loses events, things go south pretty quick. And a user-installed listener that misbehaves may negatively affect that. Multiple queues mitigate that issue; the user listener will have limited effect on the dynamic allocation listeners (it will still fight for CPU, but most probably things will behave better).
Then why are you against the other approach? It's basically making listeners "asynchronous" without having to bake that logic into every listener. |
|
I am against the other approach for multiple reasons. But the key one is that it will change the synchronization paradigm, and clearly it will change the behavior of the current listeners and maybe causing bugs. For example, the StorageStatusListener and the StorageListener are dependent. The second one use the "result" of the first one. If you put them in different thread, it will for sure change the current behavior. Will it cause fatal bug, I do not know. For the other significant (in term of performance) listener - the StorageListener - all of the key aspects are very different: The key thing in the event logging listener is the ability to not queue the blockUpdated messages and so be able to "not consider" them. |
If you read the discussion in the other PR, we pushed back and the current design avoids exactly those issues. That's why there isn't a queue for each listener, but multiple listener share a single queue.
You're talking policy, not mechanism. Asynchronous means asynchronous; there's a producer and a consumer. If you think that different consumers need different policies for how to consume/block/discard data, you can implement that.
That's such a small gain that I'm not sure why it's such a big deal... in any case, see above, policy vs. mechanism. Your current approach pushes all the burden of handling things asynchronously to the listeners. That's a huge source of duplicated code if you plan to do this for multiple listeners.
I'd just like to point out that in a separate bug I'm basically deleting all these UI listeners and replacing them with a new one, so if you're spending too much time looking at them, you'll probably be wasting a lot of that effort. |
|
There is no global issue with the current queue / listeners implementation. Conversely, it works pretty well and it is pretty simple !
Fixing only one of the 2 precise performance issues will increase a lot the dequeing rate, allowing to use Spark 2.x with a pretty decent volume of data (which is clearly not the case right now). For the EventLogListener I think that I have in this changelist a pretty simple fix, very localized, with no impact on the rest of the code, ... , which completely fix the performance issue in a quite standard way (writing asynchronously to a file is very standard). For The storageStatusListener / storageListener couple, I indeed noticed that the classes are flagged as deprecated. I do not plan to modify them yet (I do not really need that). I will wait for your new listener. I will be able to test its performance if you want on my test case (10 000 partitions, 1 000 executors) For the issue that could appears with the external listener. |
You can't make that statement. Listeners are a public API, and anyone can install their own listeners, and they may not be aware of the damage they're doing. It may very well be that internal listeners are mostly well behaved, but at least separating user listeners from internal listeners is a worthy goal. Also, the current UI listeners do synchronization. If someone makes a UI / API request in the middle of an event storm, that will block the listener bus and cause a lot of events to be lost. I've seen the stage page take several seconds in the render code while holding that lock.
I don't have an issue with adding code that makes it easy to write asynchronous listeners, but I do have an issue that you've made that change non-reusable. If instead you make that into an abstract class, or move the logic to some place where other listeners can make use of it, that's a much better change, and serves as a building block for fixing other issues with the event bus. (That's not saying your current implementation does not have problems, just that regardless of it having problems, it should not live inside a single listener implementation.) |
|
Honestly, your last comment is so unfair. You commented on a simple phrase out of its context as I did not mentioned what is in my mind the reasonable first step for the external listeners. For the UI listener, You said yourself in your last comments, that it does not worth to improve them because you will change them soon. I did some tests to avoid the synchronization you mentioned (by using atomic reference instead), it is in my fork but I did not push it in this PR, because I am not so satisfied of this commit & it worth another PR & You will change them soon. For extracting a basic block to simply build asynchronous listener, I am ready to do that, but I do not see how to do that in a easy and useful way. Constructive comments on the code to achieve that concretely are more than welcome ! |
Wait, which comment? I pointed out places where you current design has limitations. I'm asking for a way to make it easy to create asynchronous listeners, because that makes it easy to solve important problems (like separating user listeners from others, or singling out very sensitive listeners like the dynamic allocation one that would benefit from being on its own separate thread). I mentioned the UI listeners because you mentioned that "the existing listeners behave well" when that's not true. Yeah I'm changing them in some separate stuff I'm working on, but that doesn't mean the code I'm adding magically solves all issues. I'd still like to have certain listeners not be affected by some other listener suddenly running into a slow path. And the very last comment I made is still true. I've actually looked at your code and it does have issues (I noticed at least one race after a very quick look). But I'm trying to focus on the architecture first before doing a thorough code review.
Define the features you want and create an abstract class that implements that. So far I've seen you mention (i) process events in a separate thread and (ii) be able to filter which events get queued in that separate thread. Start from there. Or, basically, instead of extending the event logging listener, create a base class for the new functionality you want, and make the existing listener extend that class. a.k.a. the opposite of what you're doing. |
You mentioned the issue with the external listeners as I did not mention this case. But I clearly explain what I think is the best option for this potential issue: log very clearly which external listener is causing the issue. It is the responsibility of the user / and or the writer of the custom listener for its performance and not the spark one. But helping to quickly identifying the issue is I think a good idea and should not be very expensive.
I am the one that propose to NOT change the global design of listener bus. You want to change it. This design (reactor pattern) makes its proof in many other software (nodejs, vertx.io) with one known limitation, listener should be carefully implemented. It is why I want to improve things at the listener level. Are we agree on that or are we still discussing the design with multiple duplicated queues ?
Please point me out the race condition that you find For the concrete implementation of the asynchronous event logging listener I preferred to factorized it with the synchronous one instead of creating an upper abstract asynchronous listener. This choice is indeed disputable, but you omitted this factorization in your comment. |
Well, you stated your opinion, and I disagree with it. I've tried to explain why I disagree with it and propose solutions that I consider better, but all you've done so far is refuse to accept any feedback.
That's unrealistic. I'll go back again to my favorite example, the dynamic allocation listener. Even if all listeners behave well and don't block for arbitrary periods of time, you still have a problem: the more listeners you add, by definition, the more expensive the listener bus becomes, and having everything in a single thread will eventually lead to issues. And that goes back to why I've been suggesting the different approach in the first place. It's bad design to force all these constraints on a whole set of code, some of which is not even controlled by Spark, just so that one single listener can operate well. Instead, why not have a system where that single listener - and any other really - can be treated especially and avoid all of those issues? Allowing arbitrary listeners to be asynchronous (basically making your code generic, or implementing the solution in the other PR I mentioned) solves that issue. The dynamic allocation listener extends whatever class makes it asynchronous, or register itself to the bus with whatever API gives it a separate "event queue", and now it's not affected by other listeners.
Of course it's related. Those synchronized methods are being called by the listener bus; meaning they can block the listener bus and cause events to be dropped if they're arriving at a high rate.
How is that related to the limitations of your current design? I've given you example after example of why fixing just the event logging listener is the wrong approach but for some reason you don't see that.
As I said, I don't want to review the code because I disagree with your current approach. Pointing out individual issues with the code will not help at this point in time. Once we can agree on how to actually solve the problem you're trying to solve in a more generic and maintainable manner, then we can look at the code. |
|
Sorry, forgot one comment.
Yes, the implementation might become a little more complex (I disagree with "much more"), but as I've tried to say multiple times, there are concrete needs and it's a much better solution to the more general problem. It also happens to make the code more unit testable by isolating that behavior in a separate class. |
|
So. |
|
So can this PR be closed now? |
|
yes |
What changes were proposed in this pull request?
When the queue of the LiveListener is full, events are dropped in an arbitrary fashion.
But some of them can be crucial for the good execution of the job. The job doesn t in general fail immediately, but presents incoherent state before dying.
In this pull request:
the queue implementation was refactored to increase a bit the performances
(reduce object allocation & decrease lock usage)
A "waiting for space" strategy is introduced and can be used instead of
the default and current "dropping" strategy.
The new strategy even if it does not resolve the "real" problem
(the small dequeing rate) brings a lot more stability to the jobs with a
huge event production rate (i.e jobs with many partitions).
The EventLogListener which is the most time-consuming listener (see the attached files in the Jira issue for timings) is now asynchronous (not the default implementation, but a new version which is configurable with a simple parameter)
How was this patch tested?
Existing unit tests have been enriched to run on both strategy.