Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/debugging/fast_error_in_root_after_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def sleep(


async def open_ctx(
n: tractor._supervise.ActorNursery
n: tractor.runtime._supervise.ActorNursery
):

# spawn both actors
Expand Down
2 changes: 1 addition & 1 deletion examples/service_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def main(service_name):
await an.start_actor(service_name)

async with tractor.get_registry() as portal:
print(f"Arbiter is listening on {portal.channel}")
print(f"Registrar is listening on {portal.channel}")

async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}")
Expand Down
72 changes: 72 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import signal
import platform
import time
from pathlib import Path
from typing import Literal

import pytest
import tractor
Expand Down Expand Up @@ -52,6 +54,76 @@
)


def get_cpu_state(
icpu: int = 0,
setting: Literal[
'scaling_governor',
'*_pstate_max_freq',
'scaling_max_freq',
# 'scaling_cur_freq',
] = '*_pstate_max_freq',
) -> tuple[
Path,
str|int,
]|None:
'''
Attempt to read the (first) CPU's setting according
to the set `setting` from under the file-sys,

/sys/devices/system/cpu/cpu0/cpufreq/{setting}

Useful to determine latency headroom for various perf affected
test suites.

'''
try:
# Read governor for core 0 (usually same for all)
setting_path: Path = list(
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
.glob(f'{setting}')
)[0] # <- XXX must be single match!
with open(
setting_path,
'r',
) as f:
return (
setting_path,
f.read().strip(),
)
except (FileNotFoundError, IndexError):
return None


def cpu_scaling_factor() -> float:
'''
Return a latency-headroom multiplier (>= 1.0) reflecting how
much to inflate time-limits when CPU-freq scaling is active on
linux.

When no scaling info is available (non-linux, missing sysfs),
returns 1.0 (i.e. no headroom adjustment needed).

'''
if _non_linux:
return 1.

mx = get_cpu_state()
cur = get_cpu_state(setting='scaling_max_freq')
if mx is None or cur is None:
return 1.

_mx_pth, max_freq = mx
_cur_pth, cur_freq = cur
cpu_scaled: float = int(cur_freq) / int(max_freq)

if cpu_scaled != 1.:
return 1. / (
cpu_scaled * 2 # <- bc likely "dual threaded"
)

return 1.


def pytest_addoption(
parser: pytest.Parser,
):
Expand Down
2 changes: 1 addition & 1 deletion tests/devx/test_tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_shield_pause(
child.pid,
signal.SIGINT,
)
from tractor._supervise import _shutdown_msg
from tractor.runtime._supervise import _shutdown_msg
expect(
child,
# 'Shutting down actor runtime',
Expand Down
11 changes: 5 additions & 6 deletions tests/ipc/test_each_tpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
from tractor import Actor
from tractor.runtime import _state
from tractor.discovery import _addr


@pytest.fixture
def bindspace_dir_str() -> str:

rt_dir: Path = tractor._state.get_rt_dir()
from tractor.runtime._state import get_rt_dir
rt_dir: Path = get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
Expand Down
4 changes: 2 additions & 2 deletions tests/ipc/test_multi_tpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
Portal,
ipc,
msg,
_state,
_addr,
)
from tractor.runtime import _state
from tractor.discovery import _addr

