Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions examples/rclrs_timer_demo/src/rclrs_timer_demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
use rclrs::{create_node, Context, Node, RclrsError, Timer};
use std::{
env,
sync::{Arc, Mutex},
sync::Arc,
time::Duration,
};

/// Contains both the node and timer.
struct SimpleTimerNode {
node: Arc<Node>,
#[allow(unused)]
timer: Arc<Timer>,
}

Expand All @@ -20,29 +22,25 @@ impl SimpleTimerNode {
/// The callback will simply print to stdout:
/// "Drinking 🧉 for the xth time every p nanoseconds."
/// where x is the iteration callback counter and p is the period of the timer.
fn new(context: &Context, timer_period_ns: i64) -> Result<Self, RclrsError> {
fn new(context: &Context, timer_period: Duration) -> Result<Self, RclrsError> {
let node = create_node(context, "simple_timer_node")?;
let count: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let timer = node.create_timer(
timer_period_ns,
context,
Some(Box::new(move |_| {
let x = *count.lock().unwrap();
let mut x = 0;
let timer = node.create_timer_repeating(
timer_period,
move || {
x += 1;
println!(
"Drinking 🧉 for the {}th time every {} nanoseconds.",
x, timer_period_ns
"Drinking 🧉 for the {x}th time every {:?}.",
timer_period,
);
*count.lock().unwrap() = x + 1;
})),
None,
},
)?;
Ok(Self { node, timer })
}
}

fn main() -> Result<(), RclrsError> {
let timer_period: i64 = 1e9 as i64; // 1 seconds.
let context = Context::new(env::args()).unwrap();
let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, timer_period).unwrap());
let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, Duration::from_secs(1)).unwrap());
rclrs::spin(simple_timer_node.node.clone())
}
35 changes: 12 additions & 23 deletions rclrs/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,26 @@ impl Clock {
}

fn make(kind: ClockType) -> Self {
let mut rcl_clock;
let rcl_clock;
unsafe {
// SAFETY: Getting a default value is always safe.
rcl_clock = Self::init_generic_clock();
let allocator = rcutils_get_default_allocator();
rcl_clock = Arc::new(Mutex::new(rcl_clock_t {
type_: rcl_clock_type_t::RCL_CLOCK_UNINITIALIZED,
jump_callbacks: std::ptr::null_mut(),
num_jump_callbacks: 0,
get_now: None,
data: std::ptr::null_mut::<std::os::raw::c_void>(),
allocator,
}));
let mut allocator = rcutils_get_default_allocator();
// Function will return Err(_) only if there isn't enough memory to allocate a clock
// object.
rcl_clock_init(kind.into(), &mut rcl_clock, &mut allocator)
rcl_clock_init(kind.into(), &mut *rcl_clock.lock().unwrap(), &mut allocator)
.ok()
.unwrap();
}
Self {
kind,
rcl_clock: Arc::new(Mutex::new(rcl_clock)),
}
Self { kind, rcl_clock }
}

/// Returns the clock's `rcl_clock_t`.
Expand All @@ -106,22 +111,6 @@ impl Clock {
clock: Arc::downgrade(&self.rcl_clock),
}
}

/// Helper function to privately initialize a default clock, with the same behavior as
/// `rcl_init_generic_clock`. By defining a private function instead of implementing
/// `Default`, we avoid exposing a public API to create an invalid clock.
// SAFETY: Getting a default value is always safe.
unsafe fn init_generic_clock() -> rcl_clock_t {
let allocator = rcutils_get_default_allocator();
rcl_clock_t {
type_: rcl_clock_type_t::RCL_CLOCK_UNINITIALIZED,
jump_callbacks: std::ptr::null_mut::<rcl_jump_callback_info_t>(),
num_jump_callbacks: 0,
get_now: None,
data: std::ptr::null_mut::<std::os::raw::c_void>(),
allocator,
}
}
}

