Skip to content
Merged
106 changes: 88 additions & 18 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Discovery subsys.
'''
Discovery subsystem via a "registrar" actor scenarios.

"""
'''
import os
import signal
import platform
Expand Down Expand Up @@ -163,7 +163,10 @@ async def unpack_reg(
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')

return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}


async def spawn_and_check_registry(
Expand Down Expand Up @@ -356,20 +359,24 @@ async def close_chans_before_nursery(
try:
get_reg = partial(unpack_reg, aportal)

async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
async with tractor.open_nursery() as an:
portal1 = await an.start_actor(
name='consumer1',
enable_modules=[__name__],
)
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)

async with (
portal1.open_stream_from(
stream_forever
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2:
) as agen2,
):
async with (
collapse_eg(),
trio.open_nursery() as tn,
Expand All @@ -392,6 +399,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()

finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
Expand Down Expand Up @@ -427,7 +435,7 @@ def test_close_channel_explicit(


@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen,
start_method: str,
use_signal: bool,
Expand All @@ -448,3 +456,65 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)


@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:

await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()



# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.

'''
async def main():

name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
Comment on lines +492 to +497
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Good point - added explicit registry_addrs=[reg_addr] to both the open_nursery() and find_actor() calls so the test is guaranteed to use the remote registrar daemon started by the daemon fixture.

📎 fixed in 85457cb8

ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(
name,
registry_addrs=[reg_addr],
) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_addr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry

# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()

trio.run(main)
72 changes: 55 additions & 17 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
async def get_registry(
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
Portal | LocalPortal | None,
Portal|LocalPortal|None,
None,
]:
'''
Expand Down Expand Up @@ -153,29 +153,34 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None,

) -> AsyncGenerator[
UnwrappedAddress|None,
tuple[UnwrappedAddress|None, Portal|None],
None,
]:
Comment on lines 155 to 158
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Updated the docstring to document the new (addr, reg_portal) yield shape - explains when reg_portal is None (peer found locally via get_peer_by_name()) vs when it's a Portal to the registrar.

📎 fixed in 85457cb8

'''
Lookup a transport address (by actor name) via querying a registrar
listening @ `regaddr`.

Returns the transport protocol (socket) address or `None` if no
entry under that name exists.
Yields a `tuple` of `(addr, reg_portal)` where,
- `addr` is the transport protocol (socket) address or `None` if
no entry under that name exists,
- `reg_portal` is the `Portal` to the registrar used for the
lookup (or `None` when the peer was found locally via
`get_peer_by_name()`).
Comment on lines 155 to +169
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_actor() can yield a LocalPortal from get_registry() when current_actor().is_registrar is true, but the yield type is annotated as Portal|None and the docstring refers to a Portal. Please widen the annotation (and docstring) to include LocalPortal so the type contract matches actual behavior.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Valid — LocalPortal is a standalone class (not a Portal subclass), and get_registry() yields one when the current actor is the registrar querying for non-registrar names.

Widened the yield type annotation, reg_portal local annotation, and docstring to include LocalPortal.

📎 fixed in e71eec07


'''
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
and
actor.is_registrar
Comment on lines 174 to +176
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual line break placement. The 'and' operator is isolated on its own line (line 170), which is inconsistent with common Python style guides. Consider either keeping the condition on one line or breaking it more conventionally (e.g., after the 'and' on the previous line).

Copilot uses AI. Check for mistakes.
):
raise RuntimeError(
'The current actor IS the registry!?'
)

maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers:
yield maybe_peers[0].raddr
yield maybe_peers[0].raddr, None
return

reg_portal: Portal
Expand All @@ -188,8 +193,7 @@ async def query_actor(
'find_actor',
name=name,
)
yield addr

yield addr, reg_portal

@acm
async def maybe_open_portal(
Expand All @@ -204,15 +208,49 @@ async def maybe_open_portal(
async with query_actor(
name=name,
regaddr=addr,
) as addr:
pass
) as (addr, reg_portal):
if not addr:
yield None
return

if addr:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
try:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal

# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry
# from the registrar actor when available.
if reg_portal is not None:
uid: tuple[str, str]|None = await reg_portal.run_from_ns(
'self',
'delete_addr',
addr=addr,
)
if uid:
log.warning(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
else:
log.warning(
f'No registry entry found for addr: {addr!r}'
)
else:
log.warning(
f'Connection to {addr!r} failed'
f' and no registry portal available'
f' to delete stale entry.'
)
yield None


@acm
Expand Down Expand Up @@ -280,7 +318,7 @@ async def find_actor(
if not any(portals):
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
f'No actor {name!r} found registered @ {registry_addrs!r}'
)
yield None
return
Expand Down
40 changes: 36 additions & 4 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from types import ModuleType
import warnings

from bidict import bidict
import trio
from trio._core import _run as trio_runtime
from trio import (
Expand Down Expand Up @@ -1920,10 +1921,10 @@ def __init__(
**kwargs,
) -> None:

self._registry: dict[
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = {}
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
Expand Down Expand Up @@ -2012,7 +2013,13 @@ async def register_actor(
# should never be 0-dynamic-os-alloc
await debug.pause()

self._registry[uid] = addr
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique; use
# `.forceput()` to replace any prior (stale) entries
# that might map a different uid to the same addr (e.g.
# after an unclean shutdown or actor-restart reusing
# the same address).
self._registry.forceput(uid, tuple(addr))

# pop and signal all waiter events
events = self._waiters.pop(name, [])
Expand All @@ -2029,4 +2036,29 @@ async def unregister_actor(
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')
log.warning(
f'Request to de-register {uid!r} failed?'
)

async def delete_addr(
self,
addr: tuple[str, int|str],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple | None = self._registry.inverse.pop(
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In delete_addr(), uid is annotated as tuple | None, but the function return type is tuple[str, str] | None and _registry.inverse is keyed to the UID tuple. Tighten the local variable annotation to tuple[str, str] | None (and consider broadening the addr parameter type if it can arrive as a list over IPC, as noted in the comment) to keep typing consistent.

Suggested change
addr: tuple[str, int|str],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple | None = self._registry.inverse.pop(
addr: tuple[str, int | str] | list[str | int],
) -> tuple[str, str] | None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str] | None = self._registry.inverse.pop(

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

@goodboy goodboy Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 response authored by claude-code

Good catch — tightened uid annotation to tuple[str, str]|None and broadened addr param type to tuple[str, int|str]|list[str|int] to document the msgpack list-over-IPC reality (runtime coercion via tuple(addr) was already in place from b557ec20).

📎 fixed in e71eec07

tuple(addr),
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'

log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
Loading