Skip to content
Closed
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
372c1a9
Dirty: First proof of concept for wait_for_service implementation
sloretz Oct 10, 2017
817990a
Moved wait set code to its own class for code reuse
sloretz Oct 13, 2017
9b91a62
Added timeout_sec_to_nsec()
sloretz Oct 13, 2017
371c917
wait_for_service() implemented with timers
sloretz Oct 13, 2017
b17f497
Added unit tests for timeout_sec_to_nsec()
sloretz Oct 13, 2017
35898a6
Added test for WaitSet class
sloretz Oct 13, 2017
5b1ca62
Use negative timeouts to mean block forever
sloretz Oct 13, 2017
489bc0e
Double quotes to single quotes
sloretz Oct 13, 2017
f854466
Added wait_for_service() tests and fixed bugs it caught
sloretz Oct 13, 2017
9255e1f
Eliminate blind exception warning
sloretz Oct 13, 2017
f89a222
Reduce flakiness of test by increasing time to 0.1s
sloretz Oct 13, 2017
39ac231
Comment says negative timeouts block forever
sloretz Oct 17, 2017
3237ed8
Use :returns:
sloretz Oct 17, 2017
7acb2a6
Move add_subscriptions()
sloretz Oct 17, 2017
4720d3f
arugments -> arguments
sloretz Oct 17, 2017
395934a
Daemon as keyword arg
sloretz Oct 17, 2017
1c9a098
Remove unnecessary namespace argument
sloretz Oct 17, 2017
655425f
Use S_TO_NS in test
sloretz Oct 17, 2017
ccd93d8
More tests using S_TO_NS
sloretz Oct 17, 2017
441bbb5
Use monotonic clock for testing timer time
sloretz Oct 17, 2017
8733c14
Increased test timeout by 30 seconds
sloretz Oct 17, 2017
3f26246
CheckExact -> IsValid
sloretz Oct 17, 2017
0a79f28
Fixed wait_set not clearing ready_pointers
sloretz Oct 17, 2017
65c545f
Remove unnecessary namespace keyword arg
sloretz Oct 18, 2017
d81684c
Non-blocking wait
sloretz Oct 18, 2017
4bbc323
remove expression that always evaluates to True
sloretz Oct 18, 2017
99280ae
Raise ValueError on invalid capsule
sloretz Oct 19, 2017
1d2beeb
Simplified timeout_sec_to_nsec
sloretz Oct 19, 2017
6980711
Added 'WaitSet.destroy()' and made executor use it
sloretz Oct 24, 2017
c69858f
GraphListener periodically checks if rclpy is shutdown
sloretz Oct 24, 2017
4cd2fb1
Misc fixes after pycapsule names
sloretz Oct 25, 2017
0cf2aeb
Wait set class always clears entities before waiting
sloretz Oct 31, 2017
7843c5c
Remove underscore on import
sloretz Oct 31, 2017
ded618f
Reformat timeout line
sloretz Oct 31, 2017
e8f7fd6
Use () when raising exceptions
sloretz Oct 31, 2017
149ad43
Removed _ on imports
sloretz Oct 31, 2017
dd86a13
Executor optimizations
sloretz Nov 1, 2017
6352e86
Fixed executor yielding entities to wrong node
sloretz Nov 1, 2017
0dbf8c7
Use list() only where necessary
sloretz Nov 1, 2017
1a2ce96
Docstring in imperitive mood
sloretz Nov 1, 2017
7ee5ead
Executors reuse iterator
sloretz Nov 1, 2017
3628db4
moved some wait_set code into C
sloretz Nov 2, 2017
b71d1c5
Avoid another list comprehension
sloretz Nov 2, 2017
9d9113d
Replaced WaitSet with C code in executor
sloretz Nov 3, 2017
adfddf6
Remove test code
sloretz Nov 3, 2017
315db4e
Use lists instead of set
sloretz Nov 8, 2017
e078c47
Use locally defined function instead of member function
sloretz Nov 8, 2017
5c92085
Shorten code using macro
sloretz Nov 8, 2017
70b8652
Move everything to new wait_set code
sloretz Nov 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if(BUILD_TESTING)
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}"
PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}"
APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
TIMEOUT 90
TIMEOUT 120
)
endif()
endif()
Expand Down
37 changes: 37 additions & 0 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import threading

