Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0a5cb40
Ask Thespian to spawn actors subprocess instead of using fork
fressi-elastic Aug 14, 2025
3c00838
Correct actor initialization procedure.
fressi-elastic Aug 14, 2025
903e4d1
revert changes to rally.py
fressi-elastic Aug 14, 2025
b8a8822
Revert unrelated changes.
fressi-elastic Aug 18, 2025
544b157
Revert unrelated changes.
fressi-elastic Aug 18, 2025
0e23b15
Revert unrelated changes.
fressi-elastic Aug 18, 2025
7dae951
Revert unrelated changes.
fressi-elastic Aug 18, 2025
a9fb89a
Merge branch 'actor' of github.com:fressi-elastic/rally into actor
fressi-elastic Aug 18, 2025
b3dd751
Add test case for actor.bootstrap_actor_system
fressi-elastic Aug 18, 2025
a85135d
Add test case for actor.actor_system_already_running
fressi-elastic Aug 18, 2025
a9db7f3
Test setting process_startup_method when bootstrapping actor system.
fressi-elastic Aug 18, 2025
5286cdf
It allows configuring actor process startup method.
fressi-elastic Aug 18, 2025
42b6bc4
Test behaviour when joining actor system offline.
fressi-elastic Aug 18, 2025
e3103fd
actor_system_already_running returns `None` for unsupported system base.
fressi-elastic Aug 18, 2025
48a11c0
It joins the remote actor system with the requested IP and port.
fressi-elastic Aug 18, 2025
e8008e9
It wraps actor_system_already_running to keep the old behavior.
fressi-elastic Aug 18, 2025
3000678
Fix actor_test.py
fressi-elastic Aug 18, 2025
d3a078d
Mock log.load_configuration in test_actor.py
fressi-elastic Aug 18, 2025
09d0f93
Update configuration documentation.
fressi-elastic Aug 18, 2025
75b6711
Fix/refactor bootstrap_actor_system function.
fressi-elastic Aug 19, 2025
d3d98a3
Remove wrappers for actor_system_already_running method.
fressi-elastic Aug 19, 2025
bb605b1
Fix actor_test.py
fressi-elastic Aug 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ Rally stores its configuration in the file ``~/.rally/rally.ini`` which is autom
The configuration file can use `${CONFIG_DIR}` to refer to the directory where Rally stores its configuration files. This is useful for configuring Rally in a portable way.
This defaults to `~/.rally`, but can be overridden by setting the `RALLY_HOME` environment variable in your shell.


actor
~~~~~

This section allows to configure how thespian actor library is being used from rally.

* ``actor.process.startup.method``: It allows to configure how `thespian` actors library should specify how
`subprocessing` library should create processes for new actors. This can be used to prevent from using `fork` method
on Linux or OSX with the purpose, for instance, to use threads in some rally component.


meta
~~~~

Expand Down
186 changes: 126 additions & 60 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import socket
import traceback
import typing
from typing import Any

import thespian.actors
import thespian.system.messages.status
import thespian.actors # type: ignore[import-untyped]
import thespian.system.messages.status # type: ignore[import-untyped]

from esrally import exceptions, log
from esrally.utils import console, net
from esrally.utils import console

LOG = logging.getLogger(__name__)


