diff --git a/Makefile b/Makefile index a1c88729f..000956060 100644 --- a/Makefile +++ b/Makefile @@ -172,7 +172,7 @@ clean-docs: venv # It runs unit tests using the default python interpreter version. test: venv - uv run -- pytest -s $(or $(ARGS), tests/) + uv run -- pytest -s --full-trace $(or $(ARGS), tests/) # It runs unit tests using all supported python versions. test-all: test-3.10 test-3.11 test-3.12 test-3.13 diff --git a/esrally/actors/__init__.py b/esrally/actors/__init__.py new file mode 100644 index 000000000..190ab0049 --- /dev/null +++ b/esrally/actors/__init__.py @@ -0,0 +1,41 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from thespian.actors import ( # type: ignore[import-untyped] + Actor, + ActorAddress, + ActorExitRequest, + ActorSystem, + PoisonMessage, + WakeupMessage, +) + +from esrally.actors._actor import AsyncActor, get_actor, respond +from esrally.actors._config import ActorConfig +from esrally.actors._context import ( + ActorContext, + ActorContextError, + create_actor, + create_task, + get_actor_context, + ping, + request, + send, + shutdown, + wait_for, +) +from esrally.actors._system import SystemBase, get_actor_system, init_actor_system diff --git a/esrally/actors/_actor.py b/esrally/actors/_actor.py new file mode 100644 index 000000000..c8f293ed0 --- /dev/null +++ b/esrally/actors/_actor.py @@ -0,0 +1,372 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import collections +import dataclasses +import inspect +import logging +import uuid +from collections.abc import Awaitable, Coroutine +from typing import Any, TypeVar + +from thespian import actors # type: ignore[import-untyped] + +from esrally import config, exceptions +from esrally.actors._config import ActorConfig +from esrally.actors._context import ( + ActorContext, + ActorContextError, + enter_actor_context, + get_actor_context, + set_actor_context, +) +from esrally.actors._proto import ( + ActorInitRequest, + CancelledResponse, + CancelRequest, + MessageRequest, + PingRequest, + PongResponse, + Request, + Response, + RunningTaskResponse, +) + +LOG = logging.getLogger(__name__) + + +def get_actor() -> actors.Actor: + """It returns the actor where the actual message is being received.""" + return get_actor_context().actor + + +def get_actor_request_context() -> "ActorRequestContext": + """It retrieve details about the context where the actual message is being received.""" + ctx = get_actor_context() + if not isinstance(ctx, ActorRequestContext): + raise ActorContextError(f"Context is not a AsyncActorContext: {ctx!r}") + return ctx + + +def respond(status: Any = None, error: Exception | None = None) -> None: + """It sends a response message to the sender actor.""" + get_actor_request_context().respond(status=status, error=error) + + +R = TypeVar("R") + + +@dataclasses.dataclass +class ActorRequestContext(ActorContext): + """Actor context being used while some received messages is being processed by an actor.""" + + # sender represents the address of the actor that sent current request message. + sender: actors.ActorAddress | None = None + + # req_id represents the unique request ID carried by current request message. + req_id: str = "" + + # pending_tasks contains sets of asyncio tasks (indexed by req_id) to be cancelled in case of a CancelRequest is + # received. + pending_tasks: dict[str, set[asyncio.Task]] = dataclasses.field(default_factory=lambda: collections.defaultdict(set)) + + # responded will be true after current request message has been responded. The purpose of this flag is avoiding + # returning multiple responses to the same 'req_id'. + responded: bool = False + + @property + def actor(self) -> "AsyncActor": + """actor property returns the local actor where the current messages are being received.""" + assert isinstance(self.handler, AsyncActor) + return self.handler + + @property + def details(self) -> dict[str, Any]: + return {"sender": self.sender, "address": self.actor.myAddress, "req_id": self.req_id, **super().details} + + def create_task(self, coro: Coroutine[None, None, R], *, name: str | None = None) -> asyncio.Task[R]: + """create_task is a wrapper around asyncio.AbstractEventLoop.create_task + + While processing a request message will register task for cancellation in case a CancelRequest message + is received. + + Please note that while processing a request from inside an actor, all tasks created by calling this method will + be cancelled in case of a CancelRequest message. This could also include requests that have been forwarded to + other actors. To prevent this to happen, please use asyncio.create_task function instead. + + :param coro: The coroutine to wrap. + :param name: The name of the task. + :return: The wrapper task. + """ + task = super().create_task(coro, name=name) + if self.req_id: + + def remove_task(f: asyncio.Task) -> None: + tasks = self.pending_tasks.get(self.req_id) + if tasks is not None: + tasks.discard(f) + if not tasks: + del self.pending_tasks[self.req_id] + + task.add_done_callback(remove_task) + + self.pending_tasks[self.req_id].add(task) + self.send(self.sender, RunningTaskResponse(req_id=self.req_id, name=task.get_name())) + return task + + def receive_message(self, message: Any, sender: actors.ActorAddress) -> bool: + """receive_message is called by the actor when receiving a new message.""" + # It processes responses. + if super().receive_message(message, sender): + return True + + assert get_actor_context() is self, "Actor context not registered." + assert not (self.req_id or self.sender or self.responded), "Actor context already used." + + self.sender = sender # Destination address for `actors.respond` function. + try: + if isinstance(message, Request): + # It handles the request message. + response = self.receive_request(message) + else: + # It dispatches the message back to the actor. + response = self.dispatch_message(message) + + # This allows AsyncActors to answer even if request message was sent without calling request method. + # This is intended for AsyncActor to be able to answer requests from non async actors too. + self.respond(response) + except Exception as error: + # In case req_id is not set, it expects Thespian to send back a PoisonMessage as a response. + self.respond(error=error) + return True + + def receive_request(self, request: Request) -> Any: + """It processes a request message.""" + # This will be later used from respond method for creating a response. + self.req_id = request.req_id + + if isinstance(request, CancelRequest): + # It cancels all tasks of a previous request. + self.cancel_request(request.message) + # It sends confirmation back. + return CancelledResponse(req_id=self.req_id, status=request.message) + + if isinstance(request, PingRequest): + # It processes a ping request. + if request.destination in [None, self.actor.myAddress]: + # It responds to the ping request. + return PongResponse(req_id=request.req_id, status=request.message) + # It forwards the ping request to the destination actor. + return self.request(request.destination, request) + + if isinstance(request, ActorInitRequest): + # It receives configuration after actor creation. + self.cfg = request.cfg + response = self.dispatch_message(request.cfg) + if response is not None: + raise TypeError(f"Unexpected response from actor configuration handler: {response!r}, want None") + + if isinstance(request, MessageRequest): + # It dispatches the inner request message (if any). + return self.dispatch_message(request.message) + + return None + + def dispatch_message(self, message: Any) -> Any: + """It dispatches the message back to the actor.""" + if message is None: + return None + return actors.ActorTypeDispatcher.receiveMessage(self.actor, message, self.sender) + + def respond(self, status: Any = None, error: Exception | None = None) -> None: + """It sends a response message to the sender actor.""" + # It ensures a request gets responded only once in the scope of this context. + if self.responded: + if error is not None: + # The error will eventually reach requester in the form of a PoisonMessage. + raise error + if status is not None: + raise ActorContextError("Already responded.") + + if error is None and inspect.isawaitable(status): + # It schedules a task to wait for the actual status before responding. + # Please note that the final status could be another awaitable that would + # make it spawning another task to wait for it. This should ensure there + # should always be a response for every request which processing come to + # an end. All tasks will be cancelled in case of a CancelRequest message is received + # or once this task finishes running. + self.create_task(self.respond_later(status), name=f"respond_later({status!r})") + return + + if self.req_id: + # The status or the error will eventually reach requester in the form of a Response message, + # so that the requester can match the target future by using its req_id. + response = Response.from_status(self.req_id, status, error) + elif error is None: + # The status will eventually reach requester in the form of a standalone message without any Response + # message envelope. + response = status + else: + # The error will eventually reach requester in the form of a PoisonMessage. + raise error + + if response is not None: + # It finally sends a response. + self.send(self.sender, response) + LOG.debug("Sent response to actor %s: %r", self.sender, response) + + # Reaching this point it mean there is nothing more to respond to the request and all pending tasks can be + # cancelled. + self.responded = True + self.cancel_request() + + def cancel_request(self, message: Any = None) -> None: + """It cancels all pending tasks of the current request.""" + for t in self.pending_tasks.pop(self.req_id, []): + if not t.done(): + t.cancel(message) + + async def respond_later(self, status: Awaitable) -> None: + """respond_later awaits for a response to get ready. + + It makes sures a response is always sent to the request sender, even if the awaited task is cancelled. + """ + try: + self.respond(status=await status) + except Exception as error: + self.respond(error=error) + except asyncio.CancelledError as error: + self.respond(CancelledResponse(status=str(error), req_id=self.req_id)) + raise + + +class AsyncActor(actors.ActorTypeDispatcher): + """Override the thespian ActorTypeDispatcher with some few additional features. + + Additional features include: + - It uses its own `asyncio` event loop to run asynchronous tasks (co-routines). The loop is set as current during + messages processing. + - It periodically run pending tasks from its event loop. + - The methods processing a message type can be async coroutines, on which case they will be scheduled for execution + as an async task of the actor event loop. While messages are being processed by these co-routines, other messages + and loop events can be processed from the actor, making the actor truly asynchronous. + - It implements `request` method, an awaitable version of `Actor.send` method. It sends a message with a unique + request ID, and until a response with the same ID is received, from the target actor, other messages and loop + events are being processed from the actor. + - When receiving an ActorConfig message it inits context configuration with it. + - When receiving a PoisonErrorMessage as response of `request` method, it translates it to a PoisonError and raises + it as a possible outcome of waiting for a response. + - When receiving an ActorExitRequest, it cancels all pending tasks from the loop, then stops the loop, so that + request senders should receive CancelledError while waiting for a response. + - It creates its own logger. + """ + + def __init__(self) -> None: + super().__init__() + self._pending_results: dict[str, asyncio.Future] = {} + self._pending_tasks: dict[str, set[asyncio.Task]] = collections.defaultdict(set) + self._pending_task_timer_id: str = "" + self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() or asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + logging.setLogRecordFactory(ActorLogRecord) + # A default configuration is required when using "spawn" process creation method for parts that requires it + # (like ActorRequestContext). When using "fork" the parent process already set one for us. + try: + config.get_config() + except exceptions.ConfigError: + config.init_config(ActorConfig()) + + def __str__(self) -> str: + return f"{self.__class__.__name__}@{self.myAddress}" + + def receiveMessage(self, message: Any, sender: actors.ActorAddress) -> Any: + """It receives actor messages inside a fresh new ActorRequestContext which will process it before dispatching. + + :param message: + :param sender: + :return: + """ + ctx = ActorRequestContext(handler=self, pending_results=self._pending_results, loop=self._loop, pending_tasks=self._pending_tasks) + with enter_actor_context(ctx): + ctx.receive_message(message, sender) + + def receiveUnrecognizedMessage(self, message: Any, sender: actors.ActorAddress) -> None: + """This will eventually let know sender actor his message reached the wrong destination.""" + raise TypeError(f"Received unrecognized message: {message}") + + def receiveMsg_ActorConfig(self, cfg: ActorConfig, sender: actors.ActorAddress) -> None: + """It receives configuration from an actor initialization message. + + It registers the configuration for the current process. + """ + LOG.debug("Received configuration message: %s", cfg) + config.init_config(cfg, force=True) + + # It starts the pending task timer. + self._pending_task_timer_id = f"pending_tasks_timer:{uuid.uuid4()}" + self.wakeupAfter(cfg.loop_interval, self._pending_task_timer_id) + + def receiveMsg_WakeupMessage(self, message: actors.WakeupMessage, sender: actors.ActorAddress) -> None: + """It executes pending tasks on a scheduled time period.""" + if message.payload is None: + return None + if isinstance(message.payload, str) and message.payload.startswith("pending_tasks_timer:"): + if message.payload == self._pending_task_timer_id: + try: + self._loop.run_until_complete(nop()) + finally: + self.wakeupAfter(message.delayPeriod, self._pending_task_timer_id) + return None + # It dispatches payload message as a standalone message. + self.receiveMessage(message.payload, sender) + + def receiveMsg_ActorExitRequest(self, request: actors.ActorExitRequest, sender: actors.ActorAddress) -> None: + """It cancels all pending tasks in the event loop, then stops the loop and finally unlink the current context.""" + LOG.debug("Received ActorExitRequest message.") + + # It stops pending task timer. + self._pending_task_timer_id = "" + + # It cancels every request pending task one by one. + while self._pending_tasks: + _, tasks = self._pending_tasks.popitem() + for task in tasks: + task.cancel("Actor exit request.") + + # It stops the event loop. + if self._loop is not None: + self._loop.stop() + asyncio.set_event_loop(None) + + # It cleans up the actor context. + logging.setLogRecordFactory(logging.LogRecord) + set_actor_context(None) + + +async def nop() -> None: ... + + +class ActorLogRecord(logging.LogRecord): + """Logging record with actor context name extra field.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.actor_context: dict[str, Any] = {} + try: + self.actor_context.update(get_actor_context().details) + except ActorContextError: + pass diff --git a/esrally/actors/_config.py b/esrally/actors/_config.py new file mode 100644 index 000000000..9cb30f7bb --- /dev/null +++ b/esrally/actors/_config.py @@ -0,0 +1,208 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import sys +from typing import Literal, cast, get_args + +from esrally import config +from esrally.utils import convert + +# SystemBase is the type of actor system to be created in the application. +SystemBase = Literal[ + # "multiprocTCPBase" is recommended in most of the cases. + # Faster and safer as it uses no threads, so it can be used with fork. + "multiprocTCPBase", + # multiprocQueueBase is provided as fallback mode for 'multiprocTCPBase'. + # Because it uses threads for listening the queue it is not recommended using it with 'fork'. + "multiprocQueueBase", +] + +# ProcessStartupMethod values are used to specify the way actor processes have to be created. +ProcessStartupMethod = Literal[ + # "fork" is the fastest spawning process method. It calls fork function to create each a new actor process. + # It is not recommended using it with "multiprocQueueBase". + "fork", + # "spawn" is much slower than "fork". A process is executed from scratch to create every new actor. + # It is recommended for "multiprocQueueBase" because it could have problems with "fork". + "spawn", +] + +DEFAULT_SYSTEM_BASE: SystemBase | None = None +DEFAULT_FALLBACK_SYSTEM_BASE: SystemBase | None = None + +DEFAULT_IP: str = "127.0.0.1" +DEFAULT_ADMIN_PORTS: range | None = None +DEFAULT_COORDINATOR_IP: str | None = None +DEFAULT_PROCESS_STARTUP_METHOD: ProcessStartupMethod | None = None +DEFAULT_LOOP_INTERVAL: float = 0.01 + + +class ActorConfig(config.Config): + """Configuration class defining properties to read and set '[actors'] section.""" + + @property + def system_base(self) -> SystemBase: + """The actor system base used to initialize Thespian actor system. + "multiprocTCPBase" is recommended on most of the cases. + "multiprocQueueBase" is only provided as fallback method. + """ + value: str | None = self.opts("actors", "actors.system_base", default_value=DEFAULT_SYSTEM_BASE, mandatory=False) + if isinstance(value, str): + value = value.strip() + if value: + if value in get_args(SystemBase): + return cast(SystemBase, value) + raise ValueError(f"Invalid value for 'actors.system_base': '{value}', it must be one of {get_args(SystemBase)} or None.") + return "multiprocTCPBase" + + @system_base.setter + def system_base(self, value: SystemBase | None) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.system_base", value) + + @property + def fallback_system_base(self) -> SystemBase: + """The alternative system base used to initialize Thespian actor system. + + This value is intended to be used in case it fails initializing with other `system_base` option value. + """ + value = self.opts("actors", "actors.fallback_system_base", default_value=DEFAULT_FALLBACK_SYSTEM_BASE, mandatory=False) + if isinstance(value, str): + value = value.strip() + if value: + if value in get_args(SystemBase): + return cast(SystemBase, value) + raise ValueError( + f"Invalid value for 'actors.fallback_system_base': '{value}', it must be one of {get_args(SystemBase)} or None." + ) + if self.system_base == "multiprocQueueBase": + return "multiprocTCPBase" + return "multiprocQueueBase" + + @fallback_system_base.setter + def fallback_system_base(self, value: SystemBase | None) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.fallback_system_base", value) + + @property + def ip(self) -> str: + """The local host IP used to open the Thespian administrator service. + + It is only used with "multiprocTCPBase" system base. + """ + return self.opts("actors", "actors.ip", default_value=DEFAULT_IP, mandatory=False).strip() + + @ip.setter + def ip(self, value: str) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.ip", value.strip()) + + @property + def admin_ports(self) -> range | None: + """The range of ports where to try opening one for the Thespian administrator service. + + It is only used with "multiprocTCPBase" system base. + + In case it is None, a random port will be used. It is only used with "multiprocTCPBase" system base. + In case it is a range, it will try using every port in the range starting from the smallest to the biggest in + the range. + + To try joining a running actor system this value should be set to the same port used for starting the target + actor system (i.e. "1900"). + """ + value = self.opts("actors", "actors.admin_ports", default_value=DEFAULT_ADMIN_PORTS, mandatory=False) + if isinstance(value, str): + value = value.strip() + if value: + return convert.to_port_range(value) + if value: + if isinstance(value, range): + return value + raise ValueError(f"Invalid value for 'actors.admin_ports' option: {value}") + return None + + @admin_ports.setter + def admin_ports(self, value: str | range | None) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.admin_ports", value) + + @property + def coordinator_ip(self) -> str | None: + """The IP address of the host where rally coordinator actors are running. + + It is only used with "multiprocTCPBase" system base. + + It is passed directly to Thespian as it is. So to specify a port other than the default one you should use + the following string format: + + : + """ + value = self.opts("actors", "actors.coordinator_ip", default_value=DEFAULT_COORDINATOR_IP, mandatory=False) + if isinstance(value, str): + value = value.strip() + if value: + return value + return None + + @coordinator_ip.setter + def coordinator_ip(self, value: str | None) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.coordinator_ip", value) + + @property + def process_startup_method(self) -> ProcessStartupMethod | None: + """The method used to starts actor sub-processes in Rally. + + By default, "fork" is being used (which is the fastest and the recommended). + Others methods are being provided to overcome potential race conditions with the use of 'fork' in presence of threads. + + It is recommended to use "spawn" with "multiprocQueueBase" because it uses threads, + """ + value = self.opts("actors", "actors.process_startup_method", default_value=DEFAULT_PROCESS_STARTUP_METHOD, mandatory=False) + if isinstance(value, str): + value = value.strip() + if value: + if value in get_args(ProcessStartupMethod): + return cast(ProcessStartupMethod, value) + raise ValueError(f"Invalid process startup method '{value}', must be one of {get_args(ProcessStartupMethod)}") + if self.system_base == "multiprocQueueBase": + # multiprocQueueBase is using threads so fork could create problems. + return "spawn" + + if sys.platform == "darwin" and sys.version_info < (3, 12): + # Old versions of Python on OSX have known problems with fork. + return "spawn" + + # In general fork is expected to be the most performant tu be used. + return "fork" + + @process_startup_method.setter + def process_startup_method(self, value: ProcessStartupMethod | None) -> None: + self.add(config.Scope.applicationOverride, "actors", "actors.process_startup_method", value) + + @property + def loop_interval(self) -> float: + """It specifies the ideal interval of time used to listen for actor messages. + + Every actor wait this interval of time (in seconds) before processing the 'asyncio' event loop again. + """ + value = self.opts("actors", "actors.loop_interval", DEFAULT_LOOP_INTERVAL, False) + if isinstance(value, str): + value = value.strip() + if value: + return float(value) + return DEFAULT_LOOP_INTERVAL + + @loop_interval.setter + def loop_interval(self, value: float | None) -> None: + if value is None: + value = DEFAULT_LOOP_INTERVAL + self.add(config.Scope.applicationOverride, "actors", "actors.loop_interval", value) diff --git a/esrally/actors/_context.py b/esrally/actors/_context.py new file mode 100644 index 000000000..1ac1efe79 --- /dev/null +++ b/esrally/actors/_context.py @@ -0,0 +1,382 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import contextlib +import contextvars +import dataclasses +import logging +import uuid +from collections.abc import Coroutine, Generator +from typing import Any, Optional, TypeVar + +from thespian import actors # type: ignore[import-untyped] + +from esrally import types +from esrally.actors._config import ActorConfig +from esrally.actors._proto import ( + ActorInitRequest, + CancelRequest, + DoneResponse, + PingRequest, + PoisonError, + Request, + Response, + RunningTaskResponse, +) + +LOG = logging.getLogger(__name__) + + +CONTEXT = contextvars.ContextVar[Optional["ActorContext"]]("actors.context", default=None) + + +async def create_actor(cls: type[actors.Actor], *, requirements: dict[str, Any] | None = None, **kwargs: Any) -> actors.ActorAddress: + return await get_actor_context().create_actor(cls=cls, requirements=requirements, **kwargs) + + +R = TypeVar("R") + + +def create_task(coro: Coroutine[None, None, R], *, name: str | None = None) -> asyncio.Task[R]: + """It creates a task and registers it for cancellation with the current request. + + :param coro: the task coroutine + :param name: the task name + :return: created task. + """ + return get_actor_context().create_task(coro, name=name) + + +def send(destination: actors.ActorAddress, message: Any) -> None: + get_actor_context().send(destination, message) + + +def request(destination: actors.ActorAddress, message: Any, *, timeout: float | None = None) -> asyncio.Future[Any]: + return get_actor_context().request(destination, message, timeout=timeout) + + +def ping(destination: actors.ActorAddress, *, message: Any = None, timeout: float | None = None) -> asyncio.Future[Any]: + return request(destination, PingRequest(message=message), timeout=timeout) + + +def shutdown() -> None: + try: + ctx = get_actor_context() + except ActorContextError: + return + set_actor_context(None) + ctx.shutdown() + + +class ActorContextError(RuntimeError): + pass + + +def get_actor_context() -> "ActorContext": + ctx = CONTEXT.get() + if not ctx: + raise ActorContextError("No actor context set.") + return ctx + + +def set_actor_context(ctx: Optional["ActorContext"]) -> None: + CONTEXT.set(ctx) + + +C = TypeVar("C", bound="ActorContext") + + +@contextlib.contextmanager +def enter_actor_context(ctx: C) -> Generator[C]: + token = CONTEXT.set(ctx) + try: + yield ctx + finally: + CONTEXT.reset(token) + + +@dataclasses.dataclass +class ActorContext: + """Base class that implements context oriented global methods wrapping an actor or an actor system. + + It is used for interacting with an actor system in case it is used from outside an actor. + When inside an actor ActorRequestContext subclass will be used instead. + """ + + # handler represents the target actor or actor system this context is wrapping. + handler: actors.ActorTypeDispatcher | actors.ActorSystem | None + + # pending_results maps the future results by request ID used for receiving request responses. + pending_results: dict[str, asyncio.Future[Any]] = dataclasses.field(default_factory=dict) + + # loop represents the event loop to be used while this context is active. + loop: asyncio.AbstractEventLoop = dataclasses.field(default_factory=asyncio.get_event_loop) + + # The configuration object to be used for the actor system and for creating actors. + # A copy of it will be forwarded to created actors so that it is going to be propagated to the whole actor system. + cfg: ActorConfig = dataclasses.field(default_factory=ActorConfig.from_config) + + @property + def actor(self) -> actors.Actor: + if not isinstance(self.handler, actors.Actor): + raise ActorContextError("Actor context handler is not an actor") + return self.handler + + @property + def actor_system(self) -> actors.ActorSystem: + if not isinstance(self.handler, actors.ActorSystem): + raise ActorContextError("Actor context handler is not an actor system") + return self.handler + + @property + def details(self) -> dict[str, Any]: + return {"cls": type(self.handler).__name__} + + def shutdown(self): + """It shuts down the actor o actor system handler of this context.""" + if self.handler is None: + return + try: + if isinstance(self.handler, actors.ActorSystem): + LOG.warning("Shutting down actor system: %s...", self.handler) + self.handler.shutdown() + return + + if isinstance(self.handler, actors.Actor): + LOG.warning("Shutting down actor: %s...", self.handler) + self.handler.send(self.handler.myAddress, actors.ActorExitRequest()) + return + + raise NotImplementedError("Cannot shutdown actor context") + finally: + self.handler = None + + async def create_actor( + self, + cls: type[actors.Actor], + *, + requirements: dict[str, Any] | None = None, + cfg: types.Config | None = None, + message: Any | None = None, + ) -> actors.ActorAddress: + """It creates a new actor in the current actor context.""" + assert self.handler is not None + + # It obtains an ActorConfig object to forward it to the new actor. + if cfg is None: + cfg = self.cfg + else: + cfg = ActorConfig.from_config(cfg) + assert isinstance(cfg, ActorConfig) + + LOG.debug("Creating actor of type %s (requirements=%r)...", cls.__name__, requirements) + address = self.handler.createActor(cls, requirements) + if hasattr(cls, "receiveMsg_ActorConfig"): + LOG.debug("Initializing the new actor (address=%s, cfg=%s)...", address, cfg) + try: + await self.request(address, ActorInitRequest(cfg=cfg, message=message)) + except BaseException: + LOG.exception( + "Error initializing actor (cls=%s, address=%s, cfg=%s). Send it ActorExitRequest.", + cls.__name__, + address, + cfg, + exc_info=True, + ) + self.send(address, actors.ActorExitRequest()) + raise + LOG.debug("New actor ready (address=%s, cls=%s).", address, cls.__name__) + return address + + def create_task(self, coro: Coroutine[None, None, R], *, name: str | None = None) -> asyncio.Task[R]: + """It creates an async task in the event loop of the current actor context.""" + return self.loop.create_task(coro, name=f"{name or coro}@{self.handler}") + + def send(self, destination: actors.ActorAddress, message: Any) -> None: + """It sends a message using the actor or actor system handler of this context.""" + if isinstance(self.handler, actors.ActorSystem): + return self.handler.tell(destination, message) + if isinstance(self.handler, actors.Actor): + return self.handler.send(destination, message) + raise NotImplementedError(f"Handler type {type(self.handler)} not implemented.") + + def request(self, destination: actors.ActorAddress, message: Any, *, timeout: float | None = None) -> asyncio.Future: + """It sends a request using the actor or actor system handler of this context and returns an awaitable future result.""" + assert self.handler is not None + + req_id = getattr(message, "req_id", "") or str(uuid.uuid4()) + request: Request = Request.from_message(message, timeout=timeout, req_id=req_id) + future = self.pending_results.get(req_id) + if future is None: + self.pending_results[req_id] = future = self.loop.create_future() + + original_cancel = future.cancel + + def cancel_wrapper(msg: Any | None = None) -> bool: + if self.pending_results.pop(request.req_id, None): + self.send(destination, CancelRequest(message=msg, req_id=request.req_id)) + return original_cancel(msg) + + future.cancel = cancel_wrapper # type: ignore[method-assign] + future.add_done_callback(lambda f: self.pending_results.pop(request.req_id, None)) + + if future.done(): + # It could be this request has been already sent before and for some reason it is retrying it again. + # Being the request done, it avoids resending it. To send a new request and avoid race condition it is + # expected the req_id to be different from previous ones. In this way it works around the situation caller + # retries to execute a request that eventually is already in progress. + return future + + if timeout is not None: + # It implements the timeout by wrapping the future with a waiter async task so that it can raise a + # TimeoutError on the requester side, while sending a CancelRequest to the destination actor so it will + # eventually cancel async tasks created while processing this request. + task_name = f"request({destination!s}, {message!r}, timeout={timeout!r})" + future = self.create_task(wait_for(future, timeout=timeout, cancel_message=task_name), name=task_name) + else: + task_name = f"request({destination!s}, {message!r}" + + if isinstance(self.handler, actors.Actor): + # When running inside an actor, it relies on the asynchronous thespian actor implementation. The event + # loop will be run using a periodic actor wakeup message. + self.send(destination, request) + return future + + if isinstance(self.handler, actors.ActorSystem): + # It avoids blocking in SystemActor.ask and SystemActor.listen methods by setting a timeout long enough to + # send the request, but short enough to periodically run the event loop and process request timeouts, async + # tasks and futures callbacks. + loop_interval = self.cfg.loop_interval + response = self.handler.ask(destination, request, timeout=min_timeout(request.timeout, loop_interval)) + self.receive_message(response, destination) + if future.done(): + return future + + async def listen_for_result() -> Any: + try: + # It consumes response messages or poison errors until we get the response for this request. + while not future.done(): + if self.handler is None: + raise ActorContextError("Actor system has been shut down.") + await asyncio.sleep(0) # It runs the event loop before listening for messages again. + response = self.handler.listen(timeout=min_timeout(request.timeout, loop_interval)) + self.receive_message(response, destination) + return await future + except Exception as ex: + future.set_exception(ex) + raise + except BaseException as ex: + future.cancel(str(ex)) + raise + + # It will listen for incoming messages later in the event loop the next time the caller will await for some + # incoming event. This should allow gathering multiple request responses ant the same time. + self.create_task(listen_for_result(), name=task_name) + return future + + raise NotImplementedError(f"Cannot send request to actor: invalid handler: {self.handler}.") + + def receive_message(self, message: Any, sender: actors.ActorAddress) -> bool: + """It dispatches the handling of a message received from an Actor or an external ActorSystem.""" + if message is None: + return True + if isinstance(message, Response) and self.receive_response(message, sender): + return True + if isinstance(message, actors.PoisonMessage) and self.receive_poison_message(message, sender): + return True + if isinstance(self.handler, actors.ActorSystem): + LOG.warning("Ignored message from actor %s while waiting for response: %r.", sender, message) + return True + # The message hasn't been consumed. + return False + + def receive_poison_message(self, message: actors.PoisonMessage, sender: actors.ActorAddress) -> bool: + """It handles a poison message from an Actor or an external ActorSystem matching it with a pending request response.""" + if isinstance(message.poisonMessage, Request): + future = self.pending_results.pop(message.poisonMessage.req_id, None) + if future and not future.done(): + future.set_exception(PoisonError.from_poison_message(message)) + return True + # The message hasn't been consumed. + return False + + def receive_response(self, response: Response, sender: actors.ActorAddress) -> bool: + """It looks for a pending future result matching this request response.""" + if isinstance(response, DoneResponse) and self.receive_result_response(response, sender): + return True + if isinstance(response, RunningTaskResponse) and self.receive_running_task_response(response, sender): + return True + # The message hasn't been consumed. + return False + + def receive_result_response(self, response: DoneResponse, sender: actors.ActorAddress) -> bool: + """It looks for a pending future result matching this request response.""" + future = self.pending_results.pop(response.req_id, None) + if future and not future.done(): + # It extracts the result from the response and transfer it to the async future object. + try: + future.set_result(response.result()) + except Exception as error: + future.set_exception(error) + return True + # The message hasn't been consumed. + return False + + def receive_running_task_response(self, response: RunningTaskResponse, sender: actors.ActorAddress) -> bool: + if response.req_id in self.pending_results: + LOG.debug("Waiting for actor %s task completion: %s", sender, response.name) + return True + # The message hasn't been consumed. + return False + + def __str__(self) -> str: + details = ", ".join((f"{k}={v}" for k, v in sorted(self.details.items()) if v is not None)) + return f"{type(self).__name__}<{details}>" + + +async def wait_for(future: asyncio.Future[R], *, timeout: float | None = None, cancel: bool = True, cancel_message: Any = None) -> R: + """It waits for a future results and extract its value. + + In the case no value is available before timeout seconds, then it cancels the future (if not done) and raises TimeoutError + + :param future: The future to wait for. + :param timeout: The timeout to wait for (in seconds). + :param cancel: If True, it will cancel the future before raising TimeoutException. + :param cancel_message: The message to be used for cancelling the future. + :return: the future result value if any is set before timeout. + :raises TimeoutError: if timeout is reached before the future is done. + :raises Exception: if the future has been set with an exception value. + """ + if future.done() or timeout is None: + return await future + + await asyncio.wait([future], timeout=timeout) + + if not future.done(): + if cancel: + future.cancel(cancel_message) + raise TimeoutError(str(cancel_message)) + + return await future + + +def min_timeout(*timeouts: float | None) -> float | None: + """It picks the minimal non None timeout time duration between given values.""" + real_timeouts = [t for t in timeouts if t is not None] + if real_timeouts: + return max(0.0, min(real_timeouts)) + return None diff --git a/esrally/actors/_proto.py b/esrally/actors/_proto.py new file mode 100644 index 000000000..3c56c16f4 --- /dev/null +++ b/esrally/actors/_proto.py @@ -0,0 +1,178 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio.exceptions +import copy +import dataclasses +import time +import traceback +from typing import Any + +from thespian import actors # type: ignore[import-untyped] +from typing_extensions import Self + +from esrally.actors._config import ActorConfig + + +@dataclasses.dataclass +class Request: + req_id: str = "" + deadline: float | None = None + + @classmethod + def from_message(cls, message: Any, *, timeout: float | None = None, req_id: str | None = None) -> Self | "MessageRequest": + deadline: float | None = None + if timeout is not None: + deadline = max(0.0, time.monotonic() + timeout) + if req_id is None: + req_id = "" + req_id = req_id.strip() + if not isinstance(message, cls): + return MessageRequest(message=message, deadline=deadline, req_id=req_id) + if message.deadline != deadline or message.req_id != req_id: + message = copy.deepcopy(message) + message.deadline = deadline + message.req_id = req_id + return message + + @property + def timeout(self) -> float | None: + if self.deadline is None: + return None + return self.deadline - time.monotonic() + + +@dataclasses.dataclass +class MessageRequest(Request): + """Message envelope sent to async actors requiring a response. + + As soon the actor receive this request it sends a PendingResponse with the same req_id to notify it. + Then after processing carried message, it will send one of the following responses: + - DoneResponse: in case the message processing succeeded without any resulting status. + - StatusResponse: in case the message processing succeeded with a non-None resulting status. + - ErrorResponse: in case the message processing failed with an exception message. + All response messages send by target actor will have the same req_id as this message. + """ + + message: Any = None + + +@dataclasses.dataclass +class ActorInitRequest(MessageRequest): + """It is sent by create_actor function just after an actor accepting ActorInitRequest is created.""" + + cfg: ActorConfig = dataclasses.field(default_factory=ActorConfig.from_config) + + +@dataclasses.dataclass +class CancelRequest(MessageRequest): + """It is sent to an actor to cancel all pending asynchronous tasks started by a request with the same req_id.""" + + +@dataclasses.dataclass +class Response: + """It is sent by the actor to notify sender has just finished processing a request.""" + + # req_id is required to match the request. + req_id: str + + @classmethod + def from_status(cls, req_id: str, status: Any = None, error: Exception | None = None) -> "Response": + """Given a status and an error it will return a Response of one of the following types: + - ErrorResponse: in case error is not None + - StatusResponse: in case error is None and status is not None + - DoneResponse: in case error and status are both None + """ + if error is not None: + return ErrorResponse(req_id, error, traceback.format_exc()) + if isinstance(status, cls): + if req_id != status.req_id: + status = copy.deepcopy(status) + status.req_id = req_id + return status + if status is None: + return DoneResponse(req_id) + return ResultResponse(req_id, status) + + +@dataclasses.dataclass +class RunningTaskResponse(Response): + """It is sent by actor after a background task is created as a result of processing a request.""" + + name: str + + +class DoneResponse(Response): + + def result(self) -> Any: + return None + + +@dataclasses.dataclass +class ResultResponse(DoneResponse): + """It is sent by the actor to notify sender has just finished processing a response with a non-none resulting status.""" + + status: Any + + def result(self) -> Any: + return self.status + + +@dataclasses.dataclass +class CancelledResponse(ResultResponse): + """CancelledResponse is sent by the actor to notify sender that the request has been cancelled.""" + + def result(self) -> Any: + raise asyncio.exceptions.CancelledError(self.status is not None and str(self.status) or "") + + +@dataclasses.dataclass +class ErrorResponse(DoneResponse): + """It is sent by the actor to notify sender has just finished processing a response raising an Exception.""" + + error: Exception + details: str + + def result(self) -> Any: + cause: BaseException | None = None + if self.details: + # Attach error details (which it should include original error traceback) as a cause of the error, + # so that both stack traces (local and remote) will be visible when logging the exception. + message = f"{self.error}\n{self.details}" + cause = type(self.error)(message) + raise self.error from cause + + +class ActorRequestError(Exception): + pass + + +class PoisonError(ActorRequestError): + """It is used to translate a PoisonMessage to an Exception instance.""" + + @classmethod + def from_poison_message(cls, poison: actors.PoisonMessage) -> Self: + return cls(f"poison message: {poison.poisonMessage!r}, details:\n{poison.details!r}") + + +@dataclasses.dataclass +class PingRequest(MessageRequest): + destination: actors.ActorAddress | None = None + + +@dataclasses.dataclass +class PongResponse(ResultResponse): + pass diff --git a/esrally/actors/_system.py b/esrally/actors/_system.py new file mode 100644 index 000000000..18639f178 --- /dev/null +++ b/esrally/actors/_system.py @@ -0,0 +1,247 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import atexit +import copy +import itertools +import logging +import os +import socket +from collections.abc import Iterable +from typing import Any, get_args + +from thespian import actors # type: ignore[import-untyped] +from thespian.system.logdirector import ( # type: ignore[import-untyped] + ThespianLogForwarder, +) + +from esrally import log, types +from esrally.actors._config import ( + DEFAULT_COORDINATOR_IP, + DEFAULT_IP, + ActorConfig, + SystemBase, +) +from esrally.actors._context import ( + ActorContext, + ActorContextError, + get_actor_context, + set_actor_context, +) + +LOG = logging.getLogger(__name__) + + +def get_actor_system() -> actors.ActorSystem: + """It returns the last actor system initialized using init_actor_system()""" + return get_actor_context().actor_system + + +def init_actor_system(cfg: types.Config | None = None, auto_clean: bool = True) -> actors.ActorSystem: + """It initializes the actor system using given configuration. + + To provide a custom configuration create and customize one with ActorConfig class. Example: + + cfg = ActorConfig.from_config() + cfg.system_base = "multiprocTCPBase" + cfg.admin_ports = "1900" + system = actors.init_actor_system(cfg) + + After the actor system is initialized, the actor system reference will be available using get_actor_system() + function. For creating actors and sending messages, you can then use `create_actor`, `send` and `request` global + functions. + + address = actors.create_actor(MyActorClass) + response = await actors.request(address, MyRequest()) + + To finally shut down the actor system, use the `shutdown` function. + + actors.shutdown() + + :param cfg: Optional configuration object. + :return: An initialized actor system + """ + try: + system = get_actor_system() + except ActorContextError: + pass + else: + LOG.warning("ActorSystem already initialized.") + return system + + LOG.info("Initializing actor system...") + ctx = context_from_config(cfg) + if not isinstance(ctx.handler, actors.ActorSystem): + raise ActorContextError("Context handler is not an ActorSystem") + + if auto_clean: + atexit.register(ctx.shutdown) + + set_actor_context(ctx) + LOG.info("Actor system initialized.") + return ctx.handler + + +def context_from_config(cfg: types.Config | None = None) -> ActorContext: + """Creates a new actor context, with its actor system, from given configuration. + + :param cfg: configuration object to be used. + :return: Created actor context. + """ + cfg = ActorConfig.from_config(cfg) + + # It will try configured system base first, fall back one later, if different. + system_bases = [cfg.system_base] + if cfg.fallback_system_base not in system_bases: + system_bases.append(cfg.fallback_system_base) + + first_error: Exception | None = None + for system_base in system_bases: + + admin_ports: Iterable[int | None] = [None] + if system_base == "multiprocTCPBase" and cfg.admin_ports: + # It will try using provided ports first, then a random one as fallback in case of issues. + admin_ports = itertools.chain(cfg.admin_ports, admin_ports) + + for admin_port in admin_ports: + try: + system = create_system( + system_base=system_base, + ip=cfg.ip, + admin_port=admin_port, + coordinator_ip=cfg.coordinator_ip, + process_startup_method=cfg.process_startup_method, + ) + except actors.InvalidActorAddress as ex: + + first_error = first_error or ex + if admin_port is not None: + LOG.exception("Failed setting up actor system with system base '%s' and admin port %s", system_base, admin_port) + continue # It tries the next port + break # It tries the next system base + except Exception as ex: + LOG.exception("Failed setting up actor system with system base '%s'", system_base) + first_error = first_error or ex + break # It tries the next system base + + # Update ActorConfig instance so that it can be used to join the actor system in sub-processes. + cfg = copy.deepcopy(cfg) + system_base = system.capabilities.get("Thespian ActorSystem Name") + if system_base: + cfg.system_base = system_base + ip = system.capabilities.get("ip") + if ip: + cfg.ip = ip + admin_port = system.capabilities.get("Admin Port") + if admin_port: + cfg.admin_ports = range(admin_port, admin_port + 1) + process_startup_method = system.capabilities.get("Process Startup Method") + if process_startup_method: + cfg.process_startup_method = process_startup_method + + # It succeeded crating an actor system. The configuration passed to the context will be forwarded to all + # actors created within this context. + return ActorContext(handler=system, cfg=cfg) + + raise first_error or RuntimeError(f"Could not initialize actor system with system base '{cfg.system_base}'") + + +def create_system( + system_base: SystemBase = "multiprocTCPBase", + ip: str = "127.0.0.1", + admin_port: int | None = None, + coordinator_ip: str | None = None, + process_startup_method: str | None = None, +) -> actors.ActorSystem: + """It creates a new actor system using given configuration. + + :param system_base: + :param ip: + :param admin_port: + :param coordinator_ip: + :param process_startup_method: + :return: the new actor system from Thespian. + """ + if system_base not in get_args(SystemBase): + raise ValueError(f"invalid system base value: '{system_base}', valid options are: {get_args(SystemBase)}") + + capabilities: dict[str, Any] = {"coordinator": True} + if system_base == "multiprocTCPBase": + ip = ip or DEFAULT_IP + try: + capabilities["ip"] = ip = resolve(ip) + except Exception: + LOG.error("Failed to resolve actor system IP address '%s', using 127.0.0.1.", ip) + ip = DEFAULT_IP + assert ip + + admin_port = admin_port or find_unused_random_port(ip) + assert admin_port + + capabilities["Admin Port"] = admin_port + + if coordinator_ip: + try: + coordinator_ip = resolve(coordinator_ip) + except Exception: + coordinator_ip = DEFAULT_COORDINATOR_IP + capabilities["Convention Address.IPv4"] = coordinator_ip + if ip and coordinator_ip != ip: + capabilities["coordinator"] = False + + if process_startup_method: + capabilities["Process Startup Method"] = process_startup_method + + if system_base == "multiprocQueueBase": + if process_startup_method: + capabilities["Process Startup Method"] = process_startup_method + + log_defs = None + if not isinstance(logging.root, ThespianLogForwarder): + conf_path = log.log_config_path() + if os.path.isfile(conf_path): + log_defs = log.load_configuration() + else: + LOG.warning("File not found: %s", conf_path) + LOG.debug( + "Creating actor system:\n - systemBase: %r\n - capabilities: %r\n - logDefs: %r\n", + system_base, + capabilities, + log_defs, + ) + system = actors.ActorSystem(systemBase=system_base, capabilities=capabilities, logDefs=log_defs, transientUnique=True) + LOG.debug("Actor system created:\n - capabilities: %r\n", system.capabilities) + return system + + +def resolve(address: str, port: int | None = None) -> str: + addrinfo = socket.getaddrinfo(address, port or 1900, 0, 0, socket.IPPROTO_TCP) + for family, _, _, _, sockaddr in addrinfo: + # We're interested in the IPv4 address + if family == socket.AddressFamily.AF_INET: + ip, _ = sockaddr[:2] + return str(ip) + raise ValueError(f"Invalid hostname or ip address: '{address}'") + + +def find_unused_random_port(ip: str) -> int: + with socket.socket() as sock: + try: + sock.bind((ip, 0)) + except OSError: + sock.bind(("0.0.0.0", 0)) + _, port = sock.getsockname() + return port diff --git a/esrally/config.py b/esrally/config.py index 9f6c35cbd..8d33b96a2 100644 --- a/esrally/config.py +++ b/esrally/config.py @@ -16,15 +16,21 @@ # under the License. import configparser +import contextvars import logging import os.path import shutil +import typing from enum import Enum from string import Template +from typing_extensions import Self + from esrally import PROGRAM_NAME, exceptions, paths, types from esrally.utils import io +LOG = logging.getLogger(__name__) + class Scope(Enum): # Valid for all benchmarks, typically read from the configuration file @@ -40,6 +46,7 @@ class Scope(Enum): class ConfigFile: + def __init__(self, config_name=None, **kwargs): self.config_name = config_name @@ -124,22 +131,66 @@ def auto_load_local_config(base_config, additional_sections=None, config_file_cl return cfg -class Config: - EARLIEST_SUPPORTED_VERSION = 17 +CONFIG = contextvars.ContextVar[typing.Optional[types.Config]](f"{__name__}.config", default=None) + + +def get_config() -> types.Config: + cfg = CONFIG.get() + if cfg is None: + raise exceptions.ConfigError("Config not initialized.") + return cfg + + +def init_config(cfg: types.Config, *, force=False) -> types.Config: + if not force and CONFIG.get(): + raise exceptions.ConfigError(f"Config already set: {cfg}") + cfg = Config.from_config(cfg) + CONFIG.set(cfg) + return cfg - CURRENT_CONFIG_VERSION = 17 +def clear_config() -> None: + CONFIG.set(None) + + +class Config(types.Config): """ Config is the main entry point to retrieve and set benchmark properties. It provides multiple scopes to allow overriding of values on different levels (e.g. a command line flag can override the same configuration property in the config file). These levels are transparently resolved when a property is retrieved and the value on the most specific level is returned. """ - def __init__(self, config_name=None, config_file_class=ConfigFile, **kwargs): + EARLIEST_SUPPORTED_VERSION = 17 + + CURRENT_CONFIG_VERSION = 17 + + @classmethod + def from_config(cls, cfg: types.Config | None = None) -> Self: + if cfg is None: + cfg = get_config() + if isinstance(cfg, cls): + return cfg + if isinstance(cfg, types.Config): + return cls(opts_from=cfg) + raise TypeError(f"unexpected cfg: got type {type(cfg).__name__}, expected types.Config") + + def __init__(self, config_name: str | None = None, config_file_class=ConfigFile, copy_from: types.Config | None = None, **kwargs): self.name = config_name self.config_file = config_file_class(config_name, **kwargs) self._opts = {} - self._clear_config() + if copy_from is not None: + self.update(copy_from) + self._override_config() + + def update(self, cfg: types.Config): + if isinstance(cfg, Config): + self.name = cfg.name + self.config_file = cfg.config_file + self._opts.update(cfg._opts) # pylint: disable=protected-access + return + for section in cfg.all_sections(): + for name, value in cfg.all_opts(section).items(): + self.add(Scope.application, section, name, value) def add(self, scope, section: types.Section, key: types.Key, value): """ @@ -185,7 +236,10 @@ def opts(self, section: types.Section, key: types.Key, default_value=None, manda else: raise exceptions.ConfigError(f"No value for mandatory configuration: section='{section}', key='{key}'") - def all_opts(self, section: types.Section): + def all_sections(self) -> list[types.Section]: + return list(typing.get_args(types.Section)) + + def all_opts(self, section: types.Section) -> dict[str, typing.Any]: """ Finds all options in a section and returns them in a dict. @@ -233,21 +287,24 @@ def load_config(self, auto_upgrade=False): def _do_load_config(self): config = self.config_file.load() # It's possible that we just reload the configuration - self._clear_config() + self._opts = {} + self._override_config() self._fill_from_config_file(config) - def _clear_config(self): + def _override_config(self): # This map contains default options that we don't want to sprinkle all over the source code but we don't want users to change # them either - self._opts = { - (Scope.application, "source", "distribution.dir"): "distributions", - (Scope.application, "benchmarks", "track.repository.dir"): "tracks", - (Scope.application, "benchmarks", "track.default.repository"): "default", - (Scope.application, "provisioning", "node.name.prefix"): "rally-node", - (Scope.application, "provisioning", "node.http.port"): 39200, - (Scope.application, "mechanic", "team.repository.dir"): "teams", - (Scope.application, "mechanic", "team.default.repository"): "default", - } + self._opts.update( + { + (Scope.application, "source", "distribution.dir"): "distributions", + (Scope.application, "benchmarks", "track.repository.dir"): "tracks", + (Scope.application, "benchmarks", "track.default.repository"): "default", + (Scope.application, "provisioning", "node.name.prefix"): "rally-node", + (Scope.application, "provisioning", "node.http.port"): 39200, + (Scope.application, "mechanic", "team.repository.dir"): "teams", + (Scope.application, "mechanic", "team.default.repository"): "default", + } + ) def _fill_from_config_file(self, config): for section in config.sections(): @@ -279,6 +336,15 @@ def _k(self, scope, section: types.Section, key: types.Key): else: return scope, section, key + def __eq__(self, other): + if not isinstance(other, Config): + return False + if other.name != self.name: + return False + if other._opts != self._opts: + return False + return True + def migrate(config_file, current_version, target_version, out=print, i=input): logger = logging.getLogger(__name__) diff --git a/esrally/types.py b/esrally/types.py index 9b4453cf8..93def7028 100644 --- a/esrally/types.py +++ b/esrally/types.py @@ -15,10 +15,11 @@ # specific language governing permissions and limitations # under the License. -from typing import Any, Literal, Protocol, TypeVar +from typing import Any, Literal, Protocol, runtime_checkable Section = Literal[ "actor", + "actors", "benchmarks", "client", "defaults", @@ -44,6 +45,13 @@ ] Key = Literal[ "actor.process.startup.method", + "actors.admin_ports", + "actors.coordinator_ip", + "actors.fallback_system_base", + "actors.ip", + "actors.loop_interval", + "actors.process_startup_method", + "actors.system_base", "add.chart_name", "add.chart_type", "add.config.option", @@ -178,16 +186,21 @@ "user.tags", "values", ] -_Config = TypeVar("_Config", bound="Config") +@runtime_checkable class Config(Protocol): + + name: str | None = None + def add(self, scope, section: Section, key: Key, value: Any) -> None: ... - def add_all(self, source: _Config, section: Section) -> None: ... + def add_all(self, source: "Config", section: Section) -> None: ... def opts(self, section: Section, key: Key, default_value=None, mandatory: bool = True) -> Any: ... + def all_sections(self) -> list[Section]: ... + def all_opts(self, section: Section) -> dict: ... def exists(self, section: Section, key: Key) -> bool: ... diff --git a/esrally/utils/convert.py b/esrally/utils/convert.py index 9d2a8a40f..a58711902 100644 --- a/esrally/utils/convert.py +++ b/esrally/utils/convert.py @@ -230,3 +230,30 @@ def to_bool(value: str | bool) -> bool: elif value in ["False", "false", "No", "no", "f", "n", "0", False]: return False raise ValueError(f"Cannot convert [{value}] to bool.") + + +def to_range(value: str | range | int, *, min_value: int | None = None, max_value: int | None = None) -> range: + if isinstance(value, range): + start, stop = value.start, value.stop + elif isinstance(value, int): + start, stop = value, value + 1 + elif isinstance(value, str): + value = value.replace(" ", "") + if "-" in value: + start, last = (int(p) for p in value.split("-", 1)) + stop = last + 1 + else: + start, stop = int(value), int(value) + 1 + else: + raise TypeError(f"Cannot convert [{value}] to range.") + + start = max(int(v) for v in [start, min_value] if v is not None) + stop = min(int(v) for v in [stop, max_value] if v is not None) + + if start >= stop: + raise ValueError(f"Invalid range: start value must be less than stop value: {start}-{stop-1}") + return range(start, stop) + + +def to_port_range(value: str | range | int, *, min_value: int = 1, max_value: int = 65535) -> range: + return to_range(value, min_value=min_value, max_value=max_value) diff --git a/pyproject.toml b/pyproject.toml index 14c8a3512..824c3a4fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -209,6 +209,7 @@ files = ["esrally/", "it/", "tests/"] [[tool.mypy.overrides]] module = [ "esrally.actor", + "esrally.actors.*", "esrally.mechanic.team", "esrally.storage.*", "esrally.utils.cases", diff --git a/tests/actors/__init__.py b/tests/actors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/actors/actor_test.py b/tests/actors/actor_test.py new file mode 100644 index 000000000..97f9a4b26 --- /dev/null +++ b/tests/actors/actor_test.py @@ -0,0 +1,347 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import dataclasses +import functools +import random +from collections.abc import Callable, Generator +from typing import Any, Literal, get_args + +import pytest + +from esrally import actors, config, types +from esrally.actors import ActorConfig, _actor, _context, _proto + + +@pytest.fixture(scope="function", autouse=True) +def event_loop() -> Generator[asyncio.AbstractEventLoop, Any, None]: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + asyncio.set_event_loop(None) + + +@pytest.fixture(scope="function", params=get_args(actors.SystemBase)) +def system_base(request) -> Generator[actors.SystemBase]: + yield request.param + + +@pytest.fixture(scope="function", autouse=True) +def cfg(system_base: actors.SystemBase) -> Generator[types.Config]: + cfg = actors.ActorConfig() + cfg.system_base = system_base + config.init_config(cfg, force=True) + yield cfg + config.clear_config() + + +@pytest.fixture(scope="function", autouse=True) +def system() -> Generator[actors.ActorSystem]: + actors.shutdown() + system = actors.init_actor_system() + yield system + actors.shutdown() + + +@pytest.fixture(scope="function") +def dummy_actor(event_loop: asyncio.AbstractEventLoop) -> Generator[actors.ActorAddress]: + address = event_loop.run_until_complete(actors.create_actor(DummyActor)) + yield address + actors.send(address, actors.ActorExitRequest()) + + +# It indicates if a test case has to be executed from an outside or from inside an actor. +ExecuteFrom = Literal["from_external", "from_actor"] + + +# This fixtures patches the test function so that its being executed from inside an AsyncActor in case param is "from_actor". +@pytest.fixture(scope="function", params=get_args(ExecuteFrom)) +def execute_from(request, monkeypatch) -> Generator[ExecuteFrom, None, None]: + if request.param == "from_actor": + + original_func = getattr(request.module, request.function.__name__) + + @functools.wraps(request.function) + async def func_wrapper(*args, **kwargs): + actor = await actors.create_actor(ExecutorActor, message=ExecuteRequest(original_func, args=args, kwargs=kwargs)) + request.addfinalizer(lambda: actors.send(actor, actors.ActorExitRequest())) + + assert isinstance(request.node, pytest.Function) + request.node.obj = func_wrapper + + yield request.param + + +@pytest.mark.asyncio +async def test_get_actor_context(execute_from: ExecuteFrom): + ctx = actors.get_actor_context() + assert isinstance(ctx, actors.ActorContext) + assert ctx.pending_results == {} + + if execute_from == "from_external": + assert not isinstance(ctx, _actor.ActorRequestContext) + assert isinstance(ctx.handler, actors.ActorSystem) + assert ctx.handler is actors.get_actor_system() + assert ctx.handler is ctx.actor_system + assert ctx.details == {"cls": "ActorSystem"} + assert str(ctx) == "ActorContext", f"invalid value: 'str(ctx)' -> {ctx}" + + elif execute_from == "from_actor": + assert isinstance(ctx, _actor.ActorRequestContext), "Actor request context initialized." + assert ctx.handler is ctx.actor, "Actor request context initialized." + assert isinstance(ctx.actor, ExecutorActor), "This is being executed from inside a executor actor." + assert ctx.actor is actors.get_actor(), "Actor request context initialized." + assert ctx.req_id, "req_id not set" + assert ctx.sender is not None, "sender not set" + assert not ctx.responded, "Actor request is not responded." + assert ctx.pending_tasks.get(ctx.req_id), "test method has no tasks" + assert ctx.details == { + "cls": "ExecutorActor", + "req_id": ctx.req_id, + "sender": ctx.sender, + "address": ctx.actor.myAddress, + }, f"Invalid actor context details: {ctx.details}" + assert ctx.details["cls"] == ExecutorActor.__name__ + assert ( + str(ctx) == f"ActorRequestContext" + ), f"invalid value: 'str(ctx)' -> {ctx}" + + else: + raise NotImplementedError() + + +def test_ask_result(system: actors.ActorSystem, dummy_actor: actors.ActorAddress) -> None: + """It tests ask function receives a result from remote actor.""" + result = random.random() + assert result == system.ask(dummy_actor, ResponseRequest(result)) + + +def test_ask_poison(system: actors.ActorSystem, dummy_actor: actors.ActorAddress) -> None: + """It tests request function raises an error that has been captured while processing request.""" + error = RuntimeError(f"some ramdom error: {random.random()}") + response = system.ask(dummy_actor, ResponseRequest(error=error)) + assert isinstance(response, actors.PoisonMessage) + assert isinstance(response.poisonMessage, ResponseRequest) + assert str(response.poisonMessage.error) == str(error) + + +@pytest.mark.asyncio +async def test_request_result(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + """It tests request function receives a result from remote actor.""" + result = random.random() + assert result == await actors.request(dummy_actor, ResponseRequest(result)) + + +@pytest.mark.asyncio +async def test_request_respond(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + """It tests request function receives a result when actor responds using actors.respond function.""" + result = random.random() + assert result == await actors.request(dummy_actor, ResponseRequest(result, explicit=True)) + + +@pytest.mark.asyncio +async def test_request_raises(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + """It tests request function raises an error that has been captured while processing request.""" + error = RuntimeError(f"some ramdom error: {random.random()}") + with pytest.raises(RuntimeError): + await actors.request(dummy_actor, ResponseRequest(error=error)) + + +@pytest.mark.asyncio +async def test_request_respond_error(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + """It tests request function raises an error that has been sent using respond function.""" + error = ValueError(f"some ramdom error: {random.random()}") + with pytest.raises(ValueError): + await actors.request(dummy_actor, ResponseRequest(error=error, explicit=True)) + + +@pytest.mark.asyncio +async def test_timeout_request(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + """This verifies a blocking task would not block the actor from processing further messages.""" + future = actors.request(dummy_actor, BlockingRequest(timeout=10.0), timeout=0.1) + value = random.random() + assert value == await actors.ping(dummy_actor, message=value) + with pytest.raises(TimeoutError): + await future + + +@pytest.mark.asyncio +async def test_cancel_request(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + ctx = actors.get_actor_context() + assert ctx.pending_results == {} + + blocking = actors.request(dummy_actor, BlockingRequest(timeout=300.0)) + assert not blocking.done() + if execute_from == "from_actor": + assert not blocking.done() + assert len(ctx.pending_results) == 1 + + value = random.random() + assert value == await actors.ping(dummy_actor, message=value) + + blocking.cancel() + with pytest.raises(asyncio.CancelledError): + await blocking + + assert blocking.cancelled() + assert ctx.pending_results == {} + + +@pytest.mark.asyncio +async def test_blocking_request(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + blocking = actors.request(dummy_actor, BlockingRequest(timeout=300.0)) + value = random.random() + assert value == await actors.ping(dummy_actor, message=value) + with pytest.raises(TimeoutError): + await actors.wait_for(blocking, timeout=0.1, cancel_message="too much time!") + + +@pytest.mark.asyncio +async def test_actor_config(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + got = await actors.request(dummy_actor, GetConfigRequest()) + want = actors.get_actor_context().cfg + assert want == got, "Parent actor received configuration from system context." + + +@pytest.mark.asyncio +async def test_request(execute_from: ExecuteFrom, dummy_actor: actors.ActorAddress) -> None: + ctx = actors.get_actor_context() + assert ctx.pending_results == {} + + request = _proto.PingRequest(message=random.random()) + future = actors.request(dummy_actor, request) + if execute_from == "from_actor": + assert not future.done() + assert len(ctx.pending_results) == 1 + + response = await future + assert response == request.message + assert ctx.pending_results == {} + + +@pytest.mark.asyncio +async def test_create_actor(execute_from: ExecuteFrom) -> None: + # It uses this special configuration object to check it is going to be propagated to child actor as it is. + special_config = actors.ActorConfig() + special_config.admin_ports = range(100, 200) + actor_address = await actors.create_actor(DummyActor, cfg=special_config) + try: + actor_config = await actors.request(actor_address, GetConfigRequest()) + assert isinstance(actor_config, ActorConfig) + assert actor_config.admin_ports == special_config.admin_ports + finally: + actors.send(actor_address, actors.ActorExitRequest()) + + +@pytest.mark.asyncio +async def test_create_task(execute_from: ExecuteFrom) -> None: + result = asyncio.get_event_loop().create_future() + + async def task() -> Any: + return await result + + coro = task() + task = actors.create_task(coro, name="some-name") + assert isinstance(task, asyncio.Task) + assert not task.cancelled() + assert not task.done() + + assert task.get_name() == f"some-name@{actors.get_actor_context().handler}", f"unexpected task name: {task.get_name()}" + + if execute_from == "from_actor": + ctx = actors.get_actor_context() + assert isinstance(ctx, _actor.ActorRequestContext) + assert task in ctx.pending_tasks[ctx.req_id] + + result.set_result(random.random()) + assert await result == await task + assert not task.cancelled() + assert task.done() + + if execute_from == "from_actor": + ctx = actors.get_actor_context() + assert isinstance(ctx, _actor.ActorRequestContext) + assert task not in ctx.pending_tasks[ctx.req_id] + + +class CheckActorContextRequest: + pass + + +@dataclasses.dataclass +class ResponseRequest: + status: Any = None + error: Exception | None = None + explicit: bool = False + + +class CreateChildRequest: + pass + + +class GetConfigRequest: + pass + + +@dataclasses.dataclass +class BlockingRequest: + timeout: float + destination: actors.ActorAddress | None = None + + +class DummyActor(actors.AsyncActor): + + def receiveMsg_ResponseRequest(self, request: ResponseRequest, sender: actors.ActorAddress) -> Any: + if request.explicit: + ctx = _context.get_actor_context() + assert isinstance(ctx, _actor.ActorRequestContext), "Actor request context initialized." + assert not ctx.responded, "Actor responded flag not set." + actors.respond(status=request.status, error=request.error) + assert ctx.responded, "Actor responded flag set." + elif request.error is not None: + raise request.error + else: + return request.status + + def receiveMsg_CreateChildRequest(self, message: GetConfigRequest, sender: actors.ActorAddress) -> actors.ActorAddress: + return actors.create_actor(DummyActor) + + def receiveMsg_GetConfigRequest(self, message: GetConfigRequest, sender: actors.ActorAddress) -> types.Config: + return config.get_config() + + async def receiveMsg_BlockingRequest(self, request: BlockingRequest, sender: actors.ActorAddress) -> None: + if request.destination in [None, self.myAddress]: + await asyncio.sleep(request.timeout) + return + return await actors.request(request.destination, request) + + +@dataclasses.dataclass +class ExecuteRequest: + func: Callable + args: tuple = tuple() + kwargs: dict = dataclasses.field(default_factory=dict) + + def __call__(self) -> Any: + return self.func(*self.args, **self.kwargs) + + +class ExecutorActor(actors.AsyncActor): + + async def receiveMsg_ExecuteRequest(self, request: ExecuteRequest, sender: actors.ActorAddress) -> Any: + return request() diff --git a/tests/actors/config_test.py b/tests/actors/config_test.py new file mode 100644 index 000000000..403ec80b0 --- /dev/null +++ b/tests/actors/config_test.py @@ -0,0 +1,92 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import dataclasses +from typing import get_args + +from esrally.actors._config import ( + DEFAULT_ADMIN_PORTS, + DEFAULT_COORDINATOR_IP, + DEFAULT_FALLBACK_SYSTEM_BASE, + DEFAULT_IP, + DEFAULT_LOOP_INTERVAL, + DEFAULT_PROCESS_STARTUP_METHOD, + DEFAULT_SYSTEM_BASE, + ActorConfig, + SystemBase, +) +from esrally.utils import cases + + +@dataclasses.dataclass +class FromConfigCase: + name: str | None = None + system_base: SystemBase = DEFAULT_SYSTEM_BASE + fallback_system_base: SystemBase | None = DEFAULT_FALLBACK_SYSTEM_BASE + ip: str = DEFAULT_IP + admin_ports: range | None = DEFAULT_ADMIN_PORTS + coordinator_ip: str = DEFAULT_COORDINATOR_IP + process_startup_method: str | None = DEFAULT_PROCESS_STARTUP_METHOD + want_name: str | None = None + loop_interval: float | None = DEFAULT_LOOP_INTERVAL + + +@cases.cases( + default=FromConfigCase(), + with_name=FromConfigCase(name="some_name", want_name="some_name"), + system_base=FromConfigCase(system_base="multiprocQueueBase"), + fallback_system_base=FromConfigCase(fallback_system_base="multiprocTCPBase"), + fallback_system_base_none=FromConfigCase(fallback_system_base=None), + ip=FromConfigCase(ip="some_ip"), + admin_ports=FromConfigCase(admin_ports=range(1234, 4321)), + coordinator_ip=FromConfigCase(coordinator_ip="some_ip"), + fork=FromConfigCase(process_startup_method="fork"), + spawn=FromConfigCase(process_startup_method="spawn"), + loop_interval=FromConfigCase(loop_interval=1.0), +) +def test_from_config(case: FromConfigCase) -> None: + cfg = ActorConfig(case.name) + if case.system_base != DEFAULT_SYSTEM_BASE: + cfg.system_base = case.system_base + if case.fallback_system_base != DEFAULT_FALLBACK_SYSTEM_BASE: + cfg.fallback_system_base = case.fallback_system_base + if case.ip != DEFAULT_IP: + cfg.ip = case.ip + if case.admin_ports != DEFAULT_ADMIN_PORTS: + cfg.admin_ports = case.admin_ports + if case.coordinator_ip != DEFAULT_COORDINATOR_IP: + cfg.coordinator_ip = case.coordinator_ip + if case.loop_interval != DEFAULT_LOOP_INTERVAL: + cfg.loop_interval = case.loop_interval + assert isinstance(cfg, ActorConfig) + assert cfg.name == case.want_name + assert cfg.system_base in get_args(SystemBase) + if case.system_base is None: + assert cfg.system_base == "multiprocTCPBase" + else: + assert cfg.system_base == case.system_base + assert cfg.fallback_system_base in (get_args(SystemBase) + (None,)) + if case.fallback_system_base is None: + if cfg.system_base == "multiprocQueueBase": + assert cfg.fallback_system_base == "multiprocTCPBase" + else: + assert cfg.fallback_system_base == "multiprocQueueBase" + else: + assert cfg.fallback_system_base == case.fallback_system_base + assert cfg.ip == case.ip + assert cfg.admin_ports == case.admin_ports + assert cfg.coordinator_ip == case.coordinator_ip + assert cfg.loop_interval == case.loop_interval diff --git a/tests/actors/example_test.py b/tests/actors/example_test.py new file mode 100644 index 000000000..2e81bd9ab --- /dev/null +++ b/tests/actors/example_test.py @@ -0,0 +1,147 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import dataclasses +import logging +from collections.abc import Generator + +import pytest + +from esrally import actors, config + +LOG = logging.getLogger(__name__) + + +@pytest.fixture(scope="module", autouse=True) +def actor_system() -> Generator[actors.ActorSystem]: + config.init_config(config.Config()) + system = actors.init_actor_system() + LOG.info("Actor system initialized: capabilities {%s}", system.capabilities) + + yield system + + actors.shutdown() + LOG.info("Actor system shut down.") + + +@pytest.mark.asyncio +async def test_example(): + """It creates an actor, it sends it a request, and it finally waits for a response.""" + + parent = await actors.create_actor(ParentActor) + LOG.info("Parent actor created: %s", parent) + + LOG.info("About to send request to parent actor to create children: %s", parent) + response = await actors.request(parent, CreateChildrenRequest(["qui", "quo", "qua"])) + + assert response == "All OK" + LOG.info("All done.") + + +class ParentActor(actors.AsyncActor): + + async def receiveMsg_CreateChildrenRequest(self, request: "CreateChildrenRequest", sender: actors.ActorAddress) -> str: + """It creates some child actors, teach each one its name and tell to greet each others. + + It performs the operations for every child actor in parallel, without blocking the actor before returning the final message. + It also performs verification all child actors do it right before returning the final message. + + The purpose is to demonstrate how simple would become implementing a workflow involving multiple actors interactions + with AsyncActor class without falling in the complexity of keeping the workflow state in actors instance attributes. + + This also demonstrate the reliability of an asynchronous workflow that always performs operations in the same, + order thanks the fact actors are using queues to communicate with each other. + + :param request: the request containing the name of the child actor. + :param sender: + :return: + """ + # It creates the child actors and map them by name. + addresses = await asyncio.gather(*[actors.create_actor(ChildActor) for _ in request.names]) + children = dict(zip(request.names, addresses)) + LOG.info("Created child actors: %s.", children) + + # It assigns each of them a name. It doesn't wait for the request to complete. + for name, child in children.items(): + actors.send(child, AssignNameRequest(name)) + LOG.info("Assigned name to child actors.") + + # It verifies each child learned his names. + assert request.names == await asyncio.gather(*[actors.request(child, AskNameRequest()) for child in children.values()]) + LOG.info("Each child know his name.") + + # It says each child to greet his sibling (including itself) and it gathers what they hear in response. + assert await asyncio.gather(*[actors.request(child, GreetSiblingRequest(children)) for child in children.values()]) == [ + [f"Hello {name2}, it's {name1}" for name1 in children] for name2 in children + ] + LOG.info("Each child greeted each of other child actors (including himself).") + + return "All OK" + + +class ChildActor(actors.AsyncActor): + def __int__(self): + super().__init__() + self.name: str | None = None + + def receiveMsg_AssignNameRequest(self, request: "AssignNameRequest", sender: actors.ActorAddress) -> None: + """It receives the actor name.""" + self.name = request.name + + def receiveMsg_AskNameRequest(self, request: "AskNameRequest", sender: actors.ActorAddress) -> str: + """It gets the actor name.""" + assert self.name is not None + return self.name + + def receiveMsg_GreetSiblingRequest(self, request: "GreetSiblingRequest", sender: actors.ActorAddress) -> asyncio.Future[list[str]]: + """It GreetMessage to all sibling actors (including itself).""" + # This demonstrates an async task will be created to gather all the answers from the requests sent here. + # No actor will be blocked waiting at any time while this is in execution. + assert isinstance(self.name, str) + return asyncio.gather( + *[actors.request(sibling, GreetMessage(sender=self.name, receiver=name)) for name, sibling in request.sibling.items()] + ) + + def receiveMsg_GreetMessage(self, message: "GreetMessage", sender: actors.ActorAddress) -> str: + assert self.name == message.receiver + return f"Hello {message.sender}, it's {self.name}" + + +@dataclasses.dataclass +class CreateChildrenRequest: + names: list[str] + + +@dataclasses.dataclass +class AssignNameRequest: + name: str + + +@dataclasses.dataclass +class AskNameRequest: + pass + + +@dataclasses.dataclass +class GreetSiblingRequest: + sibling: dict[str, actors.ActorAddress] + + +@dataclasses.dataclass +class GreetMessage: + sender: str + receiver: str diff --git a/tests/actors/system_test.py b/tests/actors/system_test.py new file mode 100644 index 000000000..37a037a24 --- /dev/null +++ b/tests/actors/system_test.py @@ -0,0 +1,208 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import copy +import dataclasses +import random +import sys +from collections.abc import Generator +from typing import Any + +import pytest + +from esrally import actors, config, types +from esrally.actors._config import ( + DEFAULT_ADMIN_PORTS, + DEFAULT_COORDINATOR_IP, + DEFAULT_FALLBACK_SYSTEM_BASE, + DEFAULT_IP, + DEFAULT_LOOP_INTERVAL, + DEFAULT_PROCESS_STARTUP_METHOD, + DEFAULT_SYSTEM_BASE, + ActorConfig, + ProcessStartupMethod, + SystemBase, +) +from esrally.actors._context import CONTEXT +from esrally.actors._proto import PingRequest +from esrally.utils import cases + +SAME_IP: str = "192.168.23.3" +OTHER_IP: str = "192.168.23.2" + + +@pytest.fixture(scope="function", autouse=True) +def clean_event_loop() -> Generator[None, Any, None]: + asyncio.set_event_loop(None) + yield + try: + loop = asyncio.get_event_loop() + asyncio.set_event_loop(None) + loop.close() + except RuntimeError: + pass + + +@pytest.fixture(scope="function", autouse=True) +def clean_context(): + token = CONTEXT.set(None) + try: + yield + finally: + actors.shutdown() + CONTEXT.reset(token) + + +@pytest.fixture(scope="function", autouse=True) +def clean_config(): + token = config.CONFIG.set(None) + try: + yield + finally: + config.clear_config() + config.CONFIG.reset(token) + + +WANT_CAPABILITIES = { + # "Thespian ActorSystem Name": DEFAULT_SYSTEM_BASE, + "coordinator": True, +} + + +@dataclasses.dataclass +class SystemCase: + cfg: types.Config = None + system_base: str = DEFAULT_SYSTEM_BASE + fallback_system_base: SystemBase | None = DEFAULT_FALLBACK_SYSTEM_BASE + ip: str = DEFAULT_IP + admin_ports: range = DEFAULT_ADMIN_PORTS + coordinator_ip: str = DEFAULT_COORDINATOR_IP + process_startup_method: ProcessStartupMethod = DEFAULT_PROCESS_STARTUP_METHOD + loop_interval: float = DEFAULT_LOOP_INTERVAL + want_capabilities: dict[str, Any] = dataclasses.field(default_factory=dict) + want_error: Exception | None = None + + +@cases.cases( + default=SystemCase(), + multiprocQueueBase=SystemCase( + system_base="multiprocQueueBase", + want_capabilities={ + "Thespian ActorSystem Name": "multiprocQueueBase", + }, + ), + multiprocTCPBase=SystemCase( + system_base="multiprocTCPBase", want_capabilities={"Thespian ActorSystem Name": "multiprocTCPBase", "ip": DEFAULT_IP} + ), + ip=SystemCase(system_base="multiprocTCPBase", ip="0.0.0.0", want_capabilities={"ip": "0.0.0.0"}), + admin_ports=SystemCase(system_base="multiprocTCPBase", admin_ports=range(12340, 12345)), + coordinator_ip=SystemCase( + system_base="multiprocTCPBase", + coordinator_ip=OTHER_IP, + want_capabilities={"coordinator": False, "Convention Address.IPv4": OTHER_IP}, + ), + same_coordinator_ip=SystemCase( + system_base="multiprocTCPBase", + ip=SAME_IP, + coordinator_ip=SAME_IP, + want_capabilities={"coordinator": True, "ip": SAME_IP, "Convention Address.IPv4": SAME_IP}, + ), + fork=SystemCase( + system_base="multiprocTCPBase", + process_startup_method="fork", + want_capabilities={"Process Startup Method": "fork"}, + ), + spawn=SystemCase( + system_base="multiprocQueueBase", + process_startup_method="spawn", + want_capabilities={"Process Startup Method": "spawn"}, + ), + loop_interval=SystemCase( + loop_interval=2.0, + ), +) +@pytest.mark.asyncio +async def test_system(case: SystemCase, event_loop: asyncio.AbstractEventLoop) -> None: + cfg = ActorConfig() + if case.system_base != DEFAULT_SYSTEM_BASE: + cfg.system_base = case.system_base + if case.fallback_system_base != DEFAULT_FALLBACK_SYSTEM_BASE: + cfg.fallback_system_base = case.fallback_system_base + if case.ip != DEFAULT_IP: + cfg.ip = case.ip + if case.admin_ports != DEFAULT_ADMIN_PORTS: + cfg.admin_ports = case.admin_ports + if case.coordinator_ip != DEFAULT_COORDINATOR_IP: + cfg.coordinator_ip = case.coordinator_ip + if case.process_startup_method != DEFAULT_PROCESS_STARTUP_METHOD: + cfg.process_startup_method = case.process_startup_method + if case.loop_interval != DEFAULT_LOOP_INTERVAL: + cfg.loop_interval = case.loop_interval + config.init_config(cfg) + + if sys.platform == "darwin" and sys.version_info < (3, 12) and cfg.process_startup_method == "fork": + # Old versions of Python on OSX have known problems with fork. + pytest.skip("There are known issues with OSX, Python < 3.12 and fork.") + + with pytest.raises(actors.ActorContextError): + actors.get_actor_system() + + system = actors.init_actor_system() + + assert isinstance(system, actors.ActorSystem) + assert system is actors.get_actor_system() + try: + want_capabilities = copy.deepcopy(WANT_CAPABILITIES) + want_capabilities.update(case.want_capabilities) + + for name, value in want_capabilities.items(): + assert system.capabilities.get(name) == value + + if case.admin_ports: + want_ports = set(cfg.admin_ports or []) + assert system.capabilities.get("Admin Port") in want_ports + + ctx = actors.get_actor_context() + + # It verifies capabilities are copied to the cfg object to be sent to actor subprocesses. + assert system.capabilities["Thespian ActorSystem Name"] == ctx.cfg.system_base + if "ip" in system.capabilities: + assert system.capabilities["ip"] == ctx.cfg.ip + assert range(system.capabilities["Admin Port"], system.capabilities["Admin Port"] + 1) == ctx.cfg.admin_ports + assert system.capabilities["Process Startup Method"] == ctx.cfg.process_startup_method + + assert ctx.cfg.loop_interval == case.loop_interval + + assert isinstance(system, actors.ActorSystem) + destination = await actors.create_actor(actors.AsyncActor) + assert isinstance(destination, actors.ActorAddress) + + request = PingRequest(message=random.random()) + response = await actors.request(destination, request) + if isinstance(response, actors.PoisonMessage): + pytest.fail(f"Actor responded with poison message: message={response.poisonMessage!r}, details={response.details!r}") + assert response == request.message + + value = random.random() + assert [value] == await asyncio.gather(actors.ping(destination, message=value)) # type: ignore[comparison-overlap] + + assert system is actors.get_actor_system() + finally: + actors.shutdown() + + with pytest.raises(actors.ActorContextError): + actors.get_actor_system() diff --git a/tests/config_test.py b/tests/config_test.py index f4afcbe7d..1f042d08c 100644 --- a/tests/config_test.py +++ b/tests/config_test.py @@ -16,6 +16,7 @@ # under the License. import configparser +from collections.abc import Generator import pytest @@ -306,3 +307,37 @@ def test_migrate_from_earliest_supported_to_latest(self): if config.Config.EARLIEST_SUPPORTED_VERSION < config.Config.CURRENT_CONFIG_VERSION: assert config_file.backup_created assert config_file.config["meta"]["config.version"] == str(config.Config.CURRENT_CONFIG_VERSION) + + +@pytest.fixture(autouse=True) +def clear_context() -> Generator[None]: + token = config.CONFIG.set(None) + yield + config.CONFIG.reset(token) + + +def test_config_context(): + with pytest.raises(exceptions.ConfigError): + config.get_config() + with pytest.raises(exceptions.ConfigError): + config.Config.from_config() + + cfg = config.Config() + assert cfg is config.init_config(cfg) + assert cfg is config.get_config() + assert cfg is config.Config.from_config() + assert cfg is config.Config.from_config(cfg) + + with pytest.raises(exceptions.ConfigError): + config.init_config(config.Config()) + + cfg2 = config.init_config(config.Config(), force=True) + assert cfg2 is not cfg + assert cfg2 is config.get_config() + assert cfg2 is config.Config.from_config() + assert cfg is config.Config.from_config(cfg) + assert cfg2 is config.Config.from_config(cfg2) + + config.clear_config() + with pytest.raises(exceptions.ConfigError): + config.get_config() diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 70cb3787c..2b5de19ed 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -1676,7 +1676,7 @@ async def perform_request(*args, **kwargs): test_track = track.Track(name="unittest", description="unittest track", indices=None, challenges=None) # in one second (0.5 warmup + 0.5 measurement) we should get 1000 [ops/s] / 4 [clients] = 250 samples - for target_throughput, bounds in {10: [2, 4], 100: [24, 26], 1000: [235, 255]}.items(): + for target_throughput, bounds in {10: [2, 4], 100: [23, 27], 1000: [230, 270]}.items(): task = track.Task( "time-based", track.Operation( diff --git a/tests/types_test.py b/tests/types_test.py index 7305c094d..498333f1b 100644 --- a/tests/types_test.py +++ b/tests/types_test.py @@ -122,5 +122,5 @@ def assert_annotations(obj, ident, *expects): class TestConfigTypeHint: def test_esrally_module_annotations(self): for module in glob_modules(project_root, "esrally/**/*.py"): - assert_annotations(module, "cfg", types.Config, "types.Config", "types.Config | None") + assert_annotations(module, "cfg", types.Config, "types.Config", "types.Config | None", types.Config | None) assert_annotations(module, "config", types.Config, Optional[types.Config], ConfigParser) diff --git a/tests/utils/convert_test.py b/tests/utils/convert_test.py index 6cc40c699..d66c14dfb 100644 --- a/tests/utils/convert_test.py +++ b/tests/utils/convert_test.py @@ -214,3 +214,20 @@ def test_size(case: SizeCase): assert got.mb() == case.want / (1024 * 1024) assert got.gb() == case.want / (1024 * 1024 * 1024) assert got.tb() == case.want / (1024 * 1024 * 1024 * 1024) + + +@dataclass +class ToRangeCase: + value: str | int | range + want: range + + +@cases( + from_range=ToRangeCase(range(12, 14), range(12, 14)), + from_int=ToRangeCase(5, range(5, 6)), + from_str1=ToRangeCase("7", range(7, 8)), + from_str2=ToRangeCase("1-10", range(1, 11)), +) +def test_to_range(case: ToRangeCase): + got = convert.to_range(case.value) + assert got == case.want