Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
665557f
Revert "Allow configuring the process startup method to be used for c…
fressi-elastic Aug 26, 2025
ba07027
Refactor actor system initialization and retrieval.
fressi-elastic Sep 8, 2025
5a021dd
Implements AsyncActor
fressi-elastic Sep 18, 2025
1771a5d
Revert unintended changes.
fressi-elastic Sep 18, 2025
4767108
Revert unintended changes.
fressi-elastic Sep 18, 2025
b7f7e4e
Revert unintended changes.
fressi-elastic Sep 18, 2025
de24507
Create method for getting Config subclass from context and add a test…
fressi-elastic Sep 18, 2025
41a544a
Merge branch 'master' of github.com:elastic/rally
fressi-elastic Sep 18, 2025
b78f157
Fix issues with configuration context.
fressi-elastic Sep 18, 2025
0931be4
Improve configuration and actor system tests.
fressi-elastic Sep 18, 2025
3fb3f49
Use a port range as default actor admin port for TCP and UDP system b…
fressi-elastic Sep 18, 2025
dd2e114
Revert unrelated changes
fressi-elastic Sep 19, 2025
ef89968
Ignore problems loading logging configuration.
fressi-elastic Sep 19, 2025
9a13e3e
Pass all typing annotations checks in the new esrally.actors package.
fressi-elastic Sep 19, 2025
b594887
Improve request context implementation.
fressi-elastic Sep 19, 2025
127715f
Rename Context as ActorContext.
fressi-elastic Sep 19, 2025
5951451
Move implementation of create method inside of the context implementa…
fressi-elastic Sep 19, 2025
aa4dc82
Add test cases for async actor implementation.
fressi-elastic Sep 22, 2025
50e43ba
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Sep 22, 2025
9cf79a6
Ensures blocking requests can be cancelled.
fressi-elastic Sep 22, 2025
c0db245
It update code style and documentation.
fressi-elastic Sep 22, 2025
7f4e555
Add support for ping request inside the base async actor class.
fressi-elastic Sep 23, 2025
15ddc82
Fixing support for python 3.9
fressi-elastic Sep 23, 2025
30732d2
Remove support for UDP actor system base.
fressi-elastic Sep 23, 2025
67d0632
Some further refactory to reduce code duplication and support task ca…
fressi-elastic Sep 25, 2025
c69486c
Make ActorSystem.ask asynchronousish.
fressi-elastic Sep 26, 2025
4816c0e
Add some documentation in the code (_actor.py)
fressi-elastic Sep 26, 2025
6816e5e
Add some documentation in the code (_config.py)
fressi-elastic Sep 26, 2025
27ee00a
Write a new fixture to run test functions from inside of an AsyncActo…
fressi-elastic Sep 28, 2025
7aabfc0
Remove pending tasks and some other smaller refactory in actor context.
fressi-elastic Sep 29, 2025
4e5dc18
Ensure pending results is clean after request cancellation.
fressi-elastic Sep 29, 2025
3bb4cc6
Just return the wrapped future instead of the task after performing t…
fressi-elastic Sep 29, 2025
482e9ad
Add actor type name and request id to task name if no name is given t…
fressi-elastic Sep 29, 2025
1a1f603
It adds some extra documentation in the code.
fressi-elastic Sep 29, 2025
d6321d1
Add option to allow picking up a random unused port for Thespian admi…
fressi-elastic Sep 29, 2025
0925770
Merge branch 'master' of github.com:elastic/rally
fressi-elastic Sep 29, 2025
d32aee7
Add support for Python3.13 (pass unit tests).
fressi-elastic Sep 29, 2025
dff998d
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
8e4f56f
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
df04aa1
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
91ae14a
Update the uv.lock file.
fressi-elastic Sep 30, 2025
f2965c7
Add entries for Python v3.13 to the CI testing matrix.
fressi-elastic Sep 30, 2025
ec0c82b
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Sep 30, 2025
5d32663
Workaround issue when log definition files is not present.
fressi-elastic Sep 30, 2025
80ed753
Use `None` as failback value for logDefs when logging configuration f…
fressi-elastic Sep 30, 2025
e37d7b9
It drops support for Python 3.9
fressi-elastic Sep 30, 2025
7f4fee3
Fix docs/migrate.rst
fressi-elastic Sep 30, 2025
4ca911d
Remove statement: from __future__ import annotations
fressi-elastic Sep 30, 2025
cb01f3a
Fix esrally/track/params.py and update uv.lock file.
fressi-elastic Sep 30, 2025
001f33b
Revert changes to esrally/storage/_range.py
fressi-elastic Sep 30, 2025
78d294f
Skip testing version 3.12 in buildkite pipeline.
fressi-elastic Sep 30, 2025
4f20116
Fix some other annotation and one pylint founding.
fressi-elastic Sep 30, 2025
11d5159
Merge branch 'python3.13' into actors
fressi-elastic Sep 30, 2025
272b72c
Remove lines: from __future__ import annotations
fressi-elastic Sep 30, 2025
a140c78
Workaround issue blocking when sending external request.
fressi-elastic Oct 1, 2025
f5658ff
It removes 'from __future__ import annotations' statements.
fressi-elastic Oct 2, 2025
9e217ef
Update CI python versions file.
fressi-elastic Oct 2, 2025
23e1314
Remove unnecessary changes.
fressi-elastic Oct 2, 2025
ab11c9b
Update CI python versions.
fressi-elastic Oct 2, 2025
4e06935
Set Python3.13 as the default version.
fressi-elastic Oct 2, 2025
45a9015
Let `make test` and `make it` use python 3.13
fressi-elastic Oct 2, 2025
0e9cc67
Add standard-imghdr because imghdr has been removed from python 3.13
fressi-elastic Oct 2, 2025
48f3da0
Update docker image version.
fressi-elastic Oct 2, 2025
0699feb
Use managed python version for preparing venv directory.
fressi-elastic Oct 2, 2025
8c75c66
Refactor Makefile:
fressi-elastic Oct 2, 2025
073d5f4
Add goal to run IT for serverless.
fressi-elastic Oct 2, 2025
210565c
Add goal for rally_tracks_compat
fressi-elastic Oct 2, 2025
bf47095
Fix PHONY goals.
fressi-elastic Oct 2, 2025
7ff36ad
Update python versions for mypy and black
fressi-elastic Oct 2, 2025
6cbea73
Update .buildkite pipeline files.
fressi-elastic Oct 2, 2025
e3f2857
Fix UV installation step.
fressi-elastic Oct 2, 2025
0678a19
Debug CI scripts
fressi-elastic Oct 2, 2025
6fbff06
Fix Dockerfile and CI scripts.
fressi-elastic Oct 2, 2025
d2ffdb5
Fix VENV path in Dockerfile.
fressi-elastic Oct 2, 2025
a92dbf6
Use levacy VIRTUAL_ENV as name for VENV_DIR
fressi-elastic Oct 2, 2025
5ceb63e
Force Python version when running it_serverless test cases.
fressi-elastic Oct 3, 2025
35a216f
Export PY_VERSION as an environment variable.
fressi-elastic Oct 3, 2025
930ac3a
Update uv version in Docerfile.
fressi-elastic Oct 3, 2025
caea1a3
It drops the check for max python version.
fressi-elastic Oct 4, 2025
9dea73e
Drop dependency on tox/nox.
fressi-elastic Oct 4, 2025
a08fa72
It ensures the rally plugin is installed before using for tests.
fressi-elastic Oct 4, 2025
f408e14
It renames Makefile goal rally_tracks_compat -> it_tracks_compat
fressi-elastic Oct 4, 2025
a7a0e6e
It adds some comment in the Makefile.
fressi-elastic Oct 4, 2025
7f2915f
Merge branch 'python3.13' into actors
fressi-elastic Oct 5, 2025
9b38493
Remove unnecessary requirements and update wheel version.
fressi-elastic Oct 6, 2025
70a88c2
Update Makefile error message.
fressi-elastic Oct 6, 2025
3f943fe
Update add_missing_loggers_to_config to make it a little easier to read.
fressi-elastic Oct 6, 2025
432cda8
Merge branch 'python3.13' into actors
fressi-elastic Oct 6, 2025
86f2357
It gets few fixes back from storage.manager branch.
fressi-elastic Oct 6, 2025
929dc0f
Add an example test to demonstrate the new AsyncActor at work.
fressi-elastic Oct 6, 2025
8b2ba23
Improve example readability.
fressi-elastic Oct 6, 2025
bbb7d9d
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Oct 6, 2025
1953694
Revert changes to net.resolve.
fressi-elastic Oct 6, 2025
5743a2b
Update and test convert.range function.
fressi-elastic Oct 6, 2025
711e211
Relax acceptance levels of test_execute_schedule_throughput_throttled.
fressi-elastic Oct 7, 2025
07ac669
Starting final clean up and some simplifications.
fressi-elastic Oct 9, 2025
2f6d948
Fix actor_test.py.
fressi-elastic Oct 9, 2025
7384a34
Clean 3 left over.
fressi-elastic Oct 9, 2025
a299647
Provide a default configuration for actors process when starting befo…
fressi-elastic Oct 10, 2025
541a407
Add some additional documentation in the code.
fressi-elastic Oct 10, 2025
814e9a3
Add more code documentation and other small cleaning of the code.
fressi-elastic Oct 10, 2025
2bc32cd
Fix system_test.
fressi-elastic Oct 10, 2025
e0be984
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Oct 10, 2025
c62c62e
Make sure actor system is shut down at process termination by default.
fressi-elastic Oct 10, 2025
f83b2a5
Handle actor system shut down in listen_for_result() closure.
fressi-elastic Oct 10, 2025
ccb578f
Ensure actor subprocesses have conviguration details to eventually jo…
fressi-elastic Oct 10, 2025
f47cc37
Fix test stability on Python-3.10
fressi-elastic Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ clean-docs: venv

