Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void EventsExecutor::spin(std::optional<double> timeout_sec, bool stop_after_use
const auto timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(*timeout_sec));
const auto end = std::chrono::steady_clock::now() + timeout_ns;
events_queue_.RunUntil(end);
events_queue_.Run(end);
} else {
events_queue_.Run();
}
Expand Down
11 changes: 7 additions & 4 deletions rclpy/src/rclpy/events_executor/events_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ void EventsQueue::Enqueue(std::function<void()> event_handler)
cv_.notify_one();
}

void EventsQueue::Run() {RunUntil(std::chrono::steady_clock::time_point::max());}

void EventsQueue::RunUntil(std::chrono::steady_clock::time_point deadline)
void EventsQueue::Run(const std::optional<std::chrono::steady_clock::time_point> deadline)
{
while (true) {
std::function<void()> handler;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_until(lock, deadline, [this]() {return stopped_ || !queue_.empty();});
auto pred = [this]() {return stopped_ || !queue_.empty();};
if (deadline) {
cv_.wait_until(lock, *deadline, pred);
} else {
cv_.wait(lock, pred);
}
if (stopped_ || queue_.empty()) {
// We stopped for some reason other than being ready to run something (stopped or timeout)
return;
Expand Down
6 changes: 2 additions & 4 deletions rclpy/src/rclpy/events_executor/events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <functional>
#include <mutex>
#include <queue>
#include <optional>

namespace rclpy
{
Expand All @@ -37,12 +38,9 @@ class EventsQueue
/// Add an event handler to the queue to be dispatched. Can be invoked by any thread.
void Enqueue(std::function<void()>);

/// Run event handlers indefinitely, until stopped.
void Run();

/// Run all ready event handlers, and any that become ready before the given deadline. Calling
/// Stop() will make this return immediately even if ready handlers are enqueued.
void RunUntil(std::chrono::steady_clock::time_point);
void Run(const std::optional<std::chrono::steady_clock::time_point> = {});

/// Causes any Run*() methods outstanding to return immediately. Can be invoked from any thread.
/// The stopped condition persists (causing any *subsequent* Run*() calls to also return) until
Expand Down