impl Drop for ClockSource {
Expand Down
11 changes: 11 additions & 0 deletions rclrs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub enum RclrsError {
},
/// It was attempted to add a waitable to a wait set twice.
AlreadyAddedToWaitSet,
/// A negative duration was obtained from rcl which should have been positive.
///
/// The value represents nanoseconds.
NegativeDuration(i64),
}

impl Display for RclrsError {
Expand All @@ -48,6 +52,12 @@ impl Display for RclrsError {
"Could not add entity to wait set because it was already added to a wait set"
)
}
RclrsError::NegativeDuration(duration) => {
write!(
f,
"A duration was negative when it should not have been: {duration:?}"
)
}
}
}
}
Expand Down Expand Up @@ -80,6 +90,7 @@ impl Error for RclrsError {
RclrsError::UnknownRclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
RclrsError::StringContainsNul { err, .. } => Some(err).map(|e| e as &dyn Error),
RclrsError::AlreadyAddedToWaitSet => None,
RclrsError::NegativeDuration(_) => None,
}
}
}
Expand Down
137 changes: 115 additions & 22 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use rosidl_runtime_rs::Message;

pub use self::{builder::*, graph::*};
use crate::{
rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, LogParams,
Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher,
QoSProfile, RclrsError, Service, ServiceBase, Subscription, SubscriptionBase,
SubscriptionCallback, TimeSource, Timer, TimerCallback, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
rcl_bindings::*, AnyTimerCallback, Client, ClientBase, Clock, Context, ContextHandle,
GuardCondition, IntoTimerOptions, LogParams, Logger, ParameterBuilder, ParameterInterface,
ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase,
Subscription, SubscriptionBase, SubscriptionCallback, TimeSource, Timer, TimerCallOnce,
TimerCallRepeating, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand Down Expand Up @@ -341,22 +342,23 @@ impl Node {
Ok(subscription)
}

/// Creates a [`Timer`][1].
/// Creates a [`Timer`].
///
/// For more ergonomic usage see also:
/// * [`Self::create_timer_repeating`]
/// * [`Self::create_timer_oneshot`]
/// * [`Self::create_timer_inert`]
///
/// [1]: crate::Timer
/// TODO: make timer's lifetime depend on node's lifetime.
pub fn create_timer(
pub fn create_timer<'a>(
&self,
period_ns: i64,
context: &Context,
callback: Option<TimerCallback>,
clock: Option<Clock>,
options: impl IntoTimerOptions<'a>,
callback: AnyTimerCallback,
) -> Result<Arc<Timer>, RclrsError> {
let clock_used = match clock {
Some(value) => value,
None => self.get_clock(),
};
let timer = Timer::new(&clock_used, &context, period_ns, callback)?;
let options = options.into_timer_options();
let clock = options.clock.as_clock(self);

let timer = Timer::new(&self.handle.context_handle, options.period, clock, callback)?;
let timer = Arc::new(timer);
self.timers_mtx
.lock()
Expand All @@ -365,6 +367,48 @@ impl Node {
Ok(timer)
}

/// Create a [`Timer`] with a repeating callback.
///
/// See also:
/// * [`Self::create_timer_oneshot`]
/// * [`Self::create_timer_inert`]
pub fn create_timer_repeating<'a, Args>(
&self,
options: impl IntoTimerOptions<'a>,
callback: impl TimerCallRepeating<Args>,
) -> Result<Arc<Timer>, RclrsError> {
self.create_timer(options, callback.into_repeating_timer_callback())
}

/// Create a [`Timer`] whose callback will be triggered once after the period
/// of the timer has elapsed. After that you will need to use
/// [`Timer::set_callback`] or a related method or else nothing will happen
/// the following times that the `Timer` elapses.
///
/// See also:
/// * [`Self::create_timer_repeating`]
/// * [`Self::create_time_inert`]
pub fn create_timer_oneshot<'a, Args>(
&self,
options: impl IntoTimerOptions<'a>,
callback: impl TimerCallOnce<Args>,
) -> Result<Arc<Timer>, RclrsError> {
self.create_timer(options, callback.into_oneshot_timer_callback())
}

