From 0a5cb4035bc87a70373fbb1695da41a12dc1d677 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Thu, 14 Aug 2025 15:16:52 +0200 Subject: [PATCH 01/21] Ask Thespian to spawn actors subprocess instead of using fork The main reason for this is that to be able to use threads in subprocesses we have to prevemt using any kind of fork function. --- esrally/actor.py | 120 +++++++++++++++++++++++++---------------------- pyproject.toml | 1 + 2 files changed, 65 insertions(+), 56 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 5faa404c5..141f1a036 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -14,17 +14,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import logging +import os import socket import traceback +from typing import Any -import thespian.actors -import thespian.system.messages.status +import thespian.actors # type: ignore[import-untyped] +import thespian.system.messages.status # type: ignore[import-untyped] from esrally import exceptions, log from esrally.utils import console, net +LOG = logging.getLogger(__name__) + class BenchmarkFailure: """ @@ -92,7 +97,7 @@ def guard(self, msg, sender): return f(self, msg, sender) except BaseException: # log here as the full trace might get lost. - logging.getLogger(__name__).exception("Error in %s", actor_name) + LOG.exception("Error in %s", actor_name) # don't forward the exception as is because the main process might not have this class available on the load path # and will fail then while deserializing the cause. self.send(sender, BenchmarkFailure(traceback.format_exc())) @@ -103,7 +108,7 @@ def guard(self, msg, sender): class RallyActor(thespian.actors.ActorTypeDispatcher): def __init__(self, *args, **kw): super().__init__(*args, **kw) - self.children = [] + self.children: list[thespian.actors.ActorAddress] = [] self.received_responses = [] self.status = None log.post_configure_actor_logging() @@ -175,8 +180,9 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat if self.is_current_status_expected(expected_status): self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status) self.status = new_status - for m in filter(None, self.children): - self.send(m, msg) + for m in self.children: + if m: + self.send(m, msg) else: raise exceptions.RallyAssertionError( "Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status) @@ -193,71 +199,73 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -def actor_system_already_running(ip="127.0.0.1"): - """ - Determines whether an actor system is already running by opening a socket connection. +def actor_system_already_running(system_base: str, ip: str = "127.0.0.1", port: int = 1900) -> bool: + """It determines whether an actor system is already running by opening a socket connection. Note: It may be possible that another system is running on the same port. """ - s = socket.socket() - try: - s.connect((ip, 1900)) - s.close() - return True - except Exception: + if system_base != "multiprocTCPBase": return False + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + sock.connect((ip, port)) + return True + except ConnectionRefusedError: + return False -__SYSTEM_BASE = "multiprocTCPBase" +DEFAULT_SYSTEM_BASE: str = os.getenv("ESRALLY_ACTOR_SYSTEM_BASE", "").strip() or "multiprocTCPBase" +DEFAULT_PROCESS_STARTUP_METHOD: str = os.getenv("ESRALLY_ACTOR_PROCESS_STARTUP_METHOD", "").strip() or "spawn" def use_offline_actor_system(): - global __SYSTEM_BASE - __SYSTEM_BASE = "multiprocQueueBase" - - -def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None): - logger = logging.getLogger(__name__) - system_base = __SYSTEM_BASE - try: - if try_join: - if actor_system_already_running(): - logger.debug("Joining already running actor system with system base [%s].", system_base) + global DEFAULT_SYSTEM_BASE + DEFAULT_SYSTEM_BASE = "multiprocQueueBase" + + +def bootstrap_actor_system( + try_join: bool = False, + prefer_local_only: bool = False, + local_ip: str = "127.0.0.1", + coordinator_ip: str | None = None, + system_base: str = DEFAULT_SYSTEM_BASE, + process_startup_method: str = DEFAULT_PROCESS_STARTUP_METHOD, +) -> thespian.actors.ActorSystem: + if prefer_local_only: + coordinator_ip = local_ip = "127.0.0.1" + elif not coordinator_ip: + coordinator_ip = local_ip + if try_join: + if actor_system_already_running(system_base, coordinator_ip): + LOG.debug("Joining already running actor system with system base [%s].", system_base) + try: return thespian.actors.ActorSystem(system_base) - else: - logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base) - # if we try to join we can only run on the coordinator... - return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True}) - elif prefer_local_only: - coordinator = True - if system_base != "multiprocQueueBase": - coordinator_ip = "127.0.0.1" - local_ip = "127.0.0.1" - else: - coordinator_ip = None - local_ip = None + except thespian.actors.ActorSystemException: + LOG.exception("Failed to join actor system [%s]", system_base) + + capabilities: dict[str, Any] = {} + if system_base in ("multiprocTCPBase", "multiprocUDPBase"): + if not local_ip: + raise exceptions.SystemSetupError("local IP is required") + if not coordinator_ip: + raise exceptions.SystemSetupError("coordinator IP is required") + # It always resolves the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy. + coordinator = local_ip == coordinator_ip + if coordinator: + local_ip = coordinator_ip = net.resolve(local_ip) else: - if system_base not in ("multiprocTCPBase", "multiprocUDPBase"): - raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) - if not coordinator_ip: - raise exceptions.SystemSetupError("coordinator IP is required") - if not local_ip: - raise exceptions.SystemSetupError("local IP is required") - # always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy local_ip = net.resolve(local_ip) coordinator_ip = net.resolve(coordinator_ip) + capabilities["coordinator"] = coordinator + capabilities["ip"] = local_ip + capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:1900" - coordinator = local_ip == coordinator_ip + if system_base != "simpleSystemBase" and process_startup_method: + capabilities["Process Startup Method"] = process_startup_method - capabilities = {"coordinator": coordinator} - if local_ip: - # just needed to determine whether to run benchmarks locally - capabilities["ip"] = local_ip - if coordinator_ip: - # Make the coordinator node the convention leader - capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip - logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) + LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) + try: return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities) except thespian.actors.ActorSystemException: - logger.exception("Could not initialize internal actor system.") + LOG.exception("Could not initialize internal actor system.") raise diff --git a/pyproject.toml b/pyproject.toml index d7824e9ea..628a56c5e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -210,6 +210,7 @@ files = ["esrally/", "it/", "tests/"] [[tool.mypy.overrides]] module = [ + "esrally.actor", "esrally.mechanic.team", "esrally.storage.*", "esrally.utils.cases", From 3c008387bd36be2926237050ab12cee91b58b9af Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Thu, 14 Aug 2025 16:14:01 +0200 Subject: [PATCH 02/21] Correct actor initialization procedure. --- esrally/actor.py | 4 ++-- esrally/rally.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 141f1a036..84335b656 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -199,7 +199,7 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -def actor_system_already_running(system_base: str, ip: str = "127.0.0.1", port: int = 1900) -> bool: +def actor_system_already_running(ip: str = "127.0.0.1", port: int = 1900, system_base: str = "multiprocTCPBase") -> bool: """It determines whether an actor system is already running by opening a socket connection. Note: It may be possible that another system is running on the same port. @@ -236,7 +236,7 @@ def bootstrap_actor_system( elif not coordinator_ip: coordinator_ip = local_ip if try_join: - if actor_system_already_running(system_base, coordinator_ip): + if actor_system_already_running(coordinator_ip, system_base=system_base): LOG.debug("Joining already running actor system with system base [%s].", system_base) try: return thespian.actors.ActorSystem(system_base) diff --git a/esrally/rally.py b/esrally/rally.py index c3729912f..6efa3a1a2 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -962,14 +962,15 @@ def race(cfg: types.Config, kill_running_processes=False): def with_actor_system(runnable, cfg: types.Config): logger = logging.getLogger(__name__) - already_running = actor.actor_system_already_running() - logger.info("Actor system already running locally? [%s]", str(already_running)) + already_running = False try: - actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system(try_join=True) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 + coordinator_ip = actors.capabilities.get("Convention Address.IPv4") + already_running = coordinator_ip not in [None, "127.0.0.1"] and actor.actor_system_already_running(coordinator_ip) cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) - # This happens when the admin process could not be started, e.g. because it could not open a socket. except thespian.actors.InvalidActorAddress: + # This happens when the admin process could not be started, e.g. because it could not open a socket. logger.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) From 903e4d1b4ec87e15265c8cb8804ab03b845d3076 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Thu, 14 Aug 2025 16:18:11 +0200 Subject: [PATCH 03/21] revert changes to rally.py --- esrally/rally.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/esrally/rally.py b/esrally/rally.py index 6efa3a1a2..c3729912f 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -962,15 +962,14 @@ def race(cfg: types.Config, kill_running_processes=False): def with_actor_system(runnable, cfg: types.Config): logger = logging.getLogger(__name__) - already_running = False + already_running = actor.actor_system_already_running() + logger.info("Actor system already running locally? [%s]", str(already_running)) try: - actors = actor.bootstrap_actor_system(try_join=True) + actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 - coordinator_ip = actors.capabilities.get("Convention Address.IPv4") - already_running = coordinator_ip not in [None, "127.0.0.1"] and actor.actor_system_already_running(coordinator_ip) cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) + # This happens when the admin process could not be started, e.g. because it could not open a socket. except thespian.actors.InvalidActorAddress: - # This happens when the admin process could not be started, e.g. because it could not open a socket. logger.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) From b8a8822590c4de9990aff8f64d4095edadacd683 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 09:51:15 +0200 Subject: [PATCH 04/21] Revert unrelated changes. --- esrally/actor.py | 118 ++++++++++++++++++++++---------------------- tests/actor_test.py | 0 2 files changed, 60 insertions(+), 58 deletions(-) create mode 100644 tests/actor_test.py diff --git a/esrally/actor.py b/esrally/actor.py index 84335b656..a4068c926 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -17,10 +17,8 @@ from __future__ import annotations import logging -import os import socket import traceback -from typing import Any import thespian.actors # type: ignore[import-untyped] import thespian.system.messages.status # type: ignore[import-untyped] @@ -28,8 +26,6 @@ from esrally import exceptions, log from esrally.utils import console, net -LOG = logging.getLogger(__name__) - class BenchmarkFailure: """ @@ -97,7 +93,7 @@ def guard(self, msg, sender): return f(self, msg, sender) except BaseException: # log here as the full trace might get lost. - LOG.exception("Error in %s", actor_name) + logging.getLogger(__name__).exception("Error in %s", actor_name) # don't forward the exception as is because the main process might not have this class available on the load path # and will fail then while deserializing the cause. self.send(sender, BenchmarkFailure(traceback.format_exc())) @@ -180,9 +176,11 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat if self.is_current_status_expected(expected_status): self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status) self.status = new_status - for m in self.children: - if m: - self.send(m, msg) + for child in self.children: + if not child: + continue + assert isinstance(child, thespian.actors.ActorAddress) + self.send(child, msg) else: raise exceptions.RallyAssertionError( "Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status) @@ -199,73 +197,77 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -def actor_system_already_running(ip: str = "127.0.0.1", port: int = 1900, system_base: str = "multiprocTCPBase") -> bool: - """It determines whether an actor system is already running by opening a socket connection. +def actor_system_already_running(ip="127.0.0.1"): + """ + Determines whether an actor system is already running by opening a socket connection. Note: It may be possible that another system is running on the same port. """ - if system_base != "multiprocTCPBase": - return False - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + with socket.socket() as sock: try: - sock.connect((ip, port)) + sock.connect((ip, 1900)) return True - except ConnectionRefusedError: + except Exception: return False -DEFAULT_SYSTEM_BASE: str = os.getenv("ESRALLY_ACTOR_SYSTEM_BASE", "").strip() or "multiprocTCPBase" -DEFAULT_PROCESS_STARTUP_METHOD: str = os.getenv("ESRALLY_ACTOR_PROCESS_STARTUP_METHOD", "").strip() or "spawn" +__SYSTEM_BASE = "multiprocTCPBase" def use_offline_actor_system(): - global DEFAULT_SYSTEM_BASE - DEFAULT_SYSTEM_BASE = "multiprocQueueBase" - - -def bootstrap_actor_system( - try_join: bool = False, - prefer_local_only: bool = False, - local_ip: str = "127.0.0.1", - coordinator_ip: str | None = None, - system_base: str = DEFAULT_SYSTEM_BASE, - process_startup_method: str = DEFAULT_PROCESS_STARTUP_METHOD, -) -> thespian.actors.ActorSystem: - if prefer_local_only: - coordinator_ip = local_ip = "127.0.0.1" - elif not coordinator_ip: - coordinator_ip = local_ip - if try_join: - if actor_system_already_running(coordinator_ip, system_base=system_base): - LOG.debug("Joining already running actor system with system base [%s].", system_base) - try: + global __SYSTEM_BASE + __SYSTEM_BASE = "multiprocQueueBase" + + +def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None): + logger = logging.getLogger(__name__) + system_base = __SYSTEM_BASE + try: + if try_join: + if actor_system_already_running(): + logger.debug("Joining already running actor system with system base [%s].", system_base) return thespian.actors.ActorSystem(system_base) - except thespian.actors.ActorSystemException: - LOG.exception("Failed to join actor system [%s]", system_base) - - capabilities: dict[str, Any] = {} - if system_base in ("multiprocTCPBase", "multiprocUDPBase"): - if not local_ip: - raise exceptions.SystemSetupError("local IP is required") - if not coordinator_ip: - raise exceptions.SystemSetupError("coordinator IP is required") - # It always resolves the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy. - coordinator = local_ip == coordinator_ip - if coordinator: - local_ip = coordinator_ip = net.resolve(local_ip) + else: + logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base) + # if we try to join we can only run on the coordinator... + return thespian.actors.ActorSystem( + system_base, + logDefs=log.load_configuration(), + capabilities={ + "coordinator": True, + "Process Startup Method": "spawn", + }, + ) + elif prefer_local_only: + coordinator = True + if system_base != "multiprocQueueBase": + coordinator_ip = "127.0.0.1" + local_ip = "127.0.0.1" + else: + coordinator_ip = None + local_ip = None else: + if system_base not in ("multiprocTCPBase", "multiprocUDPBase"): + raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) + if not coordinator_ip: + raise exceptions.SystemSetupError("coordinator IP is required") + if not local_ip: + raise exceptions.SystemSetupError("local IP is required") + # always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy local_ip = net.resolve(local_ip) coordinator_ip = net.resolve(coordinator_ip) - capabilities["coordinator"] = coordinator - capabilities["ip"] = local_ip - capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:1900" - if system_base != "simpleSystemBase" and process_startup_method: - capabilities["Process Startup Method"] = process_startup_method + coordinator = local_ip == coordinator_ip - LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) - try: + capabilities = {"coordinator": coordinator, "Process Startup Method": "spawn"} + if local_ip: + # just needed to determine whether to run benchmarks locally + capabilities["ip"] = local_ip + if coordinator_ip: + # Make the coordinator node the convention leader + capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip + logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities) except thespian.actors.ActorSystemException: - LOG.exception("Could not initialize internal actor system.") + logger.exception("Could not initialize internal actor system.") raise diff --git a/tests/actor_test.py b/tests/actor_test.py new file mode 100644 index 000000000..e69de29bb From 544b15719fe6507f29bb7126199c0c43eff239b7 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 09:52:50 +0200 Subject: [PATCH 05/21] Revert unrelated changes. --- esrally/actor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index a4068c926..b38dcc1b4 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -176,9 +176,8 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat if self.is_current_status_expected(expected_status): self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status) self.status = new_status - for child in self.children: - if not child: - continue + child: thespian.actors.ActorAddress + for child in filter(None, self.children): assert isinstance(child, thespian.actors.ActorAddress) self.send(child, msg) else: From 0e23b15d405e0483284b7839ec1210525226b671 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 09:53:08 +0200 Subject: [PATCH 06/21] Revert unrelated changes. --- esrally/actor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/esrally/actor.py b/esrally/actor.py index b38dcc1b4..971b86b89 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -178,7 +178,6 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat self.status = new_status child: thespian.actors.ActorAddress for child in filter(None, self.children): - assert isinstance(child, thespian.actors.ActorAddress) self.send(child, msg) else: raise exceptions.RallyAssertionError( From 7dae951b201eba5a2baf7fb8c9529dda294901eb Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 09:53:08 +0200 Subject: [PATCH 07/21] Revert unrelated changes. --- esrally/actor.py | 139 +++++++++++++++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 48 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index b38dcc1b4..ea27a371a 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -16,9 +16,11 @@ # under the License. from __future__ import annotations +import enum import logging import socket import traceback +from typing import Any import thespian.actors # type: ignore[import-untyped] import thespian.system.messages.status # type: ignore[import-untyped] @@ -26,6 +28,8 @@ from esrally import exceptions, log from esrally.utils import console, net +LOG = logging.getLogger(__name__) + class BenchmarkFailure: """ @@ -178,7 +182,6 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat self.status = new_status child: thespian.actors.ActorAddress for child in filter(None, self.children): - assert isinstance(child, thespian.actors.ActorAddress) self.send(child, msg) else: raise exceptions.RallyAssertionError( @@ -196,77 +199,117 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -def actor_system_already_running(ip="127.0.0.1"): - """ - Determines whether an actor system is already running by opening a socket connection. +class SystemBase(enum.Enum): + MULTIPROC_TCP = "multiprocTCPBase" + MULTIPROC_UDP = "multiprocUDPBase" + MULTIPROC_QUEUE = "multiprocQueueBase" + + +__SYSTEM_BASE: SystemBase = SystemBase.MULTIPROC_TCP + + +def actor_system_already_running( + ip: str | None = None, + port: int | None = None, + system_base: SystemBase | None = None, +) -> bool: + """It determines whether an actor system is already running by opening a socket connection. - Note: It may be possible that another system is running on the same port. + Notes: + - It may be possible that another system is running on the same port. + - This is working only when system base is "multiprocTCPBase" """ - with socket.socket() as sock: + if system_base is None: + system_base = __SYSTEM_BASE + if system_base != SystemBase.MULTIPROC_TCP: + return False + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: - sock.connect((ip, 1900)) + ip = ip or "127.0.0.1" + port = port or 1900 + LOG.info("Looking for an already running actor system (ip='%s', port=%d)...", ip, port) + sock.connect((ip, port)) return True - except Exception: - return False - + except Exception as ex: + LOG.info("Failed to connect to already running actor system (ip='%s', port=%d): %s", ip, port, ex) -__SYSTEM_BASE = "multiprocTCPBase" + return False -def use_offline_actor_system(): +def use_offline_actor_system() -> None: global __SYSTEM_BASE - __SYSTEM_BASE = "multiprocQueueBase" + __SYSTEM_BASE = SystemBase.MULTIPROC_QUEUE + + +class ProcessStartupMethod(enum.StrEnum): + FORK = "fork" + FORK_SERVER = "forkserver" + SPAWN = "spawn" + +__PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None -def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None): - logger = logging.getLogger(__name__) - system_base = __SYSTEM_BASE + +def use_startup_method(method: ProcessStartupMethod) -> None: + global __PROCESS_STARTUP_METHOD + __PROCESS_STARTUP_METHOD = method + + +def bootstrap_actor_system( + try_join: bool = False, + prefer_local_only: bool = False, + local_ip: str | None = None, + coordinator_ip: str | None = None, + coordinator_port: int | None = None, + system_base: SystemBase | None = None, + transient_unique: bool = False, +) -> thespian.actors.ActorSystem: + if system_base is None: + system_base = __SYSTEM_BASE + + process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD try: if try_join: - if actor_system_already_running(): - logger.debug("Joining already running actor system with system base [%s].", system_base) + if actor_system_already_running(ip=coordinator_ip, port=coordinator_port, system_base=system_base): + LOG.debug("Joining already running actor system with system base [%s].", system_base) return thespian.actors.ActorSystem(system_base) - else: - logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base) - # if we try to join we can only run on the coordinator... - return thespian.actors.ActorSystem( - system_base, - logDefs=log.load_configuration(), - capabilities={ - "coordinator": True, - "Process Startup Method": "spawn", - }, - ) elif prefer_local_only: - coordinator = True - if system_base != "multiprocQueueBase": - coordinator_ip = "127.0.0.1" - local_ip = "127.0.0.1" + if system_base in (SystemBase.MULTIPROC_TCP, SystemBase.MULTIPROC_UDP): + local_ip = coordinator_ip = "127.0.0.1" else: - coordinator_ip = None - local_ip = None + local_ip = coordinator_ip = None else: - if system_base not in ("multiprocTCPBase", "multiprocUDPBase"): - raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) + if system_base not in (SystemBase.MULTIPROC_TCP, SystemBase.MULTIPROC_UDP): + raise exceptions.SystemSetupError("Rally requires a network-capable system base but got '%s'." % system_base) if not coordinator_ip: - raise exceptions.SystemSetupError("coordinator IP is required") + raise exceptions.SystemSetupError("Coordinator IP is required") if not local_ip: - raise exceptions.SystemSetupError("local IP is required") + raise exceptions.SystemSetupError("Local IP is required") # always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy - local_ip = net.resolve(local_ip) - coordinator_ip = net.resolve(coordinator_ip) - - coordinator = local_ip == coordinator_ip - capabilities = {"coordinator": coordinator, "Process Startup Method": "spawn"} + # if we try to join we can only run on the coordinator... + capabilities: dict[str, Any] = {} + if process_startup_method is not None: + capabilities["Process Startup Method"] = process_startup_method.value if local_ip: # just needed to determine whether to run benchmarks locally + local_ip = net.resolve(local_ip) capabilities["ip"] = local_ip if coordinator_ip: # Make the coordinator node the convention leader - capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip - logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) - return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities) + coordinator_ip = net.resolve(coordinator_ip) + coordinator_port = coordinator_port or 1900 + capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}" + coordinator = local_ip == coordinator_ip + capabilities["coordinator"] = coordinator + LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) + return thespian.actors.ActorSystem( + systemBase=system_base.value, + capabilities=capabilities, + logDefs=log.load_configuration(), + transientUnique=transient_unique, + ) except thespian.actors.ActorSystemException: - logger.exception("Could not initialize internal actor system.") + LOG.exception("Could not initialize internal actor system.") raise From b3dd75108025d972c0dd363033ab43e57ec5928b Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 15:42:34 +0200 Subject: [PATCH 08/21] Add test case for actor.bootstrap_actor_system --- esrally/actor.py | 48 +++++------ tests/actor_test.py | 191 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 28 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index ea27a371a..f4f546174 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -16,10 +16,10 @@ # under the License. from __future__ import annotations -import enum import logging import socket import traceback +import typing from typing import Any import thespian.actors # type: ignore[import-untyped] @@ -199,13 +199,10 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -class SystemBase(enum.Enum): - MULTIPROC_TCP = "multiprocTCPBase" - MULTIPROC_UDP = "multiprocUDPBase" - MULTIPROC_QUEUE = "multiprocQueueBase" +SystemBase = typing.Literal["multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"] -__SYSTEM_BASE: SystemBase = SystemBase.MULTIPROC_TCP +__SYSTEM_BASE: SystemBase = "multiprocTCPBase" def actor_system_already_running( @@ -221,7 +218,7 @@ def actor_system_already_running( """ if system_base is None: system_base = __SYSTEM_BASE - if system_base != SystemBase.MULTIPROC_TCP: + if system_base != "multiprocTCPBase": return False with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: @@ -239,13 +236,14 @@ def actor_system_already_running( def use_offline_actor_system() -> None: global __SYSTEM_BASE - __SYSTEM_BASE = SystemBase.MULTIPROC_QUEUE + __SYSTEM_BASE = "multiprocQueueBase" -class ProcessStartupMethod(enum.StrEnum): - FORK = "fork" - FORK_SERVER = "forkserver" - SPAWN = "spawn" +ProcessStartupMethod = typing.Literal[ + "fork", + "forkserver", + "spawn", +] __PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None @@ -262,12 +260,8 @@ def bootstrap_actor_system( local_ip: str | None = None, coordinator_ip: str | None = None, coordinator_port: int | None = None, - system_base: SystemBase | None = None, - transient_unique: bool = False, ) -> thespian.actors.ActorSystem: - if system_base is None: - system_base = __SYSTEM_BASE - + system_base = __SYSTEM_BASE process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD try: if try_join: @@ -275,23 +269,23 @@ def bootstrap_actor_system( LOG.debug("Joining already running actor system with system base [%s].", system_base) return thespian.actors.ActorSystem(system_base) elif prefer_local_only: - if system_base in (SystemBase.MULTIPROC_TCP, SystemBase.MULTIPROC_UDP): + if system_base in ("multiprocTCPBase", "multiprocUDPBase"): local_ip = coordinator_ip = "127.0.0.1" else: local_ip = coordinator_ip = None else: - if system_base not in (SystemBase.MULTIPROC_TCP, SystemBase.MULTIPROC_UDP): - raise exceptions.SystemSetupError("Rally requires a network-capable system base but got '%s'." % system_base) + if str(system_base) not in ("multiprocTCPBase", "multiprocUDPBase"): + raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) if not coordinator_ip: - raise exceptions.SystemSetupError("Coordinator IP is required") + raise exceptions.SystemSetupError("coordinator IP is required") if not local_ip: - raise exceptions.SystemSetupError("Local IP is required") + raise exceptions.SystemSetupError("local IP is required") # always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy # if we try to join we can only run on the coordinator... capabilities: dict[str, Any] = {} - if process_startup_method is not None: - capabilities["Process Startup Method"] = process_startup_method.value + if process_startup_method: + capabilities["Process Startup Method"] = process_startup_method if local_ip: # just needed to determine whether to run benchmarks locally local_ip = net.resolve(local_ip) @@ -301,14 +295,12 @@ def bootstrap_actor_system( coordinator_ip = net.resolve(coordinator_ip) coordinator_port = coordinator_port or 1900 capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}" - coordinator = local_ip == coordinator_ip - capabilities["coordinator"] = coordinator + capabilities["coordinator"] = local_ip == coordinator_ip LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) return thespian.actors.ActorSystem( - systemBase=system_base.value, + systemBase=system_base, capabilities=capabilities, logDefs=log.load_configuration(), - transientUnique=transient_unique, ) except thespian.actors.ActorSystemException: LOG.exception("Could not initialize internal actor system.") diff --git a/tests/actor_test.py b/tests/actor_test.py index e69de29bb..e9c12eedb 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -0,0 +1,191 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import dataclasses +import socket +import typing +from unittest import mock + +import pytest +import thespian.actors + +from esrally import actor, exceptions +from esrally.utils import cases, net + + +@pytest.fixture(autouse=True) +def mock_socket(monkeypatch: pytest.MonkeyPatch) -> socket.socket: + sock_cls = mock.create_autospec(socket.socket) + sock = sock_cls.return_value + assert isinstance(sock, socket.socket) + sock.__enter__.return_value = sock + sock_cls.return_value = sock + monkeypatch.setattr(socket, "socket", sock_cls) + return sock + + +@dataclasses.dataclass +class DummyActorSystem: + systemBase: str + capabilities: dict[str, typing.Any] | None = None + logDefs: typing.Any = None + transientUnique: bool = False + + +@pytest.fixture(autouse=True) +def dummy_actor_system(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(thespian.actors, "ActorSystem", DummyActorSystem) + + +def resolve(ip: str): + return f"{ip}!r" + + +@pytest.fixture(autouse=True) +def mock_net_resolve(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(net, "resolve", mock.create_autospec(net.resolve, side_effect=resolve)) + + +@dataclasses.dataclass +class BootstrapActorSystemCase: + system_base: str = "multiprocTCPBase" + offline: bool = False + already_running: bool = False + try_join: bool = False + prefer_local_only: bool = False + local_ip: str | None = None + coordinator_ip: str | None = None + coordinator_port: int | None = None + want_error: tuple[Exception, ...] = tuple() + want_connect: tuple[str, int] | None = None + want_system_base: str = "multiprocTCPBase" + want_capabilities: dict[str, typing.Any] | None = None + want_log_defs: bool = False + + +@cases.cases( + default=BootstrapActorSystemCase( + want_error=( + exceptions.SystemSetupError("coordinator IP is required"), + exceptions.SystemSetupError("local IP is required"), + ) + ), + tcp=BootstrapActorSystemCase( + system_base="multiprocTCPBase", + want_error=( + exceptions.SystemSetupError("coordinator IP is required"), + exceptions.SystemSetupError("local IP is required"), + ), + ), + udp=BootstrapActorSystemCase( + system_base="multiprocUDPBase", + want_error=( + exceptions.SystemSetupError("coordinator IP is required"), + exceptions.SystemSetupError("local IP is required"), + ), + ), + no_local_ip=BootstrapActorSystemCase(coordinator_ip="127.0.0.1", want_error=(exceptions.SystemSetupError("local IP is required"),)), + no_coordinator_ip=BootstrapActorSystemCase( + local_ip="127.0.0.1", want_error=(exceptions.SystemSetupError("coordinator IP is required"),) + ), + offline=BootstrapActorSystemCase( + offline=True, + want_error=(exceptions.SystemSetupError("Rally requires a network-capable system base but got [multiprocQueueBase]."),), + ), + try_join=BootstrapActorSystemCase( + try_join=True, + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_connect=("127.0.0.1", 1900), + ), + try_join_already_running=BootstrapActorSystemCase( + try_join=True, + already_running=True, + want_connect=("127.0.0.1", 1900), + ), + prefer_local_only=BootstrapActorSystemCase( + prefer_local_only=True, + want_capabilities={"Convention Address.IPv4": f"{resolve('127.0.0.1')}:1900", "coordinator": True, "ip": resolve("127.0.0.1")}, + want_log_defs=True, + ), + prefer_local_only_offline=BootstrapActorSystemCase( + offline=True, + prefer_local_only=True, + want_capabilities={"coordinator": True}, + want_system_base="multiprocQueueBase", + want_log_defs=True, + ), + coordinator_node=BootstrapActorSystemCase( + local_ip="192.168.0.41", + coordinator_ip="192.168.0.41", + want_capabilities={ + "Convention Address.IPv4": f"{resolve('192.168.0.41')}:1900", + "coordinator": True, + "ip": resolve("192.168.0.41"), + }, + want_log_defs=True, + ), + non_coordinator_node=BootstrapActorSystemCase( + local_ip="192.168.0.42", + coordinator_ip="192.168.0.41", + want_capabilities={ + "Convention Address.IPv4": f"{resolve('192.168.0.41')}:1900", + "coordinator": False, + "ip": resolve("192.168.0.42"), + }, + want_log_defs=True, + ), + coordinator_port=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + coordinator_port=1234, + local_ip="192.168.0.2", + want_capabilities={"Convention Address.IPv4": f"{resolve('192.168.0.1')}:1234", "coordinator": False, "ip": resolve("192.168.0.2")}, + want_log_defs=True, + ), +) +def test_bootstrap_actor_system( + monkeypatch: pytest.MonkeyPatch, + case: BootstrapActorSystemCase, + mock_socket: socket.socket, +) -> None: + if not case.already_running: + mock_socket.connect.side_effect = socket.error + + monkeypatch.setattr(actor, "__SYSTEM_BASE", case.system_base) + + if case.offline: + actor.use_offline_actor_system() + try: + got = actor.bootstrap_actor_system( + try_join=case.try_join, + prefer_local_only=case.prefer_local_only, + local_ip=case.local_ip, + coordinator_ip=case.coordinator_ip, + coordinator_port=case.coordinator_port, + ) + except tuple(type(e) for e in case.want_error) as ex: # pylint: disable=catching-non-exception + assert str(ex) in {str(e) for e in case.want_error} + return + assert got is not None + assert got.systemBase == case.want_system_base + assert got.capabilities == case.want_capabilities + assert bool(got.logDefs) == case.want_log_defs + if case.want_connect: + mock_socket.connect.assert_called_once_with(case.want_connect) + else: + mock_socket.connect.assert_not_called() From a85135d2468070003a1cfaebe0734fbf5a17053c Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 16:02:22 +0200 Subject: [PATCH 09/21] Add test case for actor.actor_system_already_running --- esrally/actor.py | 2 +- tests/actor_test.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/esrally/actor.py b/esrally/actor.py index f4f546174..218da1a74 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -199,7 +199,7 @@ def is_current_status_expected(self, expected_status): return self.status == expected_status -SystemBase = typing.Literal["multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"] +SystemBase = typing.Literal["simpleSystemBase", "multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"] __SYSTEM_BASE: SystemBase = "multiprocTCPBase" diff --git a/tests/actor_test.py b/tests/actor_test.py index e9c12eedb..f57941c1c 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -189,3 +189,40 @@ def test_bootstrap_actor_system( mock_socket.connect.assert_called_once_with(case.want_connect) else: mock_socket.connect.assert_not_called() + + +@dataclasses.dataclass +class ActorSystemAlreadyRunningCase: + already_running: bool = False + ip: str | None = None + port: int | None = None + system_base: actor.SystemBase | None = None + want: bool = False + want_connect: tuple[str, int] | None = ("127.0.0.1", 1900) + + +@cases.cases( + default=ActorSystemAlreadyRunningCase(), + ip_and_port=ActorSystemAlreadyRunningCase(ip="10.0.0.1", port=1000, want_connect=("10.0.0.1", 1000)), + already_running=ActorSystemAlreadyRunningCase(already_running=True, want=True), + ip_and_port_already_running=ActorSystemAlreadyRunningCase( + ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) + ), + system_base_queue=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocQueueBase", want_connect=None), + system_base_tcp=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocTCPBase"), + system_base_tcp_already_running=ActorSystemAlreadyRunningCase(already_running=True, want=True, system_base="multiprocTCPBase"), + system_base_udp=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocUDPBase", want_connect=None), +) +def test_actor_system_already_running( + case: ActorSystemAlreadyRunningCase, + mock_socket: socket.socket, +): + if not case.already_running: + mock_socket.connect.side_effect = socket.error + + got = actor.actor_system_already_running(ip=case.ip, port=case.port, system_base=case.system_base) + assert got is case.want + if case.want_connect: + mock_socket.connect.assert_called_once_with(case.want_connect) + else: + mock_socket.connect.assert_not_called() From a9db7f303f4754b1d34c6b8bef3ff2797e556f68 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 16:17:31 +0200 Subject: [PATCH 10/21] Test setting process_startup_method when bootstrapping actor system. --- esrally/actor.py | 2 +- tests/actor_test.py | 50 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 218da1a74..5f6b4af01 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -249,7 +249,7 @@ def use_offline_actor_system() -> None: __PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None -def use_startup_method(method: ProcessStartupMethod) -> None: +def set_startup_method(method: ProcessStartupMethod) -> None: global __PROCESS_STARTUP_METHOD __PROCESS_STARTUP_METHOD = method diff --git a/tests/actor_test.py b/tests/actor_test.py index f57941c1c..73bb5a573 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -65,6 +65,7 @@ def mock_net_resolve(monkeypatch: pytest.MonkeyPatch): class BootstrapActorSystemCase: system_base: str = "multiprocTCPBase" offline: bool = False + process_startup_method: actor.ProcessStartupMethod | None = None already_running: bool = False try_join: bool = False prefer_local_only: bool = False @@ -118,6 +119,13 @@ class BootstrapActorSystemCase: already_running=True, want_connect=("127.0.0.1", 1900), ), + try_join_process_startup_method=BootstrapActorSystemCase( + try_join=True, + process_startup_method="spawn", + want_connect=("127.0.0.1", 1900), + want_capabilities={"coordinator": True, "Process Startup Method": "spawn"}, + want_log_defs=True, + ), prefer_local_only=BootstrapActorSystemCase( prefer_local_only=True, want_capabilities={"Convention Address.IPv4": f"{resolve('127.0.0.1')}:1900", "coordinator": True, "ip": resolve("127.0.0.1")}, @@ -157,6 +165,42 @@ class BootstrapActorSystemCase: want_capabilities={"Convention Address.IPv4": f"{resolve('192.168.0.1')}:1234", "coordinator": False, "ip": resolve("192.168.0.2")}, want_log_defs=True, ), + process_startup_method_fork=BootstrapActorSystemCase( + process_startup_method="fork", + local_ip="192.168.0.2", + coordinator_ip="192.168.0.1", + want_capabilities={ + "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", + "Process Startup Method": "fork", + "coordinator": False, + "ip": resolve("192.168.0.2"), + }, + want_log_defs=True, + ), + process_startup_method_spawn=BootstrapActorSystemCase( + process_startup_method="spawn", + local_ip="192.168.0.2", + coordinator_ip="192.168.0.1", + want_capabilities={ + "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", + "Process Startup Method": "spawn", + "coordinator": False, + "ip": resolve("192.168.0.2"), + }, + want_log_defs=True, + ), + process_startup_method_forkserver=BootstrapActorSystemCase( + process_startup_method="forkserver", + local_ip="192.168.0.2", + coordinator_ip="192.168.0.1", + want_capabilities={ + "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", + "Process Startup Method": "forkserver", + "coordinator": False, + "ip": resolve("192.168.0.2"), + }, + want_log_defs=True, + ), ) def test_bootstrap_actor_system( monkeypatch: pytest.MonkeyPatch, @@ -167,9 +211,13 @@ def test_bootstrap_actor_system( mock_socket.connect.side_effect = socket.error monkeypatch.setattr(actor, "__SYSTEM_BASE", case.system_base) - if case.offline: actor.use_offline_actor_system() + + monkeypatch.setattr(actor, "__PROCESS_STARTUP_METHOD", None) + if case.process_startup_method: + actor.set_startup_method(case.process_startup_method) + try: got = actor.bootstrap_actor_system( try_join=case.try_join, From 5286cdf4760a79f00481fdfc9aa721fe25bb57b4 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 16:29:44 +0200 Subject: [PATCH 11/21] It allows configuring actor process startup method. --- esrally/actor.py | 2 +- esrally/rally.py | 9 +++++++++ esrally/types.py | 2 ++ tests/actor_test.py | 17 ++++++++++------- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 5f6b4af01..1a46b3660 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -219,7 +219,7 @@ def actor_system_already_running( if system_base is None: system_base = __SYSTEM_BASE if system_base != "multiprocTCPBase": - return False + return True with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: diff --git a/esrally/rally.py b/esrally/rally.py index c3729912f..c9ffaba27 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import argparse import datetime @@ -23,6 +24,7 @@ import shutil import sys import time +import typing import uuid from enum import Enum @@ -963,6 +965,13 @@ def race(cfg: types.Config, kill_running_processes=False): def with_actor_system(runnable, cfg: types.Config): logger = logging.getLogger(__name__) already_running = actor.actor_system_already_running() + + process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) + if process_startup_method is not None: + if process_startup_method not in typing.get_args(actor.ProcessStartupMethod): + raise ValueError("invalid value for 'actor.process.startup.method' option") + actor.set_startup_method(process_startup_method) + logger.info("Actor system already running locally? [%s]", str(already_running)) try: actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) diff --git a/esrally/types.py b/esrally/types.py index dd6db6672..9b4453cf8 100644 --- a/esrally/types.py +++ b/esrally/types.py @@ -18,6 +18,7 @@ from typing import Any, Literal, Protocol, TypeVar Section = Literal[ + "actor", "benchmarks", "client", "defaults", @@ -42,6 +43,7 @@ "unit-test", ] Key = Literal[ + "actor.process.startup.method", "add.chart_name", "add.chart_type", "add.config.option", diff --git a/tests/actor_test.py b/tests/actor_test.py index 73bb5a573..c43e53617 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -246,20 +246,23 @@ class ActorSystemAlreadyRunningCase: port: int | None = None system_base: actor.SystemBase | None = None want: bool = False - want_connect: tuple[str, int] | None = ("127.0.0.1", 1900) + want_connect: tuple[str, int] | None = None @cases.cases( - default=ActorSystemAlreadyRunningCase(), + default=ActorSystemAlreadyRunningCase(want_connect=("127.0.0.1", 1900)), ip_and_port=ActorSystemAlreadyRunningCase(ip="10.0.0.1", port=1000, want_connect=("10.0.0.1", 1000)), - already_running=ActorSystemAlreadyRunningCase(already_running=True, want=True), + already_running=ActorSystemAlreadyRunningCase(already_running=True, want_connect=("127.0.0.1", 1900), want=True), ip_and_port_already_running=ActorSystemAlreadyRunningCase( ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) ), - system_base_queue=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocQueueBase", want_connect=None), - system_base_tcp=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocTCPBase"), - system_base_tcp_already_running=ActorSystemAlreadyRunningCase(already_running=True, want=True, system_base="multiprocTCPBase"), - system_base_udp=ActorSystemAlreadyRunningCase(already_running=False, want=False, system_base="multiprocUDPBase", want_connect=None), + system_base_simple=ActorSystemAlreadyRunningCase(want=True, system_base="simpleSystemBase"), + system_base_queue=ActorSystemAlreadyRunningCase(want=True, system_base="multiprocQueueBase"), + system_base_tcp=ActorSystemAlreadyRunningCase(want=False, system_base="multiprocTCPBase", want_connect=("127.0.0.1", 1900)), + system_base_tcp_already_running=ActorSystemAlreadyRunningCase( + already_running=True, want=True, system_base="multiprocTCPBase", want_connect=("127.0.0.1", 1900) + ), + system_base_udp=ActorSystemAlreadyRunningCase(want=True, system_base="multiprocUDPBase"), ) def test_actor_system_already_running( case: ActorSystemAlreadyRunningCase, From 42b6bc431332a9d51e85e289d11ec5469486cf77 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 16:36:39 +0200 Subject: [PATCH 12/21] Test behaviour when joining actor system offline. --- tests/actor_test.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/actor_test.py b/tests/actor_test.py index c43e53617..8f913187f 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -63,7 +63,7 @@ def mock_net_resolve(monkeypatch: pytest.MonkeyPatch): @dataclasses.dataclass class BootstrapActorSystemCase: - system_base: str = "multiprocTCPBase" + system_base: actor.SystemBase = "multiprocTCPBase" offline: bool = False process_startup_method: actor.ProcessStartupMethod | None = None already_running: bool = False @@ -74,7 +74,7 @@ class BootstrapActorSystemCase: coordinator_port: int | None = None want_error: tuple[Exception, ...] = tuple() want_connect: tuple[str, int] | None = None - want_system_base: str = "multiprocTCPBase" + want_system_base: actor.SystemBase = "multiprocTCPBase" want_capabilities: dict[str, typing.Any] | None = None want_log_defs: bool = False @@ -114,6 +114,11 @@ class BootstrapActorSystemCase: want_log_defs=True, want_connect=("127.0.0.1", 1900), ), + try_join_offline=BootstrapActorSystemCase( + try_join=True, + offline=True, + want_system_base="multiprocQueueBase", + ), try_join_already_running=BootstrapActorSystemCase( try_join=True, already_running=True, From e3103fd0fce44c4d1a41884b7483b40874e20af9 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 16:55:41 +0200 Subject: [PATCH 13/21] actor_system_already_running returns `None` for unsupported system base. --- esrally/actor.py | 4 ++-- esrally/rally.py | 6 +++--- tests/actor_test.py | 18 ++++++++++-------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 1a46b3660..72d961751 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -209,7 +209,7 @@ def actor_system_already_running( ip: str | None = None, port: int | None = None, system_base: SystemBase | None = None, -) -> bool: +) -> bool | None: """It determines whether an actor system is already running by opening a socket connection. Notes: @@ -219,7 +219,7 @@ def actor_system_already_running( if system_base is None: system_base = __SYSTEM_BASE if system_base != "multiprocTCPBase": - return True + return None with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: diff --git a/esrally/rally.py b/esrally/rally.py index c9ffaba27..de54dca82 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -964,7 +964,7 @@ def race(cfg: types.Config, kill_running_processes=False): def with_actor_system(runnable, cfg: types.Config): logger = logging.getLogger(__name__) - already_running = actor.actor_system_already_running() + already_running: bool | None = actor.actor_system_already_running() process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) if process_startup_method is not None: @@ -974,7 +974,7 @@ def with_actor_system(runnable, cfg: types.Config): logger.info("Actor system already running locally? [%s]", str(already_running)) try: - actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system(try_join=bool(already_running), prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) # This happens when the admin process could not be started, e.g. because it could not open a socket. @@ -998,7 +998,7 @@ def with_actor_system(runnable, cfg: types.Config): try: runnable(cfg) finally: - # We only shutdown the actor system if it was not already running before + # We only shut down the actor system if it was not already running before if not already_running: shutdown_complete = False times_interrupted = 0 diff --git a/tests/actor_test.py b/tests/actor_test.py index 8f913187f..944e299b7 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -117,6 +117,8 @@ class BootstrapActorSystemCase: try_join_offline=BootstrapActorSystemCase( try_join=True, offline=True, + want_capabilities={"coordinator": True}, + want_log_defs=True, want_system_base="multiprocQueueBase", ), try_join_already_running=BootstrapActorSystemCase( @@ -250,24 +252,24 @@ class ActorSystemAlreadyRunningCase: ip: str | None = None port: int | None = None system_base: actor.SystemBase | None = None - want: bool = False + want: bool = None want_connect: tuple[str, int] | None = None @cases.cases( - default=ActorSystemAlreadyRunningCase(want_connect=("127.0.0.1", 1900)), - ip_and_port=ActorSystemAlreadyRunningCase(ip="10.0.0.1", port=1000, want_connect=("10.0.0.1", 1000)), + default=ActorSystemAlreadyRunningCase(want_connect=("127.0.0.1", 1900), want=False), + ip_and_port=ActorSystemAlreadyRunningCase(ip="10.0.0.1", port=1000, want_connect=("10.0.0.1", 1000), want=False), already_running=ActorSystemAlreadyRunningCase(already_running=True, want_connect=("127.0.0.1", 1900), want=True), ip_and_port_already_running=ActorSystemAlreadyRunningCase( ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) ), - system_base_simple=ActorSystemAlreadyRunningCase(want=True, system_base="simpleSystemBase"), - system_base_queue=ActorSystemAlreadyRunningCase(want=True, system_base="multiprocQueueBase"), - system_base_tcp=ActorSystemAlreadyRunningCase(want=False, system_base="multiprocTCPBase", want_connect=("127.0.0.1", 1900)), + system_base_simple=ActorSystemAlreadyRunningCase(system_base="simpleSystemBase"), + system_base_queue=ActorSystemAlreadyRunningCase(system_base="multiprocQueueBase"), + system_base_tcp=ActorSystemAlreadyRunningCase(system_base="multiprocTCPBase", want=False, want_connect=("127.0.0.1", 1900)), system_base_tcp_already_running=ActorSystemAlreadyRunningCase( - already_running=True, want=True, system_base="multiprocTCPBase", want_connect=("127.0.0.1", 1900) + already_running=True, system_base="multiprocTCPBase", want=True, want_connect=("127.0.0.1", 1900) ), - system_base_udp=ActorSystemAlreadyRunningCase(want=True, system_base="multiprocUDPBase"), + system_base_udp=ActorSystemAlreadyRunningCase(system_base="multiprocUDPBase"), ) def test_actor_system_already_running( case: ActorSystemAlreadyRunningCase, From 48a11c04e68d47c51e67fda67e1cd80714e3a119 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 17:15:00 +0200 Subject: [PATCH 14/21] It joins the remote actor system with the requested IP and port. --- esrally/actor.py | 16 +++++++++++----- tests/actor_test.py | 28 ++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 72d961751..c02048735 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -209,7 +209,7 @@ def actor_system_already_running( ip: str | None = None, port: int | None = None, system_base: SystemBase | None = None, -) -> bool | None: +) -> bool: """It determines whether an actor system is already running by opening a socket connection. Notes: @@ -219,7 +219,7 @@ def actor_system_already_running( if system_base is None: system_base = __SYSTEM_BASE if system_base != "multiprocTCPBase": - return None + raise ValueError(f"unsupported system base: {system_base}") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: @@ -263,11 +263,18 @@ def bootstrap_actor_system( ) -> thespian.actors.ActorSystem: system_base = __SYSTEM_BASE process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD + capabilities: dict[str, Any] = {} try: if try_join: - if actor_system_already_running(ip=coordinator_ip, port=coordinator_port, system_base=system_base): + if system_base == "multiprocTCPBase" and actor_system_already_running( + ip=coordinator_ip, port=coordinator_port, system_base=system_base + ): + if coordinator_ip: + coordinator_ip = net.resolve(coordinator_ip) + coordinator_port = coordinator_port or 1900 + capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}" LOG.debug("Joining already running actor system with system base [%s].", system_base) - return thespian.actors.ActorSystem(system_base) + return thespian.actors.ActorSystem(systemBase=system_base, capabilities=capabilities or None) elif prefer_local_only: if system_base in ("multiprocTCPBase", "multiprocUDPBase"): local_ip = coordinator_ip = "127.0.0.1" @@ -283,7 +290,6 @@ def bootstrap_actor_system( # always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy # if we try to join we can only run on the coordinator... - capabilities: dict[str, Any] = {} if process_startup_method: capabilities["Process Startup Method"] = process_startup_method if local_ip: diff --git a/tests/actor_test.py b/tests/actor_test.py index 944e299b7..4e7e4814c 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -126,6 +126,15 @@ class BootstrapActorSystemCase: already_running=True, want_connect=("127.0.0.1", 1900), ), + try_join_already_running_coordinator_ip_and_port=BootstrapActorSystemCase( + try_join=True, + already_running=True, + coordinator_ip="10.0.0.1", + coordinator_port=900, + local_ip="10.0.0.2", + want_capabilities={"Convention Address.IPv4": f"{resolve("10.0.0.1")}:900"}, + want_connect=("10.0.0.1", 900), + ), try_join_process_startup_method=BootstrapActorSystemCase( try_join=True, process_startup_method="spawn", @@ -253,6 +262,7 @@ class ActorSystemAlreadyRunningCase: port: int | None = None system_base: actor.SystemBase | None = None want: bool = None + want_error: Exception | None = None want_connect: tuple[str, int] | None = None @@ -263,13 +273,19 @@ class ActorSystemAlreadyRunningCase: ip_and_port_already_running=ActorSystemAlreadyRunningCase( ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) ), - system_base_simple=ActorSystemAlreadyRunningCase(system_base="simpleSystemBase"), - system_base_queue=ActorSystemAlreadyRunningCase(system_base="multiprocQueueBase"), + system_base_simple=ActorSystemAlreadyRunningCase( + system_base="simpleSystemBase", want_error=ValueError("unsupported system base: simpleSystemBase") + ), + system_base_queue=ActorSystemAlreadyRunningCase( + system_base="multiprocQueueBase", want_error=ValueError("unsupported system base: multiprocQueueBase") + ), system_base_tcp=ActorSystemAlreadyRunningCase(system_base="multiprocTCPBase", want=False, want_connect=("127.0.0.1", 1900)), system_base_tcp_already_running=ActorSystemAlreadyRunningCase( already_running=True, system_base="multiprocTCPBase", want=True, want_connect=("127.0.0.1", 1900) ), - system_base_udp=ActorSystemAlreadyRunningCase(system_base="multiprocUDPBase"), + system_base_udp=ActorSystemAlreadyRunningCase( + system_base="multiprocUDPBase", want_error=ValueError("unsupported system base: multiprocUDPBase") + ), ) def test_actor_system_already_running( case: ActorSystemAlreadyRunningCase, @@ -278,7 +294,11 @@ def test_actor_system_already_running( if not case.already_running: mock_socket.connect.side_effect = socket.error - got = actor.actor_system_already_running(ip=case.ip, port=case.port, system_base=case.system_base) + try: + got = actor.actor_system_already_running(ip=case.ip, port=case.port, system_base=case.system_base) + except type(case.want_error) if case.want_error else tuple() as ex: + assert str(ex) == str(case.want_error) + return assert got is case.want if case.want_connect: mock_socket.connect.assert_called_once_with(case.want_connect) From e8008e9505da39136ed9d5297b1416d806022f40 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 17:21:55 +0200 Subject: [PATCH 15/21] It wraps actor_system_already_running to keep the old behavior. --- esrally/rally.py | 14 ++++++++++---- esrally/rallyd.py | 15 +++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/esrally/rally.py b/esrally/rally.py index de54dca82..88ecc3e28 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -962,10 +962,16 @@ def race(cfg: types.Config, kill_running_processes=False): with_actor_system(racecontrol.run, cfg) +def actor_system_already_running() -> bool: + try: + return actor.actor_system_already_running() + except ValueError: + return False + + def with_actor_system(runnable, cfg: types.Config): logger = logging.getLogger(__name__) - already_running: bool | None = actor.actor_system_already_running() - + already_running = actor_system_already_running() process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) if process_startup_method is not None: if process_startup_method not in typing.get_args(actor.ProcessStartupMethod): @@ -974,7 +980,7 @@ def with_actor_system(runnable, cfg: types.Config): logger.info("Actor system already running locally? [%s]", str(already_running)) try: - actors = actor.bootstrap_actor_system(try_join=bool(already_running), prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) # This happens when the admin process could not be started, e.g. because it could not open a socket. @@ -1010,7 +1016,7 @@ def with_actor_system(runnable, cfg: types.Config): actors.shutdown() # note that this check will only evaluate to True for a TCP-based actor system. timeout = 15 - while actor.actor_system_already_running() and timeout > 0: + while actor_system_already_running() and timeout > 0: logger.info("Actor system is still running. Waiting...") time.sleep(1) timeout -= 1 diff --git a/esrally/rallyd.py b/esrally/rallyd.py index fecef88e5..6870bed22 100644 --- a/esrally/rallyd.py +++ b/esrally/rallyd.py @@ -34,8 +34,15 @@ from esrally.utils import console, process +def actor_system_already_running() -> bool: + try: + return actor.actor_system_already_running() + except ValueError: + return False + + def start(args): - if actor.actor_system_already_running(): + if actor_system_already_running(): raise exceptions.RallyError("An actor system appears to be already running.") actor.bootstrap_actor_system(local_ip=args.node_ip, coordinator_ip=args.coordinator_ip) console.info(f"Successfully started actor system on node [{args.node_ip}] with coordinator node IP [{args.coordinator_ip}].") @@ -51,7 +58,7 @@ def start(args): def stop(raise_errors=True): - if actor.actor_system_already_running(): + if actor_system_already_running(): # noinspection PyBroadException try: # TheSpian writes the following warning upon start (at least) on Mac OS X: @@ -65,7 +72,7 @@ def stop(raise_errors=True): running_system.shutdown() # await termination... console.info("Shutting down actor system.", end="", flush=True) - while actor.actor_system_already_running(): + while actor_system_already_running(): console.println(".", end="", flush=True) time.sleep(1) console.println(" [OK]") @@ -80,7 +87,7 @@ def stop(raise_errors=True): def status(): - if actor.actor_system_already_running(): + if actor_system_already_running(): console.println("Running") else: console.println("Stopped") From 3000678beb0682de2fb97e468e3f6e1c8e32897f Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 17:34:51 +0200 Subject: [PATCH 16/21] Fix actor_test.py --- tests/actor_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/actor_test.py b/tests/actor_test.py index 4e7e4814c..978120f89 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -132,7 +132,7 @@ class BootstrapActorSystemCase: coordinator_ip="10.0.0.1", coordinator_port=900, local_ip="10.0.0.2", - want_capabilities={"Convention Address.IPv4": f"{resolve("10.0.0.1")}:900"}, + want_capabilities={"Convention Address.IPv4": f"{resolve('10.0.0.1')}:900"}, want_connect=("10.0.0.1", 900), ), try_join_process_startup_method=BootstrapActorSystemCase( From d3a078defab3ad9115a870512c51b860d5d178e4 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 17:50:31 +0200 Subject: [PATCH 17/21] Mock log.load_configuration in test_actor.py --- tests/actor_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/actor_test.py b/tests/actor_test.py index 978120f89..2879a32a4 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -24,7 +24,7 @@ import pytest import thespian.actors -from esrally import actor, exceptions +from esrally import actor, exceptions, log from esrally.utils import cases, net @@ -61,6 +61,11 @@ def mock_net_resolve(monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr(net, "resolve", mock.create_autospec(net.resolve, side_effect=resolve)) +@pytest.fixture(autouse=True) +def mock_load_configuration(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(log, "load_configuration", lambda: {"some": "config"}) + + @dataclasses.dataclass class BootstrapActorSystemCase: system_base: actor.SystemBase = "multiprocTCPBase" From 09d0f93623887724b0b3aa23dbc131be6abbac02 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Mon, 18 Aug 2025 18:17:03 +0200 Subject: [PATCH 18/21] Update configuration documentation. --- docs/configuration.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/configuration.rst b/docs/configuration.rst index e95fdf45c..daef85d3b 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -10,6 +10,17 @@ Rally stores its configuration in the file ``~/.rally/rally.ini`` which is autom The configuration file can use `${CONFIG_DIR}` to refer to the directory where Rally stores its configuration files. This is useful for configuring Rally in a portable way. This defaults to `~/.rally`, but can be overridden by setting the `RALLY_HOME` environment variable in your shell. + +actor +~~~~~ + +This section allows to configure how thespian actor library is being used from rally. + +* ``actor.process.startup.method``: It allows to configure how `thespian` actors library should specify how + `subprocessing` library should create processes for new actors. This can be used to prevent from using `fork` method + on Linux or OSX with the purpose, for instance, to use threads in some rally component. + + meta ~~~~ From 75b671164b8fc41c795cfe8f4b7e4cf2bffe9bee Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Tue, 19 Aug 2025 16:34:26 +0200 Subject: [PATCH 19/21] Fix/refactor bootstrap_actor_system function. --- esrally/actor.py | 99 +++++++++++++++++++--------------- esrally/rally.py | 32 ++++++----- tests/actor_test.py | 128 ++++++++++++++++++++++++++------------------ 3 files changed, 152 insertions(+), 107 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index c02048735..7b0567b6d 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -26,7 +26,7 @@ import thespian.system.messages.status # type: ignore[import-untyped] from esrally import exceptions, log -from esrally.utils import console, net +from esrally.utils import console LOG = logging.getLogger(__name__) @@ -228,7 +228,7 @@ def actor_system_already_running( LOG.info("Looking for an already running actor system (ip='%s', port=%d)...", ip, port) sock.connect((ip, port)) return True - except Exception as ex: + except OSError as ex: LOG.info("Failed to connect to already running actor system (ip='%s', port=%d): %s", ip, port, ex) return False @@ -237,6 +237,7 @@ def actor_system_already_running( def use_offline_actor_system() -> None: global __SYSTEM_BASE __SYSTEM_BASE = "multiprocQueueBase" + LOG.info("Actor system base set to [%s]", __PROCESS_STARTUP_METHOD) ProcessStartupMethod = typing.Literal[ @@ -252,62 +253,76 @@ def use_offline_actor_system() -> None: def set_startup_method(method: ProcessStartupMethod) -> None: global __PROCESS_STARTUP_METHOD __PROCESS_STARTUP_METHOD = method + LOG.info("Actor process startup method set to [%s]", __PROCESS_STARTUP_METHOD) def bootstrap_actor_system( try_join: bool = False, prefer_local_only: bool = False, local_ip: str | None = None, + admin_port: int | None = None, coordinator_ip: str | None = None, coordinator_port: int | None = None, ) -> thespian.actors.ActorSystem: system_base = __SYSTEM_BASE - process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD capabilities: dict[str, Any] = {} - try: - if try_join: - if system_base == "multiprocTCPBase" and actor_system_already_running( - ip=coordinator_ip, port=coordinator_port, system_base=system_base - ): - if coordinator_ip: - coordinator_ip = net.resolve(coordinator_ip) - coordinator_port = coordinator_port or 1900 - capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}" - LOG.debug("Joining already running actor system with system base [%s].", system_base) - return thespian.actors.ActorSystem(systemBase=system_base, capabilities=capabilities or None) - elif prefer_local_only: - if system_base in ("multiprocTCPBase", "multiprocUDPBase"): + log_defs: Any = None + if try_join and ( + system_base != "multiprocTCPBase" or actor_system_already_running(ip=local_ip, port=admin_port, system_base=system_base) + ): + LOG.info("Try joining already running actor system with system base [%s].", system_base) + else: + # All actor system are coordinator unless another coordinator is known to exist. + capabilities["coordinator"] = True + + if system_base in ("multiprocTCPBase", "multiprocUDPBase"): + if prefer_local_only: + LOG.info("Bootstrapping locally running actor system with system base [%s].", system_base) local_ip = coordinator_ip = "127.0.0.1" - else: - local_ip = coordinator_ip = None - else: - if str(system_base) not in ("multiprocTCPBase", "multiprocUDPBase"): - raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base) - if not coordinator_ip: - raise exceptions.SystemSetupError("coordinator IP is required") - if not local_ip: - raise exceptions.SystemSetupError("local IP is required") - # always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy - - # if we try to join we can only run on the coordinator... + + if local_ip: + local_ip, admin_port = resolve(local_ip, admin_port) + capabilities["ip"] = local_ip + + if admin_port: + capabilities["Admin Port"] = admin_port + + if coordinator_ip: + coordinator_ip, coordinator_port = resolve(coordinator_ip, coordinator_port) + if coordinator_port: + coordinator_port = int(coordinator_port) + if coordinator_port: + coordinator_ip += f":{coordinator_port}" + capabilities["Convention Address.IPv4"] = coordinator_ip + + if coordinator_ip and local_ip and coordinator_ip != local_ip: + capabilities["coordinator"] = False + + process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD if process_startup_method: capabilities["Process Startup Method"] = process_startup_method - if local_ip: - # just needed to determine whether to run benchmarks locally - local_ip = net.resolve(local_ip) - capabilities["ip"] = local_ip - if coordinator_ip: - # Make the coordinator node the convention leader - coordinator_ip = net.resolve(coordinator_ip) - coordinator_port = coordinator_port or 1900 - capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}" - capabilities["coordinator"] = local_ip == coordinator_ip - LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) - return thespian.actors.ActorSystem( + + log_defs = log.load_configuration() + LOG.info("Starting actor system with system base [%s] and capabilities [%s]...", system_base, capabilities) + + try: + actor_system = thespian.actors.ActorSystem( systemBase=system_base, capabilities=capabilities, - logDefs=log.load_configuration(), + logDefs=log_defs, ) except thespian.actors.ActorSystemException: - LOG.exception("Could not initialize internal actor system.") + LOG.exception("Could not initialize actor system with system base [%s] and capabilities [%s].", system_base, capabilities) raise + + LOG.info("Successfully initialized with system base [%s] and capabilities [%s].", system_base, actor_system.capabilities) + return actor_system + + +def resolve(host: str, port: int | None = None, family: int = socket.AF_INET, proto: int = socket.IPPROTO_TCP) -> tuple[str, int | None]: + address_info: tuple[Any, Any, Any, Any, tuple[Any, ...]] + for address_info in socket.getaddrinfo(host, port=port or None, family=family, proto=proto): + address = address_info[4] + if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int): + host, port = address + return host, port or None diff --git a/esrally/rally.py b/esrally/rally.py index 88ecc3e28..4c1c1133a 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -54,6 +54,8 @@ from esrally.tracker import tracker from esrally.utils import console, convert, io, net, opts, process, versions +LOG = logging.getLogger(__name__) + class ExitStatus(Enum): SUCCESSFUL = 1 @@ -970,33 +972,35 @@ def actor_system_already_running() -> bool: def with_actor_system(runnable, cfg: types.Config): - logger = logging.getLogger(__name__) - already_running = actor_system_already_running() process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) if process_startup_method is not None: if process_startup_method not in typing.get_args(actor.ProcessStartupMethod): - raise ValueError("invalid value for 'actor.process.startup.method' option") + valid_options = ", ".join(str(v) for v in typing.get_args(actor.ProcessStartupMethod)) + raise ValueError( + f"Invalid value '{process_startup_method}' for 'actor.process.startup.method' option. Valid values are: {valid_options}" + ) actor.set_startup_method(process_startup_method) - logger.info("Actor system already running locally? [%s]", str(already_running)) + already_running = actor_system_already_running() + LOG.info("Actor system already running locally? [%s]", str(already_running)) try: actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) # This happens when the admin process could not be started, e.g. because it could not open a socket. except thespian.actors.InvalidActorAddress: - logger.info("Falling back to offline actor system.") + LOG.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) except KeyboardInterrupt: raise exceptions.UserInterrupted("User has cancelled the benchmark (detected whilst bootstrapping actor system).") from None except Exception as e: - logger.exception("Could not bootstrap actor system.") + LOG.exception("Could not bootstrap actor system.") if str(e) == "Unable to determine valid external socket address.": console.warn( - "Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=logger + "Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=LOG ) - logger.info("Falling back to offline actor system.") + LOG.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) else: @@ -1012,26 +1016,26 @@ def with_actor_system(runnable, cfg: types.Config): try: # give some time for any outstanding messages to be delivered to the actor system time.sleep(3) - logger.info("Attempting to shutdown internal actor system.") + LOG.info("Attempting to shutdown internal actor system.") actors.shutdown() # note that this check will only evaluate to True for a TCP-based actor system. timeout = 15 while actor_system_already_running() and timeout > 0: - logger.info("Actor system is still running. Waiting...") + LOG.info("Actor system is still running. Waiting...") time.sleep(1) timeout -= 1 if timeout > 0: shutdown_complete = True - logger.info("Shutdown completed.") + LOG.info("Shutdown completed.") else: - logger.warning("Shutdown timed out. Actor system is still running.") + LOG.warning("Shutdown timed out. Actor system is still running.") break except KeyboardInterrupt: times_interrupted += 1 - logger.warning("User interrupted shutdown of internal actor system.") + LOG.warning("User interrupted shutdown of internal actor system.") console.info("Please wait a moment for Rally's internal components to shutdown.") if not shutdown_complete and times_interrupted > 0: - logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted) + LOG.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted) console.println("") console.warn("Terminating now at the risk of leaving child processes behind.") console.println("") diff --git a/tests/actor_test.py b/tests/actor_test.py index 2879a32a4..2499cd7a7 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -24,8 +24,8 @@ import pytest import thespian.actors -from esrally import actor, exceptions, log -from esrally.utils import cases, net +from esrally import actor, log +from esrally.utils import cases @pytest.fixture(autouse=True) @@ -52,13 +52,13 @@ def dummy_actor_system(monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr(thespian.actors, "ActorSystem", DummyActorSystem) -def resolve(ip: str): - return f"{ip}!r" +def resolve(host: str, port: int | None = None): + return f"{host}!r", port or None @pytest.fixture(autouse=True) -def mock_net_resolve(monkeypatch: pytest.MonkeyPatch): - monkeypatch.setattr(net, "resolve", mock.create_autospec(net.resolve, side_effect=resolve)) +def mock_resolve(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(actor, "resolve", mock.create_autospec(actor.resolve, side_effect=resolve)) @pytest.fixture(autouse=True) @@ -75,6 +75,7 @@ class BootstrapActorSystemCase: try_join: bool = False prefer_local_only: bool = False local_ip: str | None = None + admin_port: int | None = None coordinator_ip: str | None = None coordinator_port: int | None = None want_error: tuple[Exception, ...] = tuple() @@ -86,32 +87,64 @@ class BootstrapActorSystemCase: @cases.cases( default=BootstrapActorSystemCase( - want_error=( - exceptions.SystemSetupError("coordinator IP is required"), - exceptions.SystemSetupError("local IP is required"), - ) + want_capabilities={"coordinator": True}, + want_log_defs=True, ), tcp=BootstrapActorSystemCase( system_base="multiprocTCPBase", - want_error=( - exceptions.SystemSetupError("coordinator IP is required"), - exceptions.SystemSetupError("local IP is required"), - ), + want_capabilities={"coordinator": True}, + want_log_defs=True, ), udp=BootstrapActorSystemCase( system_base="multiprocUDPBase", - want_error=( - exceptions.SystemSetupError("coordinator IP is required"), - exceptions.SystemSetupError("local IP is required"), - ), + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_system_base="multiprocUDPBase", + ), + queue=BootstrapActorSystemCase( + system_base="multiprocQueueBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, + want_system_base="multiprocQueueBase", + ), + local_ip=BootstrapActorSystemCase( + local_ip="127.0.0.1", + want_capabilities={"coordinator": True, "ip": "127.0.0.1!r"}, + want_log_defs=True, ), - no_local_ip=BootstrapActorSystemCase(coordinator_ip="127.0.0.1", want_error=(exceptions.SystemSetupError("local IP is required"),)), - no_coordinator_ip=BootstrapActorSystemCase( - local_ip="127.0.0.1", want_error=(exceptions.SystemSetupError("coordinator IP is required"),) + admin_port=BootstrapActorSystemCase( + admin_port=1024, + want_capabilities={"coordinator": True, "Admin Port": 1024}, + want_log_defs=True, + ), + coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": True}, + want_log_defs=True, + ), + coordinator_ip_and_port=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + coordinator_port=2000, + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r:2000", "coordinator": True}, + want_log_defs=True, + ), + local_ip_is_coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + local_ip="192.168.0.1", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": True, "ip": "192.168.0.1!r"}, + want_log_defs=True, + ), + local_ip_is_not_coordinator_ip=BootstrapActorSystemCase( + coordinator_ip="192.168.0.1", + local_ip="192.168.0.2", + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r", "coordinator": False, "ip": "192.168.0.2!r"}, + want_log_defs=True, ), offline=BootstrapActorSystemCase( offline=True, - want_error=(exceptions.SystemSetupError("Rally requires a network-capable system base but got [multiprocQueueBase]."),), + want_system_base="multiprocQueueBase", + want_capabilities={"coordinator": True}, + want_log_defs=True, ), try_join=BootstrapActorSystemCase( try_join=True, @@ -122,8 +155,6 @@ class BootstrapActorSystemCase: try_join_offline=BootstrapActorSystemCase( try_join=True, offline=True, - want_capabilities={"coordinator": True}, - want_log_defs=True, want_system_base="multiprocQueueBase", ), try_join_already_running=BootstrapActorSystemCase( @@ -131,14 +162,18 @@ class BootstrapActorSystemCase: already_running=True, want_connect=("127.0.0.1", 1900), ), - try_join_already_running_coordinator_ip_and_port=BootstrapActorSystemCase( + try_join_offline_already_running=BootstrapActorSystemCase( + try_join=True, + offline=True, + already_running=True, + want_system_base="multiprocQueueBase", + ), + try_join_already_running_with_ip_and_port=BootstrapActorSystemCase( try_join=True, already_running=True, - coordinator_ip="10.0.0.1", - coordinator_port=900, local_ip="10.0.0.2", - want_capabilities={"Convention Address.IPv4": f"{resolve('10.0.0.1')}:900"}, - want_connect=("10.0.0.1", 900), + admin_port=2000, + want_connect=("10.0.0.2", 2000), ), try_join_process_startup_method=BootstrapActorSystemCase( try_join=True, @@ -149,7 +184,7 @@ class BootstrapActorSystemCase: ), prefer_local_only=BootstrapActorSystemCase( prefer_local_only=True, - want_capabilities={"Convention Address.IPv4": f"{resolve('127.0.0.1')}:1900", "coordinator": True, "ip": resolve("127.0.0.1")}, + want_capabilities={"Convention Address.IPv4": "127.0.0.1!r", "coordinator": True, "ip": "127.0.0.1!r"}, want_log_defs=True, ), prefer_local_only_offline=BootstrapActorSystemCase( @@ -163,9 +198,9 @@ class BootstrapActorSystemCase: local_ip="192.168.0.41", coordinator_ip="192.168.0.41", want_capabilities={ - "Convention Address.IPv4": f"{resolve('192.168.0.41')}:1900", + "Convention Address.IPv4": "192.168.0.41!r", "coordinator": True, - "ip": resolve("192.168.0.41"), + "ip": "192.168.0.41!r", }, want_log_defs=True, ), @@ -173,9 +208,9 @@ class BootstrapActorSystemCase: local_ip="192.168.0.42", coordinator_ip="192.168.0.41", want_capabilities={ - "Convention Address.IPv4": f"{resolve('192.168.0.41')}:1900", + "Convention Address.IPv4": "192.168.0.41!r", "coordinator": False, - "ip": resolve("192.168.0.42"), + "ip": "192.168.0.42!r", }, want_log_defs=True, ), @@ -183,42 +218,30 @@ class BootstrapActorSystemCase: coordinator_ip="192.168.0.1", coordinator_port=1234, local_ip="192.168.0.2", - want_capabilities={"Convention Address.IPv4": f"{resolve('192.168.0.1')}:1234", "coordinator": False, "ip": resolve("192.168.0.2")}, + want_capabilities={"Convention Address.IPv4": "192.168.0.1!r:1234", "coordinator": False, "ip": "192.168.0.2!r"}, want_log_defs=True, ), process_startup_method_fork=BootstrapActorSystemCase( process_startup_method="fork", - local_ip="192.168.0.2", - coordinator_ip="192.168.0.1", want_capabilities={ - "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", "Process Startup Method": "fork", - "coordinator": False, - "ip": resolve("192.168.0.2"), + "coordinator": True, }, want_log_defs=True, ), process_startup_method_spawn=BootstrapActorSystemCase( process_startup_method="spawn", - local_ip="192.168.0.2", - coordinator_ip="192.168.0.1", want_capabilities={ - "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", "Process Startup Method": "spawn", - "coordinator": False, - "ip": resolve("192.168.0.2"), + "coordinator": True, }, want_log_defs=True, ), process_startup_method_forkserver=BootstrapActorSystemCase( process_startup_method="forkserver", - local_ip="192.168.0.2", - coordinator_ip="192.168.0.1", want_capabilities={ - "Convention Address.IPv4": f"{resolve('192.168.0.1')}:1900", "Process Startup Method": "forkserver", - "coordinator": False, - "ip": resolve("192.168.0.2"), + "coordinator": True, }, want_log_defs=True, ), @@ -244,6 +267,7 @@ def test_bootstrap_actor_system( try_join=case.try_join, prefer_local_only=case.prefer_local_only, local_ip=case.local_ip, + admin_port=case.admin_port, coordinator_ip=case.coordinator_ip, coordinator_port=case.coordinator_port, ) @@ -252,7 +276,7 @@ def test_bootstrap_actor_system( return assert got is not None assert got.systemBase == case.want_system_base - assert got.capabilities == case.want_capabilities + assert (got.capabilities or {}) == (case.want_capabilities or {}) assert bool(got.logDefs) == case.want_log_defs if case.want_connect: mock_socket.connect.assert_called_once_with(case.want_connect) @@ -304,6 +328,8 @@ def test_actor_system_already_running( except type(case.want_error) if case.want_error else tuple() as ex: assert str(ex) == str(case.want_error) return + else: + assert case.want_error is None assert got is case.want if case.want_connect: mock_socket.connect.assert_called_once_with(case.want_connect) From d3d98a3da8f76cf5c0d9fb4de0c4ab0e2d57cd63 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Tue, 19 Aug 2025 16:44:51 +0200 Subject: [PATCH 20/21] Remove wrappers for actor_system_already_running method. --- esrally/actor.py | 5 +++-- esrally/rally.py | 15 ++++----------- esrally/rallyd.py | 15 ++++----------- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 7b0567b6d..626831297 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -209,7 +209,7 @@ def actor_system_already_running( ip: str | None = None, port: int | None = None, system_base: SystemBase | None = None, -) -> bool: +) -> bool | None: """It determines whether an actor system is already running by opening a socket connection. Notes: @@ -219,7 +219,8 @@ def actor_system_already_running( if system_base is None: system_base = __SYSTEM_BASE if system_base != "multiprocTCPBase": - raise ValueError(f"unsupported system base: {system_base}") + # This system is not supported yet. + return None with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: diff --git a/esrally/rally.py b/esrally/rally.py index 4c1c1133a..5227cd266 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -964,13 +964,6 @@ def race(cfg: types.Config, kill_running_processes=False): with_actor_system(racecontrol.run, cfg) -def actor_system_already_running() -> bool: - try: - return actor.actor_system_already_running() - except ValueError: - return False - - def with_actor_system(runnable, cfg: types.Config): process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False) if process_startup_method is not None: @@ -981,10 +974,10 @@ def with_actor_system(runnable, cfg: types.Config): ) actor.set_startup_method(process_startup_method) - already_running = actor_system_already_running() - LOG.info("Actor system already running locally? [%s]", str(already_running)) + already_running = actor.actor_system_already_running() + LOG.info("Actor system already running locally? [%s]", already_running) try: - actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system(try_join=bool(already_running), prefer_local_only=not already_running) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) # This happens when the admin process could not be started, e.g. because it could not open a socket. @@ -1020,7 +1013,7 @@ def with_actor_system(runnable, cfg: types.Config): actors.shutdown() # note that this check will only evaluate to True for a TCP-based actor system. timeout = 15 - while actor_system_already_running() and timeout > 0: + while actor.actor_system_already_running() and timeout > 0: LOG.info("Actor system is still running. Waiting...") time.sleep(1) timeout -= 1 diff --git a/esrally/rallyd.py b/esrally/rallyd.py index 6870bed22..fecef88e5 100644 --- a/esrally/rallyd.py +++ b/esrally/rallyd.py @@ -34,15 +34,8 @@ from esrally.utils import console, process -def actor_system_already_running() -> bool: - try: - return actor.actor_system_already_running() - except ValueError: - return False - - def start(args): - if actor_system_already_running(): + if actor.actor_system_already_running(): raise exceptions.RallyError("An actor system appears to be already running.") actor.bootstrap_actor_system(local_ip=args.node_ip, coordinator_ip=args.coordinator_ip) console.info(f"Successfully started actor system on node [{args.node_ip}] with coordinator node IP [{args.coordinator_ip}].") @@ -58,7 +51,7 @@ def start(args): def stop(raise_errors=True): - if actor_system_already_running(): + if actor.actor_system_already_running(): # noinspection PyBroadException try: # TheSpian writes the following warning upon start (at least) on Mac OS X: @@ -72,7 +65,7 @@ def stop(raise_errors=True): running_system.shutdown() # await termination... console.info("Shutting down actor system.", end="", flush=True) - while actor_system_already_running(): + while actor.actor_system_already_running(): console.println(".", end="", flush=True) time.sleep(1) console.println(" [OK]") @@ -87,7 +80,7 @@ def stop(raise_errors=True): def status(): - if actor_system_already_running(): + if actor.actor_system_already_running(): console.println("Running") else: console.println("Stopped") From bb605b1dcc008d74794e5a1567c300ac02ed2ca7 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Tue, 19 Aug 2025 16:50:34 +0200 Subject: [PATCH 21/21] Fix actor_test.py --- tests/actor_test.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/actor_test.py b/tests/actor_test.py index 2499cd7a7..cf60494c1 100644 --- a/tests/actor_test.py +++ b/tests/actor_test.py @@ -302,18 +302,14 @@ class ActorSystemAlreadyRunningCase: ip_and_port_already_running=ActorSystemAlreadyRunningCase( ip="10.0.0.1", port=1000, already_running=True, want=True, want_connect=("10.0.0.1", 1000) ), - system_base_simple=ActorSystemAlreadyRunningCase( - system_base="simpleSystemBase", want_error=ValueError("unsupported system base: simpleSystemBase") - ), - system_base_queue=ActorSystemAlreadyRunningCase( - system_base="multiprocQueueBase", want_error=ValueError("unsupported system base: multiprocQueueBase") - ), + system_base_simple=ActorSystemAlreadyRunningCase(system_base="simpleSystemBase"), + system_base_queue=ActorSystemAlreadyRunningCase(system_base="multiprocQueueBase"), system_base_tcp=ActorSystemAlreadyRunningCase(system_base="multiprocTCPBase", want=False, want_connect=("127.0.0.1", 1900)), system_base_tcp_already_running=ActorSystemAlreadyRunningCase( already_running=True, system_base="multiprocTCPBase", want=True, want_connect=("127.0.0.1", 1900) ), system_base_udp=ActorSystemAlreadyRunningCase( - system_base="multiprocUDPBase", want_error=ValueError("unsupported system base: multiprocUDPBase") + system_base="multiprocUDPBase", ), ) def test_actor_system_already_running(