diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index c52b7c0ba..39ed0d87f 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -70,10 +70,32 @@ function(configure_python_c_extension_library _library_name) ) endfunction() +# Library with signal handler +add_library( + rclpy_sigint + SHARED src/rclpy/sigint_gc.c +) +target_include_directories(rclpy_sigint PUBLIC ${CMAKE_SOURCE_DIR}) +install( + TARGETS rclpy_sigint + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) +ament_target_dependencies(rclpy_sigint + "rcl" +) +# Export so other packages can load this library when importing rclpy +ament_export_libraries(rclpy_sigint) +# Windows dllimport/dllexport +target_compile_definitions(rclpy_sigint PRIVATE "RCLPY_SIGINT_BUILDING_LIBRARY") + +# Main library add_library( rclpy SHARED src/rclpy/_rclpy.c ) +target_link_libraries(rclpy rclpy_sigint) configure_python_c_extension_library(rclpy) ament_target_dependencies(rclpy "rcl" @@ -90,6 +112,18 @@ ament_target_dependencies(rclpy_logging "rcutils" ) +# WaitSet wrapper library +add_library( + rclpy_wait_set + SHARED src/rclpy/_rclpy_wait_set.c +) +target_link_libraries(rclpy_wait_set rclpy_sigint) +configure_python_c_extension_library(rclpy_wait_set) +ament_target_dependencies(rclpy_wait_set + "rcl" + "rcutils" +) + if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() @@ -109,6 +143,7 @@ if(BUILD_TESTING) PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}" APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path} PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR} + APPEND_LIBRARY_DIRS "$" # Windows add rclpy_sigint.dll to path TIMEOUT 90 ) endif() diff --git a/rclpy/rclpy/callback_groups.py b/rclpy/rclpy/callback_groups.py index d986a3a9f..368bb0f2a 100644 --- a/rclpy/rclpy/callback_groups.py +++ b/rclpy/rclpy/callback_groups.py @@ -45,6 +45,9 @@ def can_execute(self, entity): """ Return true if an entity can be executed. + The executor may call this on a task that has already started execution. In this case the + callback group should return True from this method. + :param entity: a subscription, timer, client, or service instance :rtype: bool """ @@ -96,17 +99,19 @@ def __init__(self): def can_execute(self, entity): with self._lock: assert weakref.ref(entity) in self.entities - return self._active_entity is None + return self._active_entity is None or weakref.ref(entity) == self._active_entity def beginning_execution(self, entity): with self._lock: - assert weakref.ref(entity) in self.entities + weak_entity = weakref.ref(entity) + assert weak_entity in self.entities if self._active_entity is None: - self._active_entity = entity + self._active_entity = weak_entity return True return False def ending_execution(self, entity): with self._lock: - assert self._active_entity == entity + weak_entity = weakref.ref(entity) + assert self._active_entity == weak_entity self._active_entity = None diff --git a/rclpy/rclpy/client.py b/rclpy/rclpy/client.py index 15bd45151..73c4a01c3 100644 --- a/rclpy/rclpy/client.py +++ b/rclpy/rclpy/client.py @@ -14,7 +14,9 @@ import threading +from rclpy.executor_handle import ExecutorHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set import rclpy.utilities @@ -23,24 +25,19 @@ class ResponseThread(threading.Thread): def __init__(self, client): threading.Thread.__init__(self) self.client = client - self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() - _rclpy.rclpy_wait_set_init(self.wait_set, 0, 1, 0, 1, 0) - _rclpy.rclpy_wait_set_clear_entities('client', self.wait_set) - _rclpy.rclpy_wait_set_add_entity( - 'client', self.wait_set, self.client.client_handle) + self._wait_set = _rclpy_wait_set.WaitSet() def run(self): [sigint_gc, sigint_gc_handle] = _rclpy.rclpy_get_sigint_guard_condition() - _rclpy.rclpy_wait_set_add_entity('guard_condition', self.wait_set, sigint_gc) + self._wait_set.clear() + self._wait_set.add_guard_conditions([sigint_gc]) + self._wait_set.add_clients([self.client.client_handle]) - _rclpy.rclpy_wait(self.wait_set, -1) - - guard_condition_ready_list = \ - _rclpy.rclpy_get_ready_entities('guard_condition', self.wait_set) + self._wait_set.wait(-1) # destroying here to make sure we dont call shutdown before cleaning up _rclpy.rclpy_destroy_entity(sigint_gc) - if sigint_gc_handle in guard_condition_ready_list: + if self._wait_set.is_ready(sigint_gc): rclpy.utilities.shutdown() return response = _rclpy.rclpy_take_response( @@ -64,8 +61,20 @@ def __init__( self.sequence_number = 0 self.response = None self.callback_group = callback_group - # True when the callback is ready to fire but has not been "taken" by an executor - self._executor_event = False + # Holds info the executor uses to do work for this entity + self._executor_handle = ExecutorHandle(self._take, self._execute) + + def _take(self): + response = _rclpy.rclpy_take_response( + self.client_handle, self.srv_type.Response, self.sequence_number) + return response + + def _execute(self, response): + if response: + # clients spawn their own thread to wait for a response in the + # wait_for_future function. Users can either use this mechanism or monitor + # the content of client.response to check if a response has been received + self.response = response def call(self, req): self.response = None @@ -77,3 +86,6 @@ def wait_for_future(self): thread1 = ResponseThread(self) thread1.start() thread1.join() + + def service_is_ready(self): + return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle) diff --git a/rclpy/rclpy/executor_handle.py b/rclpy/rclpy/executor_handle.py new file mode 100644 index 000000000..7876590b0 --- /dev/null +++ b/rclpy/rclpy/executor_handle.py @@ -0,0 +1,49 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import queue + +import rclpy.future +from rclpy.utilities import timeout_sec_to_nsec + + +class ExecutorHandle: + """Interface between an entity and an executor.""" + + def __init__(self, take_callback, execute_callback, cancel_ready_callback=None): + self._take_from_wait_list = take_callback + self.execute_callback = execute_callback + self._cancel_ready_callback = cancel_ready_callback + # True when the callback is ready to fire but has not been "taken" by an executor + self._ready = False + + def notify_ready(self): + """Receive notification from executor that this entity was ready in the wait list.""" + self._ready = True + + def cancel_ready(self): + """Receive notification from executor that this entity could not be taken.""" + self._ready = False + # Hook for guard conditions to retrigger themselves + if self._cancel_ready_callback: + self._cancel_ready_callback + + def ready(self): + return self._ready + + def take_from_wait_list(self): + """Get data from rcl.""" + args = self._take_from_wait_list() + self._ready = False + return args diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 7825ac8aa..64dc27994 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -12,62 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor import multiprocessing -from threading import Condition as _Condition -from threading import Lock as _Lock +from threading import Condition +from threading import Lock -from rclpy.constants import S_TO_NS +from rclpy.future import Task +from rclpy.guard_condition import GuardCondition from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy -from rclpy.timer import WallTimer as _WallTimer +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set +from rclpy.timer import WallTimer from rclpy.utilities import ok +from rclpy.utilities import timeout_sec_to_nsec -class _WaitSet: - """Make sure the wait set gets destroyed when a generator exits.""" - - def __enter__(self): - self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() - return self.wait_set - - def __exit__(self, t, v, tb): - _rclpy.rclpy_destroy_wait_set(self.wait_set) - - -class _WorkTracker: - """Track the amount of work that is in progress.""" - - def __init__(self): - # Number of tasks that are being executed - self._num_work_executing = 0 - self._work_condition = _Condition() - - def __enter__(self): - """Increment the amount of executing work by 1.""" - with self._work_condition: - self._num_work_executing += 1 - - def __exit__(self, t, v, tb): - """Decrement the amount of work executing by 1.""" - with self._work_condition: - self._num_work_executing -= 1 - self._work_condition.notify_all() - - def wait(self, timeout_sec=None): - """ - Wait until all work completes. - - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 - :type timeout_sec: float or None - :rtype: bool True if all work completed - """ - # Wait for all work to complete - if timeout_sec is None or timeout_sec >= 0: - with self._work_condition: - if not self._work_condition.wait_for( - lambda: self._num_work_executing == 0, timeout_sec): - return False - return True +class TimeoutException(Exception): + pass class Executor: @@ -85,14 +45,14 @@ class Executor: def __init__(self): super().__init__() self._nodes = set() - self._nodes_lock = _Lock() + self._nodes_lock = Lock() # This is triggered when wait_for_ready_callbacks should rebuild the wait list - gc, gc_handle = _rclpy.rclpy_create_guard_condition() - self._guard_condition = gc - self._guard_condition_handle = gc_handle + self._guard_condition = GuardCondition(None, None) + # Triggered by signal handler for sigint + self._sigint_gc, _ = _rclpy.rclpy_get_sigint_guard_condition() # True if shutdown has been called self._is_shutdown = False - self._work_tracker = _WorkTracker() + self._wait_set = _rclpy_wait_set.WaitSet() def shutdown(self, timeout_sec=None): """ @@ -100,24 +60,27 @@ def shutdown(self, timeout_sec=None): Return true if all outstanding callbacks finished executing. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :rtype: bool """ self._is_shutdown = True - if not self._work_tracker.wait(timeout_sec): - return False - # Clean up stuff that won't be used anymore + + # TODO(sloretz) how to handle timeout_sec? + with self._nodes_lock: self._nodes = set() - _rclpy.rclpy_destroy_entity(self._guard_condition) + _rclpy.rclpy_destroy_entity(self._guard_condition.guard_handle) + _rclpy.rclpy_destroy_entity(self._sigint_gc) self._guard_condition = None + self._sigint_gc = None return True def __del__(self): if self._guard_condition is not None: - _rclpy.rclpy_destroy_entity(self._guard_condition) + _rclpy.rclpy_destroy_entity(self._guard_condition.guard_handle) + _rclpy.rclpy_destroy_entity(self._sigint_gc) def add_node(self, node): """ @@ -130,7 +93,7 @@ def add_node(self, node): with self._nodes_lock: self._nodes.add(node) # Rebuild the wait set so it includes this new node - _rclpy.rclpy_trigger_guard_condition(self._guard_condition) + self._guard_condition.trigger() def get_nodes(self): """ @@ -139,7 +102,7 @@ def get_nodes(self): :rtype: list """ with self._nodes_lock: - return [node for node in self._nodes] + return list(self._nodes) def spin(self): """Execute callbacks until shutdown.""" @@ -152,246 +115,172 @@ def spin_once(self, timeout_sec=None): A custom executor should use :func:`Executor.wait_for_ready_callbacks` to get work. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :rtype: None """ - raise NotImplementedError - - def _take_timer(self, tmr): - _rclpy.rclpy_call_timer(tmr.timer_handle) - - def _execute_timer(self, tmr, _): - tmr.callback() - - def _take_subscription(self, sub): - msg = _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type) - return msg - - def _execute_subscription(self, sub, msg): - if msg: - sub.callback(msg) - - def _take_client(self, client): - response = _rclpy.rclpy_take_response( - client.client_handle, client.srv_type.Response, client.sequence_number) - return response - - def _execute_client(self, client, response): - if response: - # clients spawn their own thread to wait for a response in the - # wait_for_future function. Users can either use this mechanism or monitor - # the content of client.response to check if a response has been received - client.response = response - - def _take_service(self, srv): - request_and_header = _rclpy.rclpy_take_request( - srv.service_handle, srv.srv_type.Request) - return request_and_header - - def _execute_service(self, srv, request_and_header): - if request_and_header is None: - return - (request, header) = request_and_header - if request: - response = srv.callback(request, srv.srv_type.Response()) - srv.send_response(response, header) - - def _take_guard_condition(self, gc): - gc._executor_triggered = False - - def _execute_guard_condition(self, gc, _): - gc.callback() - - def _make_handler(self, entity, take_from_wait_list, call_callback): + raise NotImplementedError() + + def _make_task(self, entity): """ - Make a handler that performs work on an entity. + Make a task that performs work on an entity. :param entity: An entity to wait on - :param take_from_wait_list: Makes the entity to stop appearing in the wait list - :type take_from_wait_list: callable - :param call_callback: Does the work the entity is ready for - :type call_callback: callable - :rtype: callable + :rtype: rclpy.future.Task """ gc = self._guard_condition - work_tracker = self._work_tracker is_shutdown = self._is_shutdown # Mark this so it doesn't get added back to the wait list - entity._executor_event = True + entity._executor_handle.notify_ready() - def handler(): + def execute(): nonlocal entity nonlocal gc nonlocal is_shutdown - nonlocal work_tracker if is_shutdown or not entity.callback_group.beginning_execution(entity): # Didn't get the callback, or the executor has been ordered to stop - entity._executor_event = False - _rclpy.rclpy_trigger_guard_condition(gc) + entity._executor_handle.cancel_ready() + gc.trigger() return - with work_tracker: - arg = take_from_wait_list(entity) - - # Signal that this has been 'taken' and can be added back to the wait list - entity._executor_event = False - _rclpy.rclpy_trigger_guard_condition(gc) - - try: - call_callback(entity, arg) - finally: - entity.callback_group.ending_execution(entity) - # Signal that work has been done so the next callback in a mutually exclusive - # callback group can get executed - _rclpy.rclpy_trigger_guard_condition(gc) - return handler - - def _filter_eligible_entities(self, entities): + + # Signal that this has been 'taken' and can be added back to the wait list + arg = entity._executor_handle.take_from_wait_list() + gc.trigger() + # Return result of callback (Task will do the right thing if this is a coroutine) + return entity._executor_handle.execute_callback(arg) + + def postExecute(): + nonlocal entity + nonlocal gc + # Notify callback group this entity is no longer executing + entity.callback_group.ending_execution(entity) + # Notify the wait set can be rebuilt + gc.trigger() + + task = Task(execute) + task.add_done_callback(postExecute) + return task + + def _process_new_callbacks(self, nodes, wait_set): """ - Filter entities that should not be put onto the wait list. + Create Tasks for brand new work. - :param entity_list: Entities to be checked for eligibility - :type entity_list: list - :rtype: list + :param nodes: nodes to yield work for + :type nodes: list + :param wait_set: wait set that has already been waited on + :type wait_set: _rclpy_wait_set.WaitSet + """ + # Process ready entities one node at a time + for node in nodes: + for tmr in node.timers: + if wait_set.is_ready(tmr): + # TODO(Sloretz) Which rcl cancelled timer bug does this workaround? + if not _rclpy.rclpy_is_timer_ready(tmr.timer_handle): + continue + task = self._make_task(tmr) + node._tasks.append((task, tmr)) + print('Appending task for', tmr) + + for sub in node.subscriptions: + if wait_set.is_ready(sub): + task = self._make_task(sub) + node._tasks.append((task, sub)) + print('Appending task for', sub) + + for gc in node.guards: + if wait_set.is_ready(gc): + task = self._make_task(gc) + node._tasks.append((task, gc)) + print('Appending task for', gc) + + for cli in node.clients: + if wait_set.is_ready(cli): + task = self._make_task(cli) + node._tasks.append((task, cli)) + print('Appending task for', cli) + + for srv in node.services: + if wait_set.is_ready(srv): + task = self._make_task(srv) + node._tasks.append((task, srv)) + print('Appending task for', srv) + + def can_execute(self, entity): """ - return [e for e in entities if e.callback_group.can_execute(e) and not e._executor_event] + Determine if a callback for an entity can be executed. + + :param entity: Subscription, Timer, Guard condition, etc + :returns: True if the entity callback can be executed + :rtype: bool + """ + return not entity._executor_handle.ready() and entity.callback_group.can_execute(entity) def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None): """ Yield callbacks that are ready to be performed. - :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 + Executors must reuse the returned generator until StopIteration is raised. Failure to do + so could result in coroutines never being resumed. + + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :param nodes: A list of nodes to wait on. Wait on all nodes if None. :type nodes: list or None :rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)] """ + print('Wait for ready callbacks') timeout_timer = None - # Get timeout in nanoseconds. 0 = don't wait. < 0 means block forever - timeout_nsec = None - if timeout_sec is None: - timeout_nsec = -1 - elif timeout_sec <= 0: - timeout_nsec = 0 - else: - timeout_nsec = int(float(timeout_sec) * S_TO_NS) - timeout_timer = _WallTimer(None, None, timeout_nsec) + timeout_nsec = timeout_sec_to_nsec(timeout_sec) + if timeout_nsec > 0: + timeout_timer = WallTimer(None, None, timeout_nsec) if nodes is None: nodes = self.get_nodes() - yielded_work = False - while not yielded_work and not self._is_shutdown: - # Gather entities that can be waited on - subscriptions = [] - guards = [] - timers = [] - clients = [] - services = [] - for node in nodes: - subscriptions.extend(self._filter_eligible_entities(node.subscriptions)) - timers.extend(self._filter_eligible_entities(node.timers)) - clients.extend(self._filter_eligible_entities(node.clients)) - services.extend(self._filter_eligible_entities(node.services)) - node_guards = self._filter_eligible_entities(node.guards) - # retrigger a guard condition that was triggered but not handled - for gc in node_guards: - if gc._executor_triggered: - gc.trigger() - guards.extend(node_guards) - (sigint_gc, sigint_gc_handle) = _rclpy.rclpy_get_sigint_guard_condition() - if timeout_timer is not None: - timers.append(timeout_timer) - - # Construct a wait set - with _WaitSet() as wait_set: - _rclpy.rclpy_wait_set_init( - wait_set, - len(subscriptions), - len(guards) + 2, - len(timers), - len(clients), - len(services)) - - entities = { - 'subscription': (subscriptions, 'subscription_handle'), - 'guard_condition': (guards, 'guard_handle'), - 'client': (clients, 'client_handle'), - 'service': (services, 'service_handle'), - 'timer': (timers, 'timer_handle'), - } - for entity, (handles, handle_name) in entities.items(): - _rclpy.rclpy_wait_set_clear_entities(entity, wait_set) - for h in handles: - _rclpy.rclpy_wait_set_add_entity( - entity, wait_set, h.__getattribute__(handle_name) - ) - _rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc) - _rclpy.rclpy_wait_set_add_entity( - 'guard_condition', wait_set, self._guard_condition) - - # Wait for something to become ready - _rclpy.rclpy_wait(wait_set, timeout_nsec) - - # get ready entities - subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set) - guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set) - timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set) - clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set) - services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set) - - # Check sigint guard condition - if sigint_gc_handle in guards_ready: - raise KeyboardInterrupt - _rclpy.rclpy_destroy_entity(sigint_gc) - - # Mark all guards as triggered before yielding any handlers since they're auto-taken - for gc in [g for g in guards if g.guard_pointer in guards_ready]: - gc._executor_triggered = True - - # Process ready entities one node at a time - for node in nodes: - for tmr in [t for t in node.timers if t.timer_pointer in timers_ready]: - # Check that a timer is ready to workaround rcl issue with cancelled timers - if _rclpy.rclpy_is_timer_ready(tmr.timer_handle): - if tmr.callback_group.can_execute(tmr): - handler = self._make_handler( - tmr, self._take_timer, self._execute_timer) - yielded_work = True - yield handler, tmr, node - - for sub in [s for s in node.subscriptions if s.subscription_pointer in subs_ready]: - if sub.callback_group.can_execute(sub): - handler = self._make_handler( - sub, self._take_subscription, self._execute_subscription) - yielded_work = True - yield handler, sub, node - - for gc in [g for g in node.guards if g._executor_triggered]: - if gc.callback_group.can_execute(gc): - handler = self._make_handler( - gc, self._take_guard_condition, self._execute_guard_condition) - yielded_work = True - yield handler, gc, node - - for client in [c for c in node.clients if c.client_pointer in clients_ready]: - if client.callback_group.can_execute(client): - handler = self._make_handler( - client, self._take_client, self._execute_client) - yielded_work = True - yield handler, client, node - - for srv in [s for s in node.services if s.service_pointer in services_ready]: - if srv.callback_group.can_execute(srv): - handler = self._make_handler( - srv, self._take_service, self._execute_service) - yielded_work = True - yield handler, srv, node - - # Check timeout timer - if (timeout_nsec == 0 or - (timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)): - break + # Gather entities that can be waited on + self._wait_set.clear() + for node in nodes: + print('Gathering ready stuff') + self._wait_set.add_subscriptions(filter(self.can_execute, node.subscriptions)) + self._wait_set.add_timers(filter(self.can_execute, node.timers)) + self._wait_set.add_clients(filter(self.can_execute, node.clients)) + self._wait_set.add_services(filter(self.can_execute, node.services)) + self._wait_set.add_guard_conditions(filter(self.can_execute, node.guards)) + # Clean up old tasks + node._tasks = [(task, e) for task, e in node._tasks if not task.done()] + + self._wait_set.add_guard_conditions((self._sigint_gc, self._guard_condition)) + + if timeout_timer is not None: + self._wait_set.add_timers((timeout_timer,)) + + # Wait for something to become ready + self._wait_set.wait(timeout_nsec) + + # Check sigint guard condition + if self._wait_set.is_ready(self._sigint_gc): + raise KeyboardInterrupt() + + # Turn new work into tasks to be executed + self._process_new_callbacks(nodes, self._wait_set) + + for node in nodes: + print('tasks', node._tasks) + # Yield the tasks on the node + for task, entity in node._tasks: + # TODO Can execute should return true if a task is currently executing! + if entity.callback_group.can_execute(entity): + print('Yielding work!', entity) + yield task, entity, node + else: + print('Not allowed to yield this work!') + + # Check timeout timer + if ( + timeout_nsec == 0 or + (timeout_timer is not None and self._wait_set.is_ready(timeout_timer)) + ): + raise TimeoutException() class SingleThreadedExecutor(Executor): @@ -399,13 +288,21 @@ class SingleThreadedExecutor(Executor): def __init__(self): super().__init__() + self._callback_iter = None def spin_once(self, timeout_sec=None): - try: - handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec)) - handler() - except StopIteration: - pass + yielded_work = False + while not yielded_work: + if self._callback_iter is None: + # Reuse the same callback iterator to avoid unecessary calls to rcl_wait + self._callback_iter = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) + try: + handler, entity, node = next(self._callback_iter) + except StopIteration: + self._callback_iter = None + else: + yielded_work = True + handler() class MultiThreadedExecutor(Executor): @@ -421,16 +318,24 @@ def __init__(self, num_threads=None): :type num_threads: int """ super().__init__() + self._callback_iter = None if num_threads is None: try: num_threads = multiprocessing.cpu_count() except NotImplementedError: num_threads = 1 - self._executor = _ThreadPoolExecutor(num_threads) + self._executor = ThreadPoolExecutor(num_threads) def spin_once(self, timeout_sec=None): - try: - handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec)) - self._executor.submit(handler) - except StopIteration: - pass + yielded_work = False + while not yielded_work: + if self._callback_iter is None: + # Reuse the same callback iterator to avoid unecessary calls to rcl_wait + self._callback_iter = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) + try: + handler, entity, node = next(self._callback_iter) + except StopIteration: + self._callback_iter = None + else: + yielded_work = True + self._executor.submit(handler) diff --git a/rclpy/rclpy/future.py b/rclpy/rclpy/future.py new file mode 100644 index 000000000..f408110dd --- /dev/null +++ b/rclpy/rclpy/future.py @@ -0,0 +1,143 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import queue +import threading + + +class Future: + """Access the result of a :class:`.Task`.""" + + def __init__(self, task): + self._task = task + + def __await__(self): + # Yield if the task is not finished + while not self._task.done(): + yield + return self._task.result() + + def result(self): + """ + Get the result of the task. + + :returns: the result of the task this future tracks, or None if the task is not done. + """ + if self._task.done(): + return self._task.result() + + def add_done_callback(self, func): + """Add a callback to be executed when a task is done.""" + self._task.add_done_callback(func) + + def cancel(self): + """Cancel the running task.""" + self._task.cancel() + + def is_cancelled(self): + """Return true if the task is cancelled.""" + return self._task.is_cancelled() + + def done(self): + """Return True if the task is done or cancelled.""" + return self._task.done() + + +class Present(Future): + """Be a Future whose result is known now.""" + + def __init__(self, result): + task = Task(lambda: result) + task() + super().__init__(task) + + +class Task: + """A task that can be executed.""" + + def __init__(self, handler): + # handler is the next thing to run. It may or may not be a coroutine + self._handler = handler + # set to a coroutine returned by a handler + self._coroutine = None + self._done = False + self._cancelled = False + self._lock = threading.Lock() + self._queue = queue.Queue() + self._has_result = False + self._result = None + + def __call__(self): + """ + Run or resume a task. + + This attempts to execute a handler. If the handler is a coroutine it will attempt to + await it. If there are done callbacks it will repeat that process until all callbacks have + been executed. + + The return value of the handler passed to the constructor is stored as the task result. + """ + with self._lock: + # Execute as much as it can (including done callbacks) until a coroutine yields + while not self._done: + result = None + if self._handler is not None: + # A non-coroutine callback executes here + result = self._handler() + if asyncio.iscoroutine(result): + self._coroutine = result + self._handler = None + + if self._coroutine is not None: + try: + # A coroutine gets executed here + result = self._coroutine.send(None) + except StopIteration: + self._coroutine.close() + self._coroutine = None + else: + # The coroutine yielded + break + + if self._coroutine is None and not self._has_result: + print('Has result', result) + self._has_result = True + self._result = result + + try: + # Get the next callback, and make executing it part of this task + self._handler = self._queue.get_nowait() + except queue.Empty: + self._done = True + + def cancel(self): + self._cancelled = True + self._done = True + + def is_cancelled(self): + return self._cancelled + + def done(self): + return self._done + + def result(self): + return self._result + + def add_done_callback(self, fn): + with self._lock: + if self._done: + fn() + else: + self._queue.put(fn) diff --git a/rclpy/rclpy/guard_condition.py b/rclpy/rclpy/guard_condition.py index fe60b3caf..bbebf2edc 100644 --- a/rclpy/rclpy/guard_condition.py +++ b/rclpy/rclpy/guard_condition.py @@ -12,19 +12,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup +from rclpy.executor_handle import ExecutorHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy class GuardCondition: - def __init__(self, callback, callback_group): - self.guard_handle, self.guard_pointer = _rclpy.rclpy_create_guard_condition() + def __init__(self, callback, callback_group, guard_handle=None, guard_pointer=None): + if guard_handle is None: + guard_handle, guard_pointer = _rclpy.rclpy_create_guard_condition() + self.guard_handle = guard_handle + self.guard_pointer = guard_pointer self.callback = callback self.callback_group = callback_group - # True when the callback is ready to fire but has not been "taken" by an executor - self._executor_event = False - # True when the executor sees this has been triggered but has not yet been handled - self._executor_triggered = False + # Holds info the executor uses to do work for this entity + self._executor_handle = ExecutorHandle( + self._take, self._execute, cancel_ready_callback=self._cancel_ready) + + def _take(self): + pass + + def _execute(self, _): + return self.callback() + + def _cancel_ready(self): + # guard conditions are auto-taken when they come up in the wait-list, so retrigger + self.trigger() def trigger(self): _rclpy.rclpy_trigger_guard_condition(self.guard_handle) + + +class NodeGraphGuardCondition(GuardCondition): + + def __init__(self, node_handle): + gc_handle, gc_pointer = _rclpy.rclpy_get_graph_guard_condition() + super().__init__( + self._multiplex_callbacks, MutuallyExclusiveCallbackGroup(), guard_handle=gc_handle, + guard_pointer=gc_pointer) + self._callbacks = [] + + def _multiplex_callbacks(self): + for callback in self._callbacks: + callback() + + def add_callback(self, callback): + """Add a callback to be executed when a node graph event occurrs.""" + self._callbacks.append(callback) + + def remove_callback(self, callback): + """Stop executing a given callback when a node graph event occurrs.""" + self._callbacks.remove(callback) + + def has_callbacks(self): + """Return True if their are callbacks to be executed on graph events.""" + return bool(self._callbacks) diff --git a/rclpy/rclpy/impl/implementation_singleton.py b/rclpy/rclpy/impl/implementation_singleton.py index 1f984bd66..4f195b217 100644 --- a/rclpy/rclpy/impl/implementation_singleton.py +++ b/rclpy/rclpy/impl/implementation_singleton.py @@ -30,3 +30,4 @@ rclpy_implementation = _import('._rclpy') rclpy_logging_implementation = _import('._rclpy_logging') +rclpy_wait_set_implementation = _import('._rclpy_wait_set') diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index 095fa2100..1d0354637 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading + from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.client import Client from rclpy.constants import S_TO_NS @@ -19,6 +21,7 @@ from rclpy.exceptions import NoTypeSupportImportedException from rclpy.expand_topic_name import expand_topic_name from rclpy.guard_condition import GuardCondition +from rclpy.guard_condition import NodeGraphGuardCondition from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.publisher import Publisher from rclpy.qos import qos_profile_default, qos_profile_services_default @@ -59,6 +62,10 @@ def __init__(self, node_name, *, namespace=None): self.timers = [] self.guards = [] self._default_callback_group = MutuallyExclusiveCallbackGroup() + self._graph_condition = None + self._graph_condition_lock = threading.Lock() + # 2-tuple (task, entity) used to store work to be executed + self._tasks = [] namespace = namespace or '' if not ok(): @@ -205,6 +212,23 @@ def create_guard_condition(self, callback, callback_group=None): callback_group.add_entity(guard) return guard + def add_graph_event_callback(self, callback): + """Add a callback to be executed every time a node graph event occurrs.""" + with self._graph_condition_lock: + if self._graph_condition is None: + self._graph_condition = NodeGraphGuardCondition(self.handle) + self.guards.append(self._graph_condition) + self._graph_condition.add_callback(callback) + + def remove_graph_event_callback(self, callback): + """Remove a callback from the list that is executed every time a graph event occurrs.""" + with self._graph_condition_lock: + self._graph_condition.remove_callback(callback) + if not self._graph_condition.has_callbacks(): + # no one is listening to graph events, remove it from the executor + self.guards.remove(self._graph_condition) + self._graph_condition = None + def destroy_publisher(self, publisher): for pub in self.publishers: if pub.publisher_handle == publisher.publisher_handle: @@ -277,6 +301,7 @@ def destroy_node(self): self.services = [] self.timers = [] self.guards = [] + self._tasks = [] _rclpy.rclpy_destroy_entity(self.handle) self._handle = None diff --git a/rclpy/rclpy/service.py b/rclpy/rclpy/service.py index 154ed09c0..0304f1d5c 100644 --- a/rclpy/rclpy/service.py +++ b/rclpy/rclpy/service.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect + +from rclpy.executor_handle import ExecutorHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy @@ -26,9 +29,31 @@ def __init__( self.srv_name = srv_name self.callback = callback self.callback_group = callback_group - # True when the callback is ready to fire but has not been "taken" by an executor - self._executor_event = False + # Holds info the executor uses to do work for this entity + self._executor_handle = ExecutorHandle(self._take, self._execute) self.qos_profile = qos_profile + def _take(self): + request_and_header = _rclpy.rclpy_take_request(self.service_handle, self.srv_type.Request) + return request_and_header + + def _execute(self, request_and_header): + if request_and_header is None: + return + (request, header) = request_and_header + if request: + + async def exec_service(request, header): + """Enable service callbacks to be coroutines.""" + nonlocal self + response = self.callback(request, self.srv_type.Response()) + if inspect.isawaitable(response): + response = await response + # This special method is needed to make sure send_response gets called after the + # user's callback even if it yields + self.send_response(response, header) + + return exec_service(request, header) + def send_response(self, response, header): _rclpy.rclpy_send_response(self.service_handle, response, header) diff --git a/rclpy/rclpy/subscription.py b/rclpy/rclpy/subscription.py index 81d8562a8..ec11ec3f0 100644 --- a/rclpy/rclpy/subscription.py +++ b/rclpy/rclpy/subscription.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from rclpy.executor_handle import ExecutorHandle +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy + class Subscription: @@ -25,6 +28,14 @@ def __init__( self.topic = topic self.callback = callback self.callback_group = callback_group - # True when the callback is ready to fire but has not been "taken" by an executor - self._executor_event = False + # Holds info the executor uses to do work for this entity + self._executor_handle = ExecutorHandle(self._take, self._execute) self.qos_profile = qos_profile + + def _take(self): + msg = _rclpy.rclpy_take(self.subscription_handle, self.msg_type) + return msg + + def _execute(self, msg): + if msg: + return self.callback(msg) diff --git a/rclpy/rclpy/timer.py b/rclpy/rclpy/timer.py index 7bcd264f0..800e31269 100644 --- a/rclpy/rclpy/timer.py +++ b/rclpy/rclpy/timer.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from rclpy.executor_handle import ExecutorHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy @@ -23,8 +24,14 @@ def __init__(self, callback, callback_group, timer_period_ns): self.timer_period_ns = timer_period_ns self.callback = callback self.callback_group = callback_group - # True when the callback is ready to fire but has not been "taken" by an executor - self._executor_event = False + # Holds info the executor uses to do work for this entity + self._executor_handle = ExecutorHandle(self._take, self._execute) + + def _take(self): + _rclpy.rclpy_call_timer(self.timer_handle) + + def _execute(self, _): + return self.callback() @property def timer_period_ns(self): diff --git a/rclpy/rclpy/utilities.py b/rclpy/rclpy/utilities.py index 58141481e..9a8dcfb37 100644 --- a/rclpy/rclpy/utilities.py +++ b/rclpy/rclpy/utilities.py @@ -14,6 +14,9 @@ import threading +from rclpy.constants import S_TO_NS + + g_shutdown_lock = threading.Lock() @@ -44,3 +47,23 @@ def get_rmw_implementation_identifier(): # imported locally to avoid loading extensions on module import from rclpy.impl.implementation_singleton import rclpy_implementation return rclpy_implementation.rclpy_get_rmw_implementation_identifier() + + +def timeout_sec_to_nsec(timeout_sec): + """ + Convert timeout in seconds to rcl compatible timeout in nanoseconds. + + Python tends to use floating point numbers in seconds for timeouts. This utility converts a + python-style timeout to an integer in nanoseconds that can be used by rcl_wait. + + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if < 1ns + :type timeout_sec: float or None + :rtype: int + :returns: rcl_wait compatible timeout in nanoseconds + """ + if timeout_sec is None or timeout_sec < 0: + # Block forever + return -1 + else: + # wait for given time + return int(float(timeout_sec) * S_TO_NS) diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index 39cb03389..250a8aafe 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -28,20 +28,61 @@ #include #include -#include +#include "src/rclpy/sigint_gc.h" -static rcl_guard_condition_t * g_sigint_gc_handle; -/// Catch signals -static void catch_function(int signo) +/// Get a guard condition for node graph events +/** + * Raises ValueError if the provided argument is not a PyCapsule. + * + * A successful call will return a list with two elements: + * + * - a Capsule with the pointer of the retrieved rcl_guard_condition_t * structure + * - an integer representing the memory address of the rcl_guard_condition_t + * + * \param[in] A capsule containing rcl_node_t * + * \return 2-tuple with the capsule and memory location, or + * \return NULL on failure + */ +static PyObject * +rclpy_get_graph_guard_condition(PyObject * Py_UNUSED(self), PyObject * args) { - (void) signo; - rcl_ret_t ret = rcl_trigger_guard_condition(g_sigint_gc_handle); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to trigger guard_condition: %s", rcl_get_error_string_safe()); - rcl_reset_error(); + PyObject * pynode; + + if (!PyArg_ParseTuple(args, "O", &pynode)) { + return NULL; + } + + rcl_node_t * node = (rcl_node_t *)PyCapsule_GetPointer(pynode, "rcl_node_t"); + if (!node) { + return NULL; + } + + rcl_guard_condition_t * guard_condition = + (rcl_guard_condition_t *)rcl_node_get_graph_guard_condition(node); + + PyObject * pytuple = PyTuple_New(2); + if (!pytuple) { + // Exception set + return NULL; + } + PyObject * pygc = PyCapsule_New(guard_condition, "rcl_guard_condition_t", NULL); + if (!pygc) { + // Exception set + Py_DECREF(pytuple); + return NULL; + } + PyObject * pygc_int = PyLong_FromUnsignedLongLong((uint64_t)&guard_condition->impl); + if (!pygc_int) { + // Exception set + Py_DECREF(pytuple); + Py_DECREF(pygc); + return NULL; } + PyTuple_SET_ITEM(pytuple, 0, pygc); + PyTuple_SET_ITEM(pytuple, 1, pygc_int); + + return pytuple; } /// Create a sigint guard condition @@ -72,7 +113,7 @@ rclpy_get_sigint_guard_condition(PyObject * Py_UNUSED(self), PyObject * Py_UNUSE PyMem_Free(sigint_gc); return NULL; } - g_sigint_gc_handle = sigint_gc; + g_rclpy_sigint_gc_handle = sigint_gc; PyObject * pylist = PyList_New(2); PyList_SET_ITEM(pylist, 0, PyCapsule_New(sigint_gc, "rcl_guard_condition_t", NULL)); PyList_SET_ITEM(pylist, 1, PyLong_FromUnsignedLongLong((uint64_t)&sigint_gc->impl)); @@ -1519,6 +1560,50 @@ rclpy_send_response(PyObject * Py_UNUSED(self), PyObject * args) Py_RETURN_NONE; } + +/// Check if a service server is available +/** + * Raises ValueError if the arguments are not capsules + * + * \param[in] pynode Capsule pointing to the node the entity belongs to + * \param[in] pyclient Capsule pointing to the client + * \return True if the service server is available + */ +static PyObject * +rclpy_service_server_is_available(PyObject * Py_UNUSED(self), PyObject * args) +{ + PyObject * pynode; + PyObject * pyclient; + + if (!PyArg_ParseTuple(args, "OO", &pynode, &pyclient)) { + return NULL; + } + + rcl_node_t * node = (rcl_node_t *)PyCapsule_GetPointer(pynode, "rcl_node_t"); + if (!node) { + return NULL; + } + rcl_client_t * client = (rcl_client_t *)PyCapsule_GetPointer(pyclient, "rcl_client_t"); + if (!client) { + return NULL; + } + + bool is_ready; + rcl_ret_t ret = rcl_service_server_is_available(node, client, &is_ready); + + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to check service availability: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return NULL; + } + + if (is_ready) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + /// Destroy an entity attached to a node /** * Entity type must be one of ["subscription", "publisher", "client", "service"]. @@ -1657,312 +1742,6 @@ rclpy_get_rmw_implementation_identifier(PyObject * Py_UNUSED(self), PyObject * P return pyrmw_implementation_identifier; } -/// Return a Capsule pointing to a zero initialized rcl_wait_set_t structure -static PyObject * -rclpy_get_zero_initialized_wait_set(PyObject * Py_UNUSED(self), PyObject * Py_UNUSED(args)) -{ - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyMem_Malloc(sizeof(rcl_wait_set_t)); - *wait_set = rcl_get_zero_initialized_wait_set(); - PyObject * pywait_set = PyCapsule_New(wait_set, "rcl_wait_set_t", NULL); - - return pywait_set; -} - -/// Initialize a waitset -/** - * Raises RuntimeError if the wait set could not be initialized - * - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] node_name string name of the node to be created - * \param[in] number_of_subscriptions int - * \param[in] number_of_guard_conditions int - * \param[in] number_of_timers int - * \param[in] number_of_clients int - * \param[in] number_of_services int - * \return None - */ -static PyObject * -rclpy_wait_set_init(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - unsigned PY_LONG_LONG number_of_subscriptions; - unsigned PY_LONG_LONG number_of_guard_conditions; - unsigned PY_LONG_LONG number_of_timers; - unsigned PY_LONG_LONG number_of_clients; - unsigned PY_LONG_LONG number_of_services; - - if (!PyArg_ParseTuple( - args, "OKKKKK", &pywait_set, &number_of_subscriptions, - &number_of_guard_conditions, &number_of_timers, - &number_of_clients, &number_of_services)) - { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret = rcl_wait_set_init( - wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers, - number_of_clients, number_of_services, rcl_get_default_allocator()); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to initialize wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - -/// Clear all the pointers of a given wait_set field -/** - * Raises RuntimeError if the entity type is unknown or any rcl error occurs - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return NULL - */ -static PyObject * -rclpy_wait_set_clear_entities(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - - if (!PyArg_ParseTuple(args, "zO", &entity_type, &pywait_set)) { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret; - if (0 == strcmp(entity_type, "subscription")) { - ret = rcl_wait_set_clear_subscriptions(wait_set); - } else if (0 == strcmp(entity_type, "client")) { - ret = rcl_wait_set_clear_clients(wait_set); - } else if (0 == strcmp(entity_type, "service")) { - ret = rcl_wait_set_clear_services(wait_set); - } else if (0 == strcmp(entity_type, "timer")) { - ret = rcl_wait_set_clear_timers(wait_set); - } else if (0 == strcmp(entity_type, "guard_condition")) { - ret = rcl_wait_set_clear_guard_conditions(wait_set); - } else { - ret = RCL_RET_ERROR; // to avoid a linter warning - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to clear '%s' from wait set: %s", entity_type, rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_TRUE; -} - -/// Add an entity to the waitset structure -/** - * Raises RuntimeError if the entity type is unknown or any rcl error occurrs - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] pyentity Capsule pointing to the entity to add - * \return None - */ -static PyObject * -rclpy_wait_set_add_entity(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - PyObject * pyentity; - - if (!PyArg_ParseTuple(args, "zOO", &entity_type, &pywait_set, &pyentity)) { - return NULL; - } - rcl_ret_t ret; - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - if (0 == strcmp(entity_type, "subscription")) { - rcl_subscription_t * subscription = - (rcl_subscription_t *)PyCapsule_GetPointer(pyentity, "rcl_subscription_t"); - ret = rcl_wait_set_add_subscription(wait_set, subscription); - } else if (0 == strcmp(entity_type, "client")) { - rcl_client_t * client = - (rcl_client_t *)PyCapsule_GetPointer(pyentity, "rcl_client_t"); - ret = rcl_wait_set_add_client(wait_set, client); - } else if (0 == strcmp(entity_type, "service")) { - rcl_service_t * service = - (rcl_service_t *)PyCapsule_GetPointer(pyentity, "rcl_service_t"); - ret = rcl_wait_set_add_service(wait_set, service); - } else if (0 == strcmp(entity_type, "timer")) { - rcl_timer_t * timer = - (rcl_timer_t *)PyCapsule_GetPointer(pyentity, "rcl_timer_t"); - ret = rcl_wait_set_add_timer(wait_set, timer); - } else if (0 == strcmp(entity_type, "guard_condition")) { - rcl_guard_condition_t * guard_condition = - (rcl_guard_condition_t *)PyCapsule_GetPointer(pyentity, "rcl_guard_condition_t"); - ret = rcl_wait_set_add_guard_condition(wait_set, guard_condition); - } else { - ret = RCL_RET_ERROR; // to avoid a linter warning - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to add '%s' to wait set: %s", entity_type, rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - -/// Destroy the waitset structure -/** - * Raises RuntimeError if the wait set could not be destroyed - * - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return None - */ -static PyObject * -rclpy_destroy_wait_set(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - - if (!PyArg_ParseTuple(args, "O", &pywait_set)) { - return NULL; - } - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - - rcl_ret_t ret = rcl_wait_set_fini(wait_set); - if (ret != RCL_RET_OK) { - PyErr_Format(PyExc_RuntimeError, - "Failed to fini wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - - PyMem_Free(wait_set); - - if (PyCapsule_SetPointer(pywait_set, Py_None)) { - // exception set by PyCapsule_SetPointer - return NULL; - } - - Py_RETURN_NONE; -} - -#define GET_LIST_READY_ENTITIES(ENTITY_TYPE) \ - size_t idx; \ - size_t idx_max; \ - idx_max = wait_set->size_of_ ## ENTITY_TYPE ## s; \ - const rcl_ ## ENTITY_TYPE ## _t ** struct_ptr = wait_set->ENTITY_TYPE ## s; \ - for (idx = 0; idx < idx_max; idx ++) { \ - if (struct_ptr[idx]) { \ - PyList_Append( \ - entity_ready_list, \ - PyLong_FromUnsignedLongLong((uint64_t) & struct_ptr[idx]->impl)); \ - } \ - } \ - return entity_ready_list; -/// Get list of non-null entities in waitset -/** - * Raises ValueError if pywait_set is not a wait_set capsule - * Raises RuntimeError if the entity type is not known - * - * \param[in] entity_type string defining the entity ["subscription, client, service"] - * \param[in] pywait_set Capsule pointing to the waitset structure - * \return List of wait_set entities pointers ready for take - */ -static PyObject * -rclpy_get_ready_entities(PyObject * Py_UNUSED(self), PyObject * args) -{ - const char * entity_type; - PyObject * pywait_set; - if (!PyArg_ParseTuple(args, "zO", &entity_type, &pywait_set)) { - return NULL; - } - - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - - PyObject * entity_ready_list = PyList_New(0); - if (0 == strcmp(entity_type, "subscription")) { - GET_LIST_READY_ENTITIES(subscription) - } else if (0 == strcmp(entity_type, "client")) { - GET_LIST_READY_ENTITIES(client) - } else if (0 == strcmp(entity_type, "service")) { - GET_LIST_READY_ENTITIES(service) - } else if (0 == strcmp(entity_type, "timer")) { - GET_LIST_READY_ENTITIES(timer) - } else if (0 == strcmp(entity_type, "guard_condition")) { - GET_LIST_READY_ENTITIES(guard_condition) - } else { - PyErr_Format(PyExc_RuntimeError, - "'%s' is not a known entity", entity_type); - return NULL; - } - - return entity_ready_list; -} - -/// Wait until timeout is reached or event happened -/** - * Raises ValueError if pywait_set is not a wait_set capsule - * Raises RuntimeError if there was an error while waiting - * - * This function will wait for an event to happen or for the timeout to expire. - * A negative timeout means wait forever, a timeout of 0 means no wait - * \param[in] pywait_set Capsule pointing to the waitset structure - * \param[in] timeout optional time to wait before waking up (in nanoseconds) - * \return NULL - */ -static PyObject * -rclpy_wait(PyObject * Py_UNUSED(self), PyObject * args) -{ - PyObject * pywait_set; - PY_LONG_LONG timeout = -1; - - if (!PyArg_ParseTuple(args, "O|K", &pywait_set, &timeout)) { - return NULL; - } -#ifdef _WIN32 - _crt_signal_t -#else - sig_t -#endif // _WIN32 - previous_handler = signal(SIGINT, catch_function); - rcl_wait_set_t * wait_set = (rcl_wait_set_t *)PyCapsule_GetPointer(pywait_set, "rcl_wait_set_t"); - if (!wait_set) { - return NULL; - } - rcl_ret_t ret; - - // Could be a long wait, release the GIL - Py_BEGIN_ALLOW_THREADS; - ret = rcl_wait(wait_set, timeout); - Py_END_ALLOW_THREADS; - - signal(SIGINT, previous_handler); - if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { - PyErr_Format(PyExc_RuntimeError, - "Failed to wait on wait set: %s", rcl_get_error_string_safe()); - rcl_reset_error(); - return NULL; - } - Py_RETURN_NONE; -} - /// Take a message from a given subscription /** * \param[in] pysubscription Capsule pointing to the subscription to process the message @@ -2602,6 +2381,15 @@ static PyMethodDef rclpy_methods[] = { "Create a Timer." }, + { + "rclpy_service_server_is_available", rclpy_service_server_is_available, METH_VARARGS, + "Return true if the service server is available" + }, + + { + "rclpy_get_graph_guard_condition", rclpy_get_graph_guard_condition, METH_VARARGS, + "Get a guard condition that is triggered when the node graph updates." + }, { "rclpy_get_sigint_guard_condition", rclpy_get_sigint_guard_condition, METH_NOARGS, "Create a guard_condition triggered when sigint is received." @@ -2637,36 +2425,6 @@ static PyMethodDef rclpy_methods[] = { "Send a response." }, - { - "rclpy_get_zero_initialized_wait_set", rclpy_get_zero_initialized_wait_set, METH_NOARGS, - "rclpy_get_zero_initialized_wait_set." - }, - - { - "rclpy_wait_set_init", rclpy_wait_set_init, METH_VARARGS, - "rclpy_wait_set_init." - }, - - { - "rclpy_wait_set_clear_entities", rclpy_wait_set_clear_entities, METH_VARARGS, - "rclpy_wait_set_clear_entities." - }, - - { - "rclpy_wait_set_add_entity", rclpy_wait_set_add_entity, METH_VARARGS, - "rclpy_wait_set_add_entity." - }, - - { - "rclpy_destroy_wait_set", rclpy_destroy_wait_set, METH_VARARGS, - "rclpy_destroy_wait_set." - }, - - { - "rclpy_get_ready_entities", rclpy_get_ready_entities, METH_VARARGS, - "List non null subscriptions in waitset." - }, - { "rclpy_reset_timer", rclpy_reset_timer, METH_VARARGS, "Reset a timer." @@ -2712,11 +2470,6 @@ static PyMethodDef rclpy_methods[] = { "Get the period of a timer." }, - { - "rclpy_wait", rclpy_wait, METH_VARARGS, - "rclpy_wait." - }, - { "rclpy_take", rclpy_take, METH_VARARGS, "rclpy_take." diff --git a/rclpy/src/rclpy/_rclpy_wait_set.c b/rclpy/src/rclpy/_rclpy_wait_set.c new file mode 100644 index 000000000..88cc18854 --- /dev/null +++ b/rclpy/src/rclpy/_rclpy_wait_set.c @@ -0,0 +1,690 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include + +#include "src/rclpy/sigint_gc.h" + + +/// Storage for custom python type _rclpy_wait_set.WaitSet +typedef struct +{ + PyObject_HEAD + rcl_wait_set_t wait_set; + /// Lists of pycapsules in the wait set + PyObject * pysubs; + PyObject * pytmrs; + PyObject * pygcs; + PyObject * pyclis; + PyObject * pysrvs; + /// List of pycapsule that are ready + PyObject * pyready; +} rclpy_wait_set_t; + +/// Destructor +static void +rclpy_wait_set_dealloc(rclpy_wait_set_t * self) +{ + rcl_ret_t ret = rcl_wait_set_fini(&(self->wait_set)); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to fini wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + } + + // X because it could be NULL if PyList_New failed during rclpy_wait_set_new + Py_XDECREF(self->pysubs); + Py_XDECREF(self->pytmrs); + Py_XDECREF(self->pygcs); + Py_XDECREF(self->pyclis); + Py_XDECREF(self->pysrvs); + Py_XDECREF(self->pyready); + + Py_TYPE(self)->tp_free((PyObject *)self); +} + +/// Constructor +static PyObject * +rclpy_wait_set_new(PyTypeObject * type, PyObject * Py_UNUSED(args), PyObject * Py_UNUSED(kwds)) +{ + rclpy_wait_set_t * self; + + self = (rclpy_wait_set_t *)type->tp_alloc(type, 0); + if (self) { + self->wait_set = rcl_get_zero_initialized_wait_set(); + } + + return (PyObject *)self; +} + +/// Initializer +static int +rclpy_wait_set_init( + rclpy_wait_set_t * self, PyObject * Py_UNUSED(args), PyObject * Py_UNUSED(kwds)) +{ + // rclpy_wait_set_dealloc will take care of decref +#define MAKE_LIST_OR_BAIL(ELIST) \ + ELIST = PyList_New(0); \ + if (!(ELIST)) { \ + return -1; \ + } + + MAKE_LIST_OR_BAIL(self->pysubs); + MAKE_LIST_OR_BAIL(self->pytmrs); + MAKE_LIST_OR_BAIL(self->pygcs); + MAKE_LIST_OR_BAIL(self->pyclis); + MAKE_LIST_OR_BAIL(self->pysrvs); + MAKE_LIST_OR_BAIL(self->pyready); +#undef MAKE_LIST_OR_BAIL + + rcl_ret_t ret = rcl_wait_set_init(&(self->wait_set), 0, 0, 0, 0, 0, rcl_get_default_allocator()); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to initialize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return -1; + } + + return 0; +} + +/// Return a pycapsule handle from an rclpy type +/* + * Raises any exception raised by an object with a custom __getattr__ + * + * \param[in] pyentity + * \returns NULL on error else something that is probably a pycapsule + */ +static inline PyObject * +_rclpy_to_pycapsule(PyObject * pyentity) +{ + if (PyObject_HasAttrString(pyentity, "subscription_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "subscription_handle"); + } else if (PyObject_HasAttrString(pyentity, "guard_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "guard_handle"); + } else if (PyObject_HasAttrString(pyentity, "timer_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "timer_handle"); + } else if (PyObject_HasAttrString(pyentity, "client_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "client_handle"); + } else if (PyObject_HasAttrString(pyentity, "service_handle")) { + pyentity = PyObject_GetAttrString(pyentity, "service_handle"); + } + return pyentity; +} + + +/// Add entities of a known type to the correct list +/* + * Raises ValueError if capsule is invalid + * Handles adding entities of a given type to the wait set + * + * \param[in] pylist a list that the entities should be added to + * \param[in] pyentities an iterable of (sub, pub, client, serv, guard) + * \param[in] handle_attr an attribute of an entity where the handle is stored + * \param[in] handle_type a pycapsule name this entity uses + * \return RCL_RET_OK if everything succeded + */ +static inline rcl_ret_t +_rclpy_add_entity( + PyObject * pylist, PyObject * pyentities, const char * handle_attr, + const char * handle_type) +{ + // It's possible for arbitrary python code to be invoked + Py_INCREF(pylist); + Py_INCREF(pyentities); + + PyObject * pyiter = PyObject_GetIter(pyentities); + if (!pyiter) { + // exception set + Py_DECREF(pylist); + Py_DECREF(pyentities); + return RCL_RET_ERROR; + } + + PyObject * pyentity; + while ((pyentity = PyIter_Next(pyiter))) { + // Accept an instance of an rclpy type for convenience + if (PyObject_HasAttrString(pyentity, handle_attr)) { + pyentity = PyObject_GetAttrString(pyentity, handle_attr); + } + + if (!pyentity) { + // Exception set + break; + } + + // No chance of arbitrary python code below, so decref early + Py_DECREF(pyentity); + + if (PyCapsule_IsValid(pyentity, handle_type)) { + if (-1 == PyList_Append(pylist, pyentity)) { + break; + } + } else { + const char * entity_type = PyCapsule_GetName(pyentity); + if (entity_type) { + PyErr_Format(PyExc_ValueError, "Unknown capsule type: '%s'", entity_type); + } // else PyCapsule_GetName raised + break; + } + } + + Py_DECREF(pyiter); + Py_DECREF(pyentities); + Py_DECREF(pylist); + + if (PyErr_Occurred()) { + // Return down here after references have been cleaned dup + return RCL_RET_ERROR; + } + return RCL_RET_OK; +} + +/// Add subscriptions to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of subscriptions + */ +static PyObject * +rclpy_wait_set_add_subscriptions(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pysubs, pyentities, "subscription_handle", + "rcl_subscription_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add guard_conditions to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of guard_conditions + */ +static PyObject * +rclpy_wait_set_add_guard_conditions(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pygcs, pyentities, "guard_handle", + "rcl_guard_condition_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add timers to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of timers + */ +static PyObject * +rclpy_wait_set_add_timers(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pytmrs, pyentities, "timer_handle", + "rcl_timer_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add clients to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of clients + */ +static PyObject * +rclpy_wait_set_add_clients(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pyclis, pyentities, "client_handle", + "rcl_client_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Add services to be waited on to the appropriate set +/* + * Raises ValueError if a capsule is invalid + * Raises TypeError if the argument is not iterable + * + * \param[in] self instance of rclpy_wait_set_t + * \param[in] pyentities Iterable of services + */ +static PyObject * +rclpy_wait_set_add_services(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentities; + if (!PyArg_ParseTuple(args, "O", &pyentities)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_add_entity(self->pysrvs, pyentities, "service_handle", + "rcl_service_t")) + { + // Exception set + return NULL; + } + Py_RETURN_NONE; +} + +/// Build a wait set +/* + * Raises RuntimeError if an RCL error occurs + * + * This method adds all of the PyCapsule to the rcl_wait_set_t instance, resizing it in the process + * + * \return number of entities in the wait set +*/ +static inline rcl_ret_t +_rclpy_build_wait_set(rclpy_wait_set_t * self) +{ + rcl_ret_t ret; + ret = rcl_wait_set_fini(&(self->wait_set)); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to finalize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return ret; + } + + Py_ssize_t num_subs = PyObject_Length(self->pysubs); + if (-1 == num_subs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_gcs = PyObject_Length(self->pygcs); + if (-1 == num_gcs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_tmrs = PyObject_Length(self->pytmrs); + if (-1 == num_tmrs) { + return RCL_RET_ERROR; + } + Py_ssize_t num_clis = PyObject_Length(self->pyclis); + if (-1 == num_clis) { + return RCL_RET_ERROR; + } + Py_ssize_t num_srvs = PyObject_Length(self->pysrvs); + if (-1 == num_srvs) { + return RCL_RET_ERROR; + } + + ret = rcl_wait_set_init(&(self->wait_set), num_subs, num_gcs, num_tmrs, num_clis, num_srvs, + rcl_get_default_allocator()); + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "Failed to initialize wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return ret; + } + +#define RCLPY_ADD_ENTITY(ETYPE, WSFUNC, ELIST) \ + do { \ + PyObject * pyiter = PyObject_GetIter(ELIST); \ + if (!pyiter) { \ + return RCL_RET_ERROR; \ + } \ + PyObject * pyentity; \ + while ((pyentity = PyIter_Next(pyiter))) { \ + ETYPE * entity = (ETYPE *)PyCapsule_GetPointer(pyentity, #ETYPE); \ + if (!entity) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + rcl_ret_t ret = WSFUNC(&(self->wait_set), entity); \ + if (ret != RCL_RET_OK) { \ + PyErr_Format(PyExc_RuntimeError, \ + "Failed to add entity '" #ETYPE "' to wait set: %s", rcl_get_error_string_safe()); \ + rcl_reset_error(); \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return ret; \ + } \ + Py_DECREF(pyentity); \ + } \ + Py_DECREF(pyiter); \ + } while (false) + + RCLPY_ADD_ENTITY(rcl_subscription_t, rcl_wait_set_add_subscription, self->pysubs); + RCLPY_ADD_ENTITY(rcl_guard_condition_t, rcl_wait_set_add_guard_condition, self->pygcs); + RCLPY_ADD_ENTITY(rcl_timer_t, rcl_wait_set_add_timer, self->pytmrs); + RCLPY_ADD_ENTITY(rcl_client_t, rcl_wait_set_add_client, self->pyclis); + RCLPY_ADD_ENTITY(rcl_service_t, rcl_wait_set_add_service, self->pysrvs); +#undef RCLPY_ADD_ENTITY + + return RCL_RET_OK; +} + +/// Fill pyready with entities that are ready +/* + * \param[in] self an instance of _rclpy_wait_set.WaitSet + * \param[in] pyset the set of entities to check + * \param[in] type the name of the PyCapsule + */ +static inline rcl_ret_t +_rclpy_build_ready_entities(rclpy_wait_set_t * self) +{ + // Clear self->pyready + if (-1 == PySequence_DelSlice(self->pyready, 0, PySequence_Length(self->pyready))) { + return RCL_RET_ERROR; + } + +#define GET_READY_ENTITIES(ETYPE, ELIST) \ + do { \ + PyObject * pyiter = PyObject_GetIter((ELIST)); \ + if (!pyiter) { \ + return RCL_RET_ERROR; \ + } \ + PyObject * pyentity; \ + while ((pyentity = PyIter_Next(pyiter))) { \ + rcl_ ## ETYPE ## _t * entity = \ + (rcl_ ## ETYPE ## _t *)PyCapsule_GetPointer(pyentity, "rcl_" #ETYPE "_t"); \ + if (!entity) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + size_t idx; \ + size_t idx_max; \ + idx_max = self->wait_set.size_of_ ## ETYPE ## s; \ + const rcl_ ## ETYPE ## _t ** struct_ptr = self->wait_set.ETYPE ## s; \ + for (idx = 0; idx < idx_max; ++idx) { \ + if (struct_ptr[idx] == entity) { \ + if (-1 == PyList_Append(self->pyready, pyentity)) { \ + Py_DECREF(pyentity); \ + Py_DECREF(pyiter); \ + return RCL_RET_ERROR; \ + } \ + } \ + } \ + Py_DECREF(pyentity); \ + } \ + Py_DECREF(pyiter); \ + } while (false) + + GET_READY_ENTITIES(subscription, self->pysubs); + GET_READY_ENTITIES(guard_condition, self->pygcs); + GET_READY_ENTITIES(timer, self->pytmrs); + GET_READY_ENTITIES(client, self->pyclis); + GET_READY_ENTITIES(service, self->pysrvs); +#undef GET_READY_ENTITIES + return RCL_RET_OK; +} + +/// Wait until timeout is reached or an event happened +/** + * Raises RuntimeError if there was an error while waiting + * + * This function will wait for an event to happen or for the timeout to expire. + * A negative timeout means wait forever, a timeout of 0 means no wait + * \param[in] timeout optional time to wait before waking up (in nanoseconds) + * \return None + */ +static PyObject * +rclpy_wait_set_wait(rclpy_wait_set_t * self, PyObject * args) +{ + PY_LONG_LONG timeout = -1; + + if (!PyArg_ParseTuple(args, "|K", &timeout)) { + return NULL; + } + + if (RCL_RET_OK != _rclpy_build_wait_set(self)) { + // exception set + return NULL; + } + +#ifdef _WIN32 + _crt_signal_t +#else + sig_t +#endif // _WIN32 + previous_handler = signal(SIGINT, rclpy_catch_function); + rcl_ret_t ret; + + // Reference could be invalidated by arbitrary python code running while GIL is released + Py_INCREF(self); + Py_BEGIN_ALLOW_THREADS; + ret = rcl_wait(&(self->wait_set), timeout); + Py_END_ALLOW_THREADS; + Py_DECREF(self); + + signal(SIGINT, previous_handler); + if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + PyErr_Format(PyExc_RuntimeError, + "Failed to wait on wait set: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + return NULL; + } + + if (RCL_RET_OK != _rclpy_build_ready_entities(self)) { + // exception set + return NULL; + } + + Py_RETURN_NONE; +} + +/// Remove all stored entities from the wait set +/** + * Raises RuntimeError if there was an error while clearing + * + * \return None + */ +static PyObject * +rclpy_wait_set_clear(rclpy_wait_set_t * self) +{ + if (-1 == PySequence_DelSlice(self->pysubs, 0, PySequence_Length(self->pysubs)) || + -1 == PySequence_DelSlice(self->pytmrs, 0, PySequence_Length(self->pytmrs)) || + -1 == PySequence_DelSlice(self->pygcs, 0, PySequence_Length(self->pygcs)) || + -1 == PySequence_DelSlice(self->pyclis, 0, PySequence_Length(self->pyclis)) || + -1 == PySequence_DelSlice(self->pysrvs, 0, PySequence_Length(self->pysrvs))) + { + return NULL; + } + + Py_RETURN_NONE; +} + +/// Return True if an entity is ready +/** + * Raises RuntimeError if there was an error while clearing + * Raises ValueError if the capsule is invalid + * + * \return True if the entity is ready + */ +static PyObject * +rclpy_wait_set_is_ready(rclpy_wait_set_t * self, PyObject * args) +{ + PyObject * pyentity; + if (!PyArg_ParseTuple(args, "O", &pyentity)) { + return NULL; + } + + pyentity = _rclpy_to_pycapsule(pyentity); + + int contains = PySequence_Contains(self->pyready, pyentity); + if (-1 == contains) { + // Exception set + return NULL; + } + if (contains) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + +static PyMemberDef rclpy_wait_set_members[] = { + {"_pysubs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pysubs), 0, + "subscription capsules"}, + {"_pygcs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pygcs), 0, + "guard_condition capsules"}, + {"_pytmrs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pytmrs), 0, + "timer capsules"}, + {"_pyclis", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pyclis), 0, + "client capsules"}, + {"_pysrvs", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pysrvs), 0, + "service capsules"}, + {"_pyready", T_OBJECT_EX, offsetof(rclpy_wait_set_t, pyready), 0, + "ready capsules"}, + {NULL, 0, 0, 0, NULL} /* Sentinel */ +}; + +/// Define methods of _rclpy_wait_set.WaitSet +static PyMethodDef rclpy_wait_set_methods[] = { + {"add_subscriptions", (PyCFunction)rclpy_wait_set_add_subscriptions, METH_VARARGS, + "Add a bunch of subscription instances or handles to the wait_set."}, + {"add_guard_conditions", (PyCFunction)rclpy_wait_set_add_guard_conditions, METH_VARARGS, + "Add a bunch of guard_condition instances or handles to the wait_set."}, + {"add_timers", (PyCFunction)rclpy_wait_set_add_timers, METH_VARARGS, + "Add a bunch of timer instances or handles to the wait_set."}, + {"add_clients", (PyCFunction)rclpy_wait_set_add_clients, METH_VARARGS, + "Add a bunch of client instances or handles to the wait_set."}, + {"add_services", (PyCFunction)rclpy_wait_set_add_services, METH_VARARGS, + "Add a bunch of service instances or handles to the wait_set."}, + {"wait", (PyCFunction)rclpy_wait_set_wait, METH_VARARGS, + "Wait until timeout is reached or an event happened."}, + {"clear", (PyCFunction)rclpy_wait_set_clear, METH_NOARGS, + "Remove all entities from the wait set."}, + {"is_ready", (PyCFunction)rclpy_wait_set_is_ready, METH_VARARGS, + "Return True if an entity is ready."}, + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + + +// Partially initializing is recommended in the python docs +// I don't see a way to complete the initializer without duplicating #ifdef in cpython header +/// Python type _rclpy_wait_set.WaitSet +#if defined(__clang__) +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wmissing-field-initializers" +#elif defined(__GNUC__) || defined(__GNUG__) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wmissing-field-initializers" +#endif +static PyTypeObject rclpy_wait_set_type_t = { + PyVarObject_HEAD_INIT(NULL, 0) + "_rclpy_wait_set.WaitSet", /* tp_name */ + sizeof(rclpy_wait_set_t), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)rclpy_wait_set_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_reserved */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "Interface to a wait set.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + rclpy_wait_set_methods, /* tp_methods */ + rclpy_wait_set_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)rclpy_wait_set_init, /* tp_init */ + 0, /* tp_alloc */ + rclpy_wait_set_new, /* tp_new */ +}; +#if defined(__clang__) +# pragma clang diagnostic pop +#elif defined(__GNUC__) || defined(__GNUG__) +# pragma GCC diagnostic pop +#endif + +static PyModuleDef wait_set_module = { + PyModuleDef_HEAD_INIT, + "_rclpy_wait_set", + "Extention module for a wait set class.", + -1, + NULL, NULL, NULL, NULL, NULL +}; + +PyMODINIT_FUNC +PyInit__rclpy_wait_set(void) +{ + PyObject * m; + + rclpy_wait_set_type_t.tp_new = PyType_GenericNew; + if (PyType_Ready(&rclpy_wait_set_type_t) < 0) { + return NULL; + } + + m = PyModule_Create(&wait_set_module); + if (m == NULL) { + return NULL; + } + + Py_INCREF(&rclpy_wait_set_type_t); + PyModule_AddObject(m, "WaitSet", (PyObject *)&rclpy_wait_set_type_t); + return m; +} diff --git a/rclpy/src/rclpy/sigint_gc.c b/rclpy/src/rclpy/sigint_gc.c new file mode 100644 index 000000000..a71fa7f58 --- /dev/null +++ b/rclpy/src/rclpy/sigint_gc.c @@ -0,0 +1,32 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/rclpy/sigint_gc.h" + +#include + +#include + +rcl_guard_condition_t * g_rclpy_sigint_gc_handle; + +/// Catch signals +void rclpy_catch_function(int signo) +{ + (void) signo; + rcl_ret_t ret = rcl_trigger_guard_condition(g_rclpy_sigint_gc_handle); + if (ret != RCL_RET_OK) { + fprintf(stderr, "Failed to trigger guard_condition: %s", rcl_get_error_string_safe()); + rcl_reset_error(); + } +} diff --git a/rclpy/src/rclpy/sigint_gc.h b/rclpy/src/rclpy/sigint_gc.h new file mode 100644 index 000000000..e4765de4a --- /dev/null +++ b/rclpy/src/rclpy/sigint_gc.h @@ -0,0 +1,29 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RCLPY__SIGINT_GC_H_ +#define RCLPY__SIGINT_GC_H_ + +#include + +#include + +#include "src/rclpy/sigint_gc_visibility.h" + +/// Global variable with guard condition for sigint handler +extern RCLPY_SIGINT_PUBLIC rcl_guard_condition_t * g_rclpy_sigint_gc_handle; + +/// Function that can be used as a sigint handler +RCLPY_SIGINT_PUBLIC void rclpy_catch_function(int signo); + +#endif // RCLPY__SIGINT_GC_H_ diff --git a/rclpy/src/rclpy/sigint_gc_visibility.h b/rclpy/src/rclpy/sigint_gc_visibility.h new file mode 100644 index 000000000..0b7d0a5b5 --- /dev/null +++ b/rclpy/src/rclpy/sigint_gc_visibility.h @@ -0,0 +1,48 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RCLPY__SIGINT_GC_VISIBILITY_H_ +#define RCLPY__SIGINT_GC_VISIBILITY_H_ + +// This logic was borrowed (then namespaced) from the examples on the gcc wiki: +// https://gcc.gnu.org/wiki/Visibility + +#if defined _WIN32 || defined __CYGWIN__ + #ifdef __GNUC__ + #define RCLPY_SIGINT_EXPORT __attribute__ ((dllexport)) + #define RCLPY_SIGINT_IMPORT __attribute__ ((dllimport)) + #else + #define RCLPY_SIGINT_EXPORT __declspec(dllexport) + #define RCLPY_SIGINT_IMPORT __declspec(dllimport) + #endif + #ifdef RCLPY_SIGINT_BUILDING_LIBRARY + #define RCLPY_SIGINT_PUBLIC RCLPY_SIGINT_EXPORT + #else + #define RCLPY_SIGINT_PUBLIC RCLPY_SIGINT_IMPORT + #endif + #define RCLPY_SIGINT_PUBLIC_TYPE RCLPY_SIGINT_PUBLIC + #define RCLPY_SIGINT_LOCAL +#else + #define RCLPY_SIGINT_EXPORT __attribute__ ((visibility("default"))) + #define RCLPY_SIGINT_IMPORT + #if __GNUC__ >= 4 + #define RCLPY_SIGINT_PUBLIC __attribute__ ((visibility("default"))) + #define RCLPY_SIGINT_LOCAL __attribute__ ((visibility("hidden"))) + #else + #define RCLPY_SIGINT_PUBLIC + #define RCLPY_SIGINT_LOCAL + #endif + #define RCLPY_SIGINT_PUBLIC_TYPE +#endif + +#endif // RCLPY__SIGINT_GC_VISIBILITY_H_ diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 55295e0eb..877477db4 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -20,18 +20,28 @@ from rclpy.executors import SingleThreadedExecutor +class AwaitableClass: + + def __await__(self): + yield + + class TestExecutor(unittest.TestCase): @classmethod def setUpClass(cls): rclpy.init() - cls.node = rclpy.create_node('TestExecutor', namespace='/rclpy') @classmethod def tearDownClass(cls): - cls.node.destroy_node() rclpy.shutdown() + def setUp(self): + self.node = rclpy.create_node('TestExecutor', namespace='/rclpy') + + def tearDown(self): + self.node.destroy_node() + def func_execution(self, executor): got_callback = False @@ -47,36 +57,65 @@ def timer_callback(): self.node.destroy_timer(tmr) return got_callback - def test_single_threaded_executor_executes(self): +# def test_single_threaded_executor_executes(self): +# self.assertIsNotNone(self.node.handle) +# executor = SingleThreadedExecutor() +# try: +# self.assertTrue(self.func_execution(executor)) +# finally: +# executor.shutdown() +# +# def test_multi_threaded_executor_executes(self): +# self.assertIsNotNone(self.node.handle) +# executor = MultiThreadedExecutor() +# try: +# self.assertTrue(self.func_execution(executor)) +# finally: +# executor.shutdown() +# +# def test_add_node_to_executor(self): +# self.assertIsNotNone(self.node.handle) +# executor = SingleThreadedExecutor() +# executor.add_node(self.node) +# self.assertIn(self.node, executor.get_nodes()) +# +# def test_executor_spin_non_blocking(self): +# self.assertIsNotNone(self.node.handle) +# executor = SingleThreadedExecutor() +# executor.add_node(self.node) +# start = time.monotonic() +# executor.spin_once(timeout_sec=0) +# end = time.monotonic() +# self.assertLess(start - end, 0.001) + + def test_executor_coroutine(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor() - try: - self.assertTrue(self.func_execution(executor)) - finally: - executor.shutdown() + executor.add_node(self.node) - def test_multi_threaded_executor_executes(self): - self.assertIsNotNone(self.node.handle) - executor = MultiThreadedExecutor() - try: - self.assertTrue(self.func_execution(executor)) - finally: - executor.shutdown() + called1 = False + called2 = False - def test_add_node_to_executor(self): - self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor() - executor.add_node(self.node) - self.assertIn(self.node, executor.get_nodes()) + async def timer_coroutine(): + print('Timer coroutine') + nonlocal called1 + nonlocal called2 + called1 = True + await AwaitableClass() + called2 = True - def test_executor_spin_non_blocking(self): - self.assertIsNotNone(self.node.handle) - executor = SingleThreadedExecutor() - executor.add_node(self.node) - start = time.monotonic() - executor.spin_once(timeout_sec=0) - end = time.monotonic() - self.assertLess(start - end, 0.001) + tmr = self.node.create_timer(0.1, timer_coroutine) + try: + executor.spin_once(timeout_sec=1.23) + self.assertTrue(called1) + self.assertFalse(called2) + + called1 = False + executor.spin_once(timeout_sec=0) + self.assertFalse(called1) + self.assertTrue(called2) + finally: + self.node.destroy_timer(tmr) if __name__ == '__main__': diff --git a/rclpy/test/test_task.py b/rclpy/test/test_task.py new file mode 100644 index 000000000..6670ef2c0 --- /dev/null +++ b/rclpy/test/test_task.py @@ -0,0 +1,120 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from rclpy.future import Present +from rclpy.future import Task + + +class AwaitableClass: + + def __await__(self): + yield + + +class TestTask(unittest.TestCase): + + def test_task_normal_callable(self): + called = False + + def func(): + nonlocal called + called = True + + t = Task(func) + t() + self.assertTrue(called) + self.assertTrue(t.done()) + + def test_task_lambda(self): + called = False + + def func(): + nonlocal called + called = True + + t = Task(lambda: func()) + t() + self.assertTrue(called) + self.assertTrue(t.done()) + + def test_coroutine(self): + called1 = False + called2 = False + + async def coro(): + nonlocal called1 + nonlocal called2 + called1 = True + await AwaitableClass() + called2 = True + + t = Task(coro) + t() + self.assertTrue(called1) + self.assertFalse(called2) + + called1 = False + t() + self.assertFalse(called1) + self.assertTrue(called2) + + def test_done_callback(self): + called = False + + def func(): + nonlocal called + called = True + + t = Task(lambda: None) + t.add_done_callback(func) + t() + self.assertTrue(called) + self.assertTrue(t.done()) + + def test_done_callback_is_coroutine(self): + called1 = False + called2 = False + + async def coro(): + nonlocal called1 + nonlocal called2 + called1 = True + await AwaitableClass() + called2 = True + + t = Task(lambda: None) + t.add_done_callback(coro) + t() + self.assertTrue(called1) + self.assertFalse(called2) + + called1 = False + t() + self.assertFalse(called1) + self.assertTrue(called2) + + +class TestPresent(unittest.TestCase): + + def test_immediately_has_result(self): + uut = Present(result=5) + self.assertTrue(uut.done()) + self.assertFalse(uut.is_cancelled()) + self.assertEqual(5, uut.result()) + + +if __name__ == '__main__': + unittest.main() diff --git a/rclpy/test/test_timer.py b/rclpy/test/test_timer.py index 37c63390b..2431fe3ae 100644 --- a/rclpy/test/test_timer.py +++ b/rclpy/test/test_timer.py @@ -193,3 +193,8 @@ def test_timer_cancel_reset_1000hertz(): raise SkipTest func_launch( func_cancel_reset_timer, ['0.001'], "didn't receive the expected number of callbacks") + + +if __name__ == '__main__': + import cProfile + cProfile.run('func_number_callbacks(["0.001"])', filename='timer_test.prof') diff --git a/rclpy/test/test_utilities.py b/rclpy/test/test_utilities.py new file mode 100644 index 000000000..82cb9fc57 --- /dev/null +++ b/rclpy/test/test_utilities.py @@ -0,0 +1,31 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from rclpy.constants import S_TO_NS +import rclpy.utilities + + +class TestUtilities(unittest.TestCase): + + def test_timeout_sec_to_nsec(self): + self.assertGreater(0, rclpy.utilities.timeout_sec_to_nsec(None)) + self.assertGreater(0, rclpy.utilities.timeout_sec_to_nsec(-1)) + self.assertEqual(0, rclpy.utilities.timeout_sec_to_nsec(0)) + self.assertEqual(int(1.5 * S_TO_NS), rclpy.utilities.timeout_sec_to_nsec(1.5)) + + +if __name__ == '__main__': + unittest.main() diff --git a/rclpy/test/test_wait_set.py b/rclpy/test/test_wait_set.py new file mode 100644 index 000000000..05891a0da --- /dev/null +++ b/rclpy/test/test_wait_set.py @@ -0,0 +1,136 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest + +from rcl_interfaces.srv import GetParameters +import rclpy +from rclpy.constants import S_TO_NS +from rclpy.guard_condition import GuardCondition +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set +from rclpy.timer import WallTimer +from std_msgs.msg import String + + +class TestWaitSet(unittest.TestCase): + + @classmethod + def setUpClass(cls): + rclpy.init() + cls.node = rclpy.create_node('TestWaitSet') + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown() + + def test_guard_condition_ready(self): + gc = GuardCondition(None, None) + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_guard_conditions([gc]) + self.assertFalse(ws.is_ready(gc)) + + ws.wait(0) + self.assertFalse(ws.is_ready(gc)) + + gc.trigger() + ws.wait(0) + self.assertTrue(ws.is_ready(gc)) + + ws.wait(0) + self.assertFalse(ws.is_ready(gc)) + finally: + _rclpy.rclpy_destroy_entity(gc.guard_handle) + + def test_timer_ready(self): + timer = WallTimer(None, None, int(0.1 * S_TO_NS)) + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_timers([timer]) + self.assertFalse(ws.is_ready(timer)) + + ws.wait(0) + self.assertFalse(ws.is_ready(timer)) + + ws.wait(int(0.1 * S_TO_NS)) + self.assertTrue(ws.is_ready(timer)) + + _rclpy.rclpy_call_timer(timer.timer_handle) + ws.wait(0) + self.assertFalse(ws.is_ready(timer)) + finally: + _rclpy.rclpy_destroy_entity(timer.timer_handle) + + def test_subscriber_ready(self): + sub = self.node.create_subscription(String, 'chatter', lambda msg: print(msg)) + pub = self.node.create_publisher(String, 'chatter') + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_subscriptions([sub]) + self.assertFalse(ws.is_ready(sub)) + + ws.wait(0) + self.assertFalse(ws.is_ready(sub)) + + msg = String() + msg.data = 'Hello World' + pub.publish(msg) + + ws.wait(5 * S_TO_NS) + self.assertTrue(ws.is_ready(sub)) + + _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type) + ws.wait(0) + self.assertFalse(ws.is_ready(sub)) + finally: + self.node.destroy_publisher(pub) + self.node.destroy_subscription(sub) + + def test_server_ready(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + srv = self.node.create_service( + GetParameters, 'get/parameters', lambda req: GetParameters.Response()) + + try: + ws = _rclpy_wait_set.WaitSet() + ws.add_clients([cli]) + ws.add_services([srv]) + self.assertFalse(ws.is_ready(cli)) + self.assertFalse(ws.is_ready(srv)) + + ws.wait(0) + self.assertFalse(ws.is_ready(cli)) + self.assertFalse(ws.is_ready(srv)) + + # TODO(Sloretz) replace with wait_for_service() + time.sleep(5) + cli.call(GetParameters.Request()) + + ws.wait(5 * S_TO_NS) + # TODO(sloretz) test client after it's wait_for_future() API is sorted out + self.assertTrue(ws.is_ready(srv)) + + _rclpy.rclpy_take_request(srv.service_handle, srv.srv_type.Request) + ws.wait(0) + self.assertFalse(ws.is_ready(srv)) + finally: + self.node.destroy_client(cli) + self.node.destroy_service(srv) + + +if __name__ == '__main__': + unittest.main()