@tractor.context
async def chk_tpts(
Expand Down
2 changes: 1 addition & 1 deletion tests/msg/test_pldrx_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope.

'''
if tractor._state.debug_mode():
if tractor.debug_mode():
timeout += 999

with trio.fail_after(timeout):
Expand Down
9 changes: 8 additions & 1 deletion tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def test_cancel_via_SIGINT(
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion
"""
pid = os.getpid()
pid: int = os.getpid()

async def main():
with trio.fail_after(2):
Expand All @@ -517,6 +517,8 @@ def test_cancel_via_SIGINT_other_task(
started from a seperate ``trio`` child task.

'''
from .conftest import cpu_scaling_factor

pid: int = os.getpid()
timeout: float = (
4 if _non_linux
Expand All @@ -525,6 +527,11 @@ def test_cancel_via_SIGINT_other_task(
if _friggin_windows: # smh
timeout += 1

# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom

async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):
Expand Down
15 changes: 14 additions & 1 deletion tests/test_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@
MESSAGE = 'tractoring at full speed'


def test_empty_mngrs_input_raises() -> None:
def test_empty_mngrs_input_raises(
tpt_proto: str,
) -> None:
# TODO, the `open_actor_cluster()` teardown hangs
# intermittently on UDS when `gather_contexts(mngrs=())`
# raises `ValueError` mid-setup; likely a race in the
# actor-nursery cleanup vs UDS socket shutdown. Needs
# a deeper look at `._clustering`/`._supervise` teardown
# paths with the UDS transport.
if tpt_proto == 'uds':
pytest.skip(
'actor-cluster teardown hangs intermittently on UDS'
)

async def main():
with trio.fail_after(3):
async with (
Expand Down
4 changes: 2 additions & 2 deletions tests/test_context_stream_semantics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
StreamOverrun,
ContextCancelled,
)
from tractor._state import current_ipc_ctx
from tractor.runtime._state import current_ipc_ctx

from tractor._testing import (
tractor_test,
Expand Down Expand Up @@ -939,7 +939,7 @@ def test_one_end_stream_not_opened(

'''
overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._runtime import Actor
from tractor.runtime._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size

timeout: float = (
Expand Down
24 changes: 12 additions & 12 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def test_reg_then_unreg(
reg_addr: tuple,
):
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
assert len(actor._registry) == 1 # only self is registered

async with tractor.open_nursery(
Expand All @@ -35,7 +35,7 @@ async def test_reg_then_unreg(
uid = portal.channel.aid.uid

async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter
# this local actor should be the registrar
assert actor is aportal.actor

async with tractor.wait_for_actor('actor'):
Expand Down Expand Up @@ -154,7 +154,7 @@ async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor,
):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
Get and unpack a "registry" RPC request from the registrar
system.

'''
Expand Down Expand Up @@ -197,15 +197,15 @@ async def spawn_and_check_registry(
actor = tractor.current_actor()

if remote_arbiter:
assert not actor.is_arbiter
assert not actor.is_registrar

if actor.is_arbiter:
extra = 1 # arbiter is local root actor
if actor.is_registrar:
extra = 1 # registrar is local root actor
get_reg = partial(unpack_reg, actor)

else:
get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter
extra = 2 # local root actor + remote registrar

# ensure current actor is registered
registry: dict = await get_reg()
Expand Down Expand Up @@ -285,7 +285,7 @@ def test_subactors_unregister_on_cancel(
):
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
deregistering themselves with the registrar.

'''
with pytest.raises(KeyboardInterrupt):
Expand Down Expand Up @@ -314,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local
process tree) arbiter.
process tree) registrar.

'''
with pytest.raises(KeyboardInterrupt):
Expand Down Expand Up @@ -387,7 +387,7 @@ async def close_chans_before_nursery(
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# normal registrar channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
Expand Down Expand Up @@ -420,7 +420,7 @@ def test_close_channel_explicit(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.

'''
with pytest.raises(KeyboardInterrupt):
Expand All @@ -444,7 +444,7 @@ def test_close_channel_explicit_remote_registrar(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.

'''
with pytest.raises(KeyboardInterrupt):
Expand Down
13 changes: 11 additions & 2 deletions tests/test_docs_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ def run(script_code):
for f in p[2]

if (
'__' not in f
and f[0] != '_'
'__' not in f # ignore any pkg-mods
# ignore any `__pycache__` subdir
and '__pycache__' not in str(p[0])
and f[0] != '_' # ignore any WIP "examplel mods"
and 'debugging' not in p[0]
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
Expand Down Expand Up @@ -143,12 +145,19 @@ def test_example(
'This test does run just fine "in person" however..'
)

from .conftest import cpu_scaling_factor

timeout: float = (
60
if ci_env and _non_linux
else 16
)

# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom

with open(ex_file, 'r') as ex:
code = ex.read()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_infected_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
to_asyncio,
RemoteActorError,
ContextCancelled,
_state,
)
from tractor.runtime import _state
from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc

Expand Down
6 changes: 3 additions & 3 deletions tests/test_inter_peer_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def stream_from_peer(
) -> None:

# sanity
assert tractor._state.debug_mode() == debug_mode
assert tractor.debug_mode() == debug_mode

peer: Portal
try:
Expand Down Expand Up @@ -841,7 +841,7 @@ async def serve_subactors(
async with open_nursery() as an:

# sanity
assert tractor._state.debug_mode() == debug_mode
assert tractor.debug_mode() == debug_mode

await ctx.started(peer_name)
async with ctx.open_stream() as ipc:
Expand Down Expand Up @@ -880,7 +880,7 @@ async def client_req_subactor(
) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
assert tractor.debug_mode()

# TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub
Expand Down
Loading
Loading