@@ -22,35 +22,69 @@ pub use wait::*;
2222use rcl_bindings:: rcl_context_is_valid;
2323use std:: time:: Duration ;
2424
25+ use std:: pin:: Pin ;
26+
27+ pub use rcl_bindings:: rmw_request_id_t;
28+
2529/// Polls the node for new messages and executes the corresponding callbacks.
2630///
2731/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2832///
2933/// This may under some circumstances return
30- /// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
34+ /// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
35+ /// set spuriously wakes up.
3136/// This can usually be ignored.
3237///
3338/// [1]: crate::SubscriberErrorCode
39+ /// [2]: crate::ClientErrorCode
40+ /// [3]: crate::ServiceErrorCode
3441pub fn spin_once ( node : & Node , timeout : Option < Duration > ) -> Result < ( ) , RclReturnCode > {
3542 let live_subscriptions = node. live_subscriptions ( ) ;
43+ let live_clients = node. live_clients ( ) ;
44+ let live_services = node. live_services ( ) ;
3645 let ctx = Context {
3746 handle : node. context . clone ( ) ,
3847 } ;
39- let mut wait_set = WaitSet :: new ( live_subscriptions. len ( ) , & ctx) ?;
48+ let mut wait_set = WaitSet :: new (
49+ live_subscriptions. len ( ) ,
50+ 0 ,
51+ 0 ,
52+ live_clients. len ( ) ,
53+ live_services. len ( ) ,
54+ 0 ,
55+ & ctx,
56+ ) ?;
4057
4158 for live_subscription in & live_subscriptions {
4259 wait_set. add_subscription ( live_subscription. clone ( ) ) ?;
4360 }
4461
62+ for live_client in & live_clients {
63+ wait_set. add_client ( live_client. clone ( ) ) ?;
64+ }
65+
66+ for live_service in & live_services {
67+ wait_set. add_service ( live_service. clone ( ) ) ?;
68+ }
69+
4570 let ready_entities = wait_set. wait ( timeout) ?;
71+
4672 for ready_subscription in ready_entities. subscriptions {
4773 ready_subscription. execute ( ) ?;
4874 }
4975
76+ for ready_client in ready_entities. clients {
77+ ready_client. execute ( ) ?;
78+ }
79+
80+ for ready_service in ready_entities. services {
81+ ready_service. execute ( ) ?;
82+ }
83+
5084 Ok ( ( ) )
5185}
5286
53- /// Convenience function for calling [`spin_once`] in a loop.
87+ /// Convenience function for calling [`rclrs:: spin_once`] in a loop.
5488///
5589/// This function additionally checks that the context is still valid.
5690pub fn spin ( node : & Node ) -> Result < ( ) , RclReturnCode > {
@@ -70,6 +104,57 @@ pub fn spin(node: &Node) -> Result<(), RclReturnCode> {
70104 } ;
71105 }
72106 }
73-
74107 Ok ( ( ) )
75108}
109+
110+ #[ derive( Clone ) ]
111+ struct RclWaker { }
112+
113+ fn rclwaker_wake ( _s : & RclWaker ) { }
114+
115+ fn rclwaker_wake_by_ref ( _s : & RclWaker ) { }
116+
117+ fn rclwaker_clone ( s : & RclWaker ) -> RawWaker {
118+ let arc = unsafe { Arc :: from_raw ( s) } ;
119+ std:: mem:: forget ( arc. clone ( ) ) ;
120+ RawWaker :: new ( Arc :: into_raw ( arc) as * const ( ) , & VTABLE )
121+ }
122+
123+ const VTABLE : RawWakerVTable = unsafe {
124+ RawWakerVTable :: new (
125+ |s| rclwaker_clone ( & * ( s as * const RclWaker ) ) ,
126+ |s| rclwaker_wake ( & * ( s as * const RclWaker ) ) ,
127+ |s| rclwaker_wake_by_ref ( & * ( s as * const RclWaker ) ) ,
128+ |s| drop ( Arc :: from_raw ( s as * const RclWaker ) ) ,
129+ )
130+ } ;
131+
132+ fn rclwaker_into_waker ( s : * const RclWaker ) -> Waker {
133+ let raw_waker = RawWaker :: new ( s as * const ( ) , & VTABLE ) ;
134+ unsafe { Waker :: from_raw ( raw_waker) }
135+ }
136+
137+ pub fn spin_until_future_complete < T : Unpin + Clone > (
138+ node : & node:: Node ,
139+ mut future : Arc < Mutex < Box < RclFuture < T > > > > ,
140+ ) -> Result < <future:: RclFuture < T > as Future >:: Output , RclReturnCode > {
141+ let rclwaker = Arc :: new ( RclWaker { } ) ;
142+ let waker = rclwaker_into_waker ( Arc :: into_raw ( rclwaker) ) ;
143+ let mut cx = std:: task:: Context :: from_waker ( & waker) ;
144+
145+ loop {
146+ let context_valid = unsafe { rcl_context_is_valid ( & mut * node. context . lock ( ) as * mut _ ) } ;
147+ if context_valid {
148+ if let Some ( error) = spin_once ( node, None ) . err ( ) {
149+ match error {
150+ RclReturnCode :: Timeout => continue ,
151+ error => return Err ( error) ,
152+ } ;
153+ } ;
154+ match Future :: poll ( Pin :: new ( & mut * future. lock ( ) ) , & mut cx) {
155+ Poll :: Ready ( val) => break Ok ( val) ,
156+ Poll :: Pending => continue ,
157+ } ;
158+ }
159+ }
160+ }
0 commit comments