diff --git a/tests/test_discovery.py b/tests/test_discovery.py index b76e3706a..fcf731564 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,7 +1,7 @@ -""" -Discovery subsys. +''' +Discovery subsystem via a "registrar" actor scenarios. -""" +''' import os import signal import platform @@ -163,7 +163,10 @@ async def unpack_reg( else: msg = await actor_or_portal.run_from_ns('self', 'get_registry') - return {tuple(key.split('.')): val for key, val in msg.items()} + return { + tuple(key.split('.')): val + for key, val in msg.items() + } async def spawn_and_check_registry( @@ -356,20 +359,24 @@ async def close_chans_before_nursery( try: get_reg = partial(unpack_reg, aportal) - async with tractor.open_nursery() as tn: - portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) - portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) - - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from( - stream_forever - ) as agen1: - async with portal2.open_stream_from( + async with tractor.open_nursery() as an: + portal1 = await an.start_actor( + name='consumer1', + enable_modules=[__name__], + ) + portal2 = await an.start_actor( + 'consumer2', + enable_modules=[__name__], + ) + + async with ( + portal1.open_stream_from( + stream_forever + ) as agen1, + portal2.open_stream_from( stream_forever - ) as agen2: + ) as agen2, + ): async with ( collapse_eg(), trio.open_nursery() as tn, @@ -392,6 +399,7 @@ async def close_chans_before_nursery( # also kill off channels cuz why not await agen1.aclose() await agen2.aclose() + finally: with trio.CancelScope(shield=True): await trio.sleep(1) @@ -427,7 +435,7 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) -def test_close_channel_explicit_remote_arbiter( +def test_close_channel_explicit_remote_registrar( daemon: subprocess.Popen, start_method: str, use_signal: bool, @@ -448,3 +456,65 @@ def test_close_channel_explicit_remote_arbiter( remote_arbiter=True, ), ) + + +@tractor.context +async def kill_transport( + ctx: tractor.Context, +) -> None: + + await ctx.started() + actor: tractor.Actor = tractor.current_actor() + actor.ipc_server.cancel() + await trio.sleep_forever() + + + +# @pytest.mark.parametrize('use_signal', [False, True]) +def test_stale_entry_is_deleted( + debug_mode: bool, + daemon: subprocess.Popen, + start_method: str, + reg_addr: tuple, +): + ''' + Ensure that when a stale entry is detected in the registrar's + table that the `find_actor()` API takes care of deleting the + stale entry and not delivering a bad portal. + + ''' + async def main(): + + name: str = 'transport_fails_actor' + _reg_ptl: tractor.Portal + an: tractor.ActorNursery + async with ( + tractor.open_nursery( + debug_mode=debug_mode, + registry_addrs=[reg_addr], + ) as an, + tractor.get_registry(reg_addr) as _reg_ptl, + ): + ptl: tractor.Portal = await an.start_actor( + name, + enable_modules=[__name__], + ) + async with ptl.open_context( + kill_transport, + ) as (first, ctx): + async with tractor.find_actor( + name, + registry_addrs=[reg_addr], + ) as maybe_portal: + # because the transitive + # `._discovery.maybe_open_portal()` call should + # fail and implicitly call `.delete_addr()` + assert maybe_portal is None + registry: dict = await unpack_reg(_reg_ptl) + assert ptl.chan.aid.uid not in registry + + # should fail since we knocked out the IPC tpt XD + await ptl.cancel_actor() + await an.cancel() + + trio.run(main) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index bf4d066a4..11673267c 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -60,7 +60,7 @@ async def get_registry( addr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - Portal | LocalPortal | None, + Portal|LocalPortal|None, None, ]: ''' @@ -153,21 +153,27 @@ async def query_actor( regaddr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - UnwrappedAddress|None, + tuple[UnwrappedAddress|None, Portal|LocalPortal|None], None, ]: ''' Lookup a transport address (by actor name) via querying a registrar listening @ `regaddr`. - Returns the transport protocol (socket) address or `None` if no - entry under that name exists. + Yields a `tuple` of `(addr, reg_portal)` where, + - `addr` is the transport protocol (socket) address or `None` if + no entry under that name exists, + - `reg_portal` is the `Portal` (or `LocalPortal` when the + current actor is the registrar) used for the lookup (or + `None` when the peer was found locally via + `get_peer_by_name()`). ''' actor: Actor = current_actor() if ( name == 'registrar' - and actor.is_registrar + and + actor.is_registrar ): raise RuntimeError( 'The current actor IS the registry!?' @@ -175,10 +181,10 @@ async def query_actor( maybe_peers: list[Channel]|None = get_peer_by_name(name) if maybe_peers: - yield maybe_peers[0].raddr + yield maybe_peers[0].raddr, None return - reg_portal: Portal + reg_portal: Portal|LocalPortal regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0] async with get_registry(regaddr) as reg_portal: # TODO: return portals to all available actors - for now @@ -188,8 +194,7 @@ async def query_actor( 'find_actor', name=name, ) - yield addr - + yield addr, reg_portal @acm async def maybe_open_portal( @@ -204,15 +209,49 @@ async def maybe_open_portal( async with query_actor( name=name, regaddr=addr, - ) as addr: - pass + ) as (addr, reg_portal): + if not addr: + yield None + return - if addr: - async with _connect_chan(addr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + try: + async with _connect_chan(addr) as chan: + async with open_portal(chan) as portal: + yield portal + + # most likely we were unable to connect the + # transport and there is likely a stale entry in + # the registry actor's table, thus we need to + # instruct it to clear that stale entry and then + # more silently (pretend there was no reason but + # to) indicate that the target actor can't be + # contacted at that addr. + except OSError: + # NOTE: ensure we delete the stale entry + # from the registrar actor when available. + if reg_portal is not None: + uid: tuple[str, str]|None = await reg_portal.run_from_ns( + 'self', + 'delete_addr', + addr=addr, + ) + if uid: + log.warning( + f'Deleted stale registry entry !\n' + f'addr: {addr!r}\n' + f'uid: {uid!r}\n' + ) + else: + log.warning( + f'No registry entry found for addr: {addr!r}' + ) + else: + log.warning( + f'Connection to {addr!r} failed' + f' and no registry portal available' + f' to delete stale entry.' + ) + yield None @acm @@ -280,7 +319,7 @@ async def find_actor( if not any(portals): if raise_on_none: raise RuntimeError( - f'No actor "{name}" found registered @ {registry_addrs}' + f'No actor {name!r} found registered @ {registry_addrs!r}' ) yield None return diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 93fb68fda..e0174f0c1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -68,6 +68,7 @@ from types import ModuleType import warnings +from bidict import bidict import trio from trio._core import _run as trio_runtime from trio import ( @@ -1920,10 +1921,10 @@ def __init__( **kwargs, ) -> None: - self._registry: dict[ + self._registry: bidict[ tuple[str, str], UnwrappedAddress, - ] = {} + ] = bidict({}) self._waiters: dict[ str, # either an event to sync to receiving an actor uid (which @@ -2012,7 +2013,13 @@ async def register_actor( # should never be 0-dynamic-os-alloc await debug.pause() - self._registry[uid] = addr + # XXX NOTE, value must also be hashable AND since + # `._registry` is a `bidict` values must be unique; use + # `.forceput()` to replace any prior (stale) entries + # that might map a different uid to the same addr (e.g. + # after an unclean shutdown or actor-restart reusing + # the same address). + self._registry.forceput(uid, tuple(addr)) # pop and signal all waiter events events = self._waiters.pop(name, []) @@ -2029,4 +2036,29 @@ async def unregister_actor( uid = (str(uid[0]), str(uid[1])) entry: tuple = self._registry.pop(uid, None) if entry is None: - log.warning(f'Request to de-register {uid} failed?') + log.warning( + f'Request to de-register {uid!r} failed?' + ) + + async def delete_addr( + self, + addr: tuple[str, int|str]|list[str|int], + ) -> tuple[str, str]|None: + # NOTE: `addr` arrives as a `list` over IPC + # (msgpack deserializes tuples -> lists) so + # coerce to `tuple` for the bidict hash lookup. + uid: tuple[str, str]|None = self._registry.inverse.pop( + tuple(addr), + None, + ) + if uid: + report: str = 'Deleting registry-entry for,\n' + else: + report: str = 'No registry entry for,\n' + + log.warning( + report + + + f'{addr!r}@{uid!r}' + ) + return uid