# It runs unit tests using the default python interpreter version.
test: venv
uv run -- pytest -s $(or $(ARGS), tests/)
uv run -- pytest -s --full-trace $(or $(ARGS), tests/)

# It runs unit tests using all supported python versions.
test-all: test-3.10 test-3.11 test-3.12 test-3.13
Expand Down
41 changes: 41 additions & 0 deletions esrally/actors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from thespian.actors import ( # type: ignore[import-untyped]
Actor,
ActorAddress,
ActorExitRequest,
ActorSystem,
PoisonMessage,
WakeupMessage,
)

from esrally.actors._actor import AsyncActor, get_actor, respond
from esrally.actors._config import ActorConfig
from esrally.actors._context import (
ActorContext,
ActorContextError,
create_actor,
create_task,
get_actor_context,
ping,
request,
send,
shutdown,
wait_for,
)
from esrally.actors._system import SystemBase, get_actor_system, init_actor_system
372 changes: 372 additions & 0 deletions esrally/actors/_actor.py

Large diffs are not rendered by default.

208 changes: 208 additions & 0 deletions esrally/actors/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
from typing import Literal, cast, get_args

from esrally import config
from esrally.utils import convert

# SystemBase is the type of actor system to be created in the application.
SystemBase = Literal[
# "multiprocTCPBase" is recommended in most of the cases.
# Faster and safer as it uses no threads, so it can be used with fork.
"multiprocTCPBase",
# multiprocQueueBase is provided as fallback mode for 'multiprocTCPBase'.
# Because it uses threads for listening the queue it is not recommended using it with 'fork'.
"multiprocQueueBase",
]

