Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions examples/advanced_faults/ipc_failure_during_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MsgStream,
_testing,
trionics,
TransportClosed,
)
import trio
import pytest
Expand Down Expand Up @@ -208,12 +209,16 @@ async def main(
# TODO: is this needed or no?
raise

except trio.ClosedResourceError:
except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the
# connection to avoid raising a closed-error
# such that we drop through to the ctl-c
# mashing by user.
await trio.sleep(0.01)
with trio.CancelScope(shield=True):
await trio.sleep(0.01)

# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
Expand Down Expand Up @@ -247,6 +252,7 @@ async def main(
await stream.send(i)
pytest.fail('stream not closed?')
except (
TransportClosed,
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:
Expand Down
34 changes: 32 additions & 2 deletions tests/devx/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'''
from __future__ import annotations
import time
import signal
from typing import (
Callable,
TYPE_CHECKING,
Expand Down Expand Up @@ -69,12 +70,15 @@ def unset_colors():
import os
os.environ['PYTHON_COLORS'] = '0'

spawned: PexpectSpawner|None = None

def _spawn(
cmd: str,
**mkcmd_kwargs,
) -> pty_spawn.spawn:
nonlocal spawned
unset_colors()
return testdir.spawn(
spawned = testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
Expand All @@ -84,9 +88,35 @@ def _spawn(
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
return spawned

# such that test-dep can pass input script name.
return _spawn # the `PexpectSpawner`, type alias.
yield _spawn # the `PexpectSpawner`, type alias.

if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)

if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)

# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()


@pytest.fixture(
Expand Down
5 changes: 4 additions & 1 deletion tests/devx/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?',
'another task closed this fd',
'Debug lock request was CANCELLED?',
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
"'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]

# XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
Expand Down
20 changes: 15 additions & 5 deletions tests/test_advanced_faults.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed

mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
examples_dir()
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
Expand All @@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
pre_aclose_msgstream
):
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
Expand Down Expand Up @@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
>
ipc_break['break_child_ipc_after']
)
):
if pre_aclose_msgstream:
Expand Down Expand Up @@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
excs = value.exceptions
assert len(excs) == 1
excs: tuple[Exception] = value.exceptions
assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc)

Expand Down
27 changes: 20 additions & 7 deletions tests/test_inter_peer_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import tractor
from tractor import ( # typing
Actor,
current_actor,
open_nursery,
Portal,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor,
open_nursery,
)
from tractor._testing import (
# tractor_test,
Expand Down Expand Up @@ -796,8 +797,8 @@ async def basic_echo_server(

) -> None:
'''
Just the simplest `MsgStream` echo server which resays what
you told it but with its uid in front ;)
Just the simplest `MsgStream` echo server which resays what you
told it but with its uid in front ;)

'''
actor: Actor = tractor.current_actor()
Expand Down Expand Up @@ -966,9 +967,14 @@ async def tell_little_bro(

caller: str = '',
err_after: float|None = None,
rng_seed: int = 50,
rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
):
# contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with (
tractor.wait_for_actor(
name=actor_name
Expand All @@ -983,7 +989,6 @@ async def tell_little_bro(
else None
),
) as (sub_ctx, first),

sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
Expand All @@ -994,6 +999,7 @@ async def tell_little_bro(
i,
)
await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive()
print(
f'{caller} => {actor_name}: {msg}\n'
Expand All @@ -1006,6 +1012,9 @@ async def tell_little_bro(
assert sub_uid != uid
assert _i == i

# XXX, usually should never get here!
# await tractor.pause()


@pytest.mark.parametrize(
'raise_client_error',
Expand All @@ -1020,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor(
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None,
loglevel: str,
# ^XXX, set to 'warning' to see masked-exc warnings
# that may transpire during actor-nursery teardown.
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
Expand Down Expand Up @@ -1049,6 +1061,7 @@ async def main():
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
registry_addrs=[reg_addr],
loglevel=loglevel,
) as an:
server: Portal = await an.start_actor(
(server_name := 'spawn_server'),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_multi_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async def main():

async with sub_ptl.open_context(
get_root_portal,
) as (ctx, first):
) as (ctx, _):
print('Waiting for `sub` to connect back to us..')

await an.cancel()
Expand Down
6 changes: 2 additions & 4 deletions tractor/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
MsgTypeError,
RemoteActorError,
StreamOverrun,
TransportClosed,
pack_from_raise,
unpack_error,
)
Expand Down Expand Up @@ -2428,10 +2429,7 @@ async def open_context_from_portal(
try:
# await pause(shield=True)
await ctx.cancel()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
except TransportClosed:
log.warning(
'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n'
Expand Down
14 changes: 11 additions & 3 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ async def get_registry(


@acm
async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
'''
Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.

'''
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox']
Expand Down Expand Up @@ -193,6 +196,11 @@ async def maybe_open_portal(
addr: UnwrappedAddress,
name: str,
):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.

'''
async with query_actor(
name=name,
regaddr=addr,
Expand Down
13 changes: 1 addition & 12 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,18 +329,7 @@ async def cancel_actor(
# if we get here some weird cancellation case happened
return False

except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,

TransportClosed,
) as tpt_err:
except TransportClosed as tpt_err:
ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n'
f'\n'
Expand Down
2 changes: 1 addition & 1 deletion tractor/_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async def ping_tpt_socket(
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
Expand Down
Loading