Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0a5cb40
Ask Thespian to spawn actors subprocess instead of using fork
fressi-elastic Aug 14, 2025
3c00838
Correct actor initialization procedure.
fressi-elastic Aug 14, 2025
903e4d1
revert changes to rally.py
fressi-elastic Aug 14, 2025
b8a8822
Revert unrelated changes.
fressi-elastic Aug 18, 2025
544b157
Revert unrelated changes.
fressi-elastic Aug 18, 2025
0e23b15
Revert unrelated changes.
fressi-elastic Aug 18, 2025
7dae951
Revert unrelated changes.
fressi-elastic Aug 18, 2025
a9fb89a
Merge branch 'actor' of github.com:fressi-elastic/rally into actor
fressi-elastic Aug 18, 2025
b3dd751
Add test case for actor.bootstrap_actor_system
fressi-elastic Aug 18, 2025
a85135d
Add test case for actor.actor_system_already_running
fressi-elastic Aug 18, 2025
a9db7f3
Test setting process_startup_method when bootstrapping actor system.
fressi-elastic Aug 18, 2025
5286cdf
It allows configuring actor process startup method.
fressi-elastic Aug 18, 2025
42b6bc4
Test behaviour when joining actor system offline.
fressi-elastic Aug 18, 2025
e3103fd
actor_system_already_running returns `None` for unsupported system base.
fressi-elastic Aug 18, 2025
48a11c0
It joins the remote actor system with the requested IP and port.
fressi-elastic Aug 18, 2025
e8008e9
It wraps actor_system_already_running to keep the old behavior.
fressi-elastic Aug 18, 2025
3000678
Fix actor_test.py
fressi-elastic Aug 18, 2025
d3a078d
Mock log.load_configuration in test_actor.py
fressi-elastic Aug 18, 2025
09d0f93
Update configuration documentation.
fressi-elastic Aug 18, 2025
75b6711
Fix/refactor bootstrap_actor_system function.
fressi-elastic Aug 19, 2025
d3d98a3
Remove wrappers for actor_system_already_running method.
fressi-elastic Aug 19, 2025
bb605b1
Fix actor_test.py
fressi-elastic Aug 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ Rally stores its configuration in the file ``~/.rally/rally.ini`` which is autom
The configuration file can use `${CONFIG_DIR}` to refer to the directory where Rally stores its configuration files. This is useful for configuring Rally in a portable way.
This defaults to `~/.rally`, but can be overridden by setting the `RALLY_HOME` environment variable in your shell.


actor
~~~~~

This section allows to configure how thespian actor library is being used from rally.

* ``actor.process.startup.method``: It allows to configure how `thespian` actors library should specify how
`subprocessing` library should create processes for new actors. This can be used to prevent from using `fork` method
on Linux or OSX with the purpose, for instance, to use threads in some rally component.


meta
~~~~

Expand Down
138 changes: 94 additions & 44 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import socket
import traceback
import typing
from typing import Any

import thespian.actors
import thespian.system.messages.status
import thespian.actors # type: ignore[import-untyped]
import thespian.system.messages.status # type: ignore[import-untyped]

from esrally import exceptions, log
from esrally.utils import console, net

LOG = logging.getLogger(__name__)


class BenchmarkFailure:
"""
Expand Down Expand Up @@ -103,7 +108,7 @@ def guard(self, msg, sender):
class RallyActor(thespian.actors.ActorTypeDispatcher):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.children = []
self.children: list[thespian.actors.ActorAddress] = []
self.received_responses = []
self.status = None
log.post_configure_actor_logging()
Expand Down Expand Up @@ -175,8 +180,9 @@ def send_to_children_and_transition(self, sender, msg, expected_status, new_stat
if self.is_current_status_expected(expected_status):
self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status)
self.status = new_status
for m in filter(None, self.children):
self.send(m, msg)
child: thespian.actors.ActorAddress
for child in filter(None, self.children):
self.send(child, msg)
else:
raise exceptions.RallyAssertionError(
"Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status)
Expand All @@ -193,71 +199,115 @@ def is_current_status_expected(self, expected_status):
return self.status == expected_status


def actor_system_already_running(ip="127.0.0.1"):
"""
Determines whether an actor system is already running by opening a socket connection.
SystemBase = typing.Literal["simpleSystemBase", "multiprocQueueBase", "multiprocTCPBase", "multiprocUDPBase"]


