diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index b28e62020..66ba91b41 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -73,10 +73,24 @@ function(configure_python_c_extension_library _library_name) ) endfunction() + +# Sigint handling stuff +add_library( + rclpy_sigint + SHARED src/rclpy/sigint_gc.c +) +target_include_directories(rclpy_sigint PUBLIC ${CMAKE_SOURCE_DIR}) +configure_python_c_extension_library(rclpy_sigint) +ament_target_dependencies(rclpy_sigint + "rcl" +) + +# Main library add_library( rclpy SHARED src/rclpy/_rclpy.c ) +target_link_libraries(rclpy rclpy_sigint) configure_python_c_extension_library(rclpy) ament_target_dependencies(rclpy "rcl" @@ -93,6 +107,18 @@ ament_target_dependencies(rclpy_logging "rcutils" ) +# WaitSet wrapper library +add_library( + rclpy_wait_set + SHARED src/rclpy/_rclpy_wait_set.c +) +target_link_libraries(rclpy_wait_set rclpy_sigint) +configure_python_c_extension_library(rclpy_wait_set) +ament_target_dependencies(rclpy_wait_set + "rcl" + "rcutils" +) + if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() @@ -110,7 +136,7 @@ if(BUILD_TESTING) WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}" PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}" APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path} - TIMEOUT 90 + TIMEOUT 120 ) endif() endif() diff --git a/rclpy/rclpy/client.py b/rclpy/rclpy/client.py index 15bd45151..92411e034 100644 --- a/rclpy/rclpy/client.py +++ b/rclpy/rclpy/client.py @@ -14,7 +14,9 @@ import threading +from rclpy.graph_listener import GraphEventSubscription from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set import rclpy.utilities @@ -23,24 +25,18 @@ class ResponseThread(threading.Thread): def __init__(self, client): threading.Thread.__init__(self) self.client = client - self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() - _rclpy.rclpy_wait_set_init(self.wait_set, 0, 1, 0, 1, 0) - _rclpy.rclpy_wait_set_clear_entities('client', self.wait_set) - _rclpy.rclpy_wait_set_add_entity( - 'client', self.wait_set, self.client.client_handle) + self._wait_set = _rclpy_wait_set.WaitSet() def run(self): [sigint_gc, sigint_gc_handle] = _rclpy.rclpy_get_sigint_guard_condition() - _rclpy.rclpy_wait_set_add_entity('guard_condition', self.wait_set, sigint_gc) + self._wait_set.add_guard_conditions([sigint_gc]) + self._wait_set.add_clients([self.client.client_handle]) - _rclpy.rclpy_wait(self.wait_set, -1) - - guard_condition_ready_list = \ - _rclpy.rclpy_get_ready_entities('guard_condition', self.wait_set) + self._wait_set.wait(-1) # destroying here to make sure we dont call shutdown before cleaning up _rclpy.rclpy_destroy_entity(sigint_gc) - if sigint_gc_handle in guard_condition_ready_list: + if self._wait_set.is_ready(sigint_gc): rclpy.utilities.shutdown() return response = _rclpy.rclpy_take_response( @@ -77,3 +73,39 @@ def wait_for_future(self): thread1 = ResponseThread(self) thread1.start() thread1.join() + + def service_is_ready(self): + return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle) + + def wait_for_service(self, timeout_sec=None): + """ + Block until the service is available. + + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 + :type timeout_sec: float or None + :rtype: bool + :returns: true if the service is available + """ + timeout_nsec = rclpy.utilities.timeout_sec_to_nsec(timeout_sec) + result = self.service_is_ready() + if result or timeout_sec == 0: + return result + + event = threading.Event() + + def on_graph_event(): + nonlocal self + nonlocal event + nonlocal result + result = self.service_is_ready() + if result: + event.set() + + def on_timeout(): + nonlocal event + event.set() + + with GraphEventSubscription(self.node_handle, on_graph_event, timeout_nsec, on_timeout): + event.wait() + + return result diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 7825ac8aa..2a2bb09e3 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -12,26 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor import multiprocessing -from threading import Condition as _Condition -from threading import Lock as _Lock +from threading import Condition +from threading import Lock -from rclpy.constants import S_TO_NS +from rclpy.guard_condition import GuardCondition +from rclpy.guard_condition import KeyboardInterruptGuardCondition from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy -from rclpy.timer import WallTimer as _WallTimer +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set +from rclpy.timer import WallTimer from rclpy.utilities import ok - - -class _WaitSet: - """Make sure the wait set gets destroyed when a generator exits.""" - - def __enter__(self): - self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() - return self.wait_set - - def __exit__(self, t, v, tb): - _rclpy.rclpy_destroy_wait_set(self.wait_set) +from rclpy.utilities import timeout_sec_to_nsec class _WorkTracker: @@ -40,7 +32,7 @@ class _WorkTracker: def __init__(self): # Number of tasks that are being executed self._num_work_executing = 0 - self._work_condition = _Condition() + self._work_condition = Condition() def __enter__(self): """Increment the amount of executing work by 1.""" @@ -57,16 +49,18 @@ def wait(self, timeout_sec=None): """ Wait until all work completes. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None - :rtype: bool True if all work completed + :rtype: bool + :returns: True if all work completed """ + if timeout_sec is not None and timeout_sec < 0: + timeout_sec = None # Wait for all work to complete - if timeout_sec is None or timeout_sec >= 0: - with self._work_condition: - if not self._work_condition.wait_for( - lambda: self._num_work_executing == 0, timeout_sec): - return False + with self._work_condition: + if not self._work_condition.wait_for( + lambda: self._num_work_executing == 0, timeout_sec): + return False return True @@ -85,14 +79,15 @@ class Executor: def __init__(self): super().__init__() self._nodes = set() - self._nodes_lock = _Lock() + self._nodes_lock = Lock() # This is triggered when wait_for_ready_callbacks should rebuild the wait list - gc, gc_handle = _rclpy.rclpy_create_guard_condition() - self._guard_condition = gc - self._guard_condition_handle = gc_handle + self._guard_condition = GuardCondition(None, None) + # Triggered by signal handler for sigint + self._sigint_gc = KeyboardInterruptGuardCondition() # True if shutdown has been called self._is_shutdown = False self._work_tracker = _WorkTracker() + self._wait_set = _rclpy_wait_set.WaitSet() def shutdown(self, timeout_sec=None): """ @@ -100,7 +95,7 @@ def shutdown(self, timeout_sec=None): Return true if all outstanding callbacks finished executing. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :rtype: bool """ @@ -110,14 +105,17 @@ def shutdown(self, timeout_sec=None): # Clean up stuff that won't be used anymore with self._nodes_lock: self._nodes = set() - _rclpy.rclpy_destroy_entity(self._guard_condition) + _rclpy.rclpy_destroy_entity(self._guard_condition.guard_handle) + _rclpy.rclpy_destroy_entity(self._sigint_gc.guard_handle) self._guard_condition = None + self._sigint_gc = None return True def __del__(self): if self._guard_condition is not None: - _rclpy.rclpy_destroy_entity(self._guard_condition) + _rclpy.rclpy_destroy_entity(self._guard_condition.guard_handle) + _rclpy.rclpy_destroy_entity(self._sigint_gc.guard_handle) def add_node(self, node): """ @@ -130,7 +128,7 @@ def add_node(self, node): with self._nodes_lock: self._nodes.add(node) # Rebuild the wait set so it includes this new node - _rclpy.rclpy_trigger_guard_condition(self._guard_condition) + self._guard_condition.trigger() def get_nodes(self): """ @@ -139,7 +137,7 @@ def get_nodes(self): :rtype: list """ with self._nodes_lock: - return [node for node in self._nodes] + return list(self._nodes) def spin(self): """Execute callbacks until shutdown.""" @@ -152,11 +150,11 @@ def spin_once(self, timeout_sec=None): A custom executor should use :func:`Executor.wait_for_ready_callbacks` to get work. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :rtype: None """ - raise NotImplementedError + raise NotImplementedError() def _take_timer(self, tmr): _rclpy.rclpy_call_timer(tmr.timer_handle) @@ -228,14 +226,14 @@ def handler(): if is_shutdown or not entity.callback_group.beginning_execution(entity): # Didn't get the callback, or the executor has been ordered to stop entity._executor_event = False - _rclpy.rclpy_trigger_guard_condition(gc) + gc.trigger() return with work_tracker: arg = take_from_wait_list(entity) # Signal that this has been 'taken' and can be added back to the wait list entity._executor_event = False - _rclpy.rclpy_trigger_guard_condition(gc) + gc.trigger() try: call_callback(entity, arg) @@ -243,154 +241,123 @@ def handler(): entity.callback_group.ending_execution(entity) # Signal that work has been done so the next callback in a mutually exclusive # callback group can get executed - _rclpy.rclpy_trigger_guard_condition(gc) + gc.trigger() return handler - def _filter_eligible_entities(self, entities): + def _new_callbacks(self, nodes, wait_set): """ - Filter entities that should not be put onto the wait list. + Yield brand new work to executor implementations. - :param entity_list: Entities to be checked for eligibility - :type entity_list: list - :rtype: list + :param nodes: nodes to yield work for + :type nodes: list + :param wait_set: wait set that has already been waited on + :type wait_set: rclpy.wait_set.WaitSet + :rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)] """ - return [e for e in entities if e.callback_group.can_execute(e) and not e._executor_event] + yielded_work = False + # Process ready entities one node at a time + for node in nodes: + for tmr in node.timers: + if wait_set.is_ready(tmr) and tmr.callback_group.can_execute(tmr): + # TODO(Sloretz) Which rcl cancelled timer bug does this workaround? + if not _rclpy.rclpy_is_timer_ready(tmr.timer_handle): + continue + handler = self._make_handler(tmr, self._take_timer, self._execute_timer) + yielded_work = True + yield handler, tmr, node + + for sub in node.subscriptions: + if (wait_set.is_ready(sub) and + sub.callback_group.can_execute(sub)): + handler = self._make_handler( + sub, self._take_subscription, self._execute_subscription) + yielded_work = True + yield handler, sub, node + + for gc in node.guards: + if gc._executor_triggered and gc.callback_group.can_execute(gc): + handler = self._make_handler( + gc, self._take_guard_condition, self._execute_guard_condition) + yielded_work = True + yield handler, gc, node + + for cli in node.clients: + if wait_set.is_ready(cli) and cli.callback_group.can_execute(cli): + handler = self._make_handler(cli, self._take_client, self._execute_client) + yielded_work = True + yield handler, cli, node + + for srv in node.services: + if wait_set.is_ready(srv) and srv.callback_group.can_execute(srv): + handler = self._make_handler(srv, self._take_service, self._execute_service) + yielded_work = True + yield handler, srv, node + return yielded_work def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None): """ Yield callbacks that are ready to be performed. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :param nodes: A list of nodes to wait on. Wait on all nodes if None. :type nodes: list or None :rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)] """ timeout_timer = None - # Get timeout in nanoseconds. 0 = don't wait. < 0 means block forever - timeout_nsec = None - if timeout_sec is None: - timeout_nsec = -1 - elif timeout_sec <= 0: - timeout_nsec = 0 - else: - timeout_nsec = int(float(timeout_sec) * S_TO_NS) - timeout_timer = _WallTimer(None, None, timeout_nsec) + timeout_nsec = timeout_sec_to_nsec(timeout_sec) + if timeout_nsec > 0: + timeout_timer = WallTimer(None, None, timeout_nsec) if nodes is None: nodes = self.get_nodes() yielded_work = False while not yielded_work and not self._is_shutdown: + self._wait_set.clear() # Gather entities that can be waited on - subscriptions = [] + + def can_execute(entity): + return not entity._executor_event and entity.callback_group.can_execute(entity) + guards = [] - timers = [] - clients = [] - services = [] for node in nodes: - subscriptions.extend(self._filter_eligible_entities(node.subscriptions)) - timers.extend(self._filter_eligible_entities(node.timers)) - clients.extend(self._filter_eligible_entities(node.clients)) - services.extend(self._filter_eligible_entities(node.services)) - node_guards = self._filter_eligible_entities(node.guards) - # retrigger a guard condition that was triggered but not handled - for gc in node_guards: - if gc._executor_triggered: - gc.trigger() - guards.extend(node_guards) - (sigint_gc, sigint_gc_handle) = _rclpy.rclpy_get_sigint_guard_condition() + self._wait_set.add_subscriptions(filter(can_execute, node.subscriptions)) + self._wait_set.add_timers(filter(can_execute, node.timers)) + self._wait_set.add_clients(filter(can_execute, node.clients)) + self._wait_set.add_services(filter(can_execute, node.services)) + guards.extend(filter(can_execute, node.guards)) + + # retrigger a guard condition that was triggered but not handled + for gc in guards: + if gc._executor_triggered: + gc.trigger() + if timeout_timer is not None: - timers.append(timeout_timer) - - # Construct a wait set - with _WaitSet() as wait_set: - _rclpy.rclpy_wait_set_init( - wait_set, - len(subscriptions), - len(guards) + 2, - len(timers), - len(clients), - len(services)) - - entities = { - 'subscription': (subscriptions, 'subscription_handle'), - 'guard_condition': (guards, 'guard_handle'), - 'client': (clients, 'client_handle'), - 'service': (services, 'service_handle'), - 'timer': (timers, 'timer_handle'), - } - for entity, (handles, handle_name) in entities.items(): - _rclpy.rclpy_wait_set_clear_entities(entity, wait_set) - for h in handles: - _rclpy.rclpy_wait_set_add_entity( - entity, wait_set, h.__getattribute__(handle_name) - ) - _rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc) - _rclpy.rclpy_wait_set_add_entity( - 'guard_condition', wait_set, self._guard_condition) - - # Wait for something to become ready - _rclpy.rclpy_wait(wait_set, timeout_nsec) - - # get ready entities - subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set) - guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set) - timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set) - clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set) - services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set) + self._wait_set.add_timers((timeout_timer,)) + + self._wait_set.add_guard_conditions((self._sigint_gc, self._guard_condition)) + self._wait_set.add_guard_conditions(guards) + + # Wait for something to become ready + self._wait_set.wait(timeout_nsec) # Check sigint guard condition - if sigint_gc_handle in guards_ready: - raise KeyboardInterrupt - _rclpy.rclpy_destroy_entity(sigint_gc) + if self._wait_set.is_ready(self._sigint_gc): + raise KeyboardInterrupt() - # Mark all guards as triggered before yielding any handlers since they're auto-taken - for gc in [g for g in guards if g.guard_pointer in guards_ready]: - gc._executor_triggered = True + # Mark all guards as triggered before yielding since they're auto-taken + for gc in guards: + if self._wait_set.is_ready(gc): + gc._executor_triggered = True - # Process ready entities one node at a time - for node in nodes: - for tmr in [t for t in node.timers if t.timer_pointer in timers_ready]: - # Check that a timer is ready to workaround rcl issue with cancelled timers - if _rclpy.rclpy_is_timer_ready(tmr.timer_handle): - if tmr.callback_group.can_execute(tmr): - handler = self._make_handler( - tmr, self._take_timer, self._execute_timer) - yielded_work = True - yield handler, tmr, node - - for sub in [s for s in node.subscriptions if s.subscription_pointer in subs_ready]: - if sub.callback_group.can_execute(sub): - handler = self._make_handler( - sub, self._take_subscription, self._execute_subscription) - yielded_work = True - yield handler, sub, node - - for gc in [g for g in node.guards if g._executor_triggered]: - if gc.callback_group.can_execute(gc): - handler = self._make_handler( - gc, self._take_guard_condition, self._execute_guard_condition) - yielded_work = True - yield handler, gc, node - - for client in [c for c in node.clients if c.client_pointer in clients_ready]: - if client.callback_group.can_execute(client): - handler = self._make_handler( - client, self._take_client, self._execute_client) - yielded_work = True - yield handler, client, node - - for srv in [s for s in node.services if s.service_pointer in services_ready]: - if srv.callback_group.can_execute(srv): - handler = self._make_handler( - srv, self._take_service, self._execute_service) - yielded_work = True - yield handler, srv, node + yielded_work = yield from self._new_callbacks(nodes, self._wait_set) # Check timeout timer - if (timeout_nsec == 0 or - (timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)): + if ( + timeout_nsec == 0 or + (timeout_timer is not None and self._wait_set.is_ready(timeout_timer)) + ): break @@ -399,13 +366,18 @@ class SingleThreadedExecutor(Executor): def __init__(self): super().__init__() + self._callback_iter = None def spin_once(self, timeout_sec=None): + # Reuse the same callback iterator to avoid unecessary calls to rcl_wait + if self._callback_iter is None: + self._callback_iter = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) try: - handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec)) - handler() + handler, entity, node = next(self._callback_iter) except StopIteration: - pass + self._callback_iter = None + else: + handler() class MultiThreadedExecutor(Executor): @@ -421,16 +393,21 @@ def __init__(self, num_threads=None): :type num_threads: int """ super().__init__() + self._callback_iter = None if num_threads is None: try: num_threads = multiprocessing.cpu_count() except NotImplementedError: num_threads = 1 - self._executor = _ThreadPoolExecutor(num_threads) + self._executor = ThreadPoolExecutor(num_threads) def spin_once(self, timeout_sec=None): + # Reuse the same callback iterator to avoid unecessary calls to rcl_wait + if self._callback_iter is None: + self._callback_iter = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) try: - handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec)) - self._executor.submit(handler) + handler, entity, node = next(self._callback_iter) except StopIteration: - pass + self._callback_iter = None + else: + self._executor.submit(handler) diff --git a/rclpy/rclpy/graph_listener.py b/rclpy/rclpy/graph_listener.py new file mode 100644 index 000000000..68f6fb69e --- /dev/null +++ b/rclpy/rclpy/graph_listener.py @@ -0,0 +1,218 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import traceback + +from rclpy.constants import S_TO_NS +from rclpy.guard_condition import GuardCondition +from rclpy.guard_condition import NodeGraphGuardCondition +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set +import rclpy.logging +from rclpy.timer import WallTimer +from rclpy.utilities import ok + + +class GraphListenerSingleton: + """Manage a thread to listen for graph events.""" + + def __new__(cls, *args, **kwargs): + if not hasattr(cls, '__singleton'): + setattr(cls, '__singleton', super().__new__(cls, *args, **kwargs)) + return getattr(cls, '__singleton') + + def __init__(self): + # Maps guard_condition pointers to guard condition instances + self._guards = {} + # Maps guard_condition pointers to a list of subscriber callbacks + self._callbacks = {} + # Maps timer instances to timer callbacks + self._timers = {} + self._gc = GuardCondition(None, None) + self._thread = None + self._lock = threading.RLock() + self._wait_set = _rclpy_wait_set.WaitSet() + + def __del__(self): + self.destroy() + + @classmethod + def destroy(cls): + self = getattr(cls, '__singleton') + if self is not None: + with self._lock: + setattr(cls, '__singleton', None) + self._thread = None + _rclpy.rclpy_destroy_entity(self._gc.guard_handle) + + def _try_start_thread(self): + # Assumes lock is already held + if self._thread is None: + self._thread = threading.Thread(target=self._runner, daemon=True) + self._thread.start() + + def add_timer(self, timer_period_ns, callback): + """ + Call callback when timer triggers. + + :param timer_period_ns: time until timer triggers in nanoseconds + :type timer_period_ns: integer + :param callback: called when the graph updates + :type callback: callable + :rtype: rclpy.timer.WallTimer + """ + with self._lock: + tmr = WallTimer(callback, None, timer_period_ns) + self._timers[tmr] = callback + self._gc.trigger() + self._try_start_thread() + return tmr + + def remove_timer(self, timer): + """ + Remove a timer from the wait set. + + :param timer: A timer returned from add_timer() + :type timer: rclpy.timer.WallTimer instance + """ + with self._lock: + if timer in self._timers: + del self._timers[timer] + self._gc.trigger() + + def add_callback(self, node_handle, callback): + """ + Call callback when node graph gets updates. + + :param node_handle: rclpy node handle + :type node_handle: PyCapsule + :param callback: called when the graph updates + :type callback: callable + """ + with self._lock: + gc = NodeGraphGuardCondition(node_handle) + if gc.guard_pointer not in self._callbacks: + # new node, rebuild wait set + self._callbacks[gc.guard_pointer] = [] + self._guards[gc.guard_pointer] = gc + self._gc.trigger() + + # Add a callback + self._callbacks[gc.guard_pointer].append(callback) + + self._try_start_thread() + # start the thread if necessary + if self._thread is None: + self._thread = threading.Thread(target=self._runner) + self._thread.daemon = True + self._thread.start() + + def remove_callback(self, node_handle, callback): + """ + Stop listening for graph updates for the given node and callback. + + :param node_handle: rclpy node handle + :type node_handle: PyCapsule + :param callback: called when the graph updates + :type callback: callable + """ + with self._lock: + gc = NodeGraphGuardCondition(node_handle) + if gc.guard_pointer in self._callbacks: + # Remove the callback + callbacks = self._callbacks[gc.guard_pointer] + callbacks.remove(callback) + + if not callbacks: + # last subscriber for this node, remove the node and rebuild the wait set + del self._callbacks[gc.guard_pointer] + del self._guards[gc.guard_pointer] + self._gc.trigger() + + def _runner(self): + while True: + self._wait_set.clear() + with self._lock: + self._wait_set.add_guard_conditions(self._guards.values()) + self._wait_set.add_guard_conditions([self._gc]) + self._wait_set.add_timers(self._timers.keys()) + + # Wait 1 second + self._wait_set.wait(S_TO_NS) + + with self._lock: + # Shutdown if necessary + if not ok(): + self.destroy() + break + + # notify graph event subscribers + if not self._thread: + # Asked to shut down thread + return + ready_callbacks = [] + # Guard conditions + for gc_pointer, callback_list in self._callbacks.items(): + gc = self._guards[gc_pointer] + if self._wait_set.is_ready(gc): + for callback in callback_list: + ready_callbacks.append(callback) + # Timers + for tmr, callback in self._timers.items(): + if self._wait_set.is_ready(tmr): + ready_callbacks.append(callback) + _rclpy.rclpy_call_timer(tmr.timer_handle) + # Call callbacks + for callback in ready_callbacks: + try: + callback() + except Exception: + rclpy.logging.logwarn(traceback.format_exc()) + + +class GraphEventSubscription: + """Manage subscription to node graph updates.""" + + def __init__(self, node_handle, callback, timeout_ns=-1, timeout_callback=None): + self._listener = GraphListenerSingleton() + self._node_handle = node_handle + self._callback = callback + self._listener.add_callback(self._node_handle, self._callback) + self._timeout_callback = timeout_callback + self._timer = None + if timeout_ns >= 0: + self._timer = self._listener.add_timer(timeout_ns, self.on_timeout) + + def on_timeout(self): + self._timeout_callback() + self._unsubscribe() + + def _unsubscribe(self): + if self._callback: + self._listener.remove_callback(self._node_handle, self._callback) + self._callback = None + if self._timer: + self._listener.remove_timer(self._timer) + self._timeout_callback = None + self._timer = None + + def __del__(self): + self._unsubscribe() + + def __enter__(self): + return self + + def __exit__(self, t, v, tb): + self._unsubscribe() diff --git a/rclpy/rclpy/guard_condition.py b/rclpy/rclpy/guard_condition.py index fe60b3caf..5ec881f00 100644 --- a/rclpy/rclpy/guard_condition.py +++ b/rclpy/rclpy/guard_condition.py @@ -28,3 +28,15 @@ def __init__(self, callback, callback_group): def trigger(self): _rclpy.rclpy_trigger_guard_condition(self.guard_handle) + + +class KeyboardInterruptGuardCondition: + + def __init__(self): + self.guard_handle, self.guard_pointer = _rclpy.rclpy_get_sigint_guard_condition() + + +class NodeGraphGuardCondition: + + def __init__(self, node_handle): + self.guard_handle, self.guard_pointer = _rclpy.rclpy_get_graph_guard_condition(node_handle) diff --git a/rclpy/rclpy/impl/implementation_singleton.py b/rclpy/rclpy/impl/implementation_singleton.py index 62bf50cdf..95707e742 100644 --- a/rclpy/rclpy/impl/implementation_singleton.py +++ b/rclpy/rclpy/impl/implementation_singleton.py @@ -31,6 +31,7 @@ try: rclpy_implementation = importlib.import_module('._rclpy', package='rclpy') + rclpy_wait_set_implementation = importlib.import_module('._rclpy_wait_set', package='rclpy') except ImportError as e: if os.path.isfile(e.path): e.msg += \ diff --git a/rclpy/rclpy/utilities.py b/rclpy/rclpy/utilities.py index 00de43212..551a14628 100644 --- a/rclpy/rclpy/utilities.py +++ b/rclpy/rclpy/utilities.py @@ -14,6 +14,7 @@ import threading +from rclpy.constants import S_TO_NS from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy g_shutdown_lock = threading.Lock() @@ -38,3 +39,23 @@ def try_shutdown(): def get_rmw_implementation_identifier(): return _rclpy.rclpy_get_rmw_implementation_identifier() + + +def timeout_sec_to_nsec(timeout_sec): + """ + Convert timeout in seconds to rcl compatible timeout in nanoseconds. + + Python tends to use floating point numbers in seconds for timeouts. This utility converts a + python-style timeout to an integer in nanoseconds that can be used by rcl_wait. + + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if < 1ns + :type timeout_sec: float or None + :rtype: int + :returns: rcl_wait compatible timeout in nanoseconds + """ + if timeout_sec is None or timeout_sec < 0: + # Block forever + return -1 + else: + # wait for given time + return int(float(timeout_sec) * S_TO_NS) diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index d6a4e4a75..5d38a1380 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -28,20 +28,44 @@ #include #include -#include +#include "src/rclpy/sigint_gc.h" -static rcl_guard_condition_t * g_sigint_gc_handle; -/// Catch signals -static void catch_function(int signo) +/// Get a guard condition for node graph events +/** + * Raises ValueError if the provided argument is not a PyCapsule. + * + * A successful call will return a list with two elements: + * + * - a Capsule with the pointer of the retrieved rcl_guard_condition_t * structure + * - an integer representing the memory address of the rcl_guard_condition_t + * + * \param[in] A capsule containing rcl_node_t * + * \return a list with the capsule and memory location, or + * \return NULL on failure + */ +static PyObject * +rclpy_get_graph_guard_condition(PyObject * Py_UNUSED(self), PyObject * args) { - (void) signo; - rcl_ret_t ret = rcl_trigger_guard_condition(g_sigint_gc_handle); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to trigger guard_condition: %s", rcl_get_error_string_safe()); - rcl_reset_error(); + PyObject * pynode; + + if (!PyArg_ParseTuple(args, "O", &pynode)) { + return NULL; + } + + rcl_node_t * node = (rcl_node_t *)PyCapsule_GetPointer(pynode, "rcl_node_t"); + if (!node) { + return NULL; } + + rcl_guard_condition_t * guard_condition = + (rcl_guard_condition_t *)rcl_node_get_graph_guard_condition(node); + + PyObject * pylist = PyList_New(2); + PyList_SET_ITEM(pylist, 0, PyCapsule_New(guard_condition, "rcl_guard_condition_t", NULL)); + PyList_SET_ITEM(pylist, 1, PyLong_FromUnsignedLongLong((uint64_t)&guard_condition->impl)); + + return pylist; } /// Create a sigint guard condition @@ -73,7 +97,7 @@ rclpy_get_sigint_guard_condition(PyObject * Py_UNUSED(self), PyObject * Py_UNUSE rcl_reset_error(); return NULL; } - g_sigint_gc_handle = sigint_gc; + g_rclpy_sigint_gc_handle = sigint_gc; PyObject * pylist = PyList_New(2); PyList_SET_ITEM(pylist, 0, pygc); PyList_SET_ITEM(pylist, 1, PyLong_FromUnsignedLongLong((uint64_t)&sigint_gc->impl)); @@ -1525,6 +1549,49 @@ rclpy_send_response(PyObject * Py_UNUSED(self), PyObject * args) Py_RETURN_NONE; } +/// Check if a service server is available +/** + * Raises ValueError if the arguments are not capsules + * + * \param[in] pynode Capsule pointing to the node the entity belongs to + * \param[in] pyclient Capsule pointing to the client + * \return True if the service server is available + */ +static PyObject * +rclpy_service_server_is_available(PyObject * Py_UNUSED(self), PyObject * args) +{ + PyObject * pynode; + PyObject * pyclient; + + if (!PyArg_ParseTuple(args, "OO", &pynode, &pyclient)) { + return NULL; + } + + rcl_node_t * node = (rcl_node_t *)PyCapsule_GetPointer(pynode, "rcl_node_t"); + if (!node) { + return NULL; + } + rcl_client_t * client = (rcl_client_t *)PyCapsule_GetPointer(pyclient, "rcl_client_t"); + if (!client) { + return NULL; + } + + bool is_ready; + rcl_ret_t ret = rcl_service_server_is_available(node, client, &is_ready); + + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to check service availability: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return NULL; + } + + if (is_ready) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + /// Destroy an entity attached to a node /** * Entity type must be one of ["subscription", "publisher", "client", "service"]. @@ -1663,312 +1730,6 @@ rclpy_get_rmw_implementation_identifier(PyObject * Py_UNUSED(self), PyObject * P return pyrmw_implementation_identifier; } -/// Return a Capsule pointing to a zero initialized rcl_wait_set_t structure -static PyObject * -rclpy_get_zero_initialized_wait_set(PyObject * Py_UNUSED(self), PyObject * Py_UNUSED(args)) -{ - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyMem_Malloc(sizeof(rcl_wait_set_t)); - *wait_set = rcl_get_zero_initialized_wait_set(); - PyObject * pywait_set = PyCapsule_New(wait_set, "rcl_wait_set_t", NULL); - - return pywait_set; -} - -/// Initialize a waitset -/** - * Raises RuntimeError if the wait set could not be initialized - * - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] node_name string name of the node to be created - * \param[in] number_of_subscriptions int - * \param[in] number_of_guard_conditions int - * \param[in] number_of_timers int - * \param[in] number_of_clients int - * \param[in] number_of_services int - * \return None - */ -static PyObject * -rclpy_wait_set_init(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - unsigned PY_LONG_LONG number_of_subscriptions; - unsigned PY_LONG_LONG number_of_guard_conditions; - unsigned PY_LONG_LONG number_of_timers; - unsigned PY_LONG_LONG number_of_clients; - unsigned PY_LONG_LONG number_of_services; - - if (!PyArg_ParseTuple( - args, "OKKKKK", &pywait_set, &number_of_subscriptions, - &number_of_guard_conditions, &number_of_timers, - &number_of_clients, &number_of_services)) - { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret = rcl_wait_set_init( - wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers, - number_of_clients, number_of_services, rcl_get_default_allocator()); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to initialize wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - -/// Clear all the pointers of a given wait_set field -/** - * Raises RuntimeError if the entity type is unknown or any rcl error occurs - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return NULL - */ -static PyObject * -rclpy_wait_set_clear_entities(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - - if (!PyArg_ParseTuple(args, "zO", &entity_type, &pywait_set)) { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret; - if (0 == strcmp(entity_type, "subscription")) { - ret = rcl_wait_set_clear_subscriptions(wait_set); - } else if (0 == strcmp(entity_type, "client")) { - ret = rcl_wait_set_clear_clients(wait_set); - } else if (0 == strcmp(entity_type, "service")) { - ret = rcl_wait_set_clear_services(wait_set); - } else if (0 == strcmp(entity_type, "timer")) { - ret = rcl_wait_set_clear_timers(wait_set); - } else if (0 == strcmp(entity_type, "guard_condition")) { - ret = rcl_wait_set_clear_guard_conditions(wait_set); - } else { - ret = RCL_RET_ERROR; // to avoid a linter warning - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to clear '%s' from wait set: %s", entity_type, rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_TRUE; -} - -/// Add an entity to the waitset structure -/** - * Raises RuntimeError if the entity type is unknown or any rcl error occurrs - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] pyentity Capsule pointing to the entity to add - * \return None - */ -static PyObject * -rclpy_wait_set_add_entity(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - PyObject * pyentity; - - if (!PyArg_ParseTuple(args, "zOO", &entity_type, &pywait_set, &pyentity)) { - return NULL; - } - rcl_ret_t ret; - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - if (0 == strcmp(entity_type, "subscription")) { - rcl_subscription_t * subscription = - (rcl_subscription_t *)PyCapsule_GetPointer(pyentity, "rcl_subscription_t"); - ret = rcl_wait_set_add_subscription(wait_set, subscription); - } else if (0 == strcmp(entity_type, "client")) { - rcl_client_t * client = - (rcl_client_t *)PyCapsule_GetPointer(pyentity, "rcl_client_t"); - ret = rcl_wait_set_add_client(wait_set, client); - } else if (0 == strcmp(entity_type, "service")) { - rcl_service_t * service = - (rcl_service_t *)PyCapsule_GetPointer(pyentity, "rcl_service_t"); - ret = rcl_wait_set_add_service(wait_set, service); - } else if (0 == strcmp(entity_type, "timer")) { - rcl_timer_t * timer = - (rcl_timer_t *)PyCapsule_GetPointer(pyentity, "rcl_timer_t"); - ret = rcl_wait_set_add_timer(wait_set, timer); - } else if (0 == strcmp(entity_type, "guard_condition")) { - rcl_guard_condition_t * guard_condition = - (rcl_guard_condition_t *)PyCapsule_GetPointer(pyentity, "rcl_guard_condition_t"); - ret = rcl_wait_set_add_guard_condition(wait_set, guard_condition); - } else { - ret = RCL_RET_ERROR; // to avoid a linter warning - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to add '%s' to wait set: %s", entity_type, rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - -/// Destroy the waitset structure -/** - * Raises RuntimeError if the wait set could not be destroyed - * - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return None - */ -static PyObject * -rclpy_destroy_wait_set(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - - if (!PyArg_ParseTuple(args, "O", &pywait_set)) { - return NULL; - } - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - - rcl_ret_t ret = rcl_wait_set_fini(wait_set); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to fini wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - - PyMem_Free(wait_set); - - if (PyCapsule_SetPointer(pywait_set, Py_None)) { - // exception set by PyCapsule_SetPointer - return NULL; - } - - Py_RETURN_NONE; -} - -#define GET_LIST_READY_ENTITIES(ENTITY_TYPE) \ - size_t idx; \ - size_t idx_max; \ - idx_max = wait_set->size_of_ ## ENTITY_TYPE ## s; \ - const rcl_ ## ENTITY_TYPE ## _t ** struct_ptr = wait_set->ENTITY_TYPE ## s; \ - for (idx = 0; idx < idx_max; idx ++) { \ - if (struct_ptr[idx]) { \ - PyList_Append( \ - entity_ready_list, \ - PyLong_FromUnsignedLongLong((uint64_t) & struct_ptr[idx]->impl)); \ - } \ - } \ - return entity_ready_list; -/// Get list of non-null entities in waitset -/** - * Raises ValueError if pywait_set is not a wait_set capsule - * Raises RuntimeError if the entity type is not known - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return List of wait_set entities pointers ready for take - */ -static PyObject * -rclpy_get_ready_entities(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - if (!PyArg_ParseTuple(args, "zO", &entity_type, &pywait_set)) { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - - PyObject * entity_ready_list = PyList_New(0); - if (0 == strcmp(entity_type, "subscription")) { - GET_LIST_READY_ENTITIES(subscription) - } else if (0 == strcmp(entity_type, "client")) { - GET_LIST_READY_ENTITIES(client) - } else if (0 == strcmp(entity_type, "service")) { - GET_LIST_READY_ENTITIES(service) - } else if (0 == strcmp(entity_type, "timer")) { - GET_LIST_READY_ENTITIES(timer) - } else if (0 == strcmp(entity_type, "guard_condition")) { - GET_LIST_READY_ENTITIES(guard_condition) - } else { - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - - return entity_ready_list; -} - -/// Wait until timeout is reached or event happened -/** - * Raises ValueError if pywait_set is not a wait_set capsule - * Raises RuntimeError if there was an error while waiting - * - * This function will wait for an event to happen or for the timeout to expire. - * A negative timeout means wait forever, a timeout of 0 means no wait - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] timeout optional time to wait before waking up (in nanoseconds) - * \return NULL - */ -static PyObject * -rclpy_wait(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - PY_LONG_LONG timeout = -1; - - if (!PyArg_ParseTuple(args, "O|K", &pywait_set, &timeout)) { - return NULL; - } -#ifdef _WIN32 - _crt_signal_t -#else - sig_t -#endif // _WIN32 - previous_handler = signal(SIGINT, catch_function); - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret; - - // Could be a long wait, release the GIL - Py_BEGIN_ALLOW_THREADS; - ret = rcl_wait(wait_set, timeout); - Py_END_ALLOW_THREADS; - - signal(SIGINT, previous_handler); - if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { - PyErr_Format(PyExc_RuntimeError, - "Failed to wait on wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - /// Take a message from a given subscription /** * \param[in] pysubscription Capsule pointing to the subscription to process the message @@ -2608,6 +2369,15 @@ static PyMethodDef rclpy_methods[] = { "Create a Timer." }, + { + "rclpy_service_server_is_available", rclpy_service_server_is_available, METH_VARARGS, + "Return true if the service server is available" + }, + + { + "rclpy_get_graph_guard_condition", rclpy_get_graph_guard_condition, METH_VARARGS, + "Get a guard condition that is triggered when the node graph updates." + }, { "rclpy_get_sigint_guard_condition", rclpy_get_sigint_guard_condition, METH_NOARGS, "Create a guard_condition triggered when sigint is received." @@ -2643,36 +2413,6 @@ static PyMethodDef rclpy_methods[] = { "Send a response." }, - { - "rclpy_get_zero_initialized_wait_set", rclpy_get_zero_initialized_wait_set, METH_NOARGS, - "rclpy_get_zero_initialized_wait_set." - }, - - { - "rclpy_wait_set_init", rclpy_wait_set_init, METH_VARARGS, - "rclpy_wait_set_init." - }, - - { - "rclpy_wait_set_clear_entities", rclpy_wait_set_clear_entities, METH_VARARGS, - "rclpy_wait_set_clear_entities." - }, - - { - "rclpy_wait_set_add_entity", rclpy_wait_set_add_entity, METH_VARARGS, - "rclpy_wait_set_add_entity." - }, - - { - "rclpy_destroy_wait_set", rclpy_destroy_wait_set, METH_VARARGS, - "rclpy_destroy_wait_set." - }, - - { - "rclpy_get_ready_entities", rclpy_get_ready_entities, METH_VARARGS, - "List non null subscriptions in waitset." - }, - { "rclpy_reset_timer", rclpy_reset_timer, METH_VARARGS, "Reset a timer." @@ -2718,11 +2458,6 @@ static PyMethodDef rclpy_methods[] = { "Get the period of a timer." }, - { - "rclpy_wait", rclpy_wait, METH_VARARGS, - "rclpy_wait." - }, - { "rclpy_take", rclpy_take, METH_VARARGS, "rclpy_take." diff --git a/rclpy/src/rclpy/_rclpy_wait_set.c b/rclpy/src/rclpy/_rclpy_wait_set.c new file mode 100644 index 000000000..f15fa62a2 --- /dev/null +++ b/rclpy/src/rclpy/_rclpy_wait_set.c @@ -0,0 +1,680 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include + +#include "src/rclpy/sigint_gc.h" + + +/// Storage for custom python type _rclpy_wait_set.WaitSet +typedef struct +{ + PyObject_HEAD + rcl_wait_set_t wait_set; + /// Lists of pycapsules in the wait set + PyObject * pysubs; + PyObject * pytmrs; + PyObject * pygcs; + PyObject * pyclis; + PyObject * pysrvs; + /// List of pycapsule that are ready + PyObject * pyready; +} rclpy_wait_set_t; + +/// Destructor +static void +rclpy_wait_set_dealloc(rclpy_wait_set_t * self) +{ + rcl_ret_t ret = rcl_wait_set_fini(&(self->wait_set)); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to fini wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + } + + // X because it could be NULL if PyList_New failed during rclpy_wait_set_new + Py_XDECREF(self->pysubs); + Py_XDECREF(self->pytmrs); + Py_XDECREF(self->pygcs); + Py_XDECREF(self->pyclis); + Py_XDECREF(self->pysrvs); + Py_XDECREF(self->pyready); + + Py_TYPE(self)->tp_free((PyObject *)self); +} + +/// Constructor +static PyObject * +rclpy_wait_set_new(PyTypeObject * type, PyObject * Py_UNUSED(args), PyObject * Py_UNUSED(kwds)) +{ + rclpy_wait_set_t * self; + + self = (rclpy_wait_set_t *)type->tp_alloc(type, 0); + if (self) { + self->wait_set = rcl_get_zero_initialized_wait_set(); + } + + return (PyObject *)self; +} + +/// Initializer +static int +rclpy_wait_set_init( + rclpy_wait_set_t * self, PyObject * Py_UNUSED(args), PyObject * Py_UNUSED(kwds)) +{ + // rclpy_wait_set_dealloc will take care of decref +#define MAKE_LIST_OR_BAIL(ELIST) \ + ELIST = PyList_New(0); \ + if (!(ELIST)) { \ + return -1; \ + } + + MAKE_LIST_OR_BAIL(self->pysubs); + MAKE_LIST_OR_BAIL(self->pytmrs); + MAKE_LIST_OR_BAIL(self->pygcs); + MAKE_LIST_OR_BAIL(self->pyclis); + MAKE_LIST_OR_BAIL(self->pysrvs); + MAKE_LIST_OR_BAIL(self->pyready); +#undef MAKE_LIST_OR_BAIL + + rcl_ret_t ret = rcl_wait_set_init(&(self->wait_set), 0, 0, 0, 0, 0, rcl_get_default_allocator()); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to initialize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return -1; + } + + return 0; +} + +/// Return a pycapsule handle from an rclpy type +/* + * Raises any exception raised by an object with a custom __getattr__ + * + * \param[in] pyentity + * \returns NULL on error else something that is probably a pycapsule + */ +static inline PyObject * +_rclpy_to_pycapsule(PyObject * pyentity) +{ + if (PyObject_HasAttrString(pyentity, "subscription_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "subscription_handle"); + } else if (PyObject_HasAttrString(pyentity, "guard_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "guard_handle"); + } else if (PyObject_HasAttrString(pyentity, "timer_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "timer_handle"); + } else if (PyObject_HasAttrString(pyentity, "client_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "client_handle"); + } else if (PyObject_HasAttrString(pyentity, "service_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "service_handle"); + } + return pyentity; +} + + +/// Add entities of a known type to the correct list +/* + * Raises ValueError if capsule is invalid + * Handles adding entities of a given type to the wait set + * + * \param[in] pylist a list that the entities should be added to + * \param[in] pyentities an iterable of (sub, pub, client, serv, guard) + * \param[in] handle_attr an attribute of an entity where the handle is stored + * \param[in] handle_type a pycapsule name this entity uses + * \return RCL_RET_OK if everything succeded + */ +static inline rcl_ret_t +_rclpy_add_entity( + PyObject * pylist, PyObject * pyentities, const char * handle_attr, + const char * handle_type) +{ + // It's possible for arbitrary python code to be invoked + Py_INCREF(pylist); + Py_INCREF(pyentities); + + PyObject * pyiter = PyObject_GetIter(pyentities); + if (!pyiter) { + // exception set + Py_DECREF(pylist); + Py_DECREF(pyentities); + return RCL_RET_ERROR; + } + + PyObject * pyentity; + while ((pyentity = PyIter_Next(pyiter))) { + // Accept an instance of an rclpy type for convenience + if (PyObject_HasAttrString(pyentity, handle_attr)) { + pyentity = PyObject_GetAttrString(pyentity, handle_attr); + } + + // No chance of arbitrary python code below, so decref early + Py_DECREF(pyentity); + + if (!pyentity) { + // Exception set + break; + } + + if (PyCapsule_IsValid(pyentity, handle_type)) { + if (-1 == PyList_Append(pylist, pyentity)) { + break; + } + } else { + const char * entity_type = PyCapsule_GetName(pyentity); + if (entity_type) { + PyErr_Format(PyExc_ValueError, "Unknown capsule type: '%s'", entity_type); + } // else PyCapsule_GetName raised + break; + } + } + + Py_DECREF(pyiter); + Py_DECREF(pyentities); + Py_DECREF(pylist); + + if (PyErr_Occurred()) { + // Return down here after references have been cleaned dup + return RCL_RET_ERROR; + } + return RCL_RET_OK; +} + +/// Add subscriptions to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of subscriptions + */ +static PyObject * +rclpy_wait_set_add_subscriptions(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pysubs, pyentities, "subscription_handle", + "rcl_subscription_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add guard_conditions to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of guard_conditions + */ +static PyObject * +rclpy_wait_set_add_guard_conditions(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pygcs, pyentities, "guard_handle", + "rcl_guard_condition_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add timers to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of timers + */ +static PyObject * +rclpy_wait_set_add_timers(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pytmrs, pyentities, "timer_handle", + "rcl_timer_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add clients to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of clients + */ +static PyObject * +rclpy_wait_set_add_clients(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pyclis, pyentities, "client_handle", + "rcl_client_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add services to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of services + */ +static PyObject * +rclpy_wait_set_add_services(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pysrvs, pyentities, "service_handle", + "rcl_service_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Build a wait set +/* + * Raises RuntimeError if an RCL error occurs + * + * This method adds all of the PyCapsule to the rcl_wait_set_t instance, resizing it in the process + * + * \return number of entities in the wait set +*/ +static inline rcl_ret_t +_rclpy_build_wait_set(rclpy_wait_set_t * self) +{ + rcl_ret_t ret; + ret = rcl_wait_set_fini(&(self->wait_set)); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to finalize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return ret; + } + + Py_ssize_t num_subs = PyObject_Length(self->pysubs); + if (-1 == num_subs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_gcs = PyObject_Length(self->pygcs); + if (-1 == num_gcs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_tmrs = PyObject_Length(self->pytmrs); + if (-1 == num_tmrs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_clis = PyObject_Length(self->pyclis); + if (-1 == num_clis) { + return RCL_RET_ERROR; + } + Py_ssize_t num_srvs = PyObject_Length(self->pysrvs); + if (-1 == num_srvs) { + return RCL_RET_ERROR; + } + + ret = rcl_wait_set_init(&(self->wait_set), num_subs, num_gcs, num_tmrs, num_clis, num_srvs, + rcl_get_default_allocator()); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to initialize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return ret; + } + +#define RCLPY_ADD_ENTITY(ETYPE, WSFUNC, ELIST) \ + do { \ + PyObject * pyiter = PyObject_GetIter(ELIST); \ + if (!pyiter) { \ + return RCL_RET_ERROR; \ + } \ + PyObject * pyentity; \ + while ((pyentity = PyIter_Next(pyiter))) { \ + ETYPE * entity = (ETYPE *)PyCapsule_GetPointer(pyentity, #ETYPE); \ + if (!entity) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + rcl_ret_t ret = WSFUNC(&(self->wait_set), entity); \ + if (ret != RCL_RET_OK) { \ + PyErr_Format(PyExc_RuntimeError, \ + "Failed to add entity '" #ETYPE "' to wait set: %s", rcl_get_error_string_safe()); \ + rcl_reset_error(); \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return ret; \ + } \ + Py_DECREF(pyentity); \ + } \ + Py_DECREF(pyiter); \ + } while (false) + + RCLPY_ADD_ENTITY(rcl_subscription_t, rcl_wait_set_add_subscription, self->pysubs); + RCLPY_ADD_ENTITY(rcl_guard_condition_t, rcl_wait_set_add_guard_condition, self->pygcs); + RCLPY_ADD_ENTITY(rcl_timer_t, rcl_wait_set_add_timer, self->pytmrs); + RCLPY_ADD_ENTITY(rcl_client_t, rcl_wait_set_add_client, self->pyclis); + RCLPY_ADD_ENTITY(rcl_service_t, rcl_wait_set_add_service, self->pysrvs); +#undef RCLPY_ADD_ENTITY + + return RCL_RET_OK; +} + +/// Fill pyready with entities that are ready +/* + * \param[in] self an instance of _rclpy_wait_set.WaitSet + * \param[in] pyset the set of entities to check + * \param[in] type the name of the PyCapsule + */ +static inline rcl_ret_t +_rclpy_build_ready_entities(rclpy_wait_set_t * self) +{ + if (-1 == PySequence_DelSlice(self->pyready, 0, PySequence_Length(self->pyready))) { + return RCL_RET_ERROR; + } + +#define GET_READY_ENTITIES(ETYPE, ELIST) \ + do { \ + PyObject * pyiter = PyObject_GetIter((ELIST)); \ + if (!pyiter) { \ + return RCL_RET_ERROR; \ + } \ + PyObject * pyentity; \ + while ((pyentity = PyIter_Next(pyiter))) { \ + rcl_ ## ETYPE ## _t * entity = \ + (rcl_ ## ETYPE ## _t *)PyCapsule_GetPointer(pyentity, "rcl_" #ETYPE "_t"); \ + if (!entity) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + size_t idx; \ + size_t idx_max; \ + idx_max = self->wait_set.size_of_ ## ETYPE ## s; \ + const rcl_ ## ETYPE ## _t ** struct_ptr = self->wait_set.ETYPE ## s; \ + for (idx = 0; idx < idx_max; ++idx) { \ + if (struct_ptr[idx] == entity) { \ + if (-1 == PyList_Append(self->pyready, pyentity)) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + } \ + } \ + Py_DECREF(pyentity); \ + } \ + Py_DECREF(pyiter); \ + } while (false) + + GET_READY_ENTITIES(subscription, self->pysubs); + GET_READY_ENTITIES(guard_condition, self->pygcs); + GET_READY_ENTITIES(timer, self->pytmrs); + GET_READY_ENTITIES(client, self->pyclis); + GET_READY_ENTITIES(service, self->pysrvs); +#undef GET_READY_ENTITIES + return RCL_RET_OK; +} + +/// Wait until timeout is reached or an event happened +/** + * Raises RuntimeError if there was an error while waiting + * + * This function will wait for an event to happen or for the timeout to expire. + * A negative timeout means wait forever, a timeout of 0 means no wait + * \param[in] timeout optional time to wait before waking up (in nanoseconds) + * \return NULL + */ +static PyObject * +rclpy_wait_set_wait(rclpy_wait_set_t * self, PyObject * args) +{ + PY_LONG_LONG timeout = -1; + + if (!PyArg_ParseTuple(args, "|K", &timeout)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_build_wait_set(self)) { + // exception set + return NULL; + } + +#ifdef _WIN32 + _crt_signal_t +#else + sig_t +#endif // _WIN32 + previous_handler = signal(SIGINT, rclpy_catch_function); + rcl_ret_t ret; + + // Reference could be invalidated by arbitrary python code running while GIL is released + Py_INCREF(self); + Py_BEGIN_ALLOW_THREADS; + ret = rcl_wait(&(self->wait_set), timeout); + Py_END_ALLOW_THREADS; + Py_DECREF(self); + + signal(SIGINT, previous_handler); + if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + PyErr_Format(PyExc_RuntimeError, + "Failed to wait on wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return NULL; + } + + if (RCL_RET_OK != _rclpy_build_ready_entities(self)) { + // exception set + return NULL; + } + + Py_RETURN_NONE; +} + +/// Remove all stored entities from the wait set +/** + * Raises RuntimeError if there was an error while clearing + * + * \return None + */ +static PyObject * +rclpy_wait_set_clear(rclpy_wait_set_t * self) +{ + if (-1 == PySequence_DelSlice(self->pysubs, 0, PySequence_Length(self->pysubs)) || + -1 == PySequence_DelSlice(self->pytmrs, 0, PySequence_Length(self->pytmrs)) || + -1 == PySequence_DelSlice(self->pygcs, 0, PySequence_Length(self->pygcs)) || + -1 == PySequence_DelSlice(self->pyclis, 0, PySequence_Length(self->pyclis)) || + -1 == PySequence_DelSlice(self->pysrvs, 0, PySequence_Length(self->pysrvs))) + { + return NULL; + } + + Py_RETURN_NONE; +} + +/// Return True if an entity is ready +/** + * Raises RuntimeError if there was an error while clearing + * Raises ValueError if the capsule is invalid + * + * \return True if the entity is ready + */ +static PyObject * +rclpy_wait_set_is_ready(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentity; + if (!PyArg_ParseTuple(args, "O", &pyentity)) { + return NULL; + } + + pyentity = _rclpy_to_pycapsule(pyentity); + + int contains = PySequence_Contains(self->pyready, pyentity); + if (-1 == contains) { + // Exception set + return NULL; + } + if (contains) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + +static PyMemberDef rclpy_wait_set_members[] = { + {"_pysubs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pysubs), 0, + "subscription capsules"}, + {"_pygcs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pygcs), 0, + "guard_condition capsules"}, + {"_pytmrs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pytmrs), 0, + "timer capsules"}, + {"_pyclis", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pyclis), 0, + "client capsules"}, + {"_pysrvs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pysrvs), 0, + "service capsules"}, + {"_pyready", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pyready), 0, + "ready capsules"}, + {NULL} /* Sentinel */ +}; + +/// Define methods of _rclpy_wait_set.WaitSet +static PyMethodDef rclpy_wait_set_methods[] = { + {"add_subscriptions", (PyCFunction)rclpy_wait_set_add_subscriptions, METH_VARARGS, + "Add a bunch of subscription instances or handles to the wait_set."}, + {"add_guard_conditions", (PyCFunction)rclpy_wait_set_add_guard_conditions, METH_VARARGS, + "Add a bunch of guard_condition instances or handles to the wait_set."}, + {"add_timers", (PyCFunction)rclpy_wait_set_add_timers, METH_VARARGS, + "Add a bunch of timer instances or handles to the wait_set."}, + {"add_clients", (PyCFunction)rclpy_wait_set_add_clients, METH_VARARGS, + "Add a bunch of client instances or handles to the wait_set."}, + {"add_services", (PyCFunction)rclpy_wait_set_add_services, METH_VARARGS, + "Add a bunch of service instances or handles to the wait_set."}, + {"wait", (PyCFunction)rclpy_wait_set_wait, METH_VARARGS, + "Wait until timeout is reached or an event happened."}, + {"clear", (PyCFunction)rclpy_wait_set_clear, METH_NOARGS, + "Remove all entities from the wait set."}, + {"is_ready", (PyCFunction)rclpy_wait_set_is_ready, METH_VARARGS, + "Return True if an entity is ready."}, + {NULL} /* Sentinel */ +}; + + +// Partially initializing is recommended in the python docs +// I don't see a way to complete the initializer without duplicating #ifdef in cpython header +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wmissing-field-initializers" +/// Python type _rclpy_wait_set.WaitSet +static PyTypeObject rclpy_wait_set_type_t = { + PyVarObject_HEAD_INIT(NULL, 0) + "_rclpy_wait_set.WaitSet", /* tp_name */ + sizeof(rclpy_wait_set_t), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)rclpy_wait_set_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_reserved */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "Interface to a wait set.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + rclpy_wait_set_methods, /* tp_methods */ + rclpy_wait_set_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)rclpy_wait_set_init, /* tp_init */ + 0, /* tp_alloc */ + rclpy_wait_set_new, /* tp_new */ +}; +#pragma GCC diagnostic pop + +static PyModuleDef wait_set_module = { + PyModuleDef_HEAD_INIT, + "_rclpy_wait_set", + "Extention module for a wait set class.", + -1, + NULL, NULL, NULL, NULL, NULL +}; + +PyMODINIT_FUNC +PyInit__rclpy_wait_set(void) +{ + PyObject * m; + + rclpy_wait_set_type_t.tp_new = PyType_GenericNew; + if (PyType_Ready(&rclpy_wait_set_type_t) < 0) { + return NULL; + } + + m = PyModule_Create(&wait_set_module); + if (m == NULL) { + return NULL; + } + + Py_INCREF(&rclpy_wait_set_type_t); + PyModule_AddObject(m, "WaitSet", (PyObject *)&rclpy_wait_set_type_t); + return m; +} diff --git a/rclpy/src/rclpy/sigint_gc.c b/rclpy/src/rclpy/sigint_gc.c new file mode 100644 index 000000000..74a83960c --- /dev/null +++ b/rclpy/src/rclpy/sigint_gc.c @@ -0,0 +1,33 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/rclpy/sigint_gc.h" + +#include + +#include + +rcl_guard_condition_t * g_rclpy_sigint_gc_handle; + +/// Catch signals +void rclpy_catch_function(int signo) +{ + (void) signo; + rcl_ret_t ret = rcl_trigger_guard_condition(g_rclpy_sigint_gc_handle); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to trigger guard_condition: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + } +} diff --git a/rclpy/src/rclpy/sigint_gc.h b/rclpy/src/rclpy/sigint_gc.h new file mode 100644 index 000000000..ffc06cd93 --- /dev/null +++ b/rclpy/src/rclpy/sigint_gc.h @@ -0,0 +1,27 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RCLPY__SIGINT_GC_H_ +#define RCLPY__SIGINT_GC_H_ + +#include + +#include + +/// Global variable with guard condition for sigint handler +extern rcl_guard_condition_t * g_rclpy_sigint_gc_handle; + +/// Function that can be used as a sigint handler +extern void rclpy_catch_function(int signo); + +#endif // RCLPY__SIGINT_GC_H_ diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py new file mode 100644 index 000000000..24e9a2f3f --- /dev/null +++ b/rclpy/test/test_client.py @@ -0,0 +1,61 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest + +from rcl_interfaces.srv import GetParameters +import rclpy + + +TIME_FUDGE = 0.1 + + +class TestClient(unittest.TestCase): + + @classmethod + def setUpClass(cls): + rclpy.init() + cls.node = rclpy.create_node('TestClient') + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown() + + def test_wait_for_service_5sec(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + try: + start = time.monotonic() + cli.wait_for_service(timeout_sec=5.0) + end = time.monotonic() + self.assertGreater(5.0, end - start - TIME_FUDGE) + self.assertLess(5.0, end - start + TIME_FUDGE) + finally: + self.node.destroy_client(cli) + + def test_wait_for_service_nowait(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + try: + start = time.monotonic() + cli.wait_for_service(timeout_sec=0) + end = time.monotonic() + self.assertGreater(0, end - start - TIME_FUDGE) + self.assertLess(0, end - start + TIME_FUDGE) + finally: + self.node.destroy_client(cli) + + +if __name__ == '__main__': + unittest.main() diff --git a/rclpy/test/test_utilities.py b/rclpy/test/test_utilities.py new file mode 100644 index 000000000..5fdfef1bf --- /dev/null +++ b/rclpy/test/test_utilities.py @@ -0,0 +1,31 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from rclpy.constants import S_TO_NS +import rclpy.utilities + + +class TestUtilities(unittest.TestCase): + + def test_timeout_sec_to_nsec(self): + self.assertGreater(0, rclpy.utilities.timeout_sec_to_nsec(None)) + self.assertGreater(0, rclpy.utilities.timeout_sec_to_nsec(-1)) + self.assertEqual(0, rclpy.utilities.timeout_sec_to_nsec(0)) + self.assertEqual(1.5 * S_TO_NS, rclpy.utilities.timeout_sec_to_nsec(1.5)) + + +if __name__ == '__main__': + unittest.main() diff --git a/rclpy/test/test_wait_set.py b/rclpy/test/test_wait_set.py new file mode 100644 index 000000000..cf3e01e54 --- /dev/null +++ b/rclpy/test/test_wait_set.py @@ -0,0 +1,134 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from rcl_interfaces.srv import GetParameters +import rclpy +from rclpy.constants import S_TO_NS +from rclpy.guard_condition import GuardCondition +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set +from rclpy.timer import WallTimer +from std_msgs.msg import String + + +class TestWaitSet(unittest.TestCase): + + @classmethod + def setUpClass(cls): + rclpy.init() + cls.node = rclpy.create_node('TestWaitSet') + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown() + + def test_guard_condition_ready(self): + gc = GuardCondition(None, None) + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_guard_conditions([gc]) + self.assertFalse(ws.is_ready(gc)) + + ws.wait(0) + self.assertFalse(ws.is_ready(gc)) + + gc.trigger() + ws.wait(0) + self.assertTrue(ws.is_ready(gc)) + + ws.wait(0) + self.assertFalse(ws.is_ready(gc)) + finally: + _rclpy.rclpy_destroy_entity(gc.guard_handle) + + def test_timer_ready(self): + timer = WallTimer(None, None, int(0.1 * S_TO_NS)) + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_timers([timer]) + self.assertFalse(ws.is_ready(timer)) + + ws.wait(0) + self.assertFalse(ws.is_ready(timer)) + + ws.wait(int(0.1 * S_TO_NS)) + self.assertTrue(ws.is_ready(timer)) + + _rclpy.rclpy_call_timer(timer.timer_handle) + ws.wait(0) + self.assertFalse(ws.is_ready(timer)) + finally: + _rclpy.rclpy_destroy_entity(timer.timer_handle) + + def test_subscriber_ready(self): + sub = self.node.create_subscription(String, 'chatter', lambda msg: print(msg)) + pub = self.node.create_publisher(String, 'chatter') + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_subscriptions([sub]) + self.assertFalse(ws.is_ready(sub)) + + ws.wait(0) + self.assertFalse(ws.is_ready(sub)) + + msg = String() + msg.data = 'Hello World' + pub.publish(msg) + + ws.wait(5 * S_TO_NS) + self.assertTrue(ws.is_ready(sub)) + + _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type) + ws.wait(0) + self.assertFalse(ws.is_ready(sub)) + finally: + self.node.destroy_publisher(pub) + self.node.destroy_subscription(sub) + + def test_server_ready(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + srv = self.node.create_service( + GetParameters, 'get/parameters', lambda req: GetParameters.Response()) + + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_clients([cli]) + ws.add_services([srv]) + self.assertFalse(ws.is_ready(cli)) + self.assertFalse(ws.is_ready(srv)) + + ws.wait(0) + self.assertFalse(ws.is_ready(cli)) + self.assertFalse(ws.is_ready(srv)) + + cli.wait_for_service() + cli.call(GetParameters.Request()) + + ws.wait(5 * S_TO_NS) + # TODO(sloretz) test client after it's wait_for_future() API is sorted out + self.assertTrue(ws.is_ready(srv)) + + _rclpy.rclpy_take_request(srv.service_handle, srv.srv_type.Request) + ws.wait(0) + self.assertFalse(ws.is_ready(srv)) + finally: + self.node.destroy_client(cli) + self.node.destroy_service(srv) + + +if __name__ == '__main__': + unittest.main()