Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ def create_task(self, callback: Callable[..., Any], *args: Any, **kwargs: Any
# Task inherits from Future
return task

def create_future(self) -> Future:
"""Create a Future object attached to the Executor."""
return Future(executor=self)

def shutdown(self, timeout_sec: Optional[float] = None) -> bool:
"""
Stop executing callbacks and wait for their completion.
Expand Down
8 changes: 8 additions & 0 deletions rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ EventsExecutor::EventsExecutor(py::object context)
inspect_iscoroutine_(py::module_::import("inspect").attr("iscoroutine")),
inspect_signature_(py::module_::import("inspect").attr("signature")),
rclpy_task_(py::module_::import("rclpy.task").attr("Task")),
rclpy_future_(py::module_::import("rclpy.task").attr("Future")),
rclpy_timer_timer_info_(py::module_::import("rclpy.timer").attr("TimerInfo")),
signal_callback_([this]() {events_queue_.Stop();}),
rcl_callback_manager_(&events_queue_),
Expand All @@ -78,6 +79,12 @@ pybind11::object EventsExecutor::create_task(
return task;
}

pybind11::object EventsExecutor::create_future()
{
using py::literals::operator""_a;
return rclpy_future_("executor"_a = py::cast(this));
}

bool EventsExecutor::shutdown(std::optional<double> timeout)
{
// NOTE: The rclpy context can invoke this with a lock on the context held. Therefore we must
Expand Down Expand Up @@ -897,6 +904,7 @@ void define_events_executor(py::object module)
.def(py::init<py::object>(), py::arg("context"))
.def_property_readonly("context", &EventsExecutor::get_context)
.def("create_task", &EventsExecutor::create_task, py::arg("callback"))
.def("create_future", &EventsExecutor::create_future)
.def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none())
.def("add_node", &EventsExecutor::add_node, py::arg("node"))
.def("remove_node", &EventsExecutor::remove_node, py::arg("node"))
Expand Down
2 changes: 2 additions & 0 deletions rclpy/src/rclpy/events_executor/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class EventsExecutor
pybind11::object get_context() const {return rclpy_context_;}
pybind11::object create_task(
pybind11::object callback, pybind11::args args = {}, const pybind11::kwargs & kwargs = {});
pybind11::object create_future();
bool shutdown(std::optional<double> timeout_sec = {});
bool add_node(pybind11::object node);
void remove_node(pybind11::handle node);
Expand Down Expand Up @@ -168,6 +169,7 @@ class EventsExecutor
const pybind11::object inspect_iscoroutine_;
const pybind11::object inspect_signature_;
const pybind11::object rclpy_task_;
const pybind11::object rclpy_future_;
const pybind11::object rclpy_timer_timer_info_;

EventsQueue events_queue_;
Expand Down
27 changes: 19 additions & 8 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def timer_callback() -> None:
timer = self.node.create_timer(0.003, timer_callback)

# Timeout
future = Future[None]()
future = executor.create_future()
self.assertFalse(future.done())
start = time.perf_counter()
executor.spin_until_future_complete(future=future, timeout_sec=0.1)
Expand All @@ -517,7 +517,7 @@ def set_future_result(future):
future.set_result('finished')

# Future complete timeout_sec > 0
future = Future[str]()
future = executor.create_future()
self.assertFalse(future.done())
t = threading.Thread(target=lambda: set_future_result(future))
t.start()
Expand All @@ -526,7 +526,7 @@ def set_future_result(future):
self.assertEqual(future.result(), 'finished')

# Future complete timeout_sec = None
future = Future()
future = executor.create_future()
self.assertFalse(future.done())
t = threading.Thread(target=lambda: set_future_result(future))
t.start()
Expand All @@ -535,7 +535,7 @@ def set_future_result(future):
self.assertEqual(future.result(), 'finished')

# Future complete timeout < 0
future = Future()
future = executor.create_future()
self.assertFalse(future.done())
t = threading.Thread(target=lambda: set_future_result(future))
t.start()
Expand All @@ -557,7 +557,7 @@ def timer_callback() -> None:
timer = self.node.create_timer(0.003, timer_callback)

# Do not wait timeout_sec = 0
future = Future[None]()
future = executor.create_future()
self.assertFalse(future.done())
executor.spin_until_future_complete(future=future, timeout_sec=0)
self.assertFalse(future.done())
Expand Down Expand Up @@ -640,7 +640,7 @@ def test_single_threaded_spin_once_until_future(self) -> None:
with self.subTest(cls=cls):
executor = cls(context=self.context)

future = Future[bool](executor=executor)
future = executor.create_future()

# Setup a thread to spin_once_until_future_complete, which will spin
# for a maximum of 10 seconds.
Expand Down Expand Up @@ -668,7 +668,7 @@ def test_multi_threaded_spin_once_until_future(self) -> None:
self.assertIsNotNone(self.node.handle)
executor = MultiThreadedExecutor(context=self.context)

future: Future[bool] = Future(executor=executor)
future: Future[bool] = executor.create_future()

# Setup a thread to spin_once_until_future_complete, which will spin
# for a maximum of 10 seconds.
Expand Down Expand Up @@ -717,7 +717,7 @@ def timer2_callback() -> None:
timer2 = self.node.create_timer(1.5, timer2_callback, callback_group)

executor.add_node(self.node)
future = Future[None](executor=executor)
future = executor.create_future()
executor.spin_until_future_complete(future, 4)

assert count == 2
Expand All @@ -727,6 +727,17 @@ def timer2_callback() -> None:
self.node.destroy_timer(timer1)
self.node.destroy_client(cli)

def test_create_future_returns_future_with_executor_attached(self) -> None:
self.assertIsNotNone(self.node.handle)
for cls in [SingleThreadedExecutor, MultiThreadedExecutor, EventsExecutor]:
with self.subTest(cls=cls):
executor = cls(context=self.context)
try:
fut = executor.create_future()
self.assertEqual(executor, fut._executor())
finally:
executor.shutdown()


if __name__ == '__main__':
unittest.main()