diff --git a/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs b/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs index 279858be3..76c8734b7 100644 --- a/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs +++ b/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs @@ -5,12 +5,14 @@ use rclrs::{create_node, Context, Node, RclrsError, Timer}; use std::{ env, - sync::{Arc, Mutex}, + sync::Arc, + time::Duration, }; /// Contains both the node and timer. struct SimpleTimerNode { node: Arc, + #[allow(unused)] timer: Arc, } @@ -20,29 +22,25 @@ impl SimpleTimerNode { /// The callback will simply print to stdout: /// "Drinking 🧉 for the xth time every p nanoseconds." /// where x is the iteration callback counter and p is the period of the timer. - fn new(context: &Context, timer_period_ns: i64) -> Result { + fn new(context: &Context, timer_period: Duration) -> Result { let node = create_node(context, "simple_timer_node")?; - let count: Arc> = Arc::new(Mutex::new(0)); - let timer = node.create_timer( - timer_period_ns, - context, - Some(Box::new(move |_| { - let x = *count.lock().unwrap(); + let mut x = 0; + let timer = node.create_timer_repeating( + timer_period, + move || { + x += 1; println!( - "Drinking 🧉 for the {}th time every {} nanoseconds.", - x, timer_period_ns + "Drinking 🧉 for the {x}th time every {:?}.", + timer_period, ); - *count.lock().unwrap() = x + 1; - })), - None, + }, )?; Ok(Self { node, timer }) } } fn main() -> Result<(), RclrsError> { - let timer_period: i64 = 1e9 as i64; // 1 seconds. let context = Context::new(env::args()).unwrap(); - let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, timer_period).unwrap()); + let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, Duration::from_secs(1)).unwrap()); rclrs::spin(simple_timer_node.node.clone()) } diff --git a/rclrs/src/clock.rs b/rclrs/src/clock.rs index 8182d37ae..8dd543455 100644 --- a/rclrs/src/clock.rs +++ b/rclrs/src/clock.rs @@ -66,21 +66,26 @@ impl Clock { } fn make(kind: ClockType) -> Self { - let mut rcl_clock; + let rcl_clock; unsafe { // SAFETY: Getting a default value is always safe. - rcl_clock = Self::init_generic_clock(); + let allocator = rcutils_get_default_allocator(); + rcl_clock = Arc::new(Mutex::new(rcl_clock_t { + type_: rcl_clock_type_t::RCL_CLOCK_UNINITIALIZED, + jump_callbacks: std::ptr::null_mut(), + num_jump_callbacks: 0, + get_now: None, + data: std::ptr::null_mut::(), + allocator, + })); let mut allocator = rcutils_get_default_allocator(); // Function will return Err(_) only if there isn't enough memory to allocate a clock // object. - rcl_clock_init(kind.into(), &mut rcl_clock, &mut allocator) + rcl_clock_init(kind.into(), &mut *rcl_clock.lock().unwrap(), &mut allocator) .ok() .unwrap(); } - Self { - kind, - rcl_clock: Arc::new(Mutex::new(rcl_clock)), - } + Self { kind, rcl_clock } } /// Returns the clock's `rcl_clock_t`. @@ -106,22 +111,6 @@ impl Clock { clock: Arc::downgrade(&self.rcl_clock), } } - - /// Helper function to privately initialize a default clock, with the same behavior as - /// `rcl_init_generic_clock`. By defining a private function instead of implementing - /// `Default`, we avoid exposing a public API to create an invalid clock. - // SAFETY: Getting a default value is always safe. - unsafe fn init_generic_clock() -> rcl_clock_t { - let allocator = rcutils_get_default_allocator(); - rcl_clock_t { - type_: rcl_clock_type_t::RCL_CLOCK_UNINITIALIZED, - jump_callbacks: std::ptr::null_mut::(), - num_jump_callbacks: 0, - get_now: None, - data: std::ptr::null_mut::(), - allocator, - } - } } impl Drop for ClockSource { diff --git a/rclrs/src/error.rs b/rclrs/src/error.rs index 3eba2549f..eab8fc7e6 100644 --- a/rclrs/src/error.rs +++ b/rclrs/src/error.rs @@ -32,6 +32,10 @@ pub enum RclrsError { }, /// It was attempted to add a waitable to a wait set twice. AlreadyAddedToWaitSet, + /// A negative duration was obtained from rcl which should have been positive. + /// + /// The value represents nanoseconds. + NegativeDuration(i64), } impl Display for RclrsError { @@ -48,6 +52,12 @@ impl Display for RclrsError { "Could not add entity to wait set because it was already added to a wait set" ) } + RclrsError::NegativeDuration(duration) => { + write!( + f, + "A duration was negative when it should not have been: {duration:?}" + ) + } } } } @@ -80,6 +90,7 @@ impl Error for RclrsError { RclrsError::UnknownRclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error), RclrsError::StringContainsNul { err, .. } => Some(err).map(|e| e as &dyn Error), RclrsError::AlreadyAddedToWaitSet => None, + RclrsError::NegativeDuration(_) => None, } } } diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 5df9c057d..6aeee64cd 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -13,10 +13,11 @@ use rosidl_runtime_rs::Message; pub use self::{builder::*, graph::*}; use crate::{ - rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, LogParams, - Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, - QoSProfile, RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, - SubscriptionCallback, TimeSource, Timer, TimerCallback, ToLogParams, ENTITY_LIFECYCLE_MUTEX, + rcl_bindings::*, AnyTimerCallback, Client, ClientBase, Clock, Context, ContextHandle, + GuardCondition, IntoTimerOptions, LogParams, Logger, ParameterBuilder, ParameterInterface, + ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase, + Subscription, SubscriptionBase, SubscriptionCallback, TimeSource, Timer, TimerCallOnce, + TimerCallRepeating, ToLogParams, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -341,22 +342,23 @@ impl Node { Ok(subscription) } - /// Creates a [`Timer`][1]. + /// Creates a [`Timer`]. + /// + /// For more ergonomic usage see also: + /// * [`Self::create_timer_repeating`] + /// * [`Self::create_timer_oneshot`] + /// * [`Self::create_timer_inert`] /// - /// [1]: crate::Timer /// TODO: make timer's lifetime depend on node's lifetime. - pub fn create_timer( + pub fn create_timer<'a>( &self, - period_ns: i64, - context: &Context, - callback: Option, - clock: Option, + options: impl IntoTimerOptions<'a>, + callback: AnyTimerCallback, ) -> Result, RclrsError> { - let clock_used = match clock { - Some(value) => value, - None => self.get_clock(), - }; - let timer = Timer::new(&clock_used, &context, period_ns, callback)?; + let options = options.into_timer_options(); + let clock = options.clock.as_clock(self); + + let timer = Timer::new(&self.handle.context_handle, options.period, clock, callback)?; let timer = Arc::new(timer); self.timers_mtx .lock() @@ -365,6 +367,48 @@ impl Node { Ok(timer) } + /// Create a [`Timer`] with a repeating callback. + /// + /// See also: + /// * [`Self::create_timer_oneshot`] + /// * [`Self::create_timer_inert`] + pub fn create_timer_repeating<'a, Args>( + &self, + options: impl IntoTimerOptions<'a>, + callback: impl TimerCallRepeating, + ) -> Result, RclrsError> { + self.create_timer(options, callback.into_repeating_timer_callback()) + } + + /// Create a [`Timer`] whose callback will be triggered once after the period + /// of the timer has elapsed. After that you will need to use + /// [`Timer::set_callback`] or a related method or else nothing will happen + /// the following times that the `Timer` elapses. + /// + /// See also: + /// * [`Self::create_timer_repeating`] + /// * [`Self::create_time_inert`] + pub fn create_timer_oneshot<'a, Args>( + &self, + options: impl IntoTimerOptions<'a>, + callback: impl TimerCallOnce, + ) -> Result, RclrsError> { + self.create_timer(options, callback.into_oneshot_timer_callback()) + } + + /// Create a [`Timer`] without a callback. Nothing will happen when this + /// `Timer` elapses until you use [`Timer::set_callback`] or a related method. + /// + /// See also: + /// * [`Self::create_timer_repeating`] + /// * [`Self::create_timer_oneshot`] + pub fn create_timer_inert<'a>( + &self, + options: impl IntoTimerOptions<'a>, + ) -> Result, RclrsError> { + self.create_timer(options, AnyTimerCallback::None) + } + /// Returns the subscriptions that have not been dropped yet. pub(crate) fn live_subscriptions(&self) -> Vec> { { self.subscriptions_mtx.lock().unwrap() } @@ -532,6 +576,11 @@ mod tests { use super::*; use crate::test_helpers::*; + use std::{ + time::{Duration, Instant}, + sync::{Arc, atomic::{AtomicU64, Ordering}}, + }; + #[test] fn traits() { assert_send::(); @@ -584,20 +633,64 @@ mod tests { } #[test] - fn test_create_timer_without_clock_source() -> Result<(), RclrsError> { - let timer_period_ns: i64 = 1e6 as i64; // 1 millisecond. + fn test_create_timer() -> Result<(), RclrsError> { let context = Context::new([])?; - let dut = NodeBuilder::new(&context, "node_with_timer") + let node = NodeBuilder::new(&context, "node_with_timer") .namespace("test_create_timer") .build()?; - let _timer = - dut.create_timer(timer_period_ns, &context, Some(Box::new(move |_| {})), None)?; - assert_eq!(dut.live_timers().len(), 1); + let repeat_counter = Arc::new(AtomicU64::new(0)); + let repeat_counter_check = Arc::clone(&repeat_counter); + let _repeating_timer = node.create_timer_repeating( + Duration::from_millis(1), + move || { repeat_counter.fetch_add(1, Ordering::AcqRel); }, + )?; + assert_eq!(node.live_timers().len(), 1); + + let oneshot_counter = Arc::new(AtomicU64::new(0)); + let oneshot_counter_check = Arc::clone(&oneshot_counter); + let _oneshot_timer = node.create_timer_oneshot( + Duration::from_millis(1) + .node_time(), + move || { oneshot_counter.fetch_add(1, Ordering::AcqRel); }, + )?; + + let oneshot_resetting_counter = Arc::new(AtomicU64::new(0)); + let oneshot_resetting_counter_check = Arc::clone(&oneshot_resetting_counter); + let _oneshot_resetting_timer = node.create_timer_oneshot( + Duration::from_millis(1), + move |timer: &Timer| { + recursive_oneshot(timer, oneshot_resetting_counter); + }, + ); + + let start = Instant::now(); + while start.elapsed() < Duration::from_millis(10) { + crate::spin_once(Arc::clone(&node), Some(Duration::from_millis(10)))?; + } + + // We give a little leeway to the exact count since timers won't always + // be triggered perfectly. The important thing is that it was + // successfully called repeatedly. + assert!(repeat_counter_check.load(Ordering::Acquire) > 5); + assert!(oneshot_resetting_counter_check.load(Ordering::Acquire) > 5); + + // This should only have been triggered exactly once + assert_eq!(oneshot_counter_check.load(Ordering::Acquire), 1); Ok(()) } + fn recursive_oneshot( + timer: &Timer, + counter: Arc, + ) { + counter.fetch_add(1, Ordering::AcqRel); + timer.set_oneshot(move |timer: &Timer| { + recursive_oneshot(timer, counter); + }); + } + #[test] fn test_logger_name() -> Result<(), RclrsError> { // Use helper to create 2 nodes for us diff --git a/rclrs/src/timer.rs b/rclrs/src/timer.rs index fb5f761a7..de51677b6 100644 --- a/rclrs/src/timer.rs +++ b/rclrs/src/timer.rs @@ -1,10 +1,19 @@ -use crate::{clock::Clock, context::Context, error::RclrsError, rcl_bindings::*, to_rclrs_result}; +use crate::{ + clock::Clock, context::ContextHandle, error::RclrsError, log_error, rcl_bindings::*, + ToLogParams, ToResult, ENTITY_LIFECYCLE_MUTEX, +}; // TODO: fix me when the callback type is properly defined. // use std::fmt::Debug; -use std::sync::{atomic::AtomicBool, Arc, Mutex}; +use std::{ + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, +}; -/// Type alias for the `Timer` callback. -pub type TimerCallback = Box; +mod timer_callback; +pub use timer_callback::*; + +mod timer_options; +pub use timer_options::*; /// Struct for executing periodic events. /// @@ -21,143 +30,308 @@ pub type TimerCallback = Box; // TODO: callback type prevents us from making the Timer implement the Debug trait. // #[derive(Debug)] pub struct Timer { - pub(crate) rcl_timer: Arc>, + pub(crate) handle: TimerHandle, /// The callback function that runs when the timer is due. - callback: Option, + callback: Arc>>, + /// What was the last time lapse between calls to this timer + last_elapse: Mutex, + /// We hold onto the Timer's clock for the whole lifespan of the Timer to + /// make sure the underlying `rcl_clock_t` remains valid. pub(crate) in_use_by_wait_set: Arc, } -impl Timer { - /// Creates a new timer. - pub fn new( - clock: &Clock, - context: &Context, - period: i64, - callback: Option, - ) -> Result { - let mut rcl_timer; - let timer_init_result = unsafe { - // SAFETY: Getting a default value is always safe. - rcl_timer = rcl_get_zero_initialized_timer(); - let mut rcl_clock = clock.get_rcl_clock().lock().unwrap(); - let allocator = rcutils_get_default_allocator(); - let mut rcl_context = context.handle.rcl_context.lock().unwrap(); - // Callbacks will be handled in the WaitSet. - let rcl_timer_callback: rcl_timer_callback_t = None; - // Function will return Err(_) only if there isn't enough memory to allocate a clock - // object. - rcl_timer_init( - &mut rcl_timer, - &mut *rcl_clock, - &mut *rcl_context, - period, - rcl_timer_callback, - allocator, - ) - }; - to_rclrs_result(timer_init_result).map(|_| Timer { - rcl_timer: Arc::new(Mutex::new(rcl_timer)), - callback, - in_use_by_wait_set: Arc::new(AtomicBool::new(false)), - }) - } +/// Manage the lifecycle of an `rcl_timer_t`, including managing its dependency +/// on `rcl_clock_t` by ensuring that this dependency are [dropped after][1] +/// the `rcl_timer_t`. +/// +/// [1]: +pub(crate) struct TimerHandle { + pub(crate) rcl_timer: Arc>, + clock: Clock, +} - /// Gets the period of the timer in nanoseconds - pub fn get_timer_period_ns(&self) -> Result { +impl Timer { + /// Gets the period of the timer + pub fn get_timer_period(&self) -> Result { let mut timer_period_ns = 0; - let get_period_result = unsafe { - let rcl_timer = self.rcl_timer.lock().unwrap(); + unsafe { + let rcl_timer = self.handle.rcl_timer.lock().unwrap(); rcl_timer_get_period(&*rcl_timer, &mut timer_period_ns) - }; - to_rclrs_result(get_period_result).map(|_| timer_period_ns) + } + .ok()?; + + rcl_duration(timer_period_ns) } /// Cancels the timer, stopping the execution of the callback pub fn cancel(&self) -> Result<(), RclrsError> { - let mut rcl_timer = self.rcl_timer.lock().unwrap(); - let cancel_result = unsafe { rcl_timer_cancel(&mut *rcl_timer) }; - to_rclrs_result(cancel_result) + let mut rcl_timer = self.handle.rcl_timer.lock().unwrap(); + let cancel_result = unsafe { rcl_timer_cancel(&mut *rcl_timer) }.ok()?; + Ok(cancel_result) } /// Checks whether the timer is canceled or not pub fn is_canceled(&self) -> Result { let mut is_canceled = false; - let is_canceled_result = unsafe { - let rcl_timer = self.rcl_timer.lock().unwrap(); + unsafe { + let rcl_timer = self.handle.rcl_timer.lock().unwrap(); rcl_timer_is_canceled(&*rcl_timer, &mut is_canceled) - }; - to_rclrs_result(is_canceled_result).map(|_| is_canceled) + } + .ok()?; + Ok(is_canceled) + } + + /// Get the last time lapse between calls to the timer. + /// + /// This is different from [`Self::time_since_last_call`] because it remains + /// constant between calls to the Timer. + /// + /// It keeps track of the what the value of [`Self::time_since_last_call`] + /// was immediately before the most recent call to the callback. This will + /// be [`Duration::ZERO`] if the `Timer` has never been triggered. + pub fn last_elapse(&self) -> Duration { + *self.last_elapse.lock().unwrap() } /// Retrieves the time since the last call to the callback - pub fn time_since_last_call(&self) -> Result { + pub fn time_since_last_call(&self) -> Result { let mut time_value_ns: i64 = 0; - let time_since_last_call_result = unsafe { - let rcl_timer = self.rcl_timer.lock().unwrap(); + unsafe { + let rcl_timer = self.handle.rcl_timer.lock().unwrap(); rcl_timer_get_time_since_last_call(&*rcl_timer, &mut time_value_ns) - }; - to_rclrs_result(time_since_last_call_result).map(|_| time_value_ns) + } + .ok()?; + + rcl_duration(time_value_ns) } /// Retrieves the time until the next call of the callback - pub fn time_until_next_call(&self) -> Result { + pub fn time_until_next_call(&self) -> Result { let mut time_value_ns: i64 = 0; - let time_until_next_call_result = unsafe { - let rcl_timer = self.rcl_timer.lock().unwrap(); + unsafe { + let rcl_timer = self.handle.rcl_timer.lock().unwrap(); rcl_timer_get_time_until_next_call(&*rcl_timer, &mut time_value_ns) - }; - to_rclrs_result(time_until_next_call_result).map(|_| time_value_ns) + } + .ok()?; + + rcl_duration(time_value_ns) } /// Resets the timer. pub fn reset(&self) -> Result<(), RclrsError> { - let mut rcl_timer = self.rcl_timer.lock().unwrap(); - to_rclrs_result(unsafe { rcl_timer_reset(&mut *rcl_timer) }) - } - - /// Executes the callback of the timer (this is triggered by the executor or the node directly) - pub fn call(&self) -> Result<(), RclrsError> { - let mut rcl_timer = self.rcl_timer.lock().unwrap(); - to_rclrs_result(unsafe { rcl_timer_call(&mut *rcl_timer) }) + let mut rcl_timer = self.handle.rcl_timer.lock().unwrap(); + unsafe { rcl_timer_reset(&mut *rcl_timer) }.ok() } /// Checks if the timer is ready (not canceled) pub fn is_ready(&self) -> Result { - let (is_ready, is_ready_result) = unsafe { + let is_ready = unsafe { let mut is_ready: bool = false; - let rcl_timer = self.rcl_timer.lock().unwrap(); - let is_ready_result = rcl_timer_is_ready(&*rcl_timer, &mut is_ready); - (is_ready, is_ready_result) + let rcl_timer = self.handle.rcl_timer.lock().unwrap(); + rcl_timer_is_ready(&*rcl_timer, &mut is_ready).ok()?; + is_ready }; - to_rclrs_result(is_ready_result).map(|_| is_ready) + + Ok(is_ready) } + /// Get the clock that this timer runs on. + pub fn clock(&self) -> &Clock { + &self.handle.clock + } + + /// Set a new callback for the timer. This will return whatever callback + /// was already present unless you are calling the function from inside of + /// the timer's callback, in which case you will receive [`None`]. + /// + /// See also: + /// * [`Self::set_repeating`] + /// * [`Self::set_oneshot`] + /// * [`Self::set_inert`]. + pub fn set_callback(&self, callback: AnyTimerCallback) -> Option { + self.callback.lock().unwrap().replace(callback) + } + + /// Set a repeating callback for this timer. + /// + /// See also: + /// * [`Self::set_oneshot`] + /// * [`Self::set_inert`] + pub fn set_repeating( + &self, + f: impl TimerCallRepeating, + ) -> Option { + self.set_callback(f.into_repeating_timer_callback()) + } + + /// Set a one-shot callback for the timer. + /// + /// The next time the timer is triggered, the callback will be set to + /// [`AnyTimerCallback::None`] after this callback is triggered. To keep the + /// timer useful, you can reset the Timer callback at any time, including + /// inside the one-shot callback itself. + /// + /// See also: + /// * [`Self::set_repeating`] + /// * [`Self::set_inert`] + pub fn set_oneshot(&self, f: impl TimerCallOnce) -> Option { + self.set_callback(f.into_oneshot_timer_callback()) + } + + /// Remove the callback from the timer. + /// + /// This does not cancel the timer; it will continue to wake up and be + /// triggered at its regular period. However, nothing will happen when the + /// timer is triggered until you give a new callback to the timer. + /// + /// You can give the timer a new callback at any time by calling: + /// * [`Self::set_repeating`] + /// * [`Self::set_oneshot`] + pub fn set_inert(&self) -> Option { + self.set_callback(AnyTimerCallback::None) + } + + /// This is triggerd when the Timer wakes up its wait set. pub(crate) fn execute(&self) -> Result<(), RclrsError> { if self.is_ready()? { - let time_since_last_call = self.time_since_last_call()?; self.call()?; - if let Some(ref callback) = self.callback { - callback(time_since_last_call); + } + + Ok(()) + } + + /// Creates a new timer. Users should call one of [`Node::create_timer`], + /// [`Node::create_timer_repeating`], [`Node::create_timer_oneshot`], or + /// [`Node::create_timer_inert`]. + pub(crate) fn new( + context: &ContextHandle, + period: Duration, + clock: Clock, + callback: AnyTimerCallback, + ) -> Result { + let period = period.as_nanos() as i64; + + // Callbacks will be handled at the rclrs layer. + let rcl_timer_callback: rcl_timer_callback_t = None; + + let rcl_timer = Arc::new(Mutex::new( + // SAFETY: Zero-initializing a timer is always safe + unsafe { rcl_get_zero_initialized_timer() }, + )); + + unsafe { + let mut rcl_clock = clock.get_rcl_clock().lock().unwrap(); + let mut rcl_context = context.rcl_context.lock().unwrap(); + + // SAFETY: Getting a default value is always safe. + let allocator = rcutils_get_default_allocator(); + + let _lifecycle = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: We lock the lifecycle mutex since rcl_timer_init is not + // thread-safe. + rcl_timer_init( + &mut *rcl_timer.lock().unwrap(), + &mut *rcl_clock, + &mut *rcl_context, + period, + rcl_timer_callback, + allocator, + ) + } + .ok()?; + + let timer = Timer { + handle: TimerHandle { rcl_timer, clock }, + callback: Arc::new(Mutex::new(Some(callback))), + last_elapse: Mutex::new(Duration::ZERO), + in_use_by_wait_set: Arc::new(AtomicBool::new(false)), + }; + Ok(timer) + } + + /// Force the timer to be called, even if it is not ready to be triggered yet. + /// We could consider making this public, but the behavior may confuse users. + fn call(&self) -> Result<(), RclrsError> { + // Keep track of the time elapsed since the last call. We need to run + // this before we trigger rcl_call. + let last_elapse = self.time_since_last_call().unwrap_or(Duration::ZERO); + *self.last_elapse.lock().unwrap() = last_elapse; + + if let Err(err) = self.rcl_call() { + log_error!("timer", "Unable to call timer: {err:?}",); + } + + let Some(callback) = self.callback.lock().unwrap().take() else { + log_error!( + "timer".once(), + "Timer is missing its callback information. This should not \ + be possible, please report it to the maintainers of rclrs.", + ); + return Ok(()); + }; + + match callback { + AnyTimerCallback::Repeating(mut callback) => { + callback(self); + self.restore_callback(AnyTimerCallback::Repeating(callback)); + } + AnyTimerCallback::OneShot(callback) => { + callback(self); + // We restore the callback as None because this was a + // one-shot which has been consumed. + self.restore_callback(AnyTimerCallback::None); + } + AnyTimerCallback::None => { + // Nothing to do here, just restore the callback. + self.restore_callback(AnyTimerCallback::None); } } + Ok(()) } + + /// Updates the state of the rcl_timer to know that it has been called. This + /// should only be called by [`Self::call`]. + /// + /// The callback held by the rcl_timer is null because we store the callback + /// in the [`Timer`] struct. This means there are no side-effects to this + /// except to keep track of when the timer has been called. + fn rcl_call(&self) -> Result<(), RclrsError> { + let mut rcl_timer = self.handle.rcl_timer.lock().unwrap(); + unsafe { rcl_timer_call(&mut *rcl_timer) }.ok() + } + + /// Used by [`Timer::execute`] to restore the state of the callback if and + /// only if the user has not already set a new callback. + fn restore_callback(&self, callback: AnyTimerCallback) { + let mut self_callback = self.callback.lock().unwrap(); + if self_callback.is_none() { + *self_callback = Some(callback); + } + } } /// 'Drop' trait implementation to be able to release the resources -impl Drop for rcl_timer_t { +impl Drop for TimerHandle { fn drop(&mut self) { - // SAFETY: No preconditions for this function - let rc = unsafe { rcl_timer_fini(&mut *self) }; - if let Err(e) = to_rclrs_result(rc) { - panic!("Unable to release Timer. {:?}", e) - } + let _lifecycle = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The lifecycle mutex is locked and the clock for the timer + // must still be valid because TimerHandle keeps it alive. + unsafe { rcl_timer_fini(&mut *self.rcl_timer.lock().unwrap()) }; } } impl PartialEq for Timer { fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.rcl_timer, &other.rcl_timer) + Arc::ptr_eq(&self.handle.rcl_timer, &other.handle.rcl_timer) + } +} + +fn rcl_duration(duration_value_ns: i64) -> Result { + if duration_value_ns < 0 { + Err(RclrsError::NegativeDuration(duration_value_ns)) + } else { + Ok(Duration::from_nanos(duration_value_ns as u64)) } } @@ -167,12 +341,11 @@ unsafe impl Send for rcl_timer_t {} #[cfg(test)] mod tests { - use super::*; - use std::{thread, time}; - - fn create_dummy_callback() -> Option { - Some(Box::new(move |_| {})) - } + use crate::*; + use std::{ + sync::atomic::{AtomicBool, Ordering}, + thread, time, + }; #[test] fn traits() { @@ -184,215 +357,278 @@ mod tests { #[test] fn test_new_with_system_clock() { - let clock = Clock::system(); let context = Context::new(vec![]).unwrap(); - let period: i64 = 1e6 as i64; // 1 milliseconds. - let dut = Timer::new(&clock, &context, period, create_dummy_callback()); - assert!(dut.is_ok()); + let result = Timer::new( + &context.handle, + Duration::from_millis(1), + Clock::system(), + (|| {}).into_repeating_timer_callback(), + ); + assert!(result.is_ok()); } #[test] fn test_new_with_steady_clock() { - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period: i64 = 1e6 as i64; // 1 milliseconds. - let dut = Timer::new(&clock, &context, period, create_dummy_callback()); - assert!(dut.is_ok()); + let result = Timer::new( + &context.handle, + Duration::from_millis(1), + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ); + assert!(result.is_ok()); } - #[ignore = "SIGSEGV when creating the timer with Clock::with_source()."] #[test] fn test_new_with_source_clock() { let (clock, source) = Clock::with_source(); // No manual time set, it should default to 0 - assert!(clock.now().nsec == 0); + assert_eq!(clock.now().nsec, 0); let set_time = 1234i64; source.set_ros_time_override(set_time); - // Ros time is set, should return the value that was set + + // ROS time is set, should return the value that was set assert_eq!(clock.now().nsec, set_time); + let context = Context::new(vec![]).unwrap(); - let period: i64 = 1e6 as i64; // 1 milliseconds.. - let dut = Timer::new(&clock, &context, period, create_dummy_callback()); - assert!(dut.is_ok()); + let result = Timer::new( + &context.handle, + Duration::from_millis(1), + clock, + (|| {}).into_repeating_timer_callback(), + ); + assert!(result.is_ok()); } #[test] fn test_get_period() { - let clock = Clock::steady(); + let period = Duration::from_millis(1); let context = Context::new(vec![]).unwrap(); - let period: i64 = 1e6 as i64; // 1 milliseconds. - let dut = Timer::new(&clock, &context, period, create_dummy_callback()); - assert!(dut.is_ok()); - let dut = dut.unwrap(); - let period_result = dut.get_timer_period_ns(); - assert!(period_result.is_ok()); - let period_result = period_result.unwrap(); - assert_eq!(period_result, 1e6 as i64); + + let result = Timer::new( + &context.handle, + period, + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ); + + let timer = result.unwrap(); + let timer_period = timer.get_timer_period().unwrap(); + assert_eq!(timer_period, period); } #[test] fn test_cancel() { - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period: i64 = 1e6 as i64; // 1 milliseconds. - let dut = Timer::new(&clock, &context, period, create_dummy_callback()); - assert!(dut.is_ok()); - let dut = dut.unwrap(); - assert!(dut.is_canceled().is_ok()); - assert!(!dut.is_canceled().unwrap()); - let cancel_result = dut.cancel(); - assert!(cancel_result.is_ok()); - assert!(dut.is_canceled().is_ok()); - assert!(dut.is_canceled().unwrap()); + + let result = Timer::new( + &context.handle, + Duration::from_millis(1), + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ); + + let timer = result.unwrap(); + assert!(!timer.is_canceled().unwrap()); + timer.cancel().unwrap(); + assert!(timer.is_canceled().unwrap()); } #[test] fn test_time_since_last_call_before_first_event() { - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 2e6 as i64; // 2 milliseconds. - let sleep_period_ms = time::Duration::from_millis(1); - let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()); - assert!(dut.is_ok()); - let dut = dut.unwrap(); - thread::sleep(sleep_period_ms); - let time_since_last_call = dut.time_since_last_call(); - assert!(time_since_last_call.is_ok()); - let time_since_last_call = time_since_last_call.unwrap(); + + let result = Timer::new( + &context.handle, + Duration::from_millis(2), + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ); + let timer = result.unwrap(); + + let sleep_period = time::Duration::from_millis(1); + thread::sleep(sleep_period); + + let time_since_last_call = timer.time_since_last_call().unwrap(); assert!( - time_since_last_call > 9e5 as i64, - "time_since_last_call: {}", - time_since_last_call + time_since_last_call >= sleep_period, + "time_since_last_call: {:?} vs sleep period: {:?}", + time_since_last_call, + sleep_period, ); } #[test] fn test_time_until_next_call_before_first_event() { - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 2e6 as i64; // 2 milliseconds. - let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()); - assert!(dut.is_ok()); - let dut = dut.unwrap(); - let time_until_next_call = dut.time_until_next_call(); - assert!(time_until_next_call.is_ok()); - let time_until_next_call = time_until_next_call.unwrap(); + let period = Duration::from_millis(2); + + let result = Timer::new( + &context.handle, + period, + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ); + let timer = result.unwrap(); + + let time_until_next_call = timer.time_until_next_call().unwrap(); assert!( - time_until_next_call < period_ns, - "time_until_next_call: {}", - time_until_next_call + time_until_next_call <= period, + "time_until_next_call: {:?} vs period: {:?}", + time_until_next_call, + period, ); } #[test] fn test_reset() { - let tolerance = 20e4 as i64; - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 2e6 as i64; // 2 milliseconds. - let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); - let elapsed = period_ns - dut.time_until_next_call().unwrap(); - assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed); - thread::sleep(time::Duration::from_millis(1)); - assert!(dut.reset().is_ok()); - let elapsed = period_ns - dut.time_until_next_call().unwrap(); - assert!(elapsed < tolerance, "elapsed after reset: {}", elapsed); + let period = Duration::from_millis(2); + let timer = Timer::new( + &context.handle, + period, + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ) + .unwrap(); + + // The unwrap will panic if the remaining time is negative + timer.time_until_next_call().unwrap(); + + // Sleep until we're past the timer period + thread::sleep(Duration::from_millis(3)); + + // Now the time until next call should give an error + assert!(matches!( + timer.time_until_next_call(), + Err(RclrsError::NegativeDuration(_)) + )); + + // Reset the timer so its interval begins again + assert!(timer.reset().is_ok()); + + // The unwrap will panic if the remaining time is negative + timer.time_until_next_call().unwrap(); } #[test] fn test_call() { - let tolerance = 20e4 as i64; - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 1e6 as i64; // 1 millisecond. - let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); - let elapsed = period_ns - dut.time_until_next_call().unwrap(); - assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed); + let timer = Timer::new( + &context.handle, + Duration::from_millis(1), + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ) + .unwrap(); + + // The unwrap will panic if the remaining time is negative + timer.time_until_next_call().unwrap(); + + // Sleep until we're past the timer period thread::sleep(time::Duration::from_micros(1500)); - let elapsed = period_ns - dut.time_until_next_call().unwrap(); - assert!( - elapsed > 1500000i64, - "time_until_next_call before call: {}", - elapsed - ); - assert!(dut.call().is_ok()); - let elapsed = dut.time_until_next_call().unwrap(); - assert!( - elapsed < 500000i64, - "time_until_next_call after call: {}", - elapsed - ); + + // Now the time until the next call should give an error + assert!(matches!( + timer.time_until_next_call(), + Err(RclrsError::NegativeDuration(_)) + )); + + // The unwrap will panic if anything went wrong with the call + timer.call().unwrap(); + + // The unwrap will panic if the remaining time is negative + timer.time_until_next_call().unwrap(); } #[test] fn test_is_ready() { - let clock = Clock::steady(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 1e6 as i64; // 1 millisecond. - let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); - let is_ready = dut.is_ready(); - assert!(is_ready.is_ok()); - assert!(!is_ready.unwrap()); - thread::sleep(time::Duration::from_micros(1100)); - let is_ready = dut.is_ready(); - assert!(is_ready.is_ok()); - assert!(is_ready.unwrap()); + let timer = Timer::new( + &context.handle, + Duration::from_millis(1), + Clock::steady(), + (|| {}).into_repeating_timer_callback(), + ) + .unwrap(); + + assert!(!timer.is_ready().unwrap()); + + // Sleep until the period has elapsed + thread::sleep(Duration::from_micros(1100)); + + assert!(timer.is_ready().unwrap()); } #[test] fn test_callback() { let clock = Clock::steady(); + let initial_time = clock.now(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 1e6 as i64; // 1 millisecond. - let foo = Arc::new(Mutex::new(0i64)); - let foo_callback = foo.clone(); - let dut = Timer::new( - &clock, - &context, - period_ns, - Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + let executed = Arc::new(AtomicBool::new(false)); + + let timer = Timer::new( + &context.handle, + Duration::from_millis(1), + clock, + create_timer_callback_for_testing(initial_time, Arc::clone(&executed)), ) .unwrap(); - dut.callback.unwrap()(123); - assert_eq!(*foo.lock().unwrap(), 123); + + timer.call().unwrap(); + assert!(executed.load(Ordering::Acquire)); } #[test] fn test_execute_when_is_not_ready() { let clock = Clock::steady(); + let initial_time = clock.now(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 1e6 as i64; // 1 millisecond. - let foo = Arc::new(Mutex::new(0i64)); - let foo_callback = foo.clone(); - let dut = Timer::new( - &clock, - &context, - period_ns, - Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + let executed = Arc::new(AtomicBool::new(false)); + + let timer = Timer::new( + &context.handle, + Duration::from_millis(1), + clock, + create_timer_callback_for_testing(initial_time, Arc::clone(&executed)), ) .unwrap(); - assert!(dut.execute().is_ok()); - assert_eq!(*foo.lock().unwrap(), 0i64); + + timer.execute().unwrap(); + assert!(!executed.load(Ordering::Acquire)); } #[test] fn test_execute_when_is_ready() { let clock = Clock::steady(); + let initial_time = clock.now(); let context = Context::new(vec![]).unwrap(); - let period_ns: i64 = 1e6 as i64; // 1 millisecond. - let foo = Arc::new(Mutex::new(0i64)); - let foo_callback = foo.clone(); - let dut = Timer::new( - &clock, - &context, - period_ns, - Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + let executed = Arc::new(AtomicBool::new(false)); + + let timer = Timer::new( + &context.handle, + Duration::from_millis(1), + clock, + create_timer_callback_for_testing(initial_time, Arc::clone(&executed)), ) .unwrap(); - thread::sleep(time::Duration::from_micros(1500)); - assert!(dut.execute().is_ok()); - let x = *foo.lock().unwrap(); - assert!(x > 1500000i64); - assert!(x < 1600000i64); + + thread::sleep(time::Duration::from_millis(2)); + + timer.execute().unwrap(); + assert!(executed.load(Ordering::Acquire)); + } + + fn create_timer_callback_for_testing( + initial_time: Time, + executed: Arc, + ) -> AnyTimerCallback { + (move |t: Time| { + assert!(t + .compare_with(&initial_time, |t, initial| t >= initial) + .unwrap()); + executed.store(true, Ordering::Release); + }) + .into_oneshot_timer_callback() } } diff --git a/rclrs/src/timer/timer_callback.rs b/rclrs/src/timer/timer_callback.rs new file mode 100644 index 000000000..740bff26f --- /dev/null +++ b/rclrs/src/timer/timer_callback.rs @@ -0,0 +1,75 @@ +use crate::{Time, Timer}; + +/// A callback that can be triggered when a timer elapses. +pub enum AnyTimerCallback { + /// This callback will be triggered repeatedly, each time the period of the + /// timer elapses. + Repeating(Box), + /// This callback will be triggered exactly once, the first time the period + /// of the timer elapses. + OneShot(Box), + /// Do nothing when the timer elapses. This can be replaced later so that + /// the timer does something. + None, +} + +/// This trait is used to create timer callbacks for repeating timers. Incoming +/// callbacks can take the current [`Time`] as an argument, or [`Time`], or take +/// no argument at all. +pub trait TimerCallRepeating: Send + 'static { + /// Convert a suitable object into a repeating timer callback + fn into_repeating_timer_callback(self) -> AnyTimerCallback; +} + +impl TimerCallRepeating<()> for Func +where + Func: FnMut() + Send + 'static, +{ + fn into_repeating_timer_callback(mut self) -> AnyTimerCallback { + AnyTimerCallback::Repeating(Box::new(move |_| self())) + } +} + +impl TimerCallRepeating for Func +where + Func: FnMut(&Timer) + Send + 'static, +{ + fn into_repeating_timer_callback(mut self) -> AnyTimerCallback { + AnyTimerCallback::Repeating(Box::new(move |t| self(t))) + } +} + +/// This trait is used to create timer callbacks for one-shot timers. Incoming +/// callbacks can take the current [`Time`] as an argument, or [`Time`], or take +/// no argument at all. +pub trait TimerCallOnce: Send + 'static { + /// Convert a suitable object into a one-shot timer callback + fn into_oneshot_timer_callback(self) -> AnyTimerCallback; +} + +impl TimerCallOnce<()> for Func +where + Func: FnOnce() + Send + 'static, +{ + fn into_oneshot_timer_callback(self) -> AnyTimerCallback { + AnyTimerCallback::OneShot(Box::new(move |_| self())) + } +} + +impl TimerCallOnce for Func +where + Func: FnOnce(&Timer) + Send + 'static, +{ + fn into_oneshot_timer_callback(self) -> AnyTimerCallback { + AnyTimerCallback::OneShot(Box::new(move |t| self(t))) + } +} + +impl TimerCallOnce