-
Notifications
You must be signed in to change notification settings - Fork 14
Actor state via messages #190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9fae34a
da27d96
f715a0c
a10c4b1
dae154e
582eda4
47651ea
5db737d
2ed071c
5da86a0
ce61230
7f8c5cd
47d7b60
70c7e09
4a4a786
f1acbd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| import trio | ||
| import tractor | ||
|
|
||
|
|
||
| class Restart(Exception): | ||
| """Restart signal""" | ||
|
|
||
|
|
||
| async def sleep_then_restart(): | ||
| actor = tractor.current_actor() | ||
| print(f'{actor.uid} starting up!') | ||
| await trio.sleep(0.5) | ||
| raise Restart('This is a restart signal') | ||
|
|
||
|
|
||
| async def signal_restart_whole_actor(): | ||
| actor = tractor.current_actor() | ||
| print(f'{actor.uid} starting up!') | ||
| await trio.sleep(0.5) | ||
| return 'restart_me' | ||
|
|
||
|
|
||
| async def respawn_remote_task(portal): | ||
| # start a task in the actor at the other end | ||
| # of the provided portal, when it signals a restart, | ||
| # restart it.. | ||
|
|
||
| # This is much more efficient then restarting the undlerying | ||
| # process over and over since the python interpreter runtime | ||
| # stays up and we just submit a new task to run (which | ||
| # is just the original one we submitted repeatedly. | ||
| while True: | ||
| try: | ||
| await portal.run(sleep_then_restart) | ||
| except tractor.RemoteActorError as error: | ||
| if 'Restart' in str(error): | ||
| # respawn the actor task | ||
| continue | ||
|
|
||
|
|
||
| async def supervisor(): | ||
|
|
||
| async with tractor.open_nursery() as tn: | ||
|
|
||
| p0 = await tn.start_actor('task_restarter', enable_modules=[__name__]) | ||
|
|
||
| # Yes, you can do this from multiple tasks on one actor | ||
| # or mulitple lone tasks in multiple subactors. | ||
| # We'll show both. | ||
|
|
||
| async with trio.open_nursery() as n: | ||
| # we'll doe the first as a lone task restart in a daemon actor | ||
| for i in range(4): | ||
| n.start_soon(respawn_remote_task, p0) | ||
|
|
||
| # Open another nursery that will respawn sub-actors | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope x this. no extra nursery required.. originally it had that but we don't need it. |
||
|
|
||
| # spawn a set of subactors that will signal restart | ||
| # of the group of processes on each failures | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still need to add a loop around the nursery to restart everything if we get a certain error raised. this starts getting into a more formal supervisor strategy api that we have yet to design.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm we might have to plug into the nursery internals a bit more to get granular control on each sub-proc. needs a little more pondering fo sho.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh and common strat names from erlang:
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually i guess you'd want to pass the nursery into the strat so that you get the granular control.. hmm that might compose nicely actually. Then in theory you could use a stack to compose multiple strats? Woo this actually will be interesting i'm thinking. |
||
| portals = [] | ||
|
|
||
| # start initial subactor set | ||
| for i in range(4): | ||
| p = await tn.run_in_actor(signal_restart_whole_actor) | ||
| portals.append(p) | ||
|
|
||
| # now wait on results and respawn actors | ||
| # that request it | ||
| while True: | ||
|
|
||
| for p in portals: | ||
| result = await p.result() | ||
|
|
||
| if result == 'restart_me': | ||
| print(f'restarting {p.channel.uid}') | ||
| await p.cancel_actor() | ||
| await trio.sleep(0.5) | ||
| p = await tn.run_in_actor(signal_restart_whole_actor) | ||
| portals.append(p) | ||
|
|
||
| # this will block indefinitely so user must | ||
| # cancel with ctrl-c | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| trio.run(supervisor) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| from itertools import cycle | ||
| from pprint import pformat | ||
| from dataclasses import dataclass, field | ||
|
|
||
| import trio | ||
| import tractor | ||
|
|
||
|
|
||
| @dataclass | ||
| class MyProcessStateThing: | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of this what are you after? A function that creates some object and then makes that object mutateable from another inbound message?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you want something like We can also accomplish this but it will require a slight bit more machinery. |
||
| state: dict = field(default_factory=dict) | ||
|
|
||
| def update(self, msg: dict): | ||
| self.state.update(msg) | ||
|
|
||
|
|
||
| _actor_state = MyProcessStateThing() | ||
|
|
||
|
|
||
| async def update_local_state(msg: dict): | ||
| """Update process-local state from sent message and exit. | ||
|
|
||
| """ | ||
| actor = tractor.current_actor() | ||
|
|
||
| global _actor_state | ||
|
|
||
|
|
||
| print(f'Yo we got a message {msg}') | ||
|
|
||
| # update the "actor state" | ||
| _actor_state.update(msg) | ||
|
|
||
| print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}') | ||
|
|
||
| # we're done so exit this task running in the subactor | ||
|
|
||
|
|
||
| async def main(): | ||
| # Main process/thread that spawns one sub-actor and sends messages | ||
| # to it to update it's state. | ||
|
|
||
| actor_portals = [] | ||
|
|
||
| # XXX: that subactor can **not** outlive it's parent, this is SC. | ||
| async with tractor.open_nursery() as tn: | ||
|
|
||
| portal = await tn.start_actor('even_boy', enable_modules=[__name__]) | ||
| actor_portals.append(portal) | ||
|
|
||
| portal = await tn.start_actor('odd_boy', enable_modules=[__name__]) | ||
| actor_portals.append(portal) | ||
|
|
||
| for i, (count, portal) in enumerate( | ||
| zip(range(100), cycle(actor_portals)) | ||
| ): | ||
| await portal.run(update_local_state, msg={f'msg_{i}': count}) | ||
|
|
||
| # blocks here indefinitely synce we spawned "daemon actors" using | ||
| # .start_actor()`, you'll need to control-c to cancel. | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| trio.run(main) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| import inspect | ||
| from typing import Any | ||
| from functools import partial | ||
| from contextlib import asynccontextmanager, AsyncExitStack | ||
| from itertools import cycle | ||
| from pprint import pformat | ||
|
|
||
| import trio | ||
| import tractor | ||
|
|
||
|
|
||
| log = tractor.log.get_logger(__name__) | ||
|
|
||
|
|
||
| class ActorState: | ||
| """Singlteton actor per process. | ||
|
|
||
| """ | ||
| # this is a class defined variable and is thus both | ||
| # singleton across object instances and task safe. | ||
| state: dict = {} | ||
|
|
||
| def update(self, msg: dict) -> None: | ||
| _actor = tractor.current_actor() | ||
|
|
||
| print(f'Yo we got a message {msg}') | ||
| self.state.update(msg) | ||
|
|
||
| print(f'New local "state" for {_actor.uid} is {pformat(self.state)}') | ||
|
|
||
| def close(self): | ||
| # gives headers showing which process and task is active | ||
| log.info('Actor state is closing') | ||
|
|
||
| # if we wanted to support spawning or talking to other | ||
| # actors we can do that using a portal map collection? | ||
| # _portals: dict = {} | ||
|
|
||
|
|
||
| async def _run_proxy_method( | ||
| meth: str, | ||
| msg: dict, | ||
| ) -> Any: | ||
| """Update process-local state from sent message and exit. | ||
|
|
||
| """ | ||
| # Create a new actor instance per call. | ||
| # We can make this persistent by storing it either | ||
| # in a global var or are another clas scoped variable? | ||
| # If you want it somehow persisted in another namespace | ||
| # I'd be interested to know "where". | ||
| actor = ActorState() | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get this isn't ideal (though it really is no performance hit) in an idiomatic python sense, but the alternative is some other way to store this instance across function-task calls. The normal way would be a module level variable (since they're "globally scoped") but I guess in theory you could have a function that stays alive and constantly passes the instance to other tasks over a memory channel - still in that case how does the new task get access to the channel handle?). The alternative is a module level class which has a class level variable which is again globally scoped on the class. |
||
| if meth != 'close': | ||
| return getattr(actor, meth)(msg) | ||
| else: | ||
| actor.close() | ||
|
|
||
| # we're done so exit this task running in the subactor | ||
|
|
||
|
|
||
| class MethodProxy: | ||
| def __init__( | ||
| self, | ||
| portal: tractor._portal.Portal | ||
| ) -> None: | ||
| self._portal = portal | ||
|
|
||
| async def _run_method( | ||
| self, | ||
| *, | ||
| meth: str, | ||
| msg: dict, | ||
| ) -> Any: | ||
| return await self._portal.run( | ||
| _run_proxy_method, | ||
| meth=meth, | ||
| msg=msg | ||
| ) | ||
|
|
||
|
|
||
| def get_method_proxy(portal, target=ActorState) -> MethodProxy: | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In cases anyone gets cranky about this, from
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| proxy = MethodProxy(portal) | ||
|
|
||
| # mock all remote methods | ||
| for name, method in inspect.getmembers( | ||
| target, predicate=inspect.isfunction | ||
| ): | ||
| if '_' == name[0]: | ||
| # skip private methods | ||
| continue | ||
|
|
||
| else: | ||
| setattr(proxy, name, partial(proxy._run_method, meth=name)) | ||
|
|
||
| return proxy | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def spawn_proxy_actor(name): | ||
|
|
||
| # XXX: that subactor can **not** outlive it's parent, this is SC. | ||
| async with tractor.open_nursery( | ||
| debug_mode=True, | ||
| # loglevel='info', | ||
| ) as tn: | ||
|
|
||
| portal = await tn.start_actor(name, enable_modules=[__name__]) | ||
|
|
||
| proxy = get_method_proxy(portal) | ||
|
|
||
| yield proxy | ||
|
|
||
| await proxy.close(msg=None) | ||
|
|
||
|
|
||
| async def main(): | ||
| # Main process/thread that spawns one sub-actor and sends messages | ||
| # to it to update it's state. | ||
|
|
||
| try: | ||
| stack = AsyncExitStack() | ||
|
|
||
| actors = [] | ||
| for name in ['even', 'odd']: | ||
|
|
||
| actor_proxy = await stack.enter_async_context( | ||
| spawn_proxy_actor(name + '_boy') | ||
| ) | ||
| actors.append(actor_proxy) | ||
|
|
||
| # spin through the actors and update their states | ||
| for i, (count, actor) in enumerate( | ||
| zip(range(100), cycle(actors)) | ||
| ): | ||
| # Here we call the locally patched `.update()` method of the | ||
| # remote instance | ||
|
|
||
| # NOTE: the instance created each call here is currently | ||
| # a new object - to persist it across `portal.run()` calls | ||
| # we need to store it somewhere in memory for access by | ||
| # a new task spawned in the remote actor process. | ||
| await actor.update(msg={f'msg_{i}': count}) | ||
|
|
||
| # blocks here indefinitely synce we spawned "daemon actors" using | ||
| # .start_actor()`, you'll need to control-c to cancel. | ||
|
|
||
| finally: | ||
| await stack.aclose() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| trio.run(main) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brutal typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's clearly 3 tasks that are constantly restarted...