__SYSTEM_BASE: SystemBase = "multiprocTCPBase"


def actor_system_already_running(
ip: str | None = None,
port: int | None = None,
system_base: SystemBase | None = None,
) -> bool:
"""It determines whether an actor system is already running by opening a socket connection.

Note: It may be possible that another system is running on the same port.
Notes:
- It may be possible that another system is running on the same port.
- This is working only when system base is "multiprocTCPBase"
"""
s = socket.socket()
try:
s.connect((ip, 1900))
s.close()
return True
except Exception:
return False
if system_base is None:
system_base = __SYSTEM_BASE
if system_base != "multiprocTCPBase":
raise ValueError(f"unsupported system base: {system_base}")

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
ip = ip or "127.0.0.1"
port = port or 1900
LOG.info("Looking for an already running actor system (ip='%s', port=%d)...", ip, port)
sock.connect((ip, port))
return True
except Exception as ex:
LOG.info("Failed to connect to already running actor system (ip='%s', port=%d): %s", ip, port, ex)

__SYSTEM_BASE = "multiprocTCPBase"
return False


def use_offline_actor_system():
def use_offline_actor_system() -> None:
global __SYSTEM_BASE
__SYSTEM_BASE = "multiprocQueueBase"


def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None):
logger = logging.getLogger(__name__)
ProcessStartupMethod = typing.Literal[
"fork",
"forkserver",
"spawn",
]


__PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None


def set_startup_method(method: ProcessStartupMethod) -> None:
global __PROCESS_STARTUP_METHOD
__PROCESS_STARTUP_METHOD = method


def bootstrap_actor_system(
try_join: bool = False,
prefer_local_only: bool = False,
local_ip: str | None = None,
coordinator_ip: str | None = None,
coordinator_port: int | None = None,
) -> thespian.actors.ActorSystem:
system_base = __SYSTEM_BASE
process_startup_method: ProcessStartupMethod | None = __PROCESS_STARTUP_METHOD
capabilities: dict[str, Any] = {}
try:
if try_join:
if actor_system_already_running():
logger.debug("Joining already running actor system with system base [%s].", system_base)
return thespian.actors.ActorSystem(system_base)
else:
logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base)
# if we try to join we can only run on the coordinator...
return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True})
if system_base == "multiprocTCPBase" and actor_system_already_running(
ip=coordinator_ip, port=coordinator_port, system_base=system_base
):
if coordinator_ip:
coordinator_ip = net.resolve(coordinator_ip)
coordinator_port = coordinator_port or 1900
capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}"
LOG.debug("Joining already running actor system with system base [%s].", system_base)
return thespian.actors.ActorSystem(systemBase=system_base, capabilities=capabilities or None)
elif prefer_local_only:
coordinator = True
if system_base != "multiprocQueueBase":
coordinator_ip = "127.0.0.1"
local_ip = "127.0.0.1"
if system_base in ("multiprocTCPBase", "multiprocUDPBase"):
local_ip = coordinator_ip = "127.0.0.1"
else:
coordinator_ip = None
local_ip = None
local_ip = coordinator_ip = None
else:
if system_base not in ("multiprocTCPBase", "multiprocUDPBase"):
if str(system_base) not in ("multiprocTCPBase", "multiprocUDPBase"):
raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base)
if not coordinator_ip:
raise exceptions.SystemSetupError("coordinator IP is required")
if not local_ip:
raise exceptions.SystemSetupError("local IP is required")
# always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy
local_ip = net.resolve(local_ip)
coordinator_ip = net.resolve(coordinator_ip)

coordinator = local_ip == coordinator_ip
# always resolve the public IP here, even if a DNS name is given, otherwise Thespian will be unhappy

