diff --git a/docs/configuration.rst b/docs/configuration.rst index e95fdf45c..daef85d3b 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -10,6 +10,17 @@ Rally stores its configuration in the file ``~/.rally/rally.ini`` which is autom The configuration file can use `${CONFIG_DIR}` to refer to the directory where Rally stores its configuration files. This is useful for configuring Rally in a portable way. This defaults to `~/.rally`, but can be overridden by setting the `RALLY_HOME` environment variable in your shell. + +actor +~~~~~ + +This section allows to configure how thespian actor library is being used from rally. + +* ``actor.process.startup.method``: It allows to configure how `thespian` actors library should specify how + `subprocessing` library should create processes for new actors. This can be used to prevent from using `fork` method + on Linux or OSX with the purpose, for instance, to use threads in some rally component. + + meta ~~~~ diff --git a/esrally/actor.py b/esrally/actor.py index 5faa404c5..626831297 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -14,16 +14,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import logging import socket import traceback +import typing +from typing import Any -import thespian.actors -import thespian.system.messages.status +import thespian.actors # type: ignore[import-untyped] +import thespian.system.messages.status # type: ignore[import-untyped] from esrally import exceptions, log -from esrally.utils import console, net +from esrally.utils import console + +LOG = logging.getLogger(__name__) class BenchmarkFailure: @@ -103,7 +108,7 @@ def guard(self, msg, sender): class RallyActor(thespian.actors.ActorTypeDispatcher): def __init__(self, *args, **kw): super().__init__(*args, **kw) - self.children = [] + self.children: list[thespian.actors.ActorAddress] = [] self.received_responses = [] self.status = None log.post_configure_actor_logging() @@ -175,8 +180,9 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat if self.is_current_status_expected(expected_status): self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status) self.status = new_status - for m in filter(None, self.children): - self.send(m, msg) + child: thespian.actors.ActorAddress + for child in filter(None, self.children): + self.send(child, msg) else: raise exceptions.RallyAssertionError( "Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status) @@ -193,71 +199,131 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -def actor_system_already_running(ip="127.0.0.1"): - """ - Determines whether an actor system is already running by opening a socket connection. +SystemBase = typing.Literal["simpleSystemBase", "multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"] + + +__SYSTEM_BASE: SystemBase = "multiprocTCPBase" + + +def actor_system_already_running( + ip: str | None = None, + port: int | None = None, + system_base: SystemBase | None = None, +) -> bool | None: + """It determines whether an actor system is already running by opening a socket connection. - Note: It may be possible that another system is running on the same port. + Notes: + - It may be possible that another system is running on the same port. + - This is working only when system base is "multiprocTCPBase" """ - s = socket.socket() - try: - s.connect((ip, 1900)) - s.close() - return True - except Exception: - return False + if system_base is None: + system_base = __SYSTEM_BASE + if system_base != "multiprocTCPBase": + # This system is not supported yet. + return None + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + ip = ip or "127.0.0.1" + port = port or 1900 + LOG.info("Looking for an already running actor system (ip='%s', port=%d)...", ip, port) + sock.connect((ip, port)) + return True + except OSError as ex: + LOG.info("Failed to connect to already running actor system (ip='%s', port=%d): %s", ip, port, ex) -__SYSTEM_BASE = "multiprocTCPBase" + return False -def use_offline_actor_system(): +def use_offline_actor_system() -> None: global __SYSTEM_BASE __SYSTEM_BASE = "multiprocQueueBase" + LOG.info("Actor system base set to [%s]", __PROCESS_STARTUP_METHOD) + +ProcessStartupMethod = typing.Literal[ + "fork", + "forkserver", + "spawn", +] -def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None): - logger = logging.getLogger(__name__) + +__PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None + + +def set_startup_method(method: ProcessStartupMethod) -> None: + global __PROCESS_STARTUP_METHOD + __PROCESS_STARTUP_METHOD = method + LOG.info("Actor process startup method set to [%s]", __PROCESS_STARTUP_METHOD) + + +def bootstrap_actor_system( + try_join: bool = False, + prefer_local_only: bool = False, + local_ip: str | None = None, + admin_port: int | None = None, + coordinator_ip: str | None = None, + coordinator_port: int | None = None, +) -> thespian.actors.ActorSystem: system_base = __SYSTEM_BASE + capabilities: dict[str, Any] = {} + log_defs: Any = None + if try_join and ( + system_base != "multiprocTCPBase" or actor_system_already_running(ip=local_ip, port=admin_port, system_base=system_base) + ): + LOG.info("Try joining already running actor system with system base [%s].", system_base) + else: + # All actor system are coordinator unless another coordinator is known to exist. + capabilities["coordinator"] = True + + if system_base in ("multiprocTCPBase", "multiprocUDPBase"): + if prefer_local_only: + LOG.info("Bootstrapping locally running actor system with system base [%s].", system_base) + local_ip = coordinator_ip = "127.0.0.1" + + if local_ip: + local_ip, admin_port = resolve(local_ip, admin_port) + capabilities["ip"] = local_ip + + if admin_port: + capabilities["Admin Port"] = admin_port + + if coordinator_ip: + coordinator_ip, coordinator_port = resolve(coordinator_ip, coordinator_port) + if coordinator_port: + coordinator_port = int(coordinator_port) + if coordinator_port: + coordinator_ip += f":{coordinator_port}" + capabilities["Convention Address.IPv4"] = coordinator_ip + + if coordinator_ip and local_ip and coordinator_ip != local_ip: + capabilities["coordinator"] = False + + process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD + if process_startup_method: + capabilities["Process Startup Method"] = process_startup_method + + log_defs = log.load_configuration() + LOG.info("Starting actor system with system base [%s] and capabilities [%s]...", system_base, capabilities) + try: - if try_join: - if actor_system_already_running(): - logger.debug("Joining already running actor system with system base [%s].", system_base) - return thespian.actors.ActorSystem(system_base) - else: - logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base) - # if we try to join we can only run on the coordinator... - return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True}) - elif prefer_local_only: - coordinator = True - if system_base != "multiprocQueueBase": - coordinator_ip = "127.0.0.1" - local_ip = "127.0.0.1" - else: - coordinator_ip = None - local_ip = None - else: - if system_base not in ("multiprocTCPBase", "multiprocUDPBase"): - raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) - if not coordinator_ip: - raise exceptions.SystemSetupError("coordinator IP is required") - if not local_ip: - raise exceptions.SystemSetupError("local IP is required") - # always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy - local_ip = net.resolve(local_ip) - coordinator_ip = net.resolve(coordinator_ip) - - coordinator = local_ip == coordinator_ip - - capabilities = {"coordinator": coordinator} - if local_ip: - # just needed to determine whether to run benchmarks locally - capabilities["ip"] = local_ip - if coordinator_ip: - # Make the coordinator node the convention leader - capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip - logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) - return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities) + actor_system = thespian.actors.ActorSystem( + systemBase=system_base, + capabilities=capabilities, + logDefs=log_defs, + ) except thespian.actors.ActorSystemException: - logger.exception("Could not initialize internal actor system.") + LOG.exception("Could not initialize actor system with system base [%s] and capabilities [%s].", system_base, capabilities) raise + + LOG.info("Successfully initialized with system base [%s] and capabilities [%s].", system_base, actor_system.capabilities) + return actor_system + + +def resolve(host: str, port: int | None = None, family: int = socket.AF_INET, proto: int = socket.IPPROTO_TCP) -> tuple[str, int | None]: + address_info: tuple[Any, Any, Any, Any, tuple[Any, ...]] + for address_info in socket.getaddrinfo(host, port=port or None, family=family, proto=proto): + address = address_info[4] + if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int): + host, port = address + return host, port or None diff --git a/esrally/rally.py b/esrally/rally.py index c3729912f..5227cd266 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import argparse import datetime @@ -23,6 +24,7 @@ import shutil import sys import time +import typing import uuid from enum import Enum @@ -52,6 +54,8 @@ from esrally.tracker import tracker from esrally.utils import console, convert, io, net, opts, process, versions +LOG = logging.getLogger(__name__) + class ExitStatus(Enum): SUCCESSFUL = 1 @@ -961,27 +965,35 @@ def race(cfg: types.Config, kill_running_processes=False): def with_actor_system(runnable, cfg: types.Config): - logger = logging.getLogger(__name__) + process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) + if process_startup_method is not None: + if process_startup_method not in typing.get_args(actor.ProcessStartupMethod): + valid_options = ", ".join(str(v) for v in typing.get_args(actor.ProcessStartupMethod)) + raise ValueError( + f"Invalid value '{process_startup_method}' for 'actor.process.startup.method' option. Valid values are: {valid_options}" + ) + actor.set_startup_method(process_startup_method) + already_running = actor.actor_system_already_running() - logger.info("Actor system already running locally? [%s]", str(already_running)) + LOG.info("Actor system already running locally? [%s]", already_running) try: - actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system(try_join=bool(already_running), prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) # This happens when the admin process could not be started, e.g. because it could not open a socket. except thespian.actors.InvalidActorAddress: - logger.info("Falling back to offline actor system.") + LOG.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) except KeyboardInterrupt: raise exceptions.UserInterrupted("User has cancelled the benchmark (detected whilst bootstrapping actor system).") from None except Exception as e: - logger.exception("Could not bootstrap actor system.") + LOG.exception("Could not bootstrap actor system.") if str(e) == "Unable to determine valid external socket address.": console.warn( - "Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=logger + "Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=LOG ) - logger.info("Falling back to offline actor system.") + LOG.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) else: @@ -989,7 +1001,7 @@ def with_actor_system(runnable, cfg: types.Config): try: runnable(cfg) finally: - # We only shutdown the actor system if it was not already running before + # We only shut down the actor system if it was not already running before if not already_running: shutdown_complete = False times_interrupted = 0 @@ -997,26 +1009,26 @@ def with_actor_system(runnable, cfg: types.Config): try: # give some time for any outstanding messages to be delivered to the actor system time.sleep(3) - logger.info("Attempting to shutdown internal actor system.") + LOG.info("Attempting to shutdown internal actor system.") actors.shutdown() # note that this check will only evaluate to True for a TCP-based actor system. timeout = 15 while actor.actor_system_already_running() and timeout > 0: - logger.info("Actor system is still running. Waiting...") + LOG.info("Actor system is still running. Waiting...") time.sleep(1) timeout -= 1 if timeout > 0: shutdown_complete = True - logger.info("Shutdown completed.") + LOG.info("Shutdown completed.") else: - logger.warning("Shutdown timed out. Actor system is still running.") + LOG.warning("Shutdown timed out. Actor system is still running.") break except KeyboardInterrupt: times_interrupted += 1 - logger.warning("User interrupted shutdown of internal actor system.") + LOG.warning("User interrupted shutdown of internal actor system.") console.info("Please wait a moment for Rally's internal components to shutdown.") if not shutdown_complete and times_interrupted > 0: - logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted) + LOG.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted) console.println("") console.warn("Terminating now at the risk of leaving child processes behind.") console.println("") diff --git a/esrally/types.py b/esrally/types.py index dd6db6672..9b4453cf8 100644 --- a/esrally/types.py +++ b/esrally/types.py @@ -18,6 +18,7 @@ from typing import Any, Literal, Protocol, TypeVar Section = Literal[ + "actor", "benchmarks", "client", "defaults", @@ -42,6 +43,7 @@ "unit-test", ] Key = Literal[ + "actor.process.startup.method", "add.chart_name", "add.chart_type", "add.config.option", diff --git a/pyproject.toml b/pyproject.toml index d7824e9ea..628a56c5e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -210,6 +210,7 @@ files = ["esrally/", "it/", "tests/"] [[tool.mypy.overrides]] module = [ + "esrally.actor", "esrally.mechanic.team", "esrally.storage.*", "esrally.utils.cases", diff --git a/tests/actor_test.py b/tests/actor_test.py new file mode 100644 index 000000000..cf60494c1 --- /dev/null +++ b/tests/actor_test.py @@ -0,0 +1,333 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import dataclasses +import socket +import typing +from unittest import mock + +import pytest +import thespian.actors + +from esrally import actor, log +from esrally.utils import cases + + +@pytest.fixture(autouse=True) +def mock_socket(monkeypatch: pytest.MonkeyPatch) -> socket.socket: + sock_cls = mock.create_autospec(socket.socket) + sock = sock_cls.return_value + assert isinstance(sock, socket.socket) + sock.__enter__.return_value = sock + sock_cls.return_value = sock + monkeypatch.setattr(socket, "socket", sock_cls) + return sock + + +@dataclasses.dataclass +class DummyActorSystem: + systemBase: str + capabilities: dict[str, typing.Any] | None = None + logDefs: typing.Any = None + transientUnique: bool = False + + +@pytest.fixture(autouse=True) +def dummy_actor_system(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(thespian.actors, "ActorSystem", DummyActorSystem) + + +def resolve(host: str, port: int | None = None): + return f"{host}!r", port or None + + +@pytest.fixture(autouse=True) +def mock_resolve(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(actor, "resolve", mock.create_autospec(actor.resolve, side_effect=resolve)) + + +@pytest.fixture(autouse=True) +def mock_load_configuration(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(log, "load_configuration", lambda: {"some": "config"}) + + +@dataclasses.dataclass +class BootstrapActorSystemCase: + system_base: actor.SystemBase = "multiprocTCPBase" + offline: bool = False + process_startup_method: actor.ProcessStartupMethod | None = None + already_running: bool = False + try_join: bool = False + prefer_local_only: bool = False + local_ip: str | None = None + admin_port: int | None = None + coordinator_ip: str | None = None + coordinator_port: int | None = None + want_error: tuple[Exception, ...] = tuple() + want_connect: tuple[str, int] | None = None + want_system_base: actor.SystemBase = "multiprocTCPBase" + want_capabilities: dict[str, typing.Any] | None = None + want_log_defs: bool = False + + +@cases.cases( + default=BootstrapActorSystemCase( + want_capabilities={"coordinator": True}, + want_log_defs=True, + ), + tcp=BootstrapActorSystemCase( + system_base="multiprocTCPBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, + ), + udp=BootstrapActorSystemCase( + system_base="multiprocUDPBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_system_base="multiprocUDPBase", + ), + queue=BootstrapActorSystemCase( + system_base="multiprocQueueBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_system_base="multiprocQueueBase", + ), + local_ip=BootstrapActorSystemCase( + local_ip="127.0.0.1", + want_capabilities={"coordinator": True, "ip": "127.0.0.1!r"}, + want_log_defs=True, + ), + admin_port=BootstrapActorSystemCase( + admin_port=1024, + want_capabilities={"coordinator": True, "Admin Port": 1024}, + want_log_defs=True, + ), + coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": True}, + want_log_defs=True, + ), + coordinator_ip_and_port=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + coordinator_port=2000, + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r:2000", "coordinator": True}, + want_log_defs=True, + ), + local_ip_is_coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + local_ip="192.168.0.1", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": True, "ip": "192.168.0.1!r"}, + want_log_defs=True, + ), + local_ip_is_not_coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + local_ip="192.168.0.2", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": False, "ip": "192.168.0.2!r"}, + want_log_defs=True, + ), + offline=BootstrapActorSystemCase( + offline=True, + want_system_base="multiprocQueueBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, + ), + try_join=BootstrapActorSystemCase( + try_join=True, + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_connect=("127.0.0.1", 1900), + ), + try_join_offline=BootstrapActorSystemCase( + try_join=True, + offline=True, + want_system_base="multiprocQueueBase", + ), + try_join_already_running=BootstrapActorSystemCase( + try_join=True, + already_running=True, + want_connect=("127.0.0.1", 1900), + ), + try_join_offline_already_running=BootstrapActorSystemCase( + try_join=True, + offline=True, + already_running=True, + want_system_base="multiprocQueueBase", + ), + try_join_already_running_with_ip_and_port=BootstrapActorSystemCase( + try_join=True, + already_running=True, + local_ip="10.0.0.2", + admin_port=2000, + want_connect=("10.0.0.2", 2000), + ), + try_join_process_startup_method=BootstrapActorSystemCase( + try_join=True, + process_startup_method="spawn", + want_connect=("127.0.0.1", 1900), + want_capabilities={"coordinator": True, "Process Startup Method": "spawn"}, + want_log_defs=True, + ), + prefer_local_only=BootstrapActorSystemCase( + prefer_local_only=True, + want_capabilities={"Convention Address.IPv4": "127.0.0.1!r", "coordinator": True, "ip": "127.0.0.1!r"}, + want_log_defs=True, + ), + prefer_local_only_offline=BootstrapActorSystemCase( + offline=True, + prefer_local_only=True, + want_capabilities={"coordinator": True}, + want_system_base="multiprocQueueBase", + want_log_defs=True, + ), + coordinator_node=BootstrapActorSystemCase( + local_ip="192.168.0.41", + coordinator_ip="192.168.0.41", + want_capabilities={ + "Convention Address.IPv4": "192.168.0.41!r", + "coordinator": True, + "ip": "192.168.0.41!r", + }, + want_log_defs=True, + ), + non_coordinator_node=BootstrapActorSystemCase( + local_ip="192.168.0.42", + coordinator_ip="192.168.0.41", + want_capabilities={ + "Convention Address.IPv4": "192.168.0.41!r", + "coordinator": False, + "ip": "192.168.0.42!r", + }, + want_log_defs=True, + ), + coordinator_port=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + coordinator_port=1234, + local_ip="192.168.0.2", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r:1234", "coordinator": False, "ip": "192.168.0.2!r"}, + want_log_defs=True, + ), + process_startup_method_fork=BootstrapActorSystemCase( + process_startup_method="fork", + want_capabilities={ + "Process Startup Method": "fork", + "coordinator": True, + }, + want_log_defs=True, + ), + process_startup_method_spawn=BootstrapActorSystemCase( + process_startup_method="spawn", + want_capabilities={ + "Process Startup Method": "spawn", + "coordinator": True, + }, + want_log_defs=True, + ), + process_startup_method_forkserver=BootstrapActorSystemCase( + process_startup_method="forkserver", + want_capabilities={ + "Process Startup Method": "forkserver", + "coordinator": True, + }, + want_log_defs=True, + ), +) +def test_bootstrap_actor_system( + monkeypatch: pytest.MonkeyPatch, + case: BootstrapActorSystemCase, + mock_socket: socket.socket, +) -> None: + if not case.already_running: + mock_socket.connect.side_effect = socket.error + + monkeypatch.setattr(actor, "__SYSTEM_BASE", case.system_base) + if case.offline: + actor.use_offline_actor_system() + + monkeypatch.setattr(actor, "__PROCESS_STARTUP_METHOD", None) + if case.process_startup_method: + actor.set_startup_method(case.process_startup_method) + + try: + got = actor.bootstrap_actor_system( + try_join=case.try_join, + prefer_local_only=case.prefer_local_only, + local_ip=case.local_ip, + admin_port=case.admin_port, + coordinator_ip=case.coordinator_ip, + coordinator_port=case.coordinator_port, + ) + except tuple(type(e) for e in case.want_error) as ex: # pylint: disable=catching-non-exception + assert str(ex) in {str(e) for e in case.want_error} + return + assert got is not None + assert got.systemBase == case.want_system_base + assert (got.capabilities or {}) == (case.want_capabilities or {}) + assert bool(got.logDefs) == case.want_log_defs + if case.want_connect: + mock_socket.connect.assert_called_once_with(case.want_connect) + else: + mock_socket.connect.assert_not_called() + + +@dataclasses.dataclass +class ActorSystemAlreadyRunningCase: + already_running: bool = False + ip: str | None = None + port: int | None = None + system_base: actor.SystemBase | None = None + want: bool = None + want_error: Exception | None = None + want_connect: tuple[str, int] | None = None + + +@cases.cases( + default=ActorSystemAlreadyRunningCase(want_connect=("127.0.0.1", 1900), want=False), + ip_and_port=ActorSystemAlreadyRunningCase(ip="10.0.0.1", port=1000, want_connect=("10.0.0.1", 1000), want=False), + already_running=ActorSystemAlreadyRunningCase(already_running=True, want_connect=("127.0.0.1", 1900), want=True), + ip_and_port_already_running=ActorSystemAlreadyRunningCase( + ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) + ), + system_base_simple=ActorSystemAlreadyRunningCase(system_base="simpleSystemBase"), + system_base_queue=ActorSystemAlreadyRunningCase(system_base="multiprocQueueBase"), + system_base_tcp=ActorSystemAlreadyRunningCase(system_base="multiprocTCPBase", want=False, want_connect=("127.0.0.1", 1900)), + system_base_tcp_already_running=ActorSystemAlreadyRunningCase( + already_running=True, system_base="multiprocTCPBase", want=True, want_connect=("127.0.0.1", 1900) + ), + system_base_udp=ActorSystemAlreadyRunningCase( + system_base="multiprocUDPBase", + ), +) +def test_actor_system_already_running( + case: ActorSystemAlreadyRunningCase, + mock_socket: socket.socket, +): + if not case.already_running: + mock_socket.connect.side_effect = socket.error + + try: + got = actor.actor_system_already_running(ip=case.ip, port=case.port, system_base=case.system_base) + except type(case.want_error) if case.want_error else tuple() as ex: + assert str(ex) == str(case.want_error) + return + else: + assert case.want_error is None + assert got is case.want + if case.want_connect: + mock_socket.connect.assert_called_once_with(case.want_connect) + else: + mock_socket.connect.assert_not_called()