class BenchmarkFailure:
Expand Down Expand Up @@ -103,7 +108,7 @@ def guard(self, msg, sender):
class RallyActor(thespian.actors.ActorTypeDispatcher):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.children = []
self.children: list[thespian.actors.ActorAddress] = []
self.received_responses = []
self.status = None
log.post_configure_actor_logging()
Expand Down Expand Up @@ -175,8 +180,9 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat
if self.is_current_status_expected(expected_status):
self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status)
self.status = new_status
for m in filter(None, self.children):
self.send(m, msg)
child: thespian.actors.ActorAddress
for child in filter(None, self.children):
self.send(child, msg)
else:
raise exceptions.RallyAssertionError(
"Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status)
Expand All @@ -193,71 +199,131 @@ def is_current_status_expected(self, expected_status):
return self.status == expected_status


def actor_system_already_running(ip="127.0.0.1"):
"""
Determines whether an actor system is already running by opening a socket connection.
SystemBase = typing.Literal["simpleSystemBase", "multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"]


__SYSTEM_BASE: SystemBase = "multiprocTCPBase"


def actor_system_already_running(
ip: str | None = None,
port: int | None = None,
system_base: SystemBase | None = None,
) -> bool | None:
"""It determines whether an actor system is already running by opening a socket connection.

Note: It may be possible that another system is running on the same port.
Notes:
- It may be possible that another system is running on the same port.
- This is working only when system base is "multiprocTCPBase"
"""
s = socket.socket()
try:
s.connect((ip, 1900))
s.close()
return True
except Exception:
return False
if system_base is None:
system_base = __SYSTEM_BASE
if system_base != "multiprocTCPBase":
# This system is not supported yet.
return None

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
ip = ip or "127.0.0.1"
port = port or 1900
LOG.info("Looking for an already running actor system (ip='%s', port=%d)...", ip, port)
sock.connect((ip, port))
return True
except OSError as ex:
LOG.info("Failed to connect to already running actor system (ip='%s', port=%d): %s", ip, port, ex)

__SYSTEM_BASE = "multiprocTCPBase"
return False


def use_offline_actor_system():
def use_offline_actor_system() -> None:
global __SYSTEM_BASE
__SYSTEM_BASE = "multiprocQueueBase"
LOG.info("Actor system base set to [%s]", __PROCESS_STARTUP_METHOD)


ProcessStartupMethod = typing.Literal[
"fork",
"forkserver",
"spawn",
]

def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None):
logger = logging.getLogger(__name__)

__PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None


def set_startup_method(method: ProcessStartupMethod) -> None:
global __PROCESS_STARTUP_METHOD
__PROCESS_STARTUP_METHOD = method
LOG.info("Actor process startup method set to [%s]", __PROCESS_STARTUP_METHOD)


def bootstrap_actor_system(
try_join: bool = False,
prefer_local_only: bool = False,
local_ip: str | None = None,
admin_port: int | None = None,
coordinator_ip: str | None = None,
coordinator_port: int | None = None,
) -> thespian.actors.ActorSystem:
system_base = __SYSTEM_BASE
capabilities: dict[str, Any] = {}
log_defs: Any = None
if try_join and (
system_base != "multiprocTCPBase" or actor_system_already_running(ip=local_ip, port=admin_port, system_base=system_base)
):
LOG.info("Try joining already running actor system with system base [%s].", system_base)
else:
# All actor system are coordinator unless another coordinator is known to exist.
capabilities["coordinator"] = True

if system_base in ("multiprocTCPBase", "multiprocUDPBase"):
if prefer_local_only:
LOG.info("Bootstrapping locally running actor system with system base [%s].", system_base)
local_ip = coordinator_ip = "127.0.0.1"

if local_ip:
local_ip, admin_port = resolve(local_ip, admin_port)
capabilities["ip"] = local_ip

if admin_port:
capabilities["Admin Port"] = admin_port

if coordinator_ip:
coordinator_ip, coordinator_port = resolve(coordinator_ip, coordinator_port)
if coordinator_port:
coordinator_port = int(coordinator_port)
if coordinator_port:
coordinator_ip += f":{coordinator_port}"
capabilities["Convention Address.IPv4"] = coordinator_ip

if coordinator_ip and local_ip and coordinator_ip != local_ip:
capabilities["coordinator"] = False

process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD
if process_startup_method:
capabilities["Process Startup Method"] = process_startup_method

log_defs = log.load_configuration()
LOG.info("Starting actor system with system base [%s] and capabilities [%s]...", system_base, capabilities)

try:
if try_join:
if actor_system_already_running():
logger.debug("Joining already running actor system with system base [%s].", system_base)
return thespian.actors.ActorSystem(system_base)
else:
logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base)
# if we try to join we can only run on the coordinator...
return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True})
elif prefer_local_only:
coordinator = True
if system_base != "multiprocQueueBase":
coordinator_ip = "127.0.0.1"
local_ip = "127.0.0.1"
else:
coordinator_ip = None
local_ip = None
else:
if system_base not in ("multiprocTCPBase", "multiprocUDPBase"):
raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base)
if not coordinator_ip:
raise exceptions.SystemSetupError("coordinator IP is required")
if not local_ip:
raise exceptions.SystemSetupError("local IP is required")
# always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy
local_ip = net.resolve(local_ip)
coordinator_ip = net.resolve(coordinator_ip)

coordinator = local_ip == coordinator_ip

capabilities = {"coordinator": coordinator}
if local_ip:
# just needed to determine whether to run benchmarks locally
capabilities["ip"] = local_ip
if coordinator_ip:
# Make the coordinator node the convention leader
capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip
logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities)
return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities)
actor_system = thespian.actors.ActorSystem(
systemBase=system_base,
capabilities=capabilities,
logDefs=log_defs,
)
except thespian.actors.ActorSystemException:
logger.exception("Could not initialize internal actor system.")
LOG.exception("Could not initialize actor system with system base [%s] and capabilities [%s].", system_base, capabilities)
raise

LOG.info("Successfully initialized with system base [%s] and capabilities [%s].", system_base, actor_system.capabilities)
return actor_system


def resolve(host: str, port: int | None = None, family: int = socket.AF_INET, proto: int = socket.IPPROTO_TCP) -> tuple[str, int | None]:
address_info: tuple[Any, Any, Any, Any, tuple[Any, ...]]
for address_info in socket.getaddrinfo(host, port=port or None, family=family, proto=proto):
address = address_info[4]
if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int):
host, port = address
return host, port or None
40 changes: 26 additions & 14 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import argparse
import datetime
Expand All @@ -23,6 +24,7 @@
import shutil
import sys
import time
import typing
import uuid
from enum import Enum

Expand Down Expand Up @@ -52,6 +54,8 @@
from esrally.tracker import tracker
from esrally.utils import console, convert, io, net, opts, process, versions

LOG = logging.getLogger(__name__)


class ExitStatus(Enum):
SUCCESSFUL = 1
Expand Down Expand Up @@ -961,62 +965,70 @@ def race(cfg: types.Config, kill_running_processes=False):


def with_actor_system(runnable, cfg: types.Config):
logger = logging.getLogger(__name__)
process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False)
if process_startup_method is not None:
if process_startup_method not in typing.get_args(actor.ProcessStartupMethod):
valid_options = ", ".join(str(v) for v in typing.get_args(actor.ProcessStartupMethod))
raise ValueError(
f"Invalid value '{process_startup_method}' for 'actor.process.startup.method' option. Valid values are: {valid_options}"
)
actor.set_startup_method(process_startup_method)

already_running = actor.actor_system_already_running()
logger.info("Actor system already running locally? [%s]", str(already_running))
LOG.info("Actor system already running locally? [%s]", already_running)
try:
actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running)
actors = actor.bootstrap_actor_system(try_join=bool(already_running), prefer_local_only=not already_running)
# We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1
cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running)
# This happens when the admin process could not be started, e.g. because it could not open a socket.
except thespian.actors.InvalidActorAddress:
logger.info("Falling back to offline actor system.")
LOG.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=True)
except KeyboardInterrupt:
raise exceptions.UserInterrupted("User has cancelled the benchmark (detected whilst bootstrapping actor system).") from None
except Exception as e:
logger.exception("Could not bootstrap actor system.")
LOG.exception("Could not bootstrap actor system.")
if str(e) == "Unable to determine valid external socket address.":
console.warn(
"Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=logger
"Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=LOG
)
logger.info("Falling back to offline actor system.")
LOG.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=True)
else:
raise
try:
runnable(cfg)
finally:
# We only shutdown the actor system if it was not already running before
# We only shut down the actor system if it was not already running before
if not already_running:
shutdown_complete = False
times_interrupted = 0
while not shutdown_complete and times_interrupted < 2:
try:
# give some time for any outstanding messages to be delivered to the actor system
time.sleep(3)
logger.info("Attempting to shutdown internal actor system.")
LOG.info("Attempting to shutdown internal actor system.")
actors.shutdown()
# note that this check will only evaluate to True for a TCP-based actor system.
timeout = 15
while actor.actor_system_already_running() and timeout > 0:
logger.info("Actor system is still running. Waiting...")
LOG.info("Actor system is still running. Waiting...")
time.sleep(1)
timeout -= 1
if timeout > 0:
shutdown_complete = True
logger.info("Shutdown completed.")
LOG.info("Shutdown completed.")
else:
logger.warning("Shutdown timed out. Actor system is still running.")
LOG.warning("Shutdown timed out. Actor system is still running.")
break
except KeyboardInterrupt:
times_interrupted += 1
logger.warning("User interrupted shutdown of internal actor system.")
LOG.warning("User interrupted shutdown of internal actor system.")
console.info("Please wait a moment for Rally's internal components to shutdown.")
if not shutdown_complete and times_interrupted > 0:
logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted)
LOG.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted)
console.println("")
console.warn("Terminating now at the risk of leaving child processes behind.")
console.println("")
Expand Down
2 changes: 2 additions & 0 deletions esrally/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any, Literal, Protocol, TypeVar

Section = Literal[
"actor",
"benchmarks",
"client",
"defaults",
Expand All @@ -42,6 +43,7 @@
"unit-test",
]
Key = Literal[
"actor.process.startup.method",
"add.chart_name",
"add.chart_type",
"add.config.option",
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ files = ["esrally/", "it/", "tests/"]

[[tool.mypy.overrides]]
module = [
"esrally.actor",
"esrally.mechanic.team",
"esrally.storage.*",
"esrally.utils.cases",
Expand Down
Loading