Skip to content
Merged
102 changes: 84 additions & 18 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Discovery subsys.
'''
Discovery subsystem via a "registrar" actor scenarios.

"""
'''
import os
import signal
import platform
Expand Down Expand Up @@ -163,7 +163,10 @@ async def unpack_reg(
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')

return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}


async def spawn_and_check_registry(
Expand Down Expand Up @@ -356,20 +359,24 @@ async def close_chans_before_nursery(
try:
get_reg = partial(unpack_reg, aportal)

async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
async with tractor.open_nursery() as an:
portal1 = await an.start_actor(
name='consumer1',
enable_modules=[__name__],
)
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)

async with (
portal1.open_stream_from(
stream_forever
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2:
) as agen2,
):
async with (
collapse_eg(),
trio.open_nursery() as tn,
Expand All @@ -392,6 +399,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()

finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
Expand Down Expand Up @@ -427,7 +435,7 @@ def test_close_channel_explicit(


@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen,
start_method: str,
use_signal: bool,
Expand All @@ -448,3 +456,61 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)


@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:

await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()



# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.

'''
async def main():

name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
Comment on lines +492 to +497
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Good point - added explicit registry_addrs=[reg_addr] to both the open_nursery() and find_actor() calls so the test is guaranteed to use the remote registrar daemon started by the daemon fixture.

📎 fixed in 85457cb8

ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_addr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry

# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()

trio.run(main)
59 changes: 44 additions & 15 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
async def get_registry(
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
Portal | LocalPortal | None,
Portal|LocalPortal|None,
None,
]:
'''
Expand Down Expand Up @@ -153,7 +153,7 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None,

) -> AsyncGenerator[
UnwrappedAddress|None,
tuple[UnwrappedAddress|None, Portal|None],
None,
]:
Comment on lines 155 to 158
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Updated the docstring to document the new (addr, reg_portal) yield shape - explains when reg_portal is None (peer found locally via get_peer_by_name()) vs when it's a Portal to the registrar.

📎 fixed in 85457cb8

'''
Expand All @@ -167,15 +167,16 @@ async def query_actor(
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
and
actor.is_registrar
Comment on lines 174 to +176
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual line break placement. The 'and' operator is isolated on its own line (line 170), which is inconsistent with common Python style guides. Consider either keeping the condition on one line or breaking it more conventionally (e.g., after the 'and' on the previous line).

Copilot uses AI. Check for mistakes.
):
raise RuntimeError(
'The current actor IS the registry!?'
)

maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers:
yield maybe_peers[0].raddr
yield maybe_peers[0].raddr, None
return

reg_portal: Portal
Expand All @@ -188,8 +189,7 @@ async def query_actor(
'find_actor',
name=name,
)
yield addr

yield addr, reg_portal

@acm
async def maybe_open_portal(
Expand All @@ -204,15 +204,44 @@ async def maybe_open_portal(
async with query_actor(
name=name,
regaddr=addr,
) as addr:
pass
) as (addr, reg_portal):
if not addr:
yield None
return

if addr:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
try:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal

# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry
# from the registrar actor when available.
if reg_portal is not None:
uid: tuple[str, str] = await reg_portal.run_from_ns(
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Valid - fixed the annotation to tuple[str, str]|None and now branch on the return value: log "Deleted stale registry entry" when a uid is returned, or "No registry entry found for addr" when None.

📎 fixed in 85457cb8

'self',
'delete_addr',
addr=addr,
)
log.warning(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
else:
log.warning(
f'Connection to {addr!r} failed'
f' and no registry portal available'
f' to delete stale entry.'
)
yield None


@acm
Expand Down Expand Up @@ -280,7 +309,7 @@ async def find_actor(
if not any(portals):
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
f'No actor {name!r} found registered @ {registry_addrs!r}'
)
yield None
return
Expand Down
32 changes: 28 additions & 4 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from types import ModuleType
import warnings

from bidict import bidict
import trio
from trio._core import _run as trio_runtime
from trio import (
Expand Down Expand Up @@ -1920,10 +1921,10 @@ def __init__(
**kwargs,
) -> None:

self._registry: dict[
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = {}
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
Expand Down Expand Up @@ -2012,7 +2013,8 @@ async def register_actor(
# should never be 0-dynamic-os-alloc
await debug.pause()

self._registry[uid] = addr
# XXX NOTE, value must also be hashable.
self._registry[uid] = tuple(addr)
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Good catch 👍 - switched to bidict.forceput() which silently overwrites any prior (stale) entry mapping a different uid to the same addr. This handles the unclean-shutdown / actor-restart-reusing-addr case without needing explicit exception handling.

📎 fixed in 85457cb8


# pop and signal all waiter events
events = self._waiters.pop(name, [])
Expand All @@ -2029,4 +2031,26 @@ async def unregister_actor(
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')
log.warning(
f'Request to de-register {uid!r} failed?'
)

async def delete_addr(
self,
addr: tuple[str, int|str],
) -> tuple[str, str]|None:
uid: tuple | None = self._registry.inverse.pop(
addr,
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'

log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
Loading