/// Create a [`Timer`] without a callback. Nothing will happen when this
/// `Timer` elapses until you use [`Timer::set_callback`] or a related method.
///
/// See also:
/// * [`Self::create_timer_repeating`]
/// * [`Self::create_timer_oneshot`]
pub fn create_timer_inert<'a>(
&self,
options: impl IntoTimerOptions<'a>,
) -> Result<Arc<Timer>, RclrsError> {
self.create_timer(options, AnyTimerCallback::None)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
{ self.subscriptions_mtx.lock().unwrap() }
Expand Down Expand Up @@ -532,6 +576,11 @@ mod tests {
use super::*;
use crate::test_helpers::*;

use std::{
time::{Duration, Instant},
sync::{Arc, atomic::{AtomicU64, Ordering}},
};

#[test]
fn traits() {
assert_send::<Node>();
Expand Down Expand Up @@ -584,20 +633,64 @@ mod tests {
}

#[test]
fn test_create_timer_without_clock_source() -> Result<(), RclrsError> {
let timer_period_ns: i64 = 1e6 as i64; // 1 millisecond.
fn test_create_timer() -> Result<(), RclrsError> {
let context = Context::new([])?;
let dut = NodeBuilder::new(&context, "node_with_timer")
let node = NodeBuilder::new(&context, "node_with_timer")
.namespace("test_create_timer")
.build()?;

let _timer =
dut.create_timer(timer_period_ns, &context, Some(Box::new(move |_| {})), None)?;
assert_eq!(dut.live_timers().len(), 1);
let repeat_counter = Arc::new(AtomicU64::new(0));
let repeat_counter_check = Arc::clone(&repeat_counter);
let _repeating_timer = node.create_timer_repeating(
Duration::from_millis(1),
move || { repeat_counter.fetch_add(1, Ordering::AcqRel); },
)?;
assert_eq!(node.live_timers().len(), 1);

let oneshot_counter = Arc::new(AtomicU64::new(0));
let oneshot_counter_check = Arc::clone(&oneshot_counter);
let _oneshot_timer = node.create_timer_oneshot(
Duration::from_millis(1)
.node_time(),
move || { oneshot_counter.fetch_add(1, Ordering::AcqRel); },
)?;

let oneshot_resetting_counter = Arc::new(AtomicU64::new(0));
let oneshot_resetting_counter_check = Arc::clone(&oneshot_resetting_counter);
let _oneshot_resetting_timer = node.create_timer_oneshot(
Duration::from_millis(1),
move |timer: &Timer| {
recursive_oneshot(timer, oneshot_resetting_counter);
},
);

let start = Instant::now();
while start.elapsed() < Duration::from_millis(10) {
crate::spin_once(Arc::clone(&node), Some(Duration::from_millis(10)))?;
}

// We give a little leeway to the exact count since timers won't always
// be triggered perfectly. The important thing is that it was
// successfully called repeatedly.
assert!(repeat_counter_check.load(Ordering::Acquire) > 5);
assert!(oneshot_resetting_counter_check.load(Ordering::Acquire) > 5);

// This should only have been triggered exactly once
assert_eq!(oneshot_counter_check.load(Ordering::Acquire), 1);

Ok(())
}

fn recursive_oneshot(
timer: &Timer,
counter: Arc<AtomicU64>,
) {
counter.fetch_add(1, Ordering::AcqRel);
timer.set_oneshot(move |timer: &Timer| {
recursive_oneshot(timer, counter);
});
}

#[test]
fn test_logger_name() -> Result<(), RclrsError> {
// Use helper to create 2 nodes for us
Expand Down
Loading