# ProcessStartupMethod values are used to specify the way actor processes have to be created.
ProcessStartupMethod = Literal[
# "fork" is the fastest spawning process method. It calls fork function to create each a new actor process.
# It is not recommended using it with "multiprocQueueBase".
"fork",
# "spawn" is much slower than "fork". A process is executed from scratch to create every new actor.
# It is recommended for "multiprocQueueBase" because it could have problems with "fork".
"spawn",
]

DEFAULT_SYSTEM_BASE: SystemBase | None = None
DEFAULT_FALLBACK_SYSTEM_BASE: SystemBase | None = None

DEFAULT_IP: str = "127.0.0.1"
DEFAULT_ADMIN_PORTS: range | None = None
DEFAULT_COORDINATOR_IP: str | None = None
DEFAULT_PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None
DEFAULT_LOOP_INTERVAL: float = 0.01


class ActorConfig(config.Config):
"""Configuration class defining properties to read and set '[actors'] section."""

@property
def system_base(self) -> SystemBase:
"""The actor system base used to initialize Thespian actor system.
"multiprocTCPBase" is recommended on most of the cases.
"multiprocQueueBase" is only provided as fallback method.
"""
value: str | None = self.opts("actors", "actors.system_base", default_value=DEFAULT_SYSTEM_BASE, mandatory=False)
if isinstance(value, str):
value = value.strip()
if value:
if value in get_args(SystemBase):
return cast(SystemBase, value)
raise ValueError(f"Invalid value for 'actors.system_base': '{value}', it must be one of {get_args(SystemBase)} or None.")
return "multiprocTCPBase"

@system_base.setter
def system_base(self, value: SystemBase | None) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.system_base", value)

