Skip to content

Commit 4c05d92

Browse files
bmartin427Brad Martinjmachowinski
authored
Introduce EventsExecutor implementation (#1391)
* Introduce EventsExecutor implementation (#1389) Signed-off-by: Brad Martin <[email protected]> * Fix explosion for reference count updates without GIL The previous version worked on 22.04+Iron, but fails on 24.04+Jazzy or Rolling. It seems like the behavior of std::bind() may have changed when asked to bind a py::object to a callback taking a py::handle. Signed-off-by: Brad Martin <[email protected]> * Fix ament linter complaints Signed-off-by: Brad Martin <[email protected]> * Switch to non-boost asio library Signed-off-by: Brad Martin <[email protected]> * Use internal _rclpy C++ types instead of jumping through Python hoops Signed-off-by: Brad Martin <[email protected]> * Reformat with clang-format, then uncrustify again clang-format fixes things that uncrustify tends to leave alone, but then uncrustify seems slightly unhappy with that result. Also reflow comments at 100 columns. This uses the .clang-format file from the ament-clang-format package, with the exception of SortIncludes being set to false, because I didn't like what it tried to do with includes without that override. Signed-off-by: Brad Martin <[email protected]> * Respect init signal handling options Signed-off-by: Brad Martin <[email protected]> * Reconcile signal handling differences with SingleThreadedExecutor Signed-off-by: Brad Martin <[email protected]> * test_executor.py: Add coverage for EventsExecutor Tests that currently work are enabled, and those that don't are annotated what needs to be done before they can be enabled Signed-off-by: Brad Martin <[email protected]> * Make spin_once() only return after a user-visible callback Signed-off-by: Brad Martin <[email protected]> * Add get_nodes() method Signed-off-by: Brad Martin <[email protected]> * Add context management support Signed-off-by: Brad Martin <[email protected]> * Remove mutex protection on nodes_ member access It was being used only inconsistently, and on reflection it wasn't adding any protection beyond what the GIL already provides. Signed-off-by: Brad Martin <[email protected]> * Fix deadlock during shutdown() Needed to release the GIL while blocking on another lock. Signed-off-by: Brad Martin <[email protected]> * A little further on using C++ _rclpy types instead of Python interface Signed-off-by: Brad Martin <[email protected]> * Fix issue with iterating through incomplete Tasks Never bool-test a py::object::attr() return value without an explicit py::cast<bool>. Signed-off-by: Brad Martin <[email protected]> * Add support for coroutines to timer handling Signed-off-by: Brad Martin <[email protected]> * Fix test_not_lose_callback() test to destroy entities properly EventsExecutor was exploding because the test was leaving destroyed entities in the node by using an incorrect API to destroy them. Signed-off-by: Brad Martin <[email protected]> * Correct test that wasn't running at all, and remove EventsExecutor from it * Test methods need to be prefixed with 'test_' in order to function. This had been entirely dead code, and when I enabled it EventsExecutor deadlocked. * On reflection, it seems like a deadlock is exactly what should be expected from what this test is doing. Remove EventsExecutor from the test and document the problem. Signed-off-by: Brad Martin <[email protected]> * Warn on every timer added over the threshold, not just the first Co-authored-by: Janosch Machowinski <[email protected]> Signed-off-by: Brad Martin <[email protected]> * Keep rcl_timer_call() tightly coupled with user callback dispatch This both prevents an issue where user callbacks could potentially queue up if they didn't dispatch fast enough, and ensures that a timer that has passed its expiration without being dispatched yet can still be canceled without the user callback being delivered later. This commit also makes use of the new rcl API for querying the absolute timer expiration time, instead of the relative number of nanoseconds remaining until it expires. This should both make things more accurate, and potentially reduce overhead as we don't have to re-query the current time for each outstanding timer. Signed-off-by: Brad Martin <[email protected]> * Protect against deferred method calls happening against a deleted ClockManager Signed-off-by: Brad Martin <[email protected]> * Add support for new TimerInfo data passed to timer handlers Signed-off-by: Brad Martin <[email protected]> * Simplify spin_once() implementation This both reduces duplicate code now, and simplifies the asio interface used which would need replacing. Signed-off-by: Brad Martin <[email protected]> * Fix stale Future done callbacks with spin_until_future_complete() This method can't be allowed to leave its Future done callback outstanding in case the method is returning for a reason other than the Future being done. Signed-off-by: Brad Martin <[email protected]> * Use existing rclpy signal handling instead of asio Signed-off-by: Brad Martin <[email protected]> * Replace asio timers with a dedicated timer wait thread This is dumb on its own, but it helps me move towards eliminating asio. Signed-off-by: Brad Martin <[email protected]> * Correct busy-looping in async callback handling This isn't ideal because there are some ways async callbacks could become unblocked which wouldn't get noticed right away (if at all); however this seems to match the behavior of SingleThreadedExecutor. Signed-off-by: Brad Martin <[email protected]> * Replace asio::io_context with a new EventsQueue object Signed-off-by: Brad Martin <[email protected]> * Add EventsExecutor to new test_executor test from merge Signed-off-by: Brad Martin <[email protected]> * Swap 'pragma once' for ifndef include guards Signed-off-by: Brad Martin <[email protected]> * Add test coverage for Node.destroy_subscription() Signed-off-by: Brad Martin <[email protected]> * Replace '|' type markup with typing.Optional and typing.Union Python 3.9, still used by RHEL 9, doesn't seem to understand '|' syntax, and Optional/Union seems to be what gets used throughout the rest of the codebase. Additionally, fix a couple other mypy nits: * mypy is mad that I haven't explicitly annotated every __init__ as returning None * mypy wants generic arguments to service and action clients now Signed-off-by: Brad Martin <[email protected]> * Use 'auto' in place of explicit return type on hash pybind11::hash() is documented as returning ssize_t, but this seems to be a lie because MSVC doesn't understand that type; so, let's just return whatever we do get. Signed-off-by: Brad Martin <[email protected]> * Change initialization of struct members MSVC didn't like the more concise method. Signed-off-by: Brad Martin <[email protected]> * Use subTest in test_executor to distinguish which executor type failed Signed-off-by: Brad Martin <[email protected]> * Use time.perf_counter() instead of time.monotonic() in executor test time.monotonic() has a resolution of 16ms, which is way too coarse for the intervals this test is trying to measure. Signed-off-by: Brad Martin <[email protected]> --------- Signed-off-by: Brad Martin <[email protected]> Signed-off-by: Brad Martin <[email protected]> Co-authored-by: Brad Martin <[email protected]> Co-authored-by: Janosch Machowinski <[email protected]>
1 parent 59671a0 commit 4c05d92

21 files changed

+3335
-325
lines changed

rclpy/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ pybind11_add_module(_rclpy_pybind11
9090
src/rclpy/destroyable.cpp
9191
src/rclpy/duration.cpp
9292
src/rclpy/clock_event.cpp
93+
src/rclpy/events_executor/delayed_event_thread.cpp
94+
src/rclpy/events_executor/events_executor.cpp
95+
src/rclpy/events_executor/events_queue.cpp
96+
src/rclpy/events_executor/rcl_support.cpp
97+
src/rclpy/events_executor/timers_manager.cpp
9398
src/rclpy/exceptions.cpp
9499
src/rclpy/graph.cpp
95100
src/rclpy/guard_condition.cpp
@@ -182,6 +187,7 @@ if(BUILD_TESTING)
182187
test/test_create_node.py
183188
test/test_create_while_spinning.py
184189
test/test_destruction.py
190+
test/test_events_executor.py
185191
test/test_executor.py
186192
test/test_expand_topic_name.py
187193
test/test_guard_condition.py
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright 2024-2025 Brad Martin
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from .events_executor import EventsExecutor # noqa: F401
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright 2024-2025 Brad Martin
2+
# Copyright 2024 Merlin Labs, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import faulthandler
16+
import typing
17+
18+
import rclpy.executors
19+
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
20+
21+
22+
# Try to look like we inherit from the rclpy Executor for type checking purposes without
23+
# getting any of the code from the base class.
24+
def EventsExecutor(*, context: typing.Optional[rclpy.Context] = None) -> rclpy.executors.Executor:
25+
if context is None:
26+
context = rclpy.get_default_context()
27+
28+
# For debugging purposes, if anything goes wrong in C++ make sure we also get a
29+
# Python backtrace dumped with the crash.
30+
faulthandler.enable()
31+
32+
ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context))
33+
34+
# rclpy.Executor does this too. Note, the context itself is smart enough to check
35+
# for bound methods, and check whether the instances they're bound to still exist at
36+
# callback time, so we don't have to worry about tearing down this stale callback at
37+
# destruction time.
38+
# TODO(bmartin427) This should really be done inside of the EventsExecutor
39+
# implementation itself, but I'm unable to figure out a pybind11 incantation that
40+
# allows me to pass this bound method call from C++.
41+
context.on_shutdown(ex.wake)
42+
43+
return ex

rclpy/rclpy/task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,20 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
207207
if invoke:
208208
callback(self)
209209

210+
def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool:
211+
"""
212+
Remove a previously-added done callback.
213+
214+
Returns true if the given callback was found and removed. Always fails if the Future was
215+
already complete.
216+
"""
217+
with self._lock:
218+
try:
219+
self._callbacks.remove(callback)
220+
except ValueError:
221+
return False
222+
return True
223+
210224

211225
class Task(Future[T]):
212226
"""

rclpy/src/rclpy/_rclpy_pybind11.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "duration.hpp"
3333
#include "clock_event.hpp"
3434
#include "event_handle.hpp"
35+
#include "events_executor/events_executor.hpp"
3536
#include "exceptions.hpp"
3637
#include "graph.hpp"
3738
#include "guard_condition.hpp"
@@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) {
247248
rclpy::define_signal_handler_api(m);
248249
rclpy::define_clock_event(m);
249250
rclpy::define_lifecycle_api(m);
251+
252+
rclpy::events_executor::define_events_executor(m);
250253
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 Brad Martin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
#include "events_executor/delayed_event_thread.hpp"
15+
16+
#include <utility>
17+
18+
namespace rclpy
19+
{
20+
namespace events_executor
21+
{
22+
23+
DelayedEventThread::DelayedEventThread(EventsQueue * events_queue)
24+
: events_queue_(events_queue), thread_([this]() {RunThread();})
25+
{
26+
}
27+
28+
DelayedEventThread::~DelayedEventThread()
29+
{
30+
{
31+
std::unique_lock<std::mutex> lock(mutex_);
32+
done_ = true;
33+
}
34+
cv_.notify_one();
35+
thread_.join();
36+
}
37+
38+
void DelayedEventThread::EnqueueAt(
39+
std::chrono::steady_clock::time_point when, std::function<void()> handler)
40+
{
41+
{
42+
std::unique_lock<std::mutex> lock(mutex_);
43+
when_ = when;
44+
handler_ = handler;
45+
}
46+
cv_.notify_one();
47+
}
48+
49+
void DelayedEventThread::RunThread()
50+
{
51+
std::unique_lock<std::mutex> lock(mutex_);
52+
while (!done_) {
53+
if (handler_) {
54+
// Make sure we don't dispatch a stale wait if it changes while we're waiting.
55+
const auto latched_when = when_;
56+
if (
57+
(std::cv_status::timeout == cv_.wait_until(lock, latched_when)) && handler_ &&
58+
(when_ <= latched_when))
59+
{
60+
auto handler = std::move(handler_);
61+
handler_ = {};
62+
events_queue_->Enqueue(std::move(handler));
63+
}
64+
} else {
65+
// Wait indefinitely until we get signaled that there's something worth looking at.
66+
cv_.wait(lock);
67+
}
68+
}
69+
}
70+
71+
} // namespace events_executor
72+
} // namespace rclpy
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2025 Brad Martin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
16+
#define RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
17+
18+
#include <chrono>
19+
#include <condition_variable>
20+
#include <functional>
21+
#include <mutex>
22+
#include <thread>
23+
24+
#include "events_executor/events_queue.hpp"
25+
26+
namespace rclpy
27+
{
28+
namespace events_executor
29+
{
30+
31+
/// This object manages posting an event handler to an EventsQueue after a specified delay. The
32+
/// current delay may be changed or canceled at any time. This is done by way of a self-contained
33+
/// child thread to perform the wait.
34+
class DelayedEventThread
35+
{
36+
public:
37+
/// The pointer is aliased and must live for the lifetime of this object.
38+
explicit DelayedEventThread(EventsQueue *);
39+
~DelayedEventThread();
40+
41+
/// Schedules an event handler to be enqueued at the specified time point. Replaces any previous
42+
/// wait and handler, which will never be dispatched if it has not been already.
43+
void EnqueueAt(std::chrono::steady_clock::time_point when, std::function<void()> handler);
44+
45+
/// Cancels any previously-scheduled handler.
46+
void Cancel() {EnqueueAt({}, {});}
47+
48+
private:
49+
void RunThread();
50+
51+
EventsQueue * const events_queue_;
52+
std::mutex mutex_;
53+
bool done_{};
54+
std::condition_variable cv_;
55+
std::chrono::steady_clock::time_point when_;
56+
std::function<void()> handler_;
57+
std::thread thread_;
58+
};
59+
60+
} // namespace events_executor
61+
} // namespace rclpy
62+
63+
#endif // RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_

0 commit comments

Comments
 (0)