capabilities = {"coordinator": coordinator}
# if we try to join we can only run on the coordinator...
if process_startup_method:
capabilities["Process Startup Method"] = process_startup_method
if local_ip:
# just needed to determine whether to run benchmarks locally
local_ip = net.resolve(local_ip)
capabilities["ip"] = local_ip
if coordinator_ip:
# Make the coordinator node the convention leader
capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip
logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities)
return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities)
coordinator_ip = net.resolve(coordinator_ip)
coordinator_port = coordinator_port or 1900
capabilities["Convention Address.IPv4"] = f"{coordinator_ip}:{coordinator_port}"
capabilities["coordinator"] = local_ip == coordinator_ip
LOG.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities)
return thespian.actors.ActorSystem(
systemBase=system_base,
capabilities=capabilities,
logDefs=log.load_configuration(),
)
except thespian.actors.ActorSystemException:
logger.exception("Could not initialize internal actor system.")
LOG.exception("Could not initialize internal actor system.")
raise
21 changes: 18 additions & 3 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import argparse
import datetime
Expand All @@ -23,6 +24,7 @@
import shutil
import sys
import time
import typing
import uuid
from enum import Enum

Expand Down Expand Up @@ -960,9 +962,22 @@ def race(cfg: types.Config, kill_running_processes=False):
with_actor_system(racecontrol.run, cfg)


def actor_system_already_running() -> bool:
try:
return actor.actor_system_already_running()
except ValueError:
return False


def with_actor_system(runnable, cfg: types.Config):
logger = logging.getLogger(__name__)
already_running = actor.actor_system_already_running()
already_running = actor_system_already_running()
process_startup_method: actor.ProcessStartupMethod | None = cfg.opts("actor", "actor.process.startup.method", None, mandatory=False)
if process_startup_method is not None:
if process_startup_method not in typing.get_args(actor.ProcessStartupMethod):
raise ValueError("invalid value for 'actor.process.startup.method' option")
actor.set_startup_method(process_startup_method)

logger.info("Actor system already running locally? [%s]", str(already_running))
try:
actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running)
Expand All @@ -989,7 +1004,7 @@ def with_actor_system(runnable, cfg: types.Config):
try:
runnable(cfg)
finally:
# We only shutdown the actor system if it was not already running before
# We only shut down the actor system if it was not already running before
if not already_running:
shutdown_complete = False
times_interrupted = 0
Expand All @@ -1001,7 +1016,7 @@ def with_actor_system(runnable, cfg: types.Config):
actors.shutdown()
# note that this check will only evaluate to True for a TCP-based actor system.
timeout = 15
while actor.actor_system_already_running() and timeout > 0:
while actor_system_already_running() and timeout > 0:
logger.info("Actor system is still running. Waiting...")
time.sleep(1)
timeout -= 1
Expand Down
15 changes: 11 additions & 4 deletions esrally/rallyd.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@
from esrally.utils import console, process


def actor_system_already_running() -> bool:
try:
return actor.actor_system_already_running()
except ValueError:
return False


def start(args):
if actor.actor_system_already_running():
if actor_system_already_running():
raise exceptions.RallyError("An actor system appears to be already running.")
actor.bootstrap_actor_system(local_ip=args.node_ip, coordinator_ip=args.coordinator_ip)
console.info(f"Successfully started actor system on node [{args.node_ip}] with coordinator node IP [{args.coordinator_ip}].")
Expand All @@ -51,7 +58,7 @@ def start(args):


def stop(raise_errors=True):
if actor.actor_system_already_running():
if actor_system_already_running():
# noinspection PyBroadException
try:
# TheSpian writes the following warning upon start (at least) on Mac OS X:
Expand All @@ -65,7 +72,7 @@ def stop(raise_errors=True):
running_system.shutdown()
# await termination...
console.info("Shutting down actor system.", end="", flush=True)
while actor.actor_system_already_running():
while actor_system_already_running():
console.println(".", end="", flush=True)
time.sleep(1)
console.println(" [OK]")
Expand All @@ -80,7 +87,7 @@ def stop(raise_errors=True):


def status():
if actor.actor_system_already_running():
if actor_system_already_running():
console.println("Running")
else:
console.println("Stopped")
Expand Down
2 changes: 2 additions & 0 deletions esrally/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any, Literal, Protocol, TypeVar

Section = Literal[
"actor",
"benchmarks",
"client",
"defaults",
Expand All @@ -42,6 +43,7 @@
"unit-test",
]
Key = Literal[
"actor.process.startup.method",
"add.chart_name",
"add.chart_type",
"add.config.option",
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ files = ["esrally/", "it/", "tests/"]

[[tool.mypy.overrides]]
module = [
"esrally.actor",
"esrally.mechanic.team",
"esrally.storage.*",
"esrally.utils.cases",
Expand Down
Loading