diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index 389bdf44c..c398f646e 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -90,6 +90,11 @@ pybind11_add_module(_rclpy_pybind11 src/rclpy/destroyable.cpp src/rclpy/duration.cpp src/rclpy/clock_event.cpp + src/rclpy/events_executor/delayed_event_thread.cpp + src/rclpy/events_executor/events_executor.cpp + src/rclpy/events_executor/events_queue.cpp + src/rclpy/events_executor/rcl_support.cpp + src/rclpy/events_executor/timers_manager.cpp src/rclpy/exceptions.cpp src/rclpy/graph.cpp src/rclpy/guard_condition.cpp @@ -182,6 +187,7 @@ if(BUILD_TESTING) test/test_create_node.py test/test_create_while_spinning.py test/test_destruction.py + test/test_events_executor.py test/test_executor.py test/test_expand_topic_name.py test/test_guard_condition.py diff --git a/rclpy/rclpy/experimental/__init__.py b/rclpy/rclpy/experimental/__init__.py new file mode 100644 index 000000000..a39b6cb34 --- /dev/null +++ b/rclpy/rclpy/experimental/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024-2025 Brad Martin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events_executor import EventsExecutor # noqa: F401 diff --git a/rclpy/rclpy/experimental/events_executor.py b/rclpy/rclpy/experimental/events_executor.py new file mode 100644 index 000000000..c6807bf65 --- /dev/null +++ b/rclpy/rclpy/experimental/events_executor.py @@ -0,0 +1,43 @@ +# Copyright 2024-2025 Brad Martin +# Copyright 2024 Merlin Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import faulthandler +import typing + +import rclpy.executors +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy + + +# Try to look like we inherit from the rclpy Executor for type checking purposes without +# getting any of the code from the base class. +def EventsExecutor(*, context: typing.Optional[rclpy.Context] = None) -> rclpy.executors.Executor: + if context is None: + context = rclpy.get_default_context() + + # For debugging purposes, if anything goes wrong in C++ make sure we also get a + # Python backtrace dumped with the crash. + faulthandler.enable() + + ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context)) + + # rclpy.Executor does this too. Note, the context itself is smart enough to check + # for bound methods, and check whether the instances they're bound to still exist at + # callback time, so we don't have to worry about tearing down this stale callback at + # destruction time. + # TODO(bmartin427) This should really be done inside of the EventsExecutor + # implementation itself, but I'm unable to figure out a pybind11 incantation that + # allows me to pass this bound method call from C++. + context.on_shutdown(ex.wake) + + return ex diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index 996448e05..888e91baf 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -207,6 +207,20 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None: if invoke: callback(self) + def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool: + """ + Remove a previously-added done callback. + + Returns true if the given callback was found and removed. Always fails if the Future was + already complete. + """ + with self._lock: + try: + self._callbacks.remove(callback) + except ValueError: + return False + return True + class Task(Future[T]): """ diff --git a/rclpy/src/rclpy/_rclpy_pybind11.cpp b/rclpy/src/rclpy/_rclpy_pybind11.cpp index 4da96a5d5..f33fdaa2f 100644 --- a/rclpy/src/rclpy/_rclpy_pybind11.cpp +++ b/rclpy/src/rclpy/_rclpy_pybind11.cpp @@ -32,6 +32,7 @@ #include "duration.hpp" #include "clock_event.hpp" #include "event_handle.hpp" +#include "events_executor/events_executor.hpp" #include "exceptions.hpp" #include "graph.hpp" #include "guard_condition.hpp" @@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) { rclpy::define_signal_handler_api(m); rclpy::define_clock_event(m); rclpy::define_lifecycle_api(m); + + rclpy::events_executor::define_events_executor(m); } diff --git a/rclpy/src/rclpy/events_executor/delayed_event_thread.cpp b/rclpy/src/rclpy/events_executor/delayed_event_thread.cpp new file mode 100644 index 000000000..0d090fc44 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/delayed_event_thread.cpp @@ -0,0 +1,72 @@ +// Copyright 2025 Brad Martin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/delayed_event_thread.hpp" + +#include + +namespace rclpy +{ +namespace events_executor +{ + +DelayedEventThread::DelayedEventThread(EventsQueue * events_queue) +: events_queue_(events_queue), thread_([this]() {RunThread();}) +{ +} + +DelayedEventThread::~DelayedEventThread() +{ + { + std::unique_lock lock(mutex_); + done_ = true; + } + cv_.notify_one(); + thread_.join(); +} + +void DelayedEventThread::EnqueueAt( + std::chrono::steady_clock::time_point when, std::function handler) +{ + { + std::unique_lock lock(mutex_); + when_ = when; + handler_ = handler; + } + cv_.notify_one(); +} + +void DelayedEventThread::RunThread() +{ + std::unique_lock lock(mutex_); + while (!done_) { + if (handler_) { + // Make sure we don't dispatch a stale wait if it changes while we're waiting. + const auto latched_when = when_; + if ( + (std::cv_status::timeout == cv_.wait_until(lock, latched_when)) && handler_ && + (when_ <= latched_when)) + { + auto handler = std::move(handler_); + handler_ = {}; + events_queue_->Enqueue(std::move(handler)); + } + } else { + // Wait indefinitely until we get signaled that there's something worth looking at. + cv_.wait(lock); + } + } +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/delayed_event_thread.hpp b/rclpy/src/rclpy/events_executor/delayed_event_thread.hpp new file mode 100644 index 000000000..087423829 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/delayed_event_thread.hpp @@ -0,0 +1,63 @@ +// Copyright 2025 Brad Martin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_ +#define RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_ + +#include +#include +#include +#include +#include + +#include "events_executor/events_queue.hpp" + +namespace rclpy +{ +namespace events_executor +{ + +/// This object manages posting an event handler to an EventsQueue after a specified delay. The +/// current delay may be changed or canceled at any time. This is done by way of a self-contained +/// child thread to perform the wait. +class DelayedEventThread +{ +public: + /// The pointer is aliased and must live for the lifetime of this object. + explicit DelayedEventThread(EventsQueue *); + ~DelayedEventThread(); + + /// Schedules an event handler to be enqueued at the specified time point. Replaces any previous + /// wait and handler, which will never be dispatched if it has not been already. + void EnqueueAt(std::chrono::steady_clock::time_point when, std::function handler); + + /// Cancels any previously-scheduled handler. + void Cancel() {EnqueueAt({}, {});} + +private: + void RunThread(); + + EventsQueue * const events_queue_; + std::mutex mutex_; + bool done_{}; + std::condition_variable cv_; + std::chrono::steady_clock::time_point when_; + std::function handler_; + std::thread thread_; +}; + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_ diff --git a/rclpy/src/rclpy/events_executor/events_executor.cpp b/rclpy/src/rclpy/events_executor/events_executor.cpp new file mode 100644 index 000000000..4884395d7 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_executor.cpp @@ -0,0 +1,929 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Based on a similar approach as the iRobot rclcpp EventsExecutor implementation: +// https://github.com/ros2/rclcpp/blob/7907b2fee0b1381dc21900efd1745e11f5caa670/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +// Original copyright for that file is: +// Copyright 2023 iRobot Corporation. +// +// Also borrows some code from the original rclpy Executor implementations: +// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py +// Original copyright for that file is: +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/events_executor.hpp" + +#include +#include +#include +#include + +#include + +#include +#include + +#include "client.hpp" +#include "context.hpp" +#include "service.hpp" +#include "subscription.hpp" + +namespace pl = std::placeholders; +namespace py = pybind11; + +namespace rclpy +{ +namespace events_executor +{ + +EventsExecutor::EventsExecutor(py::object context) +: rclpy_context_(context), + inspect_iscoroutine_(py::module_::import("inspect").attr("iscoroutine")), + inspect_signature_(py::module_::import("inspect").attr("signature")), + rclpy_task_(py::module_::import("rclpy.task").attr("Task")), + rclpy_timer_timer_info_(py::module_::import("rclpy.timer").attr("TimerInfo")), + signal_callback_([this]() {events_queue_.Stop();}), + rcl_callback_manager_(&events_queue_), + timers_manager_( + &events_queue_, std::bind(&EventsExecutor::HandleTimerReady, this, pl::_1, pl::_2)) +{ +} + +EventsExecutor::~EventsExecutor() {shutdown();} + +pybind11::object EventsExecutor::create_task( + py::object callback, py::args args, const py::kwargs & kwargs) +{ + // Create and return a rclpy.task.Task() object, and schedule it to be called later. + using py::literals::operator""_a; + py::object task = rclpy_task_(callback, args, kwargs, "executor"_a = py::cast(this)); + // The Task needs to be owned at least until we invoke it from the callback we post, however we + // can't pass a bare py::object because that's going to try to do Python refcounting while + // preparing to go into or coming back from the callback, while the GIL is not held. We'll do + // manual refcounting on it instead. + py::handle cb_task_handle = task; + cb_task_handle.inc_ref(); + events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, cb_task_handle)); + return task; +} + +bool EventsExecutor::shutdown(std::optional timeout) +{ + // NOTE: The rclpy context can invoke this with a lock on the context held. Therefore we must + // not try to go access that context during this method or we can deadlock. + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/context.py#L101-L103 + + events_queue_.Stop(); + + // Block until spinning is done, or timeout. Release the GIL while we block though. + { + py::gil_scoped_release gil_release; + std::unique_lock spin_lock(spinning_mutex_, std::defer_lock); + if (timeout) { + if (!spin_lock.try_lock_for(std::chrono::duration(*timeout))) { + return false; + } + } else { + spin_lock.lock(); + } + } + + // Tear down any callbacks we still have registered. + for (py::handle node : py::list(nodes_)) { + remove_node(node); + } + UpdateEntitiesFromNodes(true); + return true; +} + +bool EventsExecutor::add_node(py::object node) +{ + if (nodes_.contains(node)) { + return false; + } + nodes_.add(node); + // Caution, the Node executor setter method calls executor.add_node() again making this + // reentrant. + node.attr("executor") = py::cast(this); + wake(); + return true; +} + +void EventsExecutor::remove_node(py::handle node) +{ + if (!nodes_.contains(node)) { + return; + } + // Why does pybind11 provide a C++ method for add() but not discard() or remove()? + nodes_.attr("remove")(node); + // Not sure why rclpy doesn't change the node.executor at this point + wake(); +} + +void EventsExecutor::wake() +{ + if (!wake_pending_.exchange(true)) { + // Update tracked entities. + events_queue_.Enqueue([this]() { + py::gil_scoped_acquire gil_acquire; + UpdateEntitiesFromNodes(!py::cast(rclpy_context_.attr("ok")())); + }); + } +} + +py::list EventsExecutor::get_nodes() const {return nodes_;} + +// NOTE: The timeouts on the below two methods are always realtime even if we're running in debug +// time. This is true of other executors too, because debug time is always associated with a +// specific node and more than one node may be connected to an executor instance. +// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L184-L185 + +void EventsExecutor::spin(std::optional timeout_sec, bool stop_after_user_callback) +{ + { + std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock); + if (!spin_lock) { + throw std::runtime_error("Attempt to spin an already-spinning Executor"); + } + stop_after_user_callback_ = stop_after_user_callback; + // Any blocked tasks may have become unblocked while we weren't looking. + PostOutstandingTasks(); + // Release the GIL while we block. Any callbacks on the events queue that want to touch Python + // will need to reacquire it though. + py::gil_scoped_release gil_release; + if (timeout_sec) { + const auto timeout_ns = std::chrono::duration_cast( + std::chrono::duration(*timeout_sec)); + const auto end = std::chrono::steady_clock::now() + timeout_ns; + events_queue_.RunUntil(end); + } else { + events_queue_.Run(); + } + events_queue_.Restart(); + } + + const bool ok = py::cast(rclpy_context_.attr("ok")()); + if (!ok) { + Raise(py::module_::import("rclpy.executors").attr("ExternalShutdownException")()); + } +} + +void EventsExecutor::spin_until_future_complete( + py::handle future, std::optional timeout_sec, bool stop_after_user_callback) +{ + py::cpp_function cb([this](py::handle) {events_queue_.Stop();}); + future.attr("add_done_callback")(cb); + spin(timeout_sec, stop_after_user_callback); + // In case the future didn't complete (we hit the timeout or dispatched a different user callback + // after being asked to only run one), we need to clean up our callback; otherwise, it could fire + // later when the executor isn't valid, or we haven't been asked to wait for this future; also, + // we could end up adding a bunch more of these same callbacks if this method gets invoked in a + // loop. + future.attr("remove_done_callback")(cb); +} + +EventsExecutor * EventsExecutor::enter() {return this;} +void EventsExecutor::exit(py::object, py::object, py::object) {shutdown();} + +void EventsExecutor::UpdateEntitiesFromNodes(bool shutdown) +{ + // Clear pending flag as early as possible, so we error on the side of retriggering a few + // harmless updates rather than potentially missing important additions. + wake_pending_.store(false); + + // Collect all entities currently associated with our nodes + py::set subscriptions; + py::set timers; + py::set clients; + py::set services; + py::set waitables; + if (!shutdown) { + for (py::handle node : nodes_) { + subscriptions.attr("update")(py::set(node.attr("subscriptions"))); + timers.attr("update")(py::set(node.attr("timers"))); + clients.attr("update")(py::set(node.attr("clients"))); + services.attr("update")(py::set(node.attr("services"))); + waitables.attr("update")(py::set(node.attr("waitables"))); + + // It doesn't seem to be possible to support guard conditions with a callback-based (as + // opposed to waitset-based) API. Fortunately we don't seem to need to. + if (!py::set(node.attr("guards")).empty()) { + throw std::runtime_error("Guard conditions not supported"); + } + } + } else { + // Remove all tracked entities and nodes. + nodes_.clear(); + } + + // Perform updates for added and removed entities + UpdateEntitySet( + subscriptions_, subscriptions, + std::bind(&EventsExecutor::HandleAddedSubscription, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedSubscription, this, pl::_1)); + UpdateEntitySet( + timers_, timers, std::bind(&EventsExecutor::HandleAddedTimer, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedTimer, this, pl::_1)); + UpdateEntitySet( + clients_, clients, std::bind(&EventsExecutor::HandleAddedClient, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedClient, this, pl::_1)); + UpdateEntitySet( + services_, services, std::bind(&EventsExecutor::HandleAddedService, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedService, this, pl::_1)); + UpdateEntitySet( + waitables_, waitables, std::bind(&EventsExecutor::HandleAddedWaitable, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedWaitable, this, pl::_1)); + + if (shutdown) { + // Stop spinning after everything is torn down. + events_queue_.Stop(); + } +} + +void EventsExecutor::UpdateEntitySet( + py::set & entity_set, const py::set & new_entity_set, + std::function added_entity_callback, + std::function removed_entity_callback) +{ + py::set added_entities = new_entity_set - entity_set; + for (py::handle added_entity : added_entities) { + added_entity_callback(added_entity); + } + + py::set removed_entities = entity_set - new_entity_set; + for (py::handle removed_entity : removed_entities) { + removed_entity_callback(removed_entity); + } + + entity_set = new_entity_set; +} + +void EventsExecutor::HandleAddedSubscription(py::handle subscription) +{ + py::handle handle = subscription.attr("handle"); + auto with = std::make_shared(handle); + const rcl_subscription_t * rcl_ptr = py::cast(handle).rcl_ptr(); + const auto cb = std::bind(&EventsExecutor::HandleSubscriptionReady, this, subscription, pl::_1); + if ( + RCL_RET_OK != rcl_subscription_set_on_new_message_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + { + throw std::runtime_error( + std::string("Failed to set the on new message callback for subscription: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedSubscription(py::handle subscription) +{ + py::handle handle = subscription.attr("handle"); + const rcl_subscription_t * rcl_ptr = py::cast(handle).rcl_ptr(); + if (RCL_RET_OK != rcl_subscription_set_on_new_message_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new message callback for subscription: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleSubscriptionReady(py::handle subscription, size_t number_of_events) +{ + if (stop_after_user_callback_) { + events_queue_.Stop(); + } + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_subscription() and _execute_subcription(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L355-L367 + // + // NOTE: Simple object attributes we can count on to be owned by the parent object, but bound + // method calls and function return values need to be owned by us. + Subscription & _rclpy_sub = py::cast(subscription.attr("handle")); + const py::object msg_type = subscription.attr("msg_type"); + const bool raw = py::cast(subscription.attr("raw")); + const int callback_type = py::cast(subscription.attr("_callback_type").attr("value")); + const int message_only = + py::cast(subscription.attr("CallbackType").attr("MessageOnly").attr("value")); + const py::handle callback = subscription.attr("callback"); + + // rmw_cyclonedds has a bug which causes number_of_events to be zero in the case where messages + // were waiting for us when we registered the callback, and the topic is using KEEP_ALL history + // policy. We'll work around that by checking for zero and just taking messages until we start + // getting None in that case. https://github.com/ros2/rmw_cyclonedds/issues/509 + bool got_none = false; + for (size_t i = 0; number_of_events ? i < number_of_events : !got_none; ++i) { + py::object msg_info = _rclpy_sub.take_message(msg_type, raw); + if (!msg_info.is_none()) { + try { + if (callback_type == message_only) { + callback(py::cast(msg_info)[0]); + } else { + callback(msg_info); + } + } catch (const py::error_already_set & e) { + HandleCallbackExceptionInNodeEntity(e, subscription, "subscriptions"); + throw; + } + } else { + got_none = true; + } + } + + PostOutstandingTasks(); +} + +void EventsExecutor::HandleAddedTimer(py::handle timer) {timers_manager_.AddTimer(timer);} + +void EventsExecutor::HandleRemovedTimer(py::handle timer) {timers_manager_.RemoveTimer(timer);} + +void EventsExecutor::HandleTimerReady(py::handle timer, const rcl_timer_call_info_t & info) +{ + py::gil_scoped_acquire gil_acquire; + py::object callback = timer.attr("callback"); + // We need to distinguish callbacks that want a TimerInfo object from those that don't. + // Executor._take_timer() actually checks if an argument has type markup expecting a TypeInfo + // object. This seems like overkill, vs just checking if it wants an argument at all? + py::object py_info; + if (py::len(inspect_signature_(callback).attr("parameters").attr("values")()) > 0) { + using py::literals::operator""_a; + py_info = rclpy_timer_timer_info_( + "expected_call_time"_a = info.expected_call_time, + "actual_call_time"_a = info.actual_call_time, + "clock_type"_a = timer.attr("clock").attr("clock_type")); + } + py::object result; + try { + if (py_info) { + result = callback(py_info); + } else { + result = callback(); + } + } catch (const py::error_already_set & e) { + HandleCallbackExceptionInNodeEntity(e, timer, "timers"); + throw; + } + + // The type markup claims the callback can't be a coroutine, but this seems to be a lie because + // the unit test does exactly that. + if (py::cast(inspect_iscoroutine_(result))) { + // Create a Task to manage iteration of this coroutine later. + create_task(result); + } else if (stop_after_user_callback_) { + events_queue_.Stop(); + } + PostOutstandingTasks(); +} + +void EventsExecutor::HandleAddedClient(py::handle client) +{ + py::handle handle = client.attr("handle"); + auto with = std::make_shared(handle); + const rcl_client_t * rcl_ptr = py::cast(handle).rcl_ptr(); + const auto cb = std::bind(&EventsExecutor::HandleClientReady, this, client, pl::_1); + if ( + RCL_RET_OK != rcl_client_set_on_new_response_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + { + throw std::runtime_error( + std::string("Failed to set the on new response callback for client: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedClient(py::handle client) +{ + py::handle handle = client.attr("handle"); + const rcl_client_t * rcl_ptr = py::cast(handle).rcl_ptr(); + if (RCL_RET_OK != rcl_client_set_on_new_response_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new response callback for client: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_events) +{ + if (stop_after_user_callback_) { + events_queue_.Stop(); + } + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_client() and _execute_client(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L369-L384 + Client & _rclpy_client = py::cast(client.attr("handle")); + const py::handle srv_type = client.attr("srv_type"); + const py::object res_type = srv_type.attr("Response"); + const py::object get_pending_request = client.attr("get_pending_request"); + + for (size_t i = 0; i < number_of_events; ++i) { + py::tuple seq_and_response = _rclpy_client.take_response(res_type); + py::handle header = seq_and_response[0]; + py::handle response = seq_and_response[1]; + if (!header.is_none()) { + py::object sequence = header.attr("request_id").attr("sequence_number"); + py::object future; + try { + future = get_pending_request(sequence); + } catch (const py::error_already_set & e) { + if (e.matches(PyExc_KeyError)) { + // The request was cancelled + continue; + } + throw; + } + future.attr("_set_executor")(py::cast(this)); + try { + future.attr("set_result")(response); + } catch (const py::error_already_set & e) { + HandleCallbackExceptionInNodeEntity(e, client, "clients"); + throw; + } + } + } + + PostOutstandingTasks(); +} + +void EventsExecutor::HandleAddedService(py::handle service) +{ + py::handle handle = service.attr("handle"); + auto with = std::make_shared(handle); + const rcl_service_t * rcl_ptr = py::cast(handle).rcl_ptr(); + const auto cb = std::bind(&EventsExecutor::HandleServiceReady, this, service, pl::_1); + if ( + RCL_RET_OK != rcl_service_set_on_new_request_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + { + throw std::runtime_error( + std::string("Failed to set the on new request callback for service: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedService(py::handle service) +{ + py::handle handle = service.attr("handle"); + const rcl_service_t * rcl_ptr = py::cast(handle).rcl_ptr(); + if (RCL_RET_OK != rcl_service_set_on_new_request_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new request callback for service: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_events) +{ + if (stop_after_user_callback_) { + events_queue_.Stop(); + } + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_service() and _execute_service(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L386-L397 + Service & _rclpy_service = py::cast(service.attr("handle")); + const py::handle srv_type = service.attr("srv_type"); + const py::object req_type = srv_type.attr("Request"); + const py::handle res_type = srv_type.attr("Response"); + const py::handle callback = service.attr("callback"); + const py::object send_response = service.attr("send_response"); + + for (size_t i = 0; i < number_of_events; ++i) { + py::tuple request_and_header = _rclpy_service.service_take_request(req_type); + py::handle request = request_and_header[0]; + py::handle header = request_and_header[1]; + if (!request.is_none()) { + py::object response; + try { + response = callback(request, res_type()); + } catch (const py::error_already_set & e) { + HandleCallbackExceptionInNodeEntity(e, service, "services"); + throw; + } + send_response(response, header); + } + } + + PostOutstandingTasks(); +} + +void EventsExecutor::HandleAddedWaitable(py::handle waitable) +{ + // The Waitable API is too abstract for us to work with directly; it only exposes APIs for + // dealing with wait sets, and all of the rcl callback API requires knowing exactly what kinds of + // rcl objects you're working with. We'll try to figure out what kind of stuff is hiding behind + // the abstraction by having the Waitable add itself to a wait set, then take stock of what all + // ended up there. We'll also have to hope that no Waitable implementations ever change their + // component entities over their lifetimes. + auto with_waitable = std::make_shared(waitable); + const py::object num_entities = waitable.attr("get_num_entities")(); + if (py::cast(num_entities.attr("num_guard_conditions")) != 0) { + throw std::runtime_error("Guard conditions not supported"); + } + auto wait_set = std::make_shared( + py::cast(num_entities.attr("num_subscriptions")), 0U, + py::cast(num_entities.attr("num_timers")), + py::cast(num_entities.attr("num_clients")), + py::cast(num_entities.attr("num_services")), + py::cast(num_entities.attr("num_events")), + py::cast(rclpy_context_.attr("handle"))); + auto with_waitset = std::make_shared(py::cast(wait_set)); + waitable.attr("add_to_wait_set")(wait_set); + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + // We null out each entry in the waitset as we set it up, so that the waitset itself can be + // reused when something becomes ready to signal to the Waitable what's ready and what's not. We + // also bind with_waitset into each callback we set up, to ensure that object doesn't get + // destroyed while any of these callbacks are still registered. + WaitableSubEntities sub_entities; + for (size_t i = 0; i < rcl_waitset->size_of_subscriptions; ++i) { + const rcl_subscription_t * const rcl_sub = rcl_waitset->subscriptions[i]; + rcl_waitset->subscriptions[i] = nullptr; + sub_entities.subscriptions.push_back(rcl_sub); + const auto cb = std::bind( + &EventsExecutor::HandleWaitableSubReady, this, waitable, rcl_sub, wait_set, i, with_waitset, + pl::_1); + if ( + RCL_RET_OK != rcl_subscription_set_on_new_message_callback( + rcl_sub, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_sub, cb, with_waitable))) + { + throw std::runtime_error( + std::string("Failed to set the on new message callback for Waitable subscription: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_timers; ++i) { + // Unfortunately we do require a non-const pointer here, while the waitset structure contains a + // const pointer. + rcl_timer_t * const rcl_timer = const_cast(rcl_waitset->timers[i]); + rcl_waitset->timers[i] = nullptr; + sub_entities.timers.push_back(rcl_timer); + // Since this callback doesn't go through RclCallbackManager which would otherwise own an + // instance of `with_waitable` associated with this callback, we'll bind it directly into the + // callback instead. + const auto cb = std::bind( + &EventsExecutor::HandleWaitableTimerReady, this, waitable, rcl_timer, wait_set, i, + with_waitable, with_waitset); + timers_manager_.rcl_manager().AddTimer(rcl_timer, cb); + } + for (size_t i = 0; i < rcl_waitset->size_of_clients; ++i) { + const rcl_client_t * const rcl_client = rcl_waitset->clients[i]; + rcl_waitset->clients[i] = nullptr; + sub_entities.clients.push_back(rcl_client); + const auto cb = std::bind( + &EventsExecutor::HandleWaitableClientReady, this, waitable, rcl_client, wait_set, i, + with_waitset, pl::_1); + if ( + RCL_RET_OK != rcl_client_set_on_new_response_callback( + rcl_client, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_client, cb, with_waitable))) + { + throw std::runtime_error( + std::string("Failed to set the on new response callback for Waitable client: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_services; ++i) { + const rcl_service_t * const rcl_service = rcl_waitset->services[i]; + rcl_waitset->services[i] = nullptr; + sub_entities.services.push_back(rcl_service); + const auto cb = std::bind( + &EventsExecutor::HandleWaitableServiceReady, this, waitable, rcl_service, wait_set, i, + with_waitset, pl::_1); + if ( + RCL_RET_OK != rcl_service_set_on_new_request_callback( + rcl_service, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_service, cb, with_waitable))) + { + throw std::runtime_error( + std::string("Failed to set the on new request callback for Waitable service: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_events; ++i) { + const rcl_event_t * const rcl_event = rcl_waitset->events[i]; + rcl_waitset->events[i] = nullptr; + sub_entities.events.push_back(rcl_event); + const auto cb = std::bind( + &EventsExecutor::HandleWaitableEventReady, this, waitable, rcl_event, wait_set, i, + with_waitset, pl::_1); + if ( + RCL_RET_OK != rcl_event_set_callback( + rcl_event, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_event, cb, with_waitable))) + { + throw std::runtime_error( + std::string("Failed to set the callback for Waitable event: ") + + rcl_get_error_string().str); + } + } + + // Save the set of discovered sub-entities for later use during tear-down since we can't repeat + // the wait set trick then, as the RCL context may already be destroyed at that point. + waitable_entities_[waitable] = std::move(sub_entities); +} + +void EventsExecutor::HandleRemovedWaitable(py::handle waitable) +{ + const auto nh = waitable_entities_.extract(waitable); + if (!nh) { + throw std::runtime_error("Couldn't find sub-entities entry for removed Waitable"); + } + const WaitableSubEntities & sub_entities = nh.mapped(); + for (const rcl_subscription_t * const rcl_sub : sub_entities.subscriptions) { + if (RCL_RET_OK != rcl_subscription_set_on_new_message_callback(rcl_sub, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new message " + "callback for Waitable subscription: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_sub); + } + for (rcl_timer_t * const rcl_timer : sub_entities.timers) { + timers_manager_.rcl_manager().RemoveTimer(rcl_timer); + } + for (const rcl_client_t * const rcl_client : sub_entities.clients) { + if (RCL_RET_OK != rcl_client_set_on_new_response_callback(rcl_client, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new response " + "callback for Waitable client: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_client); + } + for (const rcl_service_t * const rcl_service : sub_entities.services) { + if (RCL_RET_OK != rcl_service_set_on_new_request_callback(rcl_service, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new request " + "callback for Waitable service: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_service); + } + for (const rcl_event_t * const rcl_event : sub_entities.events) { + if (RCL_RET_OK != rcl_event_set_callback(rcl_event, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the callback for Waitable event: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_event); + } +} + +void EventsExecutor::HandleWaitableSubReady( + py::handle waitable, const rcl_subscription_t * rcl_sub, std::shared_ptr wait_set, + size_t wait_set_sub_index, std::shared_ptr, size_t number_of_events) +{ + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our subscription object is ready, and then + // poke the Waitable to do what it needs to do from there. + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + rcl_waitset->subscriptions[wait_set_sub_index] = rcl_sub; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->subscriptions[wait_set_sub_index] = nullptr; +} + +void EventsExecutor::HandleWaitableTimerReady( + py::handle waitable, const rcl_timer_t * rcl_timer, std::shared_ptr wait_set, + size_t wait_set_timer_index, std::shared_ptr, std::shared_ptr) +{ + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our timer object is ready, and then poke + // the Waitable to do what it needs to do from there. + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + rcl_waitset->timers[wait_set_timer_index] = rcl_timer; + HandleWaitableReady(waitable, wait_set, 1); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->timers[wait_set_timer_index] = nullptr; +} + +void EventsExecutor::HandleWaitableClientReady( + py::handle waitable, const rcl_client_t * rcl_client, std::shared_ptr wait_set, + size_t wait_set_client_index, std::shared_ptr, size_t number_of_events) +{ + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our client object is ready, and then poke + // the Waitable to do what it needs to do from there. + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + rcl_waitset->clients[wait_set_client_index] = rcl_client; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->clients[wait_set_client_index] = nullptr; +} + +void EventsExecutor::HandleWaitableServiceReady( + py::handle waitable, const rcl_service_t * rcl_service, std::shared_ptr wait_set, + size_t wait_set_service_index, std::shared_ptr, size_t number_of_events) +{ + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our service object is ready, and then poke + // the Waitable to do what it needs to do from there. + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + rcl_waitset->services[wait_set_service_index] = rcl_service; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->services[wait_set_service_index] = nullptr; +} + +void EventsExecutor::HandleWaitableEventReady( + py::handle waitable, const rcl_event_t * rcl_event, std::shared_ptr wait_set, + size_t wait_set_event_index, std::shared_ptr, size_t number_of_events) +{ + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our event object is ready, and then poke + // the Waitable to do what it needs to do from there. + rcl_wait_set_t * const rcl_waitset = wait_set->rcl_ptr(); + rcl_waitset->events[wait_set_event_index] = rcl_event; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->events[wait_set_event_index] = nullptr; +} + +void EventsExecutor::HandleWaitableReady( + py::handle waitable, std::shared_ptr wait_set, size_t number_of_events) +{ + if (stop_after_user_callback_) { + events_queue_.Stop(); + } + // Largely based on rclpy.Executor._take_waitable() + // https://github.com/ros2/rclpy/blob/a19180c238d4d97ed2b58868d8fb7fa3e3b621f2/rclpy/rclpy/executors.py#L447-L454 + py::object is_ready = waitable.attr("is_ready"); + py::object take_data = waitable.attr("take_data"); + py::object execute = waitable.attr("execute"); + py::object futures = waitable.attr("_futures"); + for (auto & future : futures) { + future.attr("_set_executor")(py::cast(this)); + } + for (size_t i = 0; i < number_of_events; ++i) { + // This method can have side effects, so it needs to be called even though it looks like just + // an accessor. + if (!is_ready(wait_set)) { + throw std::runtime_error("Failed to make Waitable ready"); + } + py::object data = take_data(); + // execute() is an async method, we need a Task to run it + create_task(execute(data)); + } + + PostOutstandingTasks(); +} + +void EventsExecutor::IterateTask(py::handle task) +{ + if (stop_after_user_callback_) { + events_queue_.Stop(); + } + py::gil_scoped_acquire gil_acquire; + // Calling this won't throw, but it may set the exception property on the task object. + task(); + if (py::cast(task.attr("done")())) { + py::object ex = task.attr("exception")(); + // Drop reference with GIL held. This doesn't necessarily destroy the underlying Task, since + // the `create_task()` caller may have retained a reference to the returned value. + task.dec_ref(); + + if (!ex.is_none()) { + // It's not clear how to easily turn a Python exception into a C++ one, so let's just throw + // it again and let pybind translate it normally. + try { + Raise(ex); + } catch (py::error_already_set & cpp_ex) { + // There's no good way to know what node this task came from. If we only have one node, we + // can use the logger from that, otherwise we'll have to leave it undefined. + py::object logger = py::none(); + if (nodes_.size() == 1) { + logger = nodes_[0].attr("get_logger")(); + } + HandleCallbackExceptionWithLogger(cpp_ex, logger, "task"); + throw; + } + } + } else { + // Task needs more iteration. Store the handle and revisit it later after the next ready + // entity which may unblock it. + // TODO(bmartin427) This matches the behavior of SingleThreadedExecutor and avoids busy + // looping, but I don't love it because if the task is waiting on something other than an rcl + // entity (e.g. an asyncio sleep, or a Future triggered from another thread, or even another + // Task), there can be arbitrarily long latency before some rcl activity causes us to go + // revisit that Task. + blocked_tasks_.push_back(task); + } +} + +void EventsExecutor::PostOutstandingTasks() +{ + for (auto & task : blocked_tasks_) { + events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, task)); + } + // Clear the entire outstanding tasks list. Any tasks that need further iteration will re-add + // themselves during IterateTask(). + blocked_tasks_.clear(); +} + +void EventsExecutor::HandleCallbackExceptionInNodeEntity( + const py::error_already_set & exc, py::handle entity, const std::string & node_entity_attr) +{ + // Try to identify the node associated with the entity that threw the exception, so we can log to + // it. + for (py::handle node : nodes_) { + if (py::set(node.attr(node_entity_attr.c_str())).contains(entity)) { + return HandleCallbackExceptionWithLogger(exc, node.attr("get_logger")(), node_entity_attr); + } + } + + // Failed to find a node + HandleCallbackExceptionWithLogger(exc, py::none(), node_entity_attr); +} + +void EventsExecutor::HandleCallbackExceptionWithLogger( + const py::error_already_set & exc, py::object logger, const std::string & entity_type) +{ + if (logger.is_none()) { + py::object logging = py::module_::import("rclpy.logging"); + logger = logging.attr("get_logger")("UNKNOWN"); + } + + // The logger API won't let you call it with two different severities, from what it considers the + // same code location. Since it has no visibility into C++, all calls made from here will be + // attributed to the python that last called into here. Instead we will call out to python for + // logging. + py::dict scope; + scope["logger"] = logger; + scope["node_entity_attr"] = entity_type; + scope["exc_value"] = exc.value(); + scope["exc_trace"] = exc.trace(); + py::exec( + R"( +import traceback +logger.fatal(f"Exception in '{node_entity_attr}' callback: {exc_value}") +logger.warn("Error occurred at:\n" + "".join(traceback.format_tb(exc_trace))) +)", + scope); +} + +void EventsExecutor::Raise(py::object ex) +{ + py::dict scope; + scope["ex"] = ex; + py::exec("raise ex", scope); +} + +// pybind11 module bindings + +void define_events_executor(py::object module) +{ + py::class_(module, "EventsExecutor") + .def(py::init(), py::arg("context")) + .def_property_readonly("context", &EventsExecutor::get_context) + .def("create_task", &EventsExecutor::create_task, py::arg("callback")) + .def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none()) + .def("add_node", &EventsExecutor::add_node, py::arg("node")) + .def("remove_node", &EventsExecutor::remove_node, py::arg("node")) + .def("wake", &EventsExecutor::wake) + .def("get_nodes", &EventsExecutor::get_nodes) + .def("spin", [](EventsExecutor & exec) {exec.spin();}) + .def( + "spin_once", + [](EventsExecutor & exec, std::optional timeout_sec) { + exec.spin(timeout_sec, true); + }, + py::arg("timeout_sec") = py::none()) + .def( + "spin_until_future_complete", + [](EventsExecutor & exec, py::handle future, std::optional timeout_sec) { + exec.spin_until_future_complete(future, timeout_sec); + }, + py::arg("future"), py::arg("timeout_sec") = py::none()) + .def( + "spin_once_until_future_complete", + [](EventsExecutor & exec, py::handle future, std::optional timeout_sec) { + exec.spin_until_future_complete(future, timeout_sec, true); + }, + py::arg("future"), py::arg("timeout_sec") = py::none()) + .def("__enter__", &EventsExecutor::enter) + .def("__exit__", &EventsExecutor::exit); +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/events_executor.hpp b/rclpy/src/rclpy/events_executor/events_executor.hpp new file mode 100644 index 000000000..16220d36b --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_executor.hpp @@ -0,0 +1,206 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ +#define RCLPY__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "events_executor/events_queue.hpp" +#include "events_executor/rcl_support.hpp" +#include "events_executor/scoped_with.hpp" +#include "events_executor/timers_manager.hpp" +#include "signal_handler.hpp" +#include "wait_set.hpp" + +namespace rclpy +{ +namespace events_executor +{ + +/// Events executor implementation for rclpy +/// +/// This executor implementation attempts to replicate the function of the rclcpp EventsExecutor +/// for the benefit of rclpy applications. It is implemented in C++ to minimize the overhead of +/// processing the event loop. +/// +/// We assume all public methods could be invoked from any thread. Callbacks on the executor loop +/// will be issued on the thread that called one of the spin*() variants (ignoring any parallelism +/// that might be allowed by the callback group configuration). +class EventsExecutor +{ +public: + /// @param context the rclpy Context object to operate on + explicit EventsExecutor(pybind11::object context); + + ~EventsExecutor(); + + // rclpy Executor API methods: + pybind11::object get_context() const {return rclpy_context_;} + pybind11::object create_task( + pybind11::object callback, pybind11::args args = {}, const pybind11::kwargs & kwargs = {}); + bool shutdown(std::optional timeout_sec = {}); + bool add_node(pybind11::object node); + void remove_node(pybind11::handle node); + void wake(); + pybind11::list get_nodes() const; + void spin(std::optional timeout_sec = {}, bool stop_after_user_callback = false); + void spin_until_future_complete( + pybind11::handle future, std::optional timeout_sec = {}, + bool stop_after_user_callback = false); + EventsExecutor * enter(); + void exit(pybind11::object, pybind11::object, pybind11::object); + +private: + // Structure to hold entities discovered underlying a Waitable object. + struct WaitableSubEntities + { + std::vector subscriptions; + std::vector timers; // Can't be const + std::vector clients; + std::vector services; + std::vector events; + }; + + /// Updates the sets of known entities based on the currently tracked nodes. This is not thread + /// safe, so it must be posted to the EventsQueue if the executor is currently spinning. Expects + /// the GIL to be held before calling. If @p shutdown is true, a purge of all known nodes and + /// entities is forced. + void UpdateEntitiesFromNodes(bool shutdown); + + /// Given an existing set of entities and a set with the desired new state, updates the existing + /// set and invokes callbacks on each added or removed entity. + void UpdateEntitySet( + pybind11::set & entity_set, const pybind11::set & new_entity_set, + std::function added_entity_callback, + std::function removed_entity_callback); + + void HandleAddedSubscription(pybind11::handle); + void HandleRemovedSubscription(pybind11::handle); + void HandleSubscriptionReady(pybind11::handle, size_t number_of_events); + + void HandleAddedTimer(pybind11::handle); + void HandleRemovedTimer(pybind11::handle); + void HandleTimerReady(pybind11::handle, const rcl_timer_call_info_t &); + + void HandleAddedClient(pybind11::handle); + void HandleRemovedClient(pybind11::handle); + void HandleClientReady(pybind11::handle, size_t number_of_events); + + void HandleAddedService(pybind11::handle); + void HandleRemovedService(pybind11::handle); + void HandleServiceReady(pybind11::handle, size_t number_of_events); + + void HandleAddedWaitable(pybind11::handle); + void HandleRemovedWaitable(pybind11::handle); + void HandleWaitableSubReady( + pybind11::handle waitable, const rcl_subscription_t *, + std::shared_ptr wait_set, size_t wait_set_sub_index, + std::shared_ptr with_waitset, size_t number_of_events); + void HandleWaitableTimerReady( + pybind11::handle waitable, const rcl_timer_t *, std::shared_ptr wait_set, + size_t wait_set_timer_index, std::shared_ptr with_waitable, + std::shared_ptr with_waitset); + void HandleWaitableClientReady( + pybind11::handle waitable, const rcl_client_t *, std::shared_ptr wait_set, + size_t wait_set_client_index, std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableServiceReady( + pybind11::handle waitable, const rcl_service_t *, std::shared_ptr wait_set, + size_t wait_set_service_index, std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableEventReady( + pybind11::handle waitable, const rcl_event_t *, std::shared_ptr wait_set, + size_t wait_set_event_index, std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableReady( + pybind11::handle waitable, std::shared_ptr wait_set, size_t number_of_events); + + /// Helper for create_task(). @p task needs to have had one reference manually added to it. See + /// create_task() implementation for details. + void IterateTask(pybind11::handle task); + + /// Posts a call to IterateTask() for every outstanding entry in tasks_; should be invoked from + /// other Handle*Ready() methods to check if any asynchronous Tasks have been unblocked by the + /// newly-handled event. + void PostOutstandingTasks(); + + void HandleCallbackExceptionInNodeEntity( + const pybind11::error_already_set &, pybind11::handle entity, + const std::string & node_entity_attr); + void HandleCallbackExceptionWithLogger( + const pybind11::error_already_set &, pybind11::object logger, const std::string & entity_type); + + /// Raises the given python object instance as a Python exception + void Raise(pybind11::object); + + const pybind11::object rclpy_context_; + + // Imported python objects we depend on + const pybind11::object inspect_iscoroutine_; + const pybind11::object inspect_signature_; + const pybind11::object rclpy_task_; + const pybind11::object rclpy_timer_timer_info_; + + EventsQueue events_queue_; + ScopedSignalCallback signal_callback_; + + pybind11::set nodes_; ///< The set of all nodes we're executing + std::atomic wake_pending_{}; ///< An unhandled call to wake() has been made + std::timed_mutex spinning_mutex_; ///< Held while a thread is spinning + + /// This flag is used by spin_once() to signal that the EventsQueue should be stopped after a + /// single user-visible callback has been dispatched. + bool stop_after_user_callback_{}; + + // Collection of awaitable entities we're servicing + pybind11::set subscriptions_; + pybind11::set timers_; + pybind11::set clients_; + pybind11::set services_; + pybind11::set waitables_; + + /// Collection of asynchronous Tasks awaiting new events to further iterate. + std::vector blocked_tasks_; + + /// Cache for rcl pointers underlying each waitables_ entry, because those are harder to retrieve + /// than the other entity types. + std::unordered_map waitable_entities_; + + RclCallbackManager rcl_callback_manager_; + TimersManager timers_manager_; +}; + +void define_events_executor(pybind11::object module); + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ diff --git a/rclpy/src/rclpy/events_executor/events_queue.cpp b/rclpy/src/rclpy/events_executor/events_queue.cpp new file mode 100644 index 000000000..a65c22bc5 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_queue.cpp @@ -0,0 +1,66 @@ +// Copyright 2025 Brad Martin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/events_queue.hpp" + +namespace rclpy +{ +namespace events_executor +{ + +EventsQueue::~EventsQueue() {} + +void EventsQueue::Enqueue(std::function event_handler) +{ + { + std::unique_lock lock(mutex_); + queue_.push(event_handler); + } + cv_.notify_one(); +} + +void EventsQueue::Run() {RunUntil(std::chrono::steady_clock::time_point::max());} + +void EventsQueue::RunUntil(std::chrono::steady_clock::time_point deadline) +{ + while (true) { + std::function handler; + { + std::unique_lock lock(mutex_); + cv_.wait_until(lock, deadline, [this]() {return stopped_ || !queue_.empty();}); + if (stopped_ || queue_.empty()) { + // We stopped for some reason other than being ready to run something (stopped or timeout) + return; + } + handler = queue_.front(); + queue_.pop(); + } + handler(); + } +} + +void EventsQueue::Stop() +{ + std::unique_lock lock(mutex_); + stopped_ = true; + cv_.notify_one(); +} + +void EventsQueue::Restart() +{ + std::unique_lock lock(mutex_); + stopped_ = false; +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/events_queue.hpp b/rclpy/src/rclpy/events_executor/events_queue.hpp new file mode 100644 index 000000000..a6d7b599b --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_queue.hpp @@ -0,0 +1,66 @@ +// Copyright 2025 Brad Martin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ +#define RCLPY__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ + +#include +#include +#include +#include +#include + +namespace rclpy +{ +namespace events_executor +{ + +/// This class represents a queue of event handlers to dispatch. Handlers may be enqueued from any +/// thread, and will always be dispatched during a call to a Run*() method on the thread invoking +/// that method. Multiple threads may not invoke Run*() methods simultaneously. +class EventsQueue +{ +public: + ~EventsQueue(); + + /// Add an event handler to the queue to be dispatched. Can be invoked by any thread. + void Enqueue(std::function); + + /// Run event handlers indefinitely, until stopped. + void Run(); + + /// Run all ready event handlers, and any that become ready before the given deadline. Calling + /// Stop() will make this return immediately even if ready handlers are enqueued. + void RunUntil(std::chrono::steady_clock::time_point); + + /// Causes any Run*() methods outstanding to return immediately. Can be invoked from any thread. + /// The stopped condition persists (causing any *subsequent* Run*() calls to also return) until + /// Restart() is invoked. + void Stop(); + + /// Ends a previous stopped condition, allowing Run*() methods to operate again. May be invoked + /// from any thread. + void Restart(); + +private: + std::queue> queue_; + std::mutex mutex_; + std::condition_variable cv_; + bool stopped_{}; +}; + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ diff --git a/rclpy/src/rclpy/events_executor/python_hasher.hpp b/rclpy/src/rclpy/events_executor/python_hasher.hpp new file mode 100644 index 000000000..c765645ac --- /dev/null +++ b/rclpy/src/rclpy/events_executor/python_hasher.hpp @@ -0,0 +1,38 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__PYTHON_HASHER_HPP_ +#define RCLPY__EVENTS_EXECUTOR__PYTHON_HASHER_HPP_ + +#include + +namespace rclpy +{ +namespace events_executor +{ +/// This is intended to be used as the Hash template arg to STL containers using a +/// pybind11::handle as a Key. This is the same hash that a native Python dict or set +/// would use given the same key. +struct PythonHasher +{ + inline auto operator()(const pybind11::handle & handle) const + { + return pybind11::hash(handle); + } +}; +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__PYTHON_HASHER_HPP_ diff --git a/rclpy/src/rclpy/events_executor/rcl_support.cpp b/rclpy/src/rclpy/events_executor/rcl_support.cpp new file mode 100644 index 000000000..3a8826f70 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/rcl_support.cpp @@ -0,0 +1,80 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/rcl_support.hpp" + +#include + +namespace py = pybind11; + +namespace rclpy +{ +namespace events_executor +{ + +extern "C" void RclEventCallbackTrampoline(const void * user_data, size_t number_of_events) +{ + const auto cb = reinterpret_cast *>(user_data); + (*cb)(number_of_events); +} + +RclCallbackManager::RclCallbackManager(EventsQueue * events_queue) +: events_queue_(events_queue) {} + +RclCallbackManager::~RclCallbackManager() +{ + // Should not still have any callbacks registered when we exit, because otherwise RCL can call + // pointers that will no longer be valid. We can't throw an exception here, but we can explode. + if (!owned_cbs_.empty()) { + py::gil_scoped_acquire gil_acquire; + py::print("Destroying callback manager with callbacks remaining"); + ::abort(); + } +} + +const void * RclCallbackManager::MakeCallback( + const void * key, std::function callback, std::shared_ptr with) +{ + // We don't support replacing an existing callback with a new one, because it gets tricky making + // sure we don't delete an old callback while the middleware still holds a pointer to it. + if (owned_cbs_.count(key) != 0) { + throw py::key_error("Attempt to replace existing callback"); + } + CbEntry new_entry; + new_entry.cb = + std::make_unique>([this, callback, key](size_t number_of_events) { + events_queue_->Enqueue([this, callback, key, number_of_events]() { + if (!owned_cbs_.count(key)) { + // This callback has been removed, just drop it as the objects it may want to touch may + // no longer exist. + return; + } + callback(number_of_events); + }); + }); + new_entry.with = with; + const void * ret = new_entry.cb.get(); + owned_cbs_[key] = std::move(new_entry); + return ret; +} + +void RclCallbackManager::RemoveCallback(const void * key) +{ + if (!owned_cbs_.erase(key)) { + throw py::key_error("Attempt to remove nonexistent callback"); + } +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/rcl_support.hpp b/rclpy/src/rclpy/events_executor/rcl_support.hpp new file mode 100644 index 000000000..0fcad5f5a --- /dev/null +++ b/rclpy/src/rclpy/events_executor/rcl_support.hpp @@ -0,0 +1,81 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__RCL_SUPPORT_HPP_ +#define RCLPY__EVENTS_EXECUTOR__RCL_SUPPORT_HPP_ + +#include +#include +#include +#include + +#include "events_executor/events_queue.hpp" +#include "events_executor/scoped_with.hpp" + +namespace rclpy +{ +namespace events_executor +{ + +/// Use this for all RCL event callbacks. Use the return value from +/// RclCallbackManager::MakeCallback() as the user data arg. +/// +/// Note that RCL callbacks are invoked in some arbitrary thread originating from the middleware. +/// Callbacks should process quickly to avoid blocking the middleware; i.e. all actual work should +/// be posted to an EventsQueue running in another thread. +extern "C" void RclEventCallbackTrampoline(const void * user_data, size_t number_of_events); + +/// Creates and maintains callback wrappers used with the RCL C library. +class RclCallbackManager +{ +public: + /// All user callbacks will be posted on the @p events_queue given to the constructor. This + /// pointer is aliased and must live for the lifetime of this object. These callbacks will be + /// invoked without the Python Global Interpreter Lock held, so if they need to access Python at + /// all make sure to acquire that explicitly. + explicit RclCallbackManager(EventsQueue * events_queue); + ~RclCallbackManager(); + + /// Creates a callback wrapper to be passed to RCL C functions. @p key should be a pointer to + /// the rcl object that will be associated with the callback. @p with protects the _rclpy object + /// handle owning the RCL object, for the duration the callback is established. + const void * MakeCallback( + const void * key, std::function callback, std::shared_ptr with); + + /// Discard a previously constructed callback. @p key should be the same value provided to + /// MakeCallback(). Caution: ensure that RCL is no longer using a callback before invoking this. + void RemoveCallback(const void * key); + +private: + /// The C RCL interface deals in raw pointers, so someone needs to own the C++ function objects + /// we'll be calling into. We use unique pointers so the raw pointer to the object remains + /// stable while the map is manipulated. + struct CbEntry + { + std::unique_ptr> cb; + std::shared_ptr with; + }; + + EventsQueue * const events_queue_; + + /// The map key is the raw pointer to the RCL entity object (subscription, etc) associated with + /// the callback. + std::unordered_map owned_cbs_; +}; + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__RCL_SUPPORT_HPP_ diff --git a/rclpy/src/rclpy/events_executor/scoped_with.hpp b/rclpy/src/rclpy/events_executor/scoped_with.hpp new file mode 100644 index 000000000..49f776adf --- /dev/null +++ b/rclpy/src/rclpy/events_executor/scoped_with.hpp @@ -0,0 +1,45 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__SCOPED_WITH_HPP_ +#define RCLPY__EVENTS_EXECUTOR__SCOPED_WITH_HPP_ + +#include + +namespace rclpy +{ +namespace events_executor +{ + +/// Enters a python context manager for the scope of this object instance. +class ScopedWith +{ +public: + explicit ScopedWith(pybind11::handle object) + : object_(pybind11::cast(object)) + { + object_.attr("__enter__")(); + } + + ~ScopedWith() {object_.attr("__exit__")(pybind11::none(), pybind11::none(), pybind11::none());} + +private: + pybind11::object object_; +}; + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__SCOPED_WITH_HPP_ diff --git a/rclpy/src/rclpy/events_executor/timers_manager.cpp b/rclpy/src/rclpy/events_executor/timers_manager.cpp new file mode 100644 index 000000000..f21c951c3 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/timers_manager.cpp @@ -0,0 +1,375 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/timers_manager.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "events_executor/delayed_event_thread.hpp" +#include "timer.hpp" + +namespace pl = std::placeholders; +namespace py = pybind11; + +namespace rclpy +{ +namespace events_executor +{ + +namespace +{ + +// Implementation note: the original iRobot TimersManager associated with the rclcpp EventsExecutor +// maintained a heap with all outstanding timers sorted by their next expiration time. Here that +// approach will be skipped in favor of just looking at every timer every update for the following +// reasons: +// +// * This approach is simpler +// * In the applications this has been used in so far, no Node types exist that have a large number +// of timers outstanding at once. Assuming no more than a few timers exist in the whole process, +// the heap seems like overkill. +// * Every time a timer ticks or is reset, the heap needs to be resorted anyway. +// +// We will however yell a bit if we ever see a large number of timers that disproves this +// assumption, so we can reassess this decision. +constexpr size_t WARN_TIMERS_COUNT = 8; + +typedef std::function ClockJumpCallbackT; +typedef std::function TimerResetCallbackT; + +extern "C" void RclClockJumpTrampoline( + const rcl_time_jump_t * time_jump, bool before_jump, void * user_data) +{ + // rcl calls this both before and after a clock change, and we never want the before callback, so + // let's just eliminate that case early. + if (before_jump) { + return; + } + auto cb = reinterpret_cast(user_data); + (*cb)(time_jump); +} + +extern "C" void RclTimerResetTrampoline(const void * user_data, size_t) +{ + auto cb = reinterpret_cast(user_data); + (*cb)(); +} + +} // namespace + +/// Manages a single clock source, and all timers operating on that source. All methods (including +/// construction and destruction) are assumed to happen on the thread running the provided events +/// queue. +class RclTimersManager::ClockManager : public std::enable_shared_from_this +{ +public: + ClockManager(EventsQueue * events_queue, rcl_clock_t * clock) + : events_queue_(events_queue), clock_(clock) + { + // Need to establish a clock jump callback so we can tell when debug time is updated. + rcl_jump_threshold_t threshold; + threshold.on_clock_change = true; + threshold.min_forward.nanoseconds = 1; + threshold.min_backward.nanoseconds = -1; + // Note, this callback could happen on any thread + jump_cb_ = [this](const rcl_time_jump_t * time_jump) { + bool on_debug{}; + switch (time_jump->clock_change) { + case RCL_ROS_TIME_NO_CHANGE: + case RCL_ROS_TIME_ACTIVATED: + on_debug = true; + break; + case RCL_ROS_TIME_DEACTIVATED: + case RCL_SYSTEM_TIME_NO_CHANGE: + on_debug = false; + break; + } + events_queue_->Enqueue(CallIfAlive(&ClockManager::HandleJump, on_debug)); + }; + if ( + RCL_RET_OK != + rcl_clock_add_jump_callback(clock_, threshold, RclClockJumpTrampoline, &jump_cb_)) + { + throw std::runtime_error( + std::string("Failed to set RCL clock jump callback: ") + rcl_get_error_string().str); + } + + // This isn't necessary yet but every timer will eventually depend on it. Again, this could + // happen on any thread. + reset_cb_ = [this]() {events_queue_->Enqueue(CallIfAlive(&ClockManager::UpdateTimers));}; + + // Initialize which timebase we're on + if (clock_->type == RCL_ROS_TIME) { + if (RCL_RET_OK != rcl_is_enabled_ros_time_override(clock_, &on_debug_time_)) { + throw std::runtime_error( + std::string("Failed to get RCL clock override state: ") + rcl_get_error_string().str); + } + } + } + + ~ClockManager() + { + if (RCL_RET_OK != rcl_clock_remove_jump_callback(clock_, RclClockJumpTrampoline, &jump_cb_)) { + py::gil_scoped_acquire gil_acquire; + py::print( + std::string("Failed to remove RCL clock jump callback: ") + rcl_get_error_string().str); + } + while (!timers_.empty()) { + RemoveTimer(timers_.begin()->first); + } + } + + bool empty() const {return timers_.empty();} + + void AddTimer( + rcl_timer_t * timer, std::function ready_callback) + { + // All timers have the same reset callback + if ( + RCL_RET_OK != rcl_timer_set_on_reset_callback(timer, RclTimerResetTrampoline, &reset_cb_)) + { + throw std::runtime_error( + std::string("Failed to set timer reset callback: ") + rcl_get_error_string().str); + } + timers_[timer] = ready_callback; + if (timers_.size() >= WARN_TIMERS_COUNT) { + py::print("Warning, the number of timers associated with this clock is large."); + py::print("Management of this number of timers may be inefficient."); + } + UpdateTimers(); + } + + void RemoveTimer(rcl_timer_t * timer) + { + auto it = timers_.find(timer); + if (it == timers_.end()) { + throw py::key_error("Attempt to remove unmanaged timer"); + } + + if (RCL_RET_OK != rcl_timer_set_on_reset_callback(timer, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear timer reset callback: ") + rcl_get_error_string().str); + } + timers_.erase(it); + // We could re-evaluate how long we need to block for now that a timer has been removed; but, + // there's no real harm in one extra wakeup that then decides it doesn't need to do anything, + // and this timer might not even be the next to fire, so we won't bother. + } + +private: + /// Returns a function suitable for being invoked later, which would invoke the given method on + /// `this` with the given args, provided that `this` still exists at that time. + template + std::function CallIfAlive(void (ClockManager::*method)(Args...), Args... args) + { + std::weak_ptr weak_this(shared_from_this()); + return [ = ]() { + auto locked = weak_this.lock(); + if (locked) { + (locked.get()->*method)(args ...); + } + }; + } + + void HandleJump(bool on_debug_time) + { + on_debug_time_ = on_debug_time; + UpdateTimers(); + } + + void UpdateTimers() + { + // Let's not assume that rcl_clock_get_now() and std::chrono::steady_clock::now() are on the + // same timebase. + int64_t rcl_now{}; + if (RCL_RET_OK != rcl_clock_get_now(clock_, &rcl_now)) { + throw std::runtime_error( + std::string("Failed to read RCL clock: ") + rcl_get_error_string().str); + } + const auto chrono_now = std::chrono::steady_clock::now(); + + // First, evaluate all of our timers and dispatch any that are ready now. While we're at it, + // keep track of the earliest next timer callback that is due. + std::optional next_ready_time_ns; + for (const auto & timer_cb_pair : timers_) { + auto this_next_time_ns = GetNextCallTimeNanoseconds(timer_cb_pair.first); + if (this_next_time_ns) { + if (*this_next_time_ns <= rcl_now) { + ready_timers_.insert(timer_cb_pair.first); + events_queue_->Enqueue(CallIfAlive(&ClockManager::DispatchTimer, timer_cb_pair.first)); + } else if (!next_ready_time_ns || (*this_next_time_ns < *next_ready_time_ns)) { + next_ready_time_ns = this_next_time_ns; + } + } + } + + // If we posted any timers for dispatch, then we'll re-evaluate things immediately after those + // complete. Otherwise, if we're on debug time, we'll re-check everything at the next jump + // callback. If neither of those things are true, then we need to schedule a wakeup for when + // we anticipate the next timer being ready. + if (ready_timers_.empty() && !on_debug_time_ && next_ready_time_ns) { + next_update_wait_.EnqueueAt( + chrono_now + std::chrono::nanoseconds(*next_ready_time_ns - rcl_now), + CallIfAlive(&ClockManager::UpdateTimers)); + } else { + next_update_wait_.Cancel(); + } + } + + void DispatchTimer(rcl_timer_t * rcl_timer) + { + ready_timers_.erase(rcl_timer); + // If we've dispatched all ready timers, then trigger another update to see when the next + // timers will be ready. + if (ready_timers_.empty()) { + events_queue_->Enqueue(CallIfAlive(&ClockManager::UpdateTimers)); + } + + const auto map_it = timers_.find(rcl_timer); + if (map_it == timers_.end()) { + // Perhaps the timer was removed before a pending callback could be dispatched? + return; + } + + // This notifies RCL that we're considering the timer triggered, for the purposes of updating + // the next trigger time. + rcl_timer_call_info_t info; + const auto ret = rcl_timer_call_with_info(rcl_timer, &info); + switch (ret) { + case RCL_RET_OK: + // Dispatch the actual user callback. + map_it->second(info); + break; + case RCL_RET_TIMER_CANCELED: + // Someone canceled the timer after we queried the call time. Nevermind, then... + rcl_reset_error(); + break; + default: + throw std::runtime_error( + std::string("Failed to call RCL timer: ") + rcl_get_error_string().str); + } + } + + /// Returns the absolute time in nanoseconds when the next callback on the given timer is due. + /// Returns std::nullopt if the timer is canceled. + static std::optional GetNextCallTimeNanoseconds(const rcl_timer_t * rcl_timer) + { + int64_t next_call_time{}; + const rcl_ret_t ret = rcl_timer_get_next_call_time(rcl_timer, &next_call_time); + switch (ret) { + case RCL_RET_OK: + return next_call_time; + case RCL_RET_TIMER_CANCELED: + return {}; + default: + throw std::runtime_error( + std::string("Failed to fetch timer ready time: ") + rcl_get_error_string().str); + } + } + + EventsQueue * const events_queue_; + rcl_clock_t * const clock_; + ClockJumpCallbackT jump_cb_; + TimerResetCallbackT reset_cb_; + bool on_debug_time_{}; + + std::unordered_map> timers_; + std::unordered_set ready_timers_; + DelayedEventThread next_update_wait_{events_queue_}; +}; + +RclTimersManager::RclTimersManager(EventsQueue * events_queue) +: events_queue_(events_queue) {} + +RclTimersManager::~RclTimersManager() {} + +namespace +{ +rcl_clock_t * GetTimerClock(rcl_timer_t * timer) +{ + rcl_clock_t * clock{}; + if (RCL_RET_OK != rcl_timer_clock(timer, &clock)) { + throw std::runtime_error( + std::string("Failed to determine clock for timer: ") + rcl_get_error_string().str); + } + return clock; +} +} // namespace + +void RclTimersManager::AddTimer( + rcl_timer_t * timer, std::function ready_callback) +{ + // Figure out the clock this timer is using, make sure a manager exists for that clock, then + // forward the timer to that clock's manager. + rcl_clock_t * clock = GetTimerClock(timer); + auto it = clock_managers_.find(clock); + if (it == clock_managers_.end()) { + std::tie(it, std::ignore) = clock_managers_.insert( + std::make_pair(clock, std::make_shared(events_queue_, clock))); + } + it->second->AddTimer(timer, ready_callback); +} + +void RclTimersManager::RemoveTimer(rcl_timer_t * timer) +{ + const rcl_clock_t * clock = GetTimerClock(timer); + auto it = clock_managers_.find(clock); + if (it == clock_managers_.end()) { + throw py::key_error("Attempt to remove timer from unmanaged clock"); + } + it->second->RemoveTimer(timer); + if (it->second->empty()) { + clock_managers_.erase(it); + } +} + +TimersManager::TimersManager( + EventsQueue * events_queue, + std::function timer_ready_callback) +: rcl_manager_(events_queue), ready_callback_(timer_ready_callback) +{ +} + +TimersManager::~TimersManager() {} + +void TimersManager::AddTimer(py::handle timer) +{ + PyRclMapping mapping; + py::handle handle = timer.attr("handle"); + mapping.with = std::make_unique(handle); + mapping.rcl_ptr = py::cast(handle).rcl_ptr(); + rcl_manager_.AddTimer(mapping.rcl_ptr, std::bind(ready_callback_, timer, pl::_1)); + timer_mappings_[timer] = std::move(mapping); +} + +void TimersManager::RemoveTimer(py::handle timer) +{ + const auto it = timer_mappings_.find(timer); + if (it == timer_mappings_.end()) { + throw py::key_error("Attempt to remove unmanaged timer"); + } + rcl_manager_.RemoveTimer(it->second.rcl_ptr); + timer_mappings_.erase(it); +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/timers_manager.hpp b/rclpy/src/rclpy/events_executor/timers_manager.hpp new file mode 100644 index 000000000..922598421 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/timers_manager.hpp @@ -0,0 +1,95 @@ +// Copyright 2024-2025 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLPY__EVENTS_EXECUTOR__TIMERS_MANAGER_HPP_ +#define RCLPY__EVENTS_EXECUTOR__TIMERS_MANAGER_HPP_ + +#include + +#include +#include + +#include +#include +#include + +#include "events_executor/events_queue.hpp" +#include "events_executor/python_hasher.hpp" +#include "events_executor/scoped_with.hpp" + +namespace rclpy +{ +namespace events_executor +{ + +/// This class manages low-level rcl timers in the system on behalf of EventsExecutor. +class RclTimersManager +{ +public: + /// The given pointer is aliased and must live for the lifetime of this object. + explicit RclTimersManager(EventsQueue *); + ~RclTimersManager(); + + void AddTimer(rcl_timer_t *, std::function ready_callback); + void RemoveTimer(rcl_timer_t *); + +private: + EventsQueue * const events_queue_; + + class ClockManager; + /// Handlers for each distinct clock source in the system. + std::unordered_map> clock_managers_; +}; + +/// This class manages rclpy.Timer Python objects on behalf of EventsExecutor. +class TimersManager +{ +public: + /// @param events_queue is aliased and must live for the lifetime of this object. + /// @param timer_ready_callback will be invoked with the timer handle and info whenever a managed + /// timer is ready for servicing. + TimersManager( + EventsQueue * events_queue, + std::function timer_ready_callback); + ~TimersManager(); + + /// Accessor for underlying rcl timer manager, for management of non-Python timers. + RclTimersManager & rcl_manager() {return rcl_manager_;} + + // Both of these methods expect the GIL to be held when they are called. + void AddTimer(pybind11::handle timer); + void RemoveTimer(pybind11::handle timer); + +private: + struct PyRclMapping + { + /// Marks the corresponding Python object as in-use for as long as we're using the rcl pointer + /// derived from it. + std::unique_ptr with; + + /// The underlying rcl object + rcl_timer_t * rcl_ptr{}; + }; + + RclTimersManager rcl_manager_; + const std::function ready_callback_; + + std::unordered_map timer_mappings_; +}; + +} // namespace events_executor +} // namespace rclpy + +#endif // RCLPY__EVENTS_EXECUTOR__TIMERS_MANAGER_HPP_ diff --git a/rclpy/src/rclpy/signal_handler.cpp b/rclpy/src/rclpy/signal_handler.cpp index 369060f50..bb7276312 100644 --- a/rclpy/src/rclpy/signal_handler.cpp +++ b/rclpy/src/rclpy/signal_handler.cpp @@ -23,6 +23,8 @@ #include #include +#include +#include #include #include #include @@ -43,6 +45,7 @@ namespace py = pybind11; static bool trigger_guard_conditions(); +static void invoke_callbacks(); namespace { @@ -142,6 +145,11 @@ setup_deferred_signal_handler() if (g_signal_handler_installed.load()) { trigger_guard_conditions(); rclpy::shutdown_contexts(); + // TODO(bmartin427) It feels like I want to do this *after* the contexts are shut down, + // to ensure the callbacks see the shut down state, and we don't end up going back to + // spin again. But I'm not sure why the same argument doesn't hold for the guard + // conditions above. + invoke_callbacks(); } } }); @@ -610,6 +618,51 @@ uninstall_signal_handlers() teardown_deferred_signal_handler(); } +// Storage for signal callbacks. +// TODO(bmartin427) The management of g_guard_conditions seems overly complex. That variable is +// never touched directly from a signal handler as it has a separate thread on which the guard +// conditions are triggered; however, there's an awful lot of code jumping through hoops that seem +// unnecessary with that in mind. I won't model the same behavior for the registered callbacks. +std::list> g_callbacks; +std::mutex g_callback_mutex; +class ScopedSignalCallback::Impl +{ +public: + explicit Impl(std::function callback) + { + std::lock_guard lock(g_callback_mutex); + iterator_ = g_callbacks.insert(g_callbacks.end(), callback); + } + + ~Impl() + { + std::lock_guard lock(g_callback_mutex); + g_callbacks.erase(iterator_); + } + +private: + std::list>::iterator iterator_; +}; + +ScopedSignalCallback::ScopedSignalCallback(std::function cb) +: impl_(std::make_shared(cb)) +{ +} + +ScopedSignalCallback::~ScopedSignalCallback() {} + +} // namespace rclpy + +static void invoke_callbacks() +{ + std::lock_guard lock(rclpy::g_callback_mutex); + for (auto & cb : rclpy::g_callbacks) { + cb(); + } +} + +namespace rclpy +{ void define_signal_handler_api(py::module m) { diff --git a/rclpy/src/rclpy/signal_handler.hpp b/rclpy/src/rclpy/signal_handler.hpp index e336c2a4c..e5250b8af 100644 --- a/rclpy/src/rclpy/signal_handler.hpp +++ b/rclpy/src/rclpy/signal_handler.hpp @@ -17,15 +17,33 @@ #include +#include +#include + namespace py = pybind11; namespace rclpy { + +/// Register a C++ callback to be invoked when a signal is caught. These callbacks will be invoked +/// on an arbitrary thread. The callback will be automatically unregistered if the object goes out +/// of scope. +class ScopedSignalCallback { +public: + explicit ScopedSignalCallback(std::function); + ~ScopedSignalCallback(); + +private: + class Impl; + std::shared_ptr impl_; +}; + /// Define methods on a module for working with signal handlers /** * \param[in] module a pybind11 module to add the definition to */ void define_signal_handler_api(py::module module); + } // namespace rclpy #endif // RCLPY__SIGNAL_HANDLER_HPP_ diff --git a/rclpy/test/test_events_executor.py b/rclpy/test/test_events_executor.py new file mode 100644 index 000000000..44bf15b00 --- /dev/null +++ b/rclpy/test/test_events_executor.py @@ -0,0 +1,690 @@ +# Copyright 2024-2025 Brad Martin +# Copyright 2024 Merlin Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import typing +import unittest + +import action_msgs.msg +import rclpy.action +import rclpy.clock_type +import rclpy.duration +import rclpy.event_handler +import rclpy.executors +import rclpy.experimental +import rclpy.node +import rclpy.qos +import rclpy.time +import rclpy.timer +import rosgraph_msgs.msg +import test_msgs.action +import test_msgs.msg +import test_msgs.srv + + +def _get_pub_sub_qos(transient_local: bool) -> rclpy.qos.QoSProfile: + if not transient_local: + return rclpy.qos.QoSProfile(history=rclpy.qos.HistoryPolicy.KEEP_ALL) + # For test purposes we deliberately want a TRANSIENT_LOCAL QoS with KEEP_ALL + # history. + return rclpy.qos.QoSProfile( + history=rclpy.qos.HistoryPolicy.KEEP_ALL, + durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + ) + + +class SubTestNode(rclpy.node.Node): + """Node to test subscriptions and subscription-related events.""" + + def __init__(self, *, transient_local: bool = False) -> None: + super().__init__('test_sub_node') + self._new_pub_future: typing.Optional[ + rclpy.Future[rclpy.event_handler.QoSSubscriptionMatchedInfo] + ] = None + self._received_future: typing.Optional[rclpy.Future[test_msgs.msg.BasicTypes]] = None + self._sub = self.create_subscription( + test_msgs.msg.BasicTypes, + # This node seems to get stale discovery data and then complain about QoS + # changes if we reuse the same topic name. + 'test_topic' + ('_transient_local' if transient_local else ''), + self._handle_sub, + _get_pub_sub_qos(transient_local), + event_callbacks=rclpy.event_handler.SubscriptionEventCallbacks( + matched=self._handle_matched_sub + ), + ) + + def drop_subscription(self) -> None: + self.destroy_subscription(self._sub) + + def expect_pub_info( + self, + ) -> rclpy.Future[rclpy.event_handler.QoSSubscriptionMatchedInfo]: + self._new_pub_future = rclpy.Future() + return self._new_pub_future + + def expect_message(self) -> rclpy.Future[test_msgs.msg.BasicTypes]: + self._received_future = rclpy.Future() + return self._received_future + + def _handle_sub(self, msg: test_msgs.msg.BasicTypes) -> None: + if self._received_future is not None: + future = self._received_future + self._received_future = None + future.set_result(msg) + + def _handle_matched_sub(self, info: rclpy.event_handler.QoSSubscriptionMatchedInfo) -> None: + """Handle a new publisher being matched to our subscription.""" + if self._new_pub_future is not None: + self._new_pub_future.set_result(info) + self._new_pub_future = None + + +class PubTestNode(rclpy.node.Node): + """Node to test publications and publication-related events.""" + + def __init__(self, *, transient_local: bool = False) -> None: + super().__init__('test_pub_node') + self._new_sub_future: typing.Optional[ + rclpy.Future[rclpy.event_handler.QoSPublisherMatchedInfo] + ] = None + self._pub = self.create_publisher( + test_msgs.msg.BasicTypes, + 'test_topic' + ('_transient_local' if transient_local else ''), + _get_pub_sub_qos(transient_local), + event_callbacks=rclpy.event_handler.PublisherEventCallbacks( + matched=self._handle_matched_pub + ), + ) + + def expect_sub_info( + self, + ) -> rclpy.Future[rclpy.event_handler.QoSPublisherMatchedInfo]: + self._new_sub_future = rclpy.Future() + return self._new_sub_future + + def publish(self, value: float) -> None: + self._pub.publish(test_msgs.msg.BasicTypes(float32_value=value)) + + def _handle_matched_pub(self, info: rclpy.event_handler.QoSPublisherMatchedInfo) -> None: + """Handle a new subscriber being matched to our publication.""" + if self._new_sub_future is not None: + self._new_sub_future.set_result(info) + self._new_sub_future = None + + +class ServiceServerTestNode(rclpy.node.Node): + """Node to test service server-side operation.""" + + def __init__(self) -> None: + super().__init__('test_service_server_node') + self._got_request_future: typing.Optional[ + rclpy.Future[test_msgs.srv.BasicTypes.Request] + ] = None + self._pending_response: typing.Optional[test_msgs.srv.BasicTypes.Response] = None + self.create_service(test_msgs.srv.BasicTypes, 'test_service', self._handle_request) + + def expect_request( + self, success: bool, error_msg: str + ) -> rclpy.Future[test_msgs.srv.BasicTypes.Request]: + """ + Expect an incoming request. + + The arguments are used to compose the response. + """ + self._got_request_future = rclpy.Future() + self._pending_response = test_msgs.srv.BasicTypes.Response( + bool_value=success, string_value=error_msg + ) + return self._got_request_future + + def _handle_request( + self, + req: test_msgs.srv.BasicTypes.Request, + res: test_msgs.srv.BasicTypes.Response, + ) -> test_msgs.srv.BasicTypes.Response: + if self._got_request_future is not None: + self._got_request_future.set_result(req) + self._got_request_future = None + if self._pending_response is not None: + res = self._pending_response + self._pending_response = None + return res + + +class ServiceClientTestNode(rclpy.node.Node): + """Node to test service client-side operation.""" + + def __init__(self) -> None: + super().__init__('test_service_client_node') + self._client: rclpy.client.Client[ + test_msgs.srv.BasicTypes.Request, test_msgs.srv.BasicTypes.Response + ] = self.create_client(test_msgs.srv.BasicTypes, 'test_service') + + def issue_request(self, value: float) -> rclpy.Future[test_msgs.srv.BasicTypes.Response]: + req = test_msgs.srv.BasicTypes.Request(float32_value=value) + return self._client.call_async(req) + + +class TimerTestNode(rclpy.node.Node): + """Node to test timer operation.""" + + def __init__( + self, + index: int = 0, + parameter_overrides: typing.Optional[list[rclpy.Parameter]] = None, + ) -> None: + super().__init__(f'test_timer{index}', parameter_overrides=parameter_overrides) + self._timer_events = 0 + self._tick_future: typing.Optional[rclpy.Future[rclpy.timer.TimerInfo]] = None + self._timer = self.create_timer(0.1, self._handle_timer) + + @property + def timer_events(self) -> int: + return self._timer_events + + def expect_tick(self) -> rclpy.Future[rclpy.timer.TimerInfo]: + """Get future on TimerInfo for an anticipated timer tick.""" + self._tick_future = rclpy.Future() + return self._tick_future + + def _handle_timer(self, info: rclpy.timer.TimerInfo) -> None: + self._timer_events += 1 + if self._tick_future is not None: + self._tick_future.set_result(info) + self._tick_future = None + + +class ClockPublisherNode(rclpy.node.Node): + """Node to publish rostime clock updates.""" + + def __init__(self) -> None: + super().__init__('clock_node') + self._now = rclpy.time.Time(clock_type=rclpy.clock_type.ClockType.ROS_TIME) + self._pub = self.create_publisher( + rosgraph_msgs.msg.Clock, + '/clock', + rclpy.qos.QoSProfile(depth=1, reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT), + ) + + def advance_time(self, millisec: int) -> None: + self._now += rclpy.duration.Duration(nanoseconds=millisec * 1000000) + self._pub.publish(rosgraph_msgs.msg.Clock(clock=self._now.to_msg())) + + @property + def now(self) -> rclpy.time.Time: + return self._now + + +class ActionServerTestNode(rclpy.node.Node): + """Node to test action server-side operation.""" + + def __init__(self) -> None: + super().__init__( + 'test_action_server_node', + parameter_overrides=[rclpy.Parameter('use_sim_time', value=True)], + ) + self._got_goal_future: typing.Optional[rclpy.Future[test_msgs.action.Fibonacci.Goal]] = ( + None + ) + self._srv = rclpy.action.ActionServer( + self, + test_msgs.action.Fibonacci, + 'test_action', + self._handle_execute, + handle_accepted_callback=self._handle_accepted, + result_timeout=10, + ) + self._goal_handle: typing.Optional[rclpy.action.server.ServerGoalHandle] = None + self._sequence: list[int] = [] + + def expect_goal(self) -> rclpy.Future[test_msgs.action.Fibonacci.Goal]: + self._goal_handle = None + self._got_goal_future = rclpy.Future() + return self._got_goal_future + + def _handle_accepted(self, goal_handle: rclpy.action.server.ServerGoalHandle) -> None: + self._goal_handle = goal_handle + self._sequence = [0, 1] + if self._got_goal_future is not None: + self._got_goal_future.set_result(goal_handle.request) + self._got_goal_future = None + # Wait to finish until instructed by test + + def advance_feedback(self) -> typing.Optional[list[int]]: + """ + Add an entry to the result in progress and sends a feedback message. + + Returns the current sequence in progress if incomplete, or None if the sequence + is complete and it's time to complete the operation instead. + + """ + assert self._goal_handle is not None + n = self._goal_handle.request.order + 1 + if len(self._sequence) < n: + self._sequence.append(self._sequence[-2] + self._sequence[-1]) + if len(self._sequence) >= n: + return None + + # FYI normally feedbacks would be sent from the execute handler, but we've tied + # it to its own public method for testing + fb = test_msgs.action.Fibonacci.Feedback() + fb.sequence = self._sequence + self._goal_handle.publish_feedback(fb) + return self._sequence + + def execute(self) -> rclpy.action.server.ServerGoalHandle: + """ + Completes the action in progress. + + Returns the handle to the goal executed. + + """ + handle = self._goal_handle + self._goal_handle = None + assert handle is not None + handle.execute() + return handle + + def _handle_execute( + self, goal_handle: rclpy.action.server.ServerGoalHandle + ) -> test_msgs.action.Fibonacci.Result: + goal_handle.succeed() + result = test_msgs.action.Fibonacci.Result() + result.sequence = self._sequence + return result + + +class ActionClientTestNode(rclpy.node.Node): + """Node to test action client-side operation.""" + + def __init__(self) -> None: + super().__init__('test_action_client_node') + self._client = rclpy.action.ActionClient[ + test_msgs.action.Fibonacci.Goal, + test_msgs.action.Fibonacci.Result, + test_msgs.action.Fibonacci.Feedback, + ](self, test_msgs.action.Fibonacci, 'test_action') + self._feedback_future: typing.Optional[ + rclpy.Future[test_msgs.action.Fibonacci.Feedback] + ] = None + self._result_future: typing.Optional[rclpy.Future[test_msgs.action.Fibonacci.Result]] = ( + None + ) + + def send_goal(self, order: int) -> rclpy.Future[rclpy.action.client.ClientGoalHandle]: + """ + Send a new goal. + + The future will contain the goal handle when the goal submission response has + been received. + + """ + self._client.wait_for_server() + goal_ack_future = self._client.send_goal_async( + test_msgs.action.Fibonacci.Goal(order=order), + feedback_callback=self._handle_feedback, + ) + goal_ack_future.add_done_callback(self._handle_goal_ack) + return goal_ack_future + + def _handle_goal_ack(self, future: rclpy.Future[rclpy.action.client.ClientGoalHandle]) -> None: + handle = future.result() + assert handle is not None + result_future = handle.get_result_async() + result_future.add_done_callback(self._handle_result_response) + + def expect_feedback(self) -> rclpy.Future[test_msgs.action.Fibonacci.Feedback]: + self._feedback_future = rclpy.Future() + return self._feedback_future + + def _handle_feedback( + self, + # If this is a private 'Impl' detail, why is rclpy handing this out?? + fb_msg: test_msgs.action.Fibonacci.Impl.FeedbackMessage, + ) -> None: + if self._feedback_future is not None: + self._feedback_future.set_result(fb_msg.feedback) + self._feedback_future = None + + def expect_result( + self, + ) -> rclpy.Future[test_msgs.action.Fibonacci.Result]: + self._result_future = rclpy.Future() + return self._result_future + + def _handle_result_response( + self, future: rclpy.Future[test_msgs.action.Fibonacci_GetResult_Response] + ) -> None: + response: typing.Optional[test_msgs.action.Fibonacci_GetResult_Response] = future.result() + assert response is not None + assert self._result_future is not None + result: test_msgs.action.Fibonacci.Result = response.result + self._result_future.set_result(result) + self._result_future = None + + +# These two python types are both actually rmw_matched_status_t +rmw_matched_status_t = typing.Union[ + rclpy.event_handler.QoSSubscriptionMatchedInfo, rclpy.event_handler.QoSPublisherMatchedInfo +] + + +class TestEventsExecutor(unittest.TestCase): + + def setUp(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + # Prevent nodes under test from discovering other random stuff to talk to + os.environ['ROS_AUTOMATIC_DISCOVERY_RANGE'] = 'OFF' + rclpy.init() + + self.executor = rclpy.experimental.EventsExecutor() + + def tearDown(self) -> None: + rclpy.shutdown() + + def _expect_future_done(self, future: rclpy.Future[typing.Any]) -> None: + # Use a moderately long timeout with the expectation that we shouldn't often + # need the whole duration. + self.executor.spin_until_future_complete(future, 1.0) + self.assertTrue(future.done()) + + def _expect_future_not_done(self, future: rclpy.Future[typing.Any]) -> None: + # Use a short timeout to give the future some time to complete if we are going + # to fail, but not very long because we'll be waiting the full duration every + # time during successful tests. It's ok if the timeout is a bit short and the + # failure isn't 100% deterministic. + self.executor.spin_until_future_complete(future, 0.2) + self.assertFalse(future.done()) + + def _spin_for(self, sec: float) -> None: + """Spins the executor for the given number of realtime seconds.""" + # Note that this roundabout approach of waiting on a future that will never + # finish with a timeout seems to be the only way with the rclpy.Executor API to + # spin for a fixed time. + self.executor.spin_until_future_complete(rclpy.Future(), sec) + + def _check_match_event_future( + self, + future: rclpy.Future[rmw_matched_status_t], + total_count: int, + current_count: int, + ) -> None: + # NOTE: fastdds appears to be buggy and reports a change in total_count with + # total_count_change staying zero. cyclonedds works as expected. Rather than + # have this test be sensitive to which RMW is selected, let's just avoid testing + # the change fields altogether. + self._expect_future_done(future) + info: typing.Optional[rmw_matched_status_t] = future.result() + assert info is not None + self.assertEqual(info.total_count, total_count) + self.assertEqual(info.current_count, current_count) + + def _check_message_future( + self, future: rclpy.Future[test_msgs.msg.BasicTypes], value: float + ) -> None: + self._expect_future_done(future) + msg: typing.Optional[test_msgs.msg.BasicTypes] = future.result() + assert msg is not None + self.assertAlmostEqual(msg.float32_value, value, places=5) + + def _check_service_request_future( + self, future: rclpy.Future[test_msgs.srv.BasicTypes.Request], value: float + ) -> None: + self._expect_future_done(future) + req: typing.Optional[test_msgs.srv.BasicTypes.Request] = future.result() + assert req is not None + self.assertAlmostEqual(req.float32_value, value, places=5) + + def _check_service_response_future( + self, + future: rclpy.Future[test_msgs.srv.BasicTypes.Response], + success: bool, + error_msg: str, + ) -> None: + self._expect_future_done(future) + res: typing.Optional[test_msgs.srv.BasicTypes.Response] = future.result() + assert res is not None + self.assertEqual(res.bool_value, success) + self.assertEqual(res.string_value, error_msg) + + def test_pub_sub(self) -> None: + sub_node = SubTestNode() + new_pub_future = sub_node.expect_pub_info() + received_future = sub_node.expect_message() + self.executor.add_node(sub_node) + + # With subscriber node alone, should be no publisher or messages + self._expect_future_not_done(new_pub_future) + self.assertFalse(received_future.done()) # Already waited a bit + + pub_node = PubTestNode() + new_sub_future = pub_node.expect_sub_info() + self.executor.add_node(pub_node) + + # Publisher and subscriber should find each other but no messages should be + # exchanged yet + self._check_match_event_future(new_pub_future, 1, 1) + new_pub_future = sub_node.expect_pub_info() + self._check_match_event_future(new_sub_future, 1, 1) + new_sub_future = pub_node.expect_sub_info() + self._expect_future_not_done(received_future) + + # Send messages and make sure they're received. + for i in range(300): + pub_node.publish(0.1 * i) + self._check_message_future(received_future, 0.1 * i) + received_future = sub_node.expect_message() + + # Destroy the subscription, make sure the publisher is notified + sub_node.drop_subscription() + self._check_match_event_future(new_sub_future, 1, 0) + new_sub_future = pub_node.expect_sub_info() + + # Publish another message to ensure all subscriber callbacks got cleaned up + pub_node.publish(4.7) + self._expect_future_not_done(new_pub_future) + self.assertFalse(received_future.done()) # Already waited a bit + + # Delete the subscribing node entirely. There should be no additional match activity and + # still no subscriber callbacks. + self.executor.remove_node(sub_node) + sub_node.destroy_node() + self._expect_future_not_done(new_sub_future) + self.assertFalse(new_pub_future.done()) # Already waited a bit + self.assertFalse(received_future.done()) # Already waited a bit + + def test_pub_sub_multi_message(self) -> None: + # Creates a transient local publisher and queues multiple messages on it. Then + # creates a subscriber and makes sure all sent messages get delivered when it + # comes up. + pub_node = PubTestNode(transient_local=True) + self.executor.add_node(pub_node) + for i in range(5): + pub_node.publish(0.1 * i) + + sub_node = SubTestNode(transient_local=True) + received_future = sub_node.expect_message() + received_messages: list[test_msgs.msg.BasicTypes] = [] + + def handle_message(future: rclpy.Future[test_msgs.msg.BasicTypes]) -> None: + nonlocal received_future + msg = future.result() + assert msg is not None + received_messages.append(msg) + received_future = sub_node.expect_message() + received_future.add_done_callback(handle_message) + + received_future.add_done_callback(handle_message) + self._expect_future_not_done(received_future) + self.executor.add_node(sub_node) + while len(received_messages) < 5: + self._expect_future_done(received_future) + self.assertEqual(len(received_messages), 5) + for i in range(5): + self.assertAlmostEqual(received_messages[i].float32_value, 0.1 * i, places=5) + self._expect_future_not_done(received_future) + + pub_node.publish(0.5) + self._check_message_future(received_future, 0.5) + + def test_service(self) -> None: + server_node = ServiceServerTestNode() + got_request_future = server_node.expect_request(True, 'test response 0') + self.executor.add_node(server_node) + self._expect_future_not_done(got_request_future) + + client_node = ServiceClientTestNode() + self.executor.add_node(client_node) + self._expect_future_not_done(got_request_future) + for i in range(300): + got_response_future = client_node.issue_request(7.1) + self._check_service_request_future(got_request_future, 7.1) + got_request_future = server_node.expect_request(True, f'test response {i + 1}') + self._check_service_response_future(got_response_future, True, f'test response {i}') + + # Destroy server node and retry issuing a request + self.executor.remove_node(server_node) + server_node.destroy_node() + self._expect_future_not_done(got_request_future) + got_response_future = client_node.issue_request(5.0) + self._expect_future_not_done(got_request_future) + self.assertFalse(got_response_future.done()) # Already waited a bit + + def test_timers(self) -> None: + realtime_node = TimerTestNode(index=0) + rostime_node = TimerTestNode( + index=1, parameter_overrides=[rclpy.Parameter('use_sim_time', value=True)] + ) + clock_node = ClockPublisherNode() + for node in [realtime_node, rostime_node, clock_node]: + self.executor.add_node(node) + + # Wait a bit, and make sure the realtime timer ticks, and the rostime one does + # not. Since this is based on wall time, be very flexible on tolerances here. + realtime_tick_future = realtime_node.expect_tick() + self._spin_for(1.0) + realtime_ticks = realtime_node.timer_events + self.assertGreater(realtime_ticks, 1) + self.assertLess(realtime_ticks, 50) + self.assertEqual(rostime_node.timer_events, 0) + info = realtime_tick_future.result() + assert info is not None + self.assertGreaterEqual(info.actual_call_time, info.expected_call_time) + + # Manually tick the rostime timer by less than a full interval. + rostime_tick_future = rostime_node.expect_tick() + for _ in range(99): + clock_node.advance_time(1) + self._expect_future_not_done(rostime_tick_future) + clock_node.advance_time(1) + self._expect_future_done(rostime_tick_future) + info = rostime_tick_future.result() + assert info is not None + self.assertEqual(info.actual_call_time, info.expected_call_time) + self.assertEqual(info.actual_call_time, clock_node.now) + # Now tick by a bunch of full intervals. + for _ in range(300): + rostime_tick_future = rostime_node.expect_tick() + clock_node.advance_time(100) + self._expect_future_done(rostime_tick_future) + + # Ensure the realtime timer ticked much less than the rostime one. + self.assertLess(realtime_node.timer_events, rostime_node.timer_events) + + # Create two timers with the same interval, both set to cancel the other from the callback. + # Only one of the callbacks should be delivered, though we can't necessarily predict which + # one. + def handler(): + nonlocal count, timer1, timer2 + count += 1 + timer1.cancel() + timer2.cancel() + + count = 0 + timer1 = rostime_node.create_timer(0.01, handler) + timer2 = rostime_node.create_timer(0.01, handler) + self._spin_for(0.0) + self.assertEqual(count, 0) + clock_node.advance_time(10) + self._spin_for(0.0) + self.assertEqual(count, 1) + clock_node.advance_time(10) + self._spin_for(0.0) + self.assertEqual(count, 1) + + def test_action(self) -> None: + clock_node = ClockPublisherNode() + self.executor.add_node(clock_node) + + server_node = ActionServerTestNode() + got_goal_future = server_node.expect_goal() + self.executor.add_node(server_node) + clock_node.advance_time(0) + self._expect_future_not_done(got_goal_future) + + client_node = ActionClientTestNode() + self.executor.add_node(client_node) + self._expect_future_not_done(got_goal_future) + for i in range(300): + order = (i % 40) + 1 # Don't want sequence to get too big + goal_acknowledged_future = client_node.send_goal(order) + + self._expect_future_done(got_goal_future) + self._expect_future_done(goal_acknowledged_future) + req: typing.Optional[test_msgs.action.Fibonacci.Goal] = got_goal_future.result() + assert req is not None + self.assertEqual(req.order, order) + result_future = client_node.expect_result() + + while True: + got_feedback_future = client_node.expect_feedback() + seq = server_node.advance_feedback() + if seq is None: + break + self._expect_future_done(got_feedback_future) + feedback = got_feedback_future.result() + assert feedback is not None + self.assertEqual(len(feedback.sequence), len(seq)) + + last_handle = server_node.execute() + self._expect_future_done(result_future) + self.assertFalse(got_feedback_future.done()) + + res: typing.Optional[test_msgs.action.Fibonacci.Result] = result_future.result() + assert res is not None + self.assertEqual(len(res.sequence), order + 1) + + got_goal_future = server_node.expect_goal() + + # Test completed goal expiration by time + self.assertEqual(last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED) + clock_node.advance_time(9999) + self._spin_for(0.2) + self.assertEqual(last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED) + clock_node.advance_time(2) + self._spin_for(0.2) + self.assertEqual(last_handle.status, action_msgs.msg.GoalStatus.STATUS_UNKNOWN) + + # Destroy server node and retry issuing a goal + self.executor.remove_node(server_node) + server_node.destroy_node() + self._expect_future_not_done(got_goal_future) + goal_acknowledged_future = client_node.send_goal(5) + self._expect_future_not_done(got_goal_future) + self.assertFalse(goal_acknowledged_future.done()) # Already waited a bit + + +if __name__ == '__main__': + unittest.main() diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 7ea277de1..c340fa70e 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -16,7 +16,6 @@ import os import threading import time -from typing import Any import unittest import warnings @@ -26,6 +25,7 @@ from rclpy.executors import MultiThreadedExecutor from rclpy.executors import ShutdownException from rclpy.executors import SingleThreadedExecutor +from rclpy.experimental import EventsExecutor from rclpy.task import Future from test_msgs.srv import Empty @@ -62,78 +62,88 @@ def timer_callback() -> None: def test_single_threaded_executor_executes(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - try: - self.assertTrue(self.func_execution(executor)) - finally: - executor.shutdown() + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + try: + self.assertTrue(self.func_execution(executor)) + finally: + executor.shutdown() def test_executor_immediate_shutdown(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - try: - got_callback = False + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + try: + got_callback = False - def timer_callback() -> None: - nonlocal got_callback - got_callback = True + def timer_callback() -> None: + nonlocal got_callback + got_callback = True - timer_period = 1 - tmr = self.node.create_timer(timer_period, timer_callback) + timer_period = 1 + tmr = self.node.create_timer(timer_period, timer_callback) - self.assertTrue(executor.add_node(self.node)) - t = threading.Thread(target=executor.spin, daemon=True) - start_time = time.monotonic() - t.start() - executor.shutdown() - t.join() - end_time = time.monotonic() + self.assertTrue(executor.add_node(self.node)) + t = threading.Thread(target=executor.spin, daemon=True) + start_time = time.perf_counter() + t.start() + executor.shutdown() + t.join() + end_time = time.perf_counter() - self.node.destroy_timer(tmr) - self.assertLess(end_time - start_time, timer_period / 2) - self.assertFalse(got_callback) - finally: - executor.shutdown() + self.node.destroy_timer(tmr) + self.assertLess(end_time - start_time, timer_period / 2) + self.assertFalse(got_callback) + finally: + executor.shutdown() def test_shutdown_executor_before_waiting_for_callbacks(self) -> None: self.assertIsNotNone(self.node.handle) + # EventsExecutor does not support the wait_for_ready_callbacks() API for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: - executor = cls(context=self.context) - executor.shutdown() - with self.assertRaises(ShutdownException): - executor.wait_for_ready_callbacks() + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.shutdown() + with self.assertRaises(ShutdownException): + executor.wait_for_ready_callbacks() def test_shutdown_exception_from_callback_generator(self) -> None: self.assertIsNotNone(self.node.handle) + # This test touches the Executor private API and is not compatible with EventsExecutor for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: - executor = cls(context=self.context) - cb_generator = executor._wait_for_ready_callbacks() - executor.shutdown() - with self.assertRaises(ShutdownException): - next(cb_generator) + with self.subTest(cls=cls): + executor = cls(context=self.context) + cb_generator = executor._wait_for_ready_callbacks() + executor.shutdown() + with self.assertRaises(ShutdownException): + next(cb_generator) def test_remove_node(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) - got_callback = False + got_callback = False - def timer_callback() -> None: - nonlocal got_callback - got_callback = True + def timer_callback() -> None: + nonlocal got_callback + got_callback = True - try: - tmr = self.node.create_timer(0.1, timer_callback) - try: - executor.add_node(self.node) - executor.remove_node(self.node) - executor.spin_once(timeout_sec=0.2) - finally: - self.node.destroy_timer(tmr) - finally: - executor.shutdown() + try: + tmr = self.node.create_timer(0.1, timer_callback) + try: + executor.add_node(self.node) + executor.remove_node(self.node) + executor.spin_once(timeout_sec=0.2) + finally: + self.node.destroy_timer(tmr) + finally: + executor.shutdown() - assert not got_callback + assert not got_callback def test_multi_threaded_executor_num_threads(self) -> None: self.assertIsNotNone(self.node.handle) @@ -189,49 +199,58 @@ def get_threads(): def test_add_node_to_executor(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) - self.assertIn(self.node, executor.get_nodes()) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + self.assertIn(self.node, executor.get_nodes()) def test_executor_spin_non_blocking(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) - start = time.monotonic() - executor.spin_once(timeout_sec=0) - end = time.monotonic() - self.assertLess(start - end, 0.001) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + start = time.perf_counter() + executor.spin_once(timeout_sec=0) + end = time.perf_counter() + self.assertLess(start - end, 0.001) def test_execute_coroutine_timer(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - called1 = False - called2 = False + called1 = False + called2 = False - async def coroutine() -> None: - nonlocal called1 - nonlocal called2 - called1 = True - await asyncio.sleep(0) - called2 = True - - tmr = self.node.create_timer(0.1, coroutine) - try: - executor.spin_once(timeout_sec=1.23) - self.assertTrue(called1) - self.assertFalse(called2) + async def coroutine() -> None: + nonlocal called1 + nonlocal called2 + called1 = True + await asyncio.sleep(0) + called2 = True - called1 = False - executor.spin_once(timeout_sec=0) - self.assertFalse(called1) - self.assertTrue(called2) - finally: - self.node.destroy_timer(tmr) + # TODO(bmartin427) The type markup on Node.create_timer() says you can't pass a + # coroutine here. + tmr = self.node.create_timer(0.1, coroutine) + try: + executor.spin_once(timeout_sec=1.23) + self.assertTrue(called1) + self.assertFalse(called2) + + called1 = False + executor.spin_once(timeout_sec=0) + self.assertFalse(called1) + self.assertTrue(called2) + finally: + self.node.destroy_timer(tmr) def test_execute_coroutine_guard_condition(self) -> None: self.assertIsNotNone(self.node.handle) + # TODO(bmartin427) Does EventsExecutor need to support guard conditions? executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) @@ -261,147 +280,159 @@ async def coroutine() -> None: def test_create_task_coroutine(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - async def coroutine(): - return 'Sentinel Result' + async def coroutine(): + return 'Sentinel Result' - future = executor.create_task(coroutine) - self.assertFalse(future.done()) + future = executor.create_task(coroutine) + self.assertFalse(future.done()) - executor.spin_once(timeout_sec=0) - self.assertTrue(future.done()) - self.assertEqual('Sentinel Result', future.result()) + executor.spin_once(timeout_sec=0) + self.assertTrue(future.done()) + self.assertEqual('Sentinel Result', future.result()) def test_create_task_coroutine_cancel(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - async def coroutine(): - return 'Sentinel Result' + async def coroutine(): + return 'Sentinel Result' - future = executor.create_task(coroutine) - self.assertFalse(future.done()) - self.assertFalse(future.cancelled()) + future = executor.create_task(coroutine) + self.assertFalse(future.done()) + self.assertFalse(future.cancelled()) - future.cancel() - self.assertTrue(future.cancelled()) + future.cancel() + self.assertTrue(future.cancelled()) - executor.spin_until_future_complete(future) - self.assertFalse(future.done()) - self.assertTrue(future.cancelled()) - self.assertEqual(None, future.result()) + executor.spin_until_future_complete(future) + self.assertFalse(future.done()) + self.assertTrue(future.cancelled()) + self.assertEqual(None, future.result()) def test_create_task_normal_function(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - def func(): - return 'Sentinel Result' + def func(): + return 'Sentinel Result' - future = executor.create_task(func) - self.assertFalse(future.done()) + future = executor.create_task(func) + self.assertFalse(future.done()) - executor.spin_once(timeout_sec=0) - self.assertTrue(future.done()) - self.assertEqual('Sentinel Result', future.result()) + executor.spin_once(timeout_sec=0) + self.assertTrue(future.done()) + self.assertEqual('Sentinel Result', future.result()) def test_create_task_fifo_order(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - async def coro1(): - return 'Sentinel Result 1' + async def coro1(): + return 'Sentinel Result 1' - future1 = executor.create_task(coro1) + future1 = executor.create_task(coro1) - async def coro2(): - return 'Sentinel Result 2' + async def coro2(): + return 'Sentinel Result 2' - future2 = executor.create_task(coro2) + future2 = executor.create_task(coro2) - # Coro1 is the 1st task, so it gets executed in this spin - executor.spin_once(timeout_sec=0) - self.assertTrue(future1.done()) - self.assertEqual('Sentinel Result 1', future1.result()) - self.assertFalse(future2.done()) + # Coro1 is the 1st task, so it gets executed in this spin + executor.spin_once(timeout_sec=0) + self.assertTrue(future1.done()) + self.assertEqual('Sentinel Result 1', future1.result()) + self.assertFalse(future2.done()) - # Coro2 is the next in the queue, so it gets executed in this spin - executor.spin_once(timeout_sec=0) - self.assertTrue(future2.done()) - self.assertEqual('Sentinel Result 2', future2.result()) + # Coro2 is the next in the queue, so it gets executed in this spin + executor.spin_once(timeout_sec=0) + self.assertTrue(future2.done()) + self.assertEqual('Sentinel Result 2', future2.result()) def test_create_task_dependent_coroutines(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - async def coro1(): - nonlocal future2 - await future2 - return 'Sentinel Result 1' + async def coro1(): + nonlocal future2 + await future2 + return 'Sentinel Result 1' - future1 = executor.create_task(coro1) + future1 = executor.create_task(coro1) - async def coro2(): - return 'Sentinel Result 2' + async def coro2(): + return 'Sentinel Result 2' - future2 = executor.create_task(coro2) + future2 = executor.create_task(coro2) - # Coro1 is the 1st task, so it gets to await future2 in this spin - executor.spin_once(timeout_sec=0) - # Coro2 execs in this spin - executor.spin_once(timeout_sec=0) - self.assertFalse(future1.done()) - self.assertTrue(future2.done()) - self.assertEqual('Sentinel Result 2', future2.result()) + # Coro1 is the 1st task, so it gets to await future2 in this spin + executor.spin_once(timeout_sec=0) + # Coro2 execs in this spin + executor.spin_once(timeout_sec=0) + self.assertFalse(future1.done()) + self.assertTrue(future2.done()) + self.assertEqual('Sentinel Result 2', future2.result()) - # Coro1 passes the await step here (timeout change forces new generator) - executor.spin_once(timeout_sec=1) - self.assertTrue(future1.done()) - self.assertEqual('Sentinel Result 1', future1.result()) + # Coro1 passes the await step here (timeout change forces new generator) + executor.spin_once(timeout_sec=1) + self.assertTrue(future1.done()) + self.assertEqual('Sentinel Result 1', future1.result()) def test_create_task_during_spin(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - future = None + future = None - def spin_until_task_done(executor): - nonlocal future - while future is None or not future.done(): - try: - executor.spin_once() - finally: - executor.shutdown() - break + def spin_until_task_done(executor): + nonlocal future + while future is None or not future.done(): + try: + executor.spin_once() + finally: + executor.shutdown() + break - # Start spinning in a separate thread - thr = threading.Thread(target=spin_until_task_done, args=(executor, ), daemon=True) - thr.start() + # Start spinning in a separate thread + thr = threading.Thread(target=spin_until_task_done, args=(executor, ), daemon=True) + thr.start() - # Sleep in this thread to give the executor a chance to reach the loop in - # '_wait_for_ready_callbacks()' - time.sleep(1) + # Sleep in this thread to give the executor a chance to reach the loop in + # '_wait_for_ready_callbacks()' + time.sleep(1) - def func(): - return 'Sentinel Result' + def func(): + return 'Sentinel Result' - # Create a task - future = executor.create_task(func) + # Create a task + future = executor.create_task(func) - thr.join(timeout=0.5) - # If the join timed out, remove the node to cause the spin thread to stop - if thr.is_alive(): - executor.remove_node(self.node) + thr.join(timeout=0.5) + # If the join timed out, remove the node to cause the spin thread to stop + if thr.is_alive(): + executor.remove_node(self.node) - self.assertTrue(future.done()) - self.assertEqual('Sentinel Result', future.result()) + self.assertTrue(future.done()) + self.assertEqual('Sentinel Result', future.result()) def test_global_executor_completes_async_task(self) -> None: self.assertIsNotNone(self.node.handle) @@ -416,147 +447,163 @@ def __await__(self): yield return - trigger = TriggerAwait() - did_callback = False - did_return = False + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + trigger = TriggerAwait() + did_callback = False + did_return = False - async def timer_callback() -> None: - nonlocal trigger, did_callback, did_return - did_callback = True - await trigger - did_return = True + async def timer_callback() -> None: + nonlocal trigger, did_callback, did_return + did_callback = True + await trigger + did_return = True - timer = self.node.create_timer(0.1, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) - executor = SingleThreadedExecutor(context=self.context) - rclpy.spin_once(self.node, timeout_sec=0.5, executor=executor) - self.assertTrue(did_callback) + executor = cls(context=self.context) + rclpy.spin_once(self.node, timeout_sec=0.5, executor=executor) + self.assertTrue(did_callback) - timer.cancel() - trigger.do_yield = False - rclpy.spin_once(self.node, timeout_sec=0, executor=executor) - self.assertTrue(did_return) + timer.cancel() + trigger.do_yield = False + rclpy.spin_once(self.node, timeout_sec=0, executor=executor) + self.assertTrue(did_return) def test_executor_add_node(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - assert executor.add_node(self.node) - assert id(executor) == id(self.node.executor) - assert not executor.add_node(self.node) - assert id(executor) == id(self.node.executor) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + assert executor.add_node(self.node) + assert id(executor) == id(self.node.executor) + assert not executor.add_node(self.node) + assert id(executor) == id(self.node.executor) def test_executor_spin_until_future_complete_timeout(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) - - def timer_callback() -> None: - pass - timer = self.node.create_timer(0.003, timer_callback) - - # Timeout - future: Future[Any] = Future() - self.assertFalse(future.done()) - start = time.monotonic() - executor.spin_until_future_complete(future=future, timeout_sec=0.1) - end = time.monotonic() - # Nothing is ever setting the future, so this should have waited - # at least 0.1 seconds. - self.assertGreaterEqual(end - start, 0.1) - self.assertFalse(future.done()) - - timer.cancel() + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - def test_executor_spin_until_future_complete_future_done(self) -> None: - self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + def timer_callback() -> None: + pass + timer = self.node.create_timer(0.003, timer_callback) - def timer_callback() -> None: - pass - timer = self.node.create_timer(0.003, timer_callback) + # Timeout + future = Future[None]() + self.assertFalse(future.done()) + start = time.perf_counter() + executor.spin_until_future_complete(future=future, timeout_sec=0.1) + end = time.perf_counter() + # Nothing is ever setting the future, so this should have waited + # at least 0.1 seconds. + self.assertGreaterEqual(end - start, 0.1) + self.assertFalse(future.done()) - def set_future_result(future: Future[str]) -> None: - future.set_result('finished') + timer.cancel() - # Future complete timeout_sec > 0 - future: Future[str] = Future() - self.assertFalse(future.done()) - t = threading.Thread(target=lambda: set_future_result(future)) - t.start() - executor.spin_until_future_complete(future=future, timeout_sec=0.2) - self.assertTrue(future.done()) - self.assertEqual(future.result(), 'finished') - - # Future complete timeout_sec = None - future = Future() - self.assertFalse(future.done()) - t = threading.Thread(target=lambda: set_future_result(future)) - t.start() - executor.spin_until_future_complete(future=future, timeout_sec=None) - self.assertTrue(future.done()) - self.assertEqual(future.result(), 'finished') - - # Future complete timeout < 0 - future = Future() - self.assertFalse(future.done()) - t = threading.Thread(target=lambda: set_future_result(future)) - t.start() - executor.spin_until_future_complete(future=future, timeout_sec=-1) - self.assertTrue(future.done()) - self.assertEqual(future.result(), 'finished') + def test_executor_spin_until_future_complete_future_done(self) -> None: + self.assertIsNotNone(self.node.handle) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - timer.cancel() + def timer_callback() -> None: + pass + timer = self.node.create_timer(0.003, timer_callback) + + def set_future_result(future): + future.set_result('finished') + + # Future complete timeout_sec > 0 + future = Future[str]() + self.assertFalse(future.done()) + t = threading.Thread(target=lambda: set_future_result(future)) + t.start() + executor.spin_until_future_complete(future=future, timeout_sec=0.2) + self.assertTrue(future.done()) + self.assertEqual(future.result(), 'finished') + + # Future complete timeout_sec = None + future = Future() + self.assertFalse(future.done()) + t = threading.Thread(target=lambda: set_future_result(future)) + t.start() + executor.spin_until_future_complete(future=future, timeout_sec=None) + self.assertTrue(future.done()) + self.assertEqual(future.result(), 'finished') + + # Future complete timeout < 0 + future = Future() + self.assertFalse(future.done()) + t = threading.Thread(target=lambda: set_future_result(future)) + t.start() + executor.spin_until_future_complete(future=future, timeout_sec=-1) + self.assertTrue(future.done()) + self.assertEqual(future.result(), 'finished') + + timer.cancel() def test_executor_spin_until_future_complete_do_not_wait(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) - executor.add_node(self.node) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) - def timer_callback() -> None: - pass - timer = self.node.create_timer(0.003, timer_callback) + def timer_callback() -> None: + pass + timer = self.node.create_timer(0.003, timer_callback) - # Do not wait timeout_sec = 0 - future: Future[Any] = Future() - self.assertFalse(future.done()) - executor.spin_until_future_complete(future=future, timeout_sec=0) - self.assertFalse(future.done()) + # Do not wait timeout_sec = 0 + future = Future[None]() + self.assertFalse(future.done()) + executor.spin_until_future_complete(future=future, timeout_sec=0) + self.assertFalse(future.done()) - timer.cancel() + timer.cancel() def test_executor_add_node_wakes_executor(self) -> None: self.assertIsNotNone(self.node.handle) - got_callback = False + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + got_callback = False - def timer_callback() -> None: - nonlocal got_callback - got_callback = True + def timer_callback() -> None: + nonlocal got_callback + got_callback = True - timer_period = 0.1 - tmr = self.node.create_timer(timer_period, timer_callback) + timer_period = 0.1 + tmr = self.node.create_timer(timer_period, timer_callback) - executor = SingleThreadedExecutor(context=self.context) - try: - # spin in background - t = threading.Thread(target=executor.spin_once, daemon=True) - t.start() - # sleep to make sure executor is blocked in rcl_wait - time.sleep(0.5) + executor = cls(context=self.context) + try: + # spin in background + t = threading.Thread(target=executor.spin_once, daemon=True) + t.start() + # sleep to make sure executor is blocked in rcl_wait + time.sleep(0.5) - self.assertTrue(executor.add_node(self.node)) - # Make sure timer has time to trigger - time.sleep(timer_period) + self.assertTrue(executor.add_node(self.node)) + # Make sure timer has time to trigger + time.sleep(timer_period) - self.assertTrue(got_callback) - finally: - executor.shutdown() - self.node.destroy_timer(tmr) + self.assertTrue(got_callback) + finally: + executor.shutdown() + self.node.destroy_timer(tmr) - def shutdown_executor_from_callback(self) -> None: + def test_shutdown_executor_from_callback(self) -> None: """https://github.com/ros2/rclpy/issues/944: allow for executor shutdown from callback.""" self.assertIsNotNone(self.node.handle) timer_period = 0.1 + # TODO(bmartin427) This seems like an invalid test to me? executor.shutdown() is + # documented as blocking until all callbacks are complete, unless you pass a non-negative + # timeout value which this doesn't. I'm not sure how that's supposed to *not* deadlock if + # you block on all callbacks from within a callback. executor = SingleThreadedExecutor(context=self.context) shutdown_event = threading.Event() @@ -575,6 +622,7 @@ def timer_callback() -> None: def test_context_manager(self) -> None: self.assertIsNotNone(self.node.handle) + # This test touches the Executor private API and is not compatible with EventsExecutor executor: Executor = SingleThreadedExecutor(context=self.context) with executor as the_executor: @@ -590,31 +638,33 @@ def test_context_manager(self) -> None: def test_single_threaded_spin_once_until_future(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) - future: Future[bool] = Future(executor=executor) + future = Future[bool](executor=executor) - # Setup a thread to spin_once_until_future_complete, which will spin - # for a maximum of 10 seconds. - start = time.time() - thread = threading.Thread(target=executor.spin_once_until_future_complete, - args=(future, 10)) - thread.start() + # Setup a thread to spin_once_until_future_complete, which will spin + # for a maximum of 10 seconds. + start = time.time() + thread = threading.Thread(target=executor.spin_once_until_future_complete, + args=(future, 10)) + thread.start() - # Mark the future as complete immediately - future.set_result(True) + # Mark the future as complete immediately + future.set_result(True) - thread.join() - end = time.time() + thread.join() + end = time.time() - time_spent = end - start + time_spent = end - start - # Since we marked the future as complete immediately, the amount of - # time we spent should be *substantially* less than the 10 second - # timeout we set on the spin. - assert time_spent < 10 + # Since we marked the future as complete immediately, the amount of + # time we spent should be *substantially* less than the 10 second + # timeout we set on the spin. + assert time_spent < 10 - executor.shutdown() + executor.shutdown() def test_multi_threaded_spin_once_until_future(self) -> None: self.assertIsNotNone(self.node.handle) @@ -646,36 +696,38 @@ def test_multi_threaded_spin_once_until_future(self) -> None: def test_not_lose_callback(self) -> None: self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor(context=self.context) + for cls in [SingleThreadedExecutor, EventsExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) - callback_group = ReentrantCallbackGroup() + callback_group = ReentrantCallbackGroup() - cli = self.node.create_client( - srv_type=Empty, srv_name='test_service', callback_group=callback_group) + cli = self.node.create_client( + srv_type=Empty, srv_name='test_service', callback_group=callback_group) - async def timer1_callback() -> None: - timer1.cancel() - await cli.call_async(Empty.Request()) + async def timer1_callback() -> None: + timer1.cancel() + await cli.call_async(Empty.Request()) - timer1 = self.node.create_timer(0.5, timer1_callback, callback_group) + timer1 = self.node.create_timer(0.5, timer1_callback, callback_group) - count = 0 + count = 0 - def timer2_callback() -> None: - nonlocal count - count += 1 - timer2 = self.node.create_timer(1.5, timer2_callback, callback_group) + def timer2_callback() -> None: + nonlocal count + count += 1 + timer2 = self.node.create_timer(1.5, timer2_callback, callback_group) - executor.add_node(self.node) - future: Future[Any] = Future(executor=executor) - executor.spin_until_future_complete(future, 4) + executor.add_node(self.node) + future = Future[None](executor=executor) + executor.spin_until_future_complete(future, 4) - assert count == 2 + assert count == 2 - executor.shutdown() - timer2.destroy() - timer1.destroy() - cli.destroy() + executor.shutdown() + self.node.destroy_timer(timer2) + self.node.destroy_timer(timer1) + self.node.destroy_client(cli) if __name__ == '__main__':