@property
def fallback_system_base(self) -> SystemBase:
"""The alternative system base used to initialize Thespian actor system.

This value is intended to be used in case it fails initializing with other `system_base` option value.
"""
value = self.opts("actors", "actors.fallback_system_base", default_value=DEFAULT_FALLBACK_SYSTEM_BASE, mandatory=False)
if isinstance(value, str):
value = value.strip()
if value:
if value in get_args(SystemBase):
return cast(SystemBase, value)
raise ValueError(
f"Invalid value for 'actors.fallback_system_base': '{value}', it must be one of {get_args(SystemBase)} or None."
)
if self.system_base == "multiprocQueueBase":
return "multiprocTCPBase"
return "multiprocQueueBase"

@fallback_system_base.setter
def fallback_system_base(self, value: SystemBase | None) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.fallback_system_base", value)

@property
def ip(self) -> str:
"""The local host IP used to open the Thespian administrator service.

It is only used with "multiprocTCPBase" system base.
"""
return self.opts("actors", "actors.ip", default_value=DEFAULT_IP, mandatory=False).strip()

@ip.setter
def ip(self, value: str) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.ip", value.strip())

@property
def admin_ports(self) -> range | None:
"""The range of ports where to try opening one for the Thespian administrator service.

It is only used with "multiprocTCPBase" system base.

In case it is None, a random port will be used. It is only used with "multiprocTCPBase" system base.
In case it is a range, it will try using every port in the range starting from the smallest to the biggest in
the range.

To try joining a running actor system this value should be set to the same port used for starting the target
actor system (i.e. "1900").
"""
value = self.opts("actors", "actors.admin_ports", default_value=DEFAULT_ADMIN_PORTS, mandatory=False)
if isinstance(value, str):
value = value.strip()
if value:
return convert.to_port_range(value)
if value:
if isinstance(value, range):
return value
raise ValueError(f"Invalid value for 'actors.admin_ports' option: {value}")
return None

@admin_ports.setter
def admin_ports(self, value: str | range | None) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.admin_ports", value)

@property
def coordinator_ip(self) -> str | None:
"""The IP address of the host where rally coordinator actors are running.

It is only used with "multiprocTCPBase" system base.

It is passed directly to Thespian as it is. So to specify a port other than the default one you should use
the following string format:

<coordinator_ip>:<coordinator_port>
"""
value = self.opts("actors", "actors.coordinator_ip", default_value=DEFAULT_COORDINATOR_IP, mandatory=False)
if isinstance(value, str):
value = value.strip()
if value:
return value
return None

@coordinator_ip.setter
def coordinator_ip(self, value: str | None) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.coordinator_ip", value)

@property
def process_startup_method(self) -> ProcessStartupMethod | None:
"""The method used to starts actor sub-processes in Rally.

By default, "fork" is being used (which is the fastest and the recommended).
Others methods are being provided to overcome potential race conditions with the use of 'fork' in presence of threads.

It is recommended to use "spawn" with "multiprocQueueBase" because it uses threads,
"""
value = self.opts("actors", "actors.process_startup_method", default_value=DEFAULT_PROCESS_STARTUP_METHOD, mandatory=False)
if isinstance(value, str):
value = value.strip()
if value:
if value in get_args(ProcessStartupMethod):
return cast(ProcessStartupMethod, value)
raise ValueError(f"Invalid process startup method '{value}', must be one of {get_args(ProcessStartupMethod)}")
if self.system_base == "multiprocQueueBase":
# multiprocQueueBase is using threads so fork could create problems.
return "spawn"

if sys.platform == "darwin" and sys.version_info < (3, 12):
# Old versions of Python on OSX have known problems with fork.
return "spawn"

# In general fork is expected to be the most performant tu be used.
return "fork"

@process_startup_method.setter
def process_startup_method(self, value: ProcessStartupMethod | None) -> None:
self.add(config.Scope.applicationOverride, "actors", "actors.process_startup_method", value)

@property
def loop_interval(self) -> float:
"""It specifies the ideal interval of time used to listen for actor messages.

Every actor wait this interval of time (in seconds) before processing the 'asyncio' event loop again.
"""
value = self.opts("actors", "actors.loop_interval", DEFAULT_LOOP_INTERVAL, False)
if isinstance(value, str):
value = value.strip()
if value:
return float(value)
return DEFAULT_LOOP_INTERVAL

@loop_interval.setter
def loop_interval(self, value: float | None) -> None:
if value is None:
value = DEFAULT_LOOP_INTERVAL
self.add(config.Scope.applicationOverride, "actors", "actors.loop_interval", value)
Loading
Loading