from rclpy.graph_listener import GraphEventSubscription
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
import rclpy.utilities

Expand Down Expand Up @@ -77,3 +78,39 @@ def wait_for_future(self):
thread1 = ResponseThread(self)
thread1.start()
thread1.join()

def service_is_ready(self):
return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle)

def wait_for_service(self, timeout_sec=None):
"""
Block until the service is available.

:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool
:returns: true if the service is available
"""
timeout_nsec = rclpy.utilities.timeout_sec_to_nsec(timeout_sec)
result = self.service_is_ready()
if result or timeout_sec == 0:
return result

event = threading.Event()

def on_graph_event():
nonlocal self
nonlocal event
nonlocal result
result = self.service_is_ready()
if result:
event.set()

def on_timeout():
nonlocal event
event.set()

with GraphEventSubscription(self.node_handle, on_graph_event, timeout_nsec, on_timeout):
event.wait()

return result
207 changes: 87 additions & 120 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from threading import Condition as _Condition
from threading import Lock as _Lock
from threading import Condition
from threading import Lock

from rclpy.constants import S_TO_NS
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.timer import WallTimer as _WallTimer
from rclpy.timer import WallTimer
from rclpy.utilities import ok


class _WaitSet:
"""Make sure the wait set gets destroyed when a generator exits."""

def __enter__(self):
self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set()
return self.wait_set

def __exit__(self, t, v, tb):
_rclpy.rclpy_destroy_wait_set(self.wait_set)
from rclpy.utilities import timeout_sec_to_nsec
from rclpy.wait_set import WaitSet


class _WorkTracker:
Expand All @@ -40,7 +30,7 @@ class _WorkTracker:
def __init__(self):
# Number of tasks that are being executed
self._num_work_executing = 0
self._work_condition = _Condition()
self._work_condition = Condition()

def __enter__(self):
"""Increment the amount of executing work by 1."""
Expand All @@ -57,16 +47,18 @@ def wait(self, timeout_sec=None):
"""
Wait until all work completes.

:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool True if all work completed
:rtype: bool
:returns: True if all work completed
"""
if timeout_sec is not None and timeout_sec < 0:
timeout_sec = None
# Wait for all work to complete
if timeout_sec is None or timeout_sec >= 0:
with self._work_condition:
if not self._work_condition.wait_for(
lambda: self._num_work_executing == 0, timeout_sec):
return False
with self._work_condition:
if not self._work_condition.wait_for(
lambda: self._num_work_executing == 0, timeout_sec):
return False
return True


Expand All @@ -85,7 +77,7 @@ class Executor:
def __init__(self):
super().__init__()
self._nodes = set()
self._nodes_lock = _Lock()
self._nodes_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
gc, gc_handle = _rclpy.rclpy_create_guard_condition()
self._guard_condition = gc
Expand All @@ -100,7 +92,7 @@ def shutdown(self, timeout_sec=None):

Return true if all outstanding callbacks finished executing.

:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool
"""
Expand Down Expand Up @@ -152,11 +144,11 @@ def spin_once(self, timeout_sec=None):

A custom executor should use :func:`Executor.wait_for_ready_callbacks` to get work.

:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: None
"""
raise NotImplementedError
raise NotImplementedError()

def _take_timer(self, tmr):
_rclpy.rclpy_call_timer(tmr.timer_handle)
Expand Down Expand Up @@ -260,22 +252,16 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Yield callbacks that are ready to be performed.

:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:param nodes: A list of nodes to wait on. Wait on all nodes if None.
:type nodes: list or None
:rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)]
"""
timeout_timer = None
# Get timeout in nanoseconds. 0 = don't wait. < 0 means block forever
timeout_nsec = None
if timeout_sec is None:
timeout_nsec = -1
elif timeout_sec <= 0:
timeout_nsec = 0
else:
timeout_nsec = int(float(timeout_sec) * S_TO_NS)
timeout_timer = _WallTimer(None, None, timeout_nsec)
timeout_nsec = timeout_sec_to_nsec(timeout_sec)
if timeout_nsec > 0:
timeout_timer = WallTimer(None, None, timeout_nsec)

if nodes is None:
nodes = self.get_nodes()
Expand Down Expand Up @@ -304,94 +290,75 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
timers.append(timeout_timer)

# Construct a wait set
with _WaitSet() as wait_set:
_rclpy.rclpy_wait_set_init(
wait_set,
len(subscriptions),
len(guards) + 2,
len(timers),
len(clients),
len(services))

entities = {
'subscription': (subscriptions, 'subscription_handle'),
'guard_condition': (guards, 'guard_handle'),
'client': (clients, 'client_handle'),
'service': (services, 'service_handle'),
'timer': (timers, 'timer_handle'),
}
for entity, (handles, handle_name) in entities.items():
_rclpy.rclpy_wait_set_clear_entities(entity, wait_set)
for h in handles:
_rclpy.rclpy_wait_set_add_entity(
entity, wait_set, h.__getattribute__(handle_name)
)
_rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc)
_rclpy.rclpy_wait_set_add_entity(
'guard_condition', wait_set, self._guard_condition)
with WaitSet() as wait_set:
wait_set.add_subscriptions(subscriptions)
wait_set.add_clients(clients)
wait_set.add_services(services)
wait_set.add_timers(timers)
wait_set.add_guard_conditions(guards)
wait_set.add_guard_condition(sigint_gc, sigint_gc_handle)
wait_set.add_guard_condition(self._guard_condition, self._guard_condition_handle)

# Wait for something to become ready
_rclpy.rclpy_wait(wait_set, timeout_nsec)

# get ready entities
subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set)
guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set)
timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set)
clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set)
services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set)
wait_set.wait(timeout_nsec)

# Check sigint guard condition
if wait_set.is_ready(sigint_gc_handle):
raise KeyboardInterrupt()
_rclpy.rclpy_destroy_entity(sigint_gc)

# Mark all guards as triggered before yielding since they're auto-taken
for gc in [g for g in guards if wait_set.is_ready(g.guard_pointer)]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be changed in this PR since we do it "everywhere" in the code base like this.

I was just wondering if you should migrate to use something like this instead in the future:

for gc in filter(lambda g: wait_set.is_ready(g.guard_pointer), guards):

Just curious what others think...?

gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in timers if wait_set.is_ready(t.timer_pointer)]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this list comprehension only consider the timers of the current node?

Same for subscriptions, clients and services below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by 6352e86

# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr == timeout_timer:
continue
elif tmr.callback_group.can_execute(tmr):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this doesn't need to be an elif since the if block calls continue. if would be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is alright with you I would like to leave this for now since it gets fixed in the next PR #135

https://github.com/ros2/rclpy/pull/135/files#diff-521ddf9ec814122daa49f03bb2974340R245

handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in subscriptions if wait_set.is_ready(
s.subscription_pointer)]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

# Check sigint guard condition
if sigint_gc_handle in guards_ready:
raise KeyboardInterrupt
_rclpy.rclpy_destroy_entity(sigint_gc)
for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

# Mark all guards as triggered before yielding any handlers since they're auto-taken
for gc in [g for g in guards if g.guard_pointer in guards_ready]:
gc._executor_triggered = True
for client in [c for c in clients if wait_set.is_ready(c.client_pointer)]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in node.timers if t.timer_pointer in timers_ready]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
for srv in [s for s in services if wait_set.is_ready(s.service_pointer)]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in node.subscriptions if s.subscription_pointer in subs_ready]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in [c for c in node.clients if c.client_pointer in clients_ready]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in node.services if s.service_pointer in services_ready]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)):
break
yield handler, srv, node

# Check timeout timer
if (
timeout_nsec == 0 or
(timeout_timer is not None and wait_set.is_ready(timeout_timer.timer_pointer))
):
break


class SingleThreadedExecutor(Executor):
Expand Down Expand Up @@ -426,7 +393,7 @@ def __init__(self, num_threads=None):
num_threads = multiprocessing.cpu_count()
except NotImplementedError:
num_threads = 1
self._executor = _ThreadPoolExecutor(num_threads)
self._executor = ThreadPoolExecutor(num_threads)

def spin_once(self, timeout_sec=None):
try:
Expand Down
Loading