-
-
Notifications
You must be signed in to change notification settings - Fork 378
Make worker action cancellable #1472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f84b7e5 to
ae22d25
Compare
c821df9 to
5dc0701
Compare
5211c9b to
bcaf884
Compare
giskard/utils/__init__.py
Outdated
| return | ||
| LOGGER.info("Starting worker pool...") | ||
| self.pool = ProcessPoolExecutor(*args, **kwargs) | ||
| self.max_workers = min(max_workers, 2) if max_workers is not None else os.cpu_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small improvement: it'd be better to move 2 to the giskard.settings.Settings. In this case we'd get a possibility to change it from an env variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I also changed min to max also, since we always want at least min_workers
| # Create independant process pool | ||
| # If we kill a running process, it breaking the Process pool, making it unusable | ||
| one_shot_executor = ProcessPoolExecutor(max_workers=1) | ||
| pid = one_shot_executor.submit(os.getpid).result(timeout=5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it, the comment above said
.result(timeout=timeout) => Not working with WSL and python 3.10
here it's expected to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should remove the timeout here anyway, no reason for basic os.getpid to fail or take a long time.
Basically, when I tested it, in the killer thread, the timeout was not respected, and it never stopped, so I had to do the loop.
Btw, the timeout was working out on MacOS with python 3.11.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the overall killing mechanism could be simplified to avoid having a killer thread and a cancellable counter.
We could use an inter-process data structure to communicate pids between a spawned worker process and the main process: multiprocessing.Queue for example.
In this case the first thing the pool process will do is identify its pid and add it to queue, then do the actual work.
In the main process we could use concurrent.futures._base.as_completed to wrap the future and add a timeout to it and then call .add_done_callback(result_handler) as we do now.
When the timeout is expired we'd find a PID related to a given task and kill it from the main process.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hartorn , actually, it looks like https://pypi.org/project/Pebble/ does exactly what we need:
from pebble import ProcessPool
pool = ProcessPool(max_workers=5)
pool.schedule(long_fn, args=(a,b,c), timeout=1).add_done_callback(done_callback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the overall killing mechanism could be simplified to avoid having a killer thread and a cancellable counter.
We could use an inter-process data structure to communicate pids between a spawned worker process and the main process:
multiprocessing.Queuefor example.In this case the first thing the pool process will do is identify its pid and add it to queue, then do the actual work. In the main process we could use
concurrent.futures._base.as_completedto wrap the future and add a timeout to it and then call.add_done_callback(result_handler)as we do now.When the timeout is expired we'd find a PID related to a given task and kill it from the main process.
WDYT?
Cancellable counter is only for logs, we can remove it if we want.
I just wanted to check if that was working, or eventually to avoid having too many process launched.
For the callback of a future, you need everything to be pickable, so you cannot have lock, processPool, and so on.
So I don't know how to shutdown properly the one shot executor we ran.
Also, it cannot be the main executor, since killing a process is breaking the pool, which only raises BrokenPoolException after that.
I'm pretty sure we could avoid the thread is the whole code we were running was async, since a coroutine could do this job, but here I would not be confident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hartorn , actually, it looks like https://pypi.org/project/Pebble/ does exactly what we need:
from pebble import ProcessPool pool = ProcessPool(max_workers=5) pool.schedule(long_fn, args=(a,b,c), timeout=1).add_done_callback(done_callback)
Looking at it !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I saw, they are kinda doing the same stuff, when using a separate process, they have a handler thread for watching it and handling timeout.
Should we get this merged and change to use it ? Or want me to switch it ?
Although I'm a bit concerned it's not that much used, the code looks clean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, while having a clean API and being easy to use Pebble is a LGPL library so we won't be able to use it (it might be the reason why it's not that widely adopted).
I also read their code and found similarities with your implementation. I suggest we stick to your current code (and merge it since we need these changes ASAP).
As an improvement, I think in a separate PR we could inspire from Pebble's API and also encapsulate call_in_pool into our custom ProcessPool implementation and have a schedule method, WDYT?
In the end Pebble is just a ~5k LOC library
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shame, the code was working well with it also.
Sure, seems a good idea to have our own worker implementation. I'm sure it will also help us to improve performance of the lib
59c29b9 to
0d23d4e
Compare
Closes #GSK-1863
0d23d4e to
3f808f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Kudos, SonarCloud Quality Gate passed! |








Closes #GSK-1863
Description
Related Issue
Type of Change
Checklist
CODE_OF_CONDUCT.mddocument.CONTRIBUTING.mdguide.make codestyle.