Skip to content

Commit 4c167a6

Browse files
committed
Fixes Action.*_async futures never complete (#1308)
Per rclpy:1123 If two seperate client server actions are running in seperate executors the future given to the ActionClient will never complete due to a race condition This fixes the calls to rcl handles potentially leading to deadlock scenarios by adding locks to there references Co-authored-by: Aditya Agarwal <[email protected]> Co-authored-by: Jonathan Blixt <[email protected]> Signed-off-by: Jonathan <[email protected]>
1 parent 8190ea0 commit 4c167a6

File tree

2 files changed

+80
-51
lines changed

2 files changed

+80
-51
lines changed

rclpy/rclpy/action/client.py

Lines changed: 68 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ def __init__(
181181
callback_group.add_entity(self)
182182
self._node.add_waitable(self)
183183

184+
self._lock = threading.Lock()
185+
184186
def _generate_random_uuid(self):
185187
return UUID(uuid=list(uuid.uuid4().bytes))
186188

@@ -240,39 +242,44 @@ def take_data(self):
240242
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
241243
data = {}
242244
if self._is_goal_response_ready:
243-
taken_data = self._client_handle.take_goal_response(
244-
self._action_type.Impl.SendGoalService.Response)
245-
# If take fails, then we get (None, None)
246-
if all(taken_data):
247-
data['goal'] = taken_data
245+
with self._lock:
246+
taken_data = self._client_handle.take_goal_response(
247+
self._action_type.Impl.SendGoalService.Response)
248+
# If take fails, then we get (None, None)
249+
if all(taken_data):
250+
data['goal'] = taken_data
248251

249252
if self._is_cancel_response_ready:
250-
taken_data = self._client_handle.take_cancel_response(
251-
self._action_type.Impl.CancelGoalService.Response)
252-
# If take fails, then we get (None, None)
253-
if all(taken_data):
254-
data['cancel'] = taken_data
253+
with self._lock:
254+
taken_data = self._client_handle.take_cancel_response(
255+
self._action_type.Impl.CancelGoalService.Response)
256+
# If take fails, then we get (None, None)
257+
if all(taken_data):
258+
data['cancel'] = taken_data
255259

256260
if self._is_result_response_ready:
257-
taken_data = self._client_handle.take_result_response(
258-
self._action_type.Impl.GetResultService.Response)
259-
# If take fails, then we get (None, None)
260-
if all(taken_data):
261-
data['result'] = taken_data
261+
with self._lock:
262+
taken_data = self._client_handle.take_result_response(
263+
self._action_type.Impl.GetResultService.Response)
264+
# If take fails, then we get (None, None)
265+
if all(taken_data):
266+
data['result'] = taken_data
262267

263268
if self._is_feedback_ready:
264-
taken_data = self._client_handle.take_feedback(
265-
self._action_type.Impl.FeedbackMessage)
266-
# If take fails, then we get None
267-
if taken_data is not None:
268-
data['feedback'] = taken_data
269+
with self._lock:
270+
taken_data = self._client_handle.take_feedback(
271+
self._action_type.Impl.FeedbackMessage)
272+
# If take fails, then we get None
273+
if taken_data is not None:
274+
data['feedback'] = taken_data
269275

270276
if self._is_status_ready:
271-
taken_data = self._client_handle.take_status(
272-
self._action_type.Impl.GoalStatusMessage)
273-
# If take fails, then we get None
274-
if taken_data is not None:
275-
data['status'] = taken_data
277+
with self._lock:
278+
taken_data = self._client_handle.take_status(
279+
self._action_type.Impl.GoalStatusMessage)
280+
# If take fails, then we get None
281+
if taken_data is not None:
282+
data['status'] = taken_data
276283

277284
return data
278285

@@ -353,12 +360,14 @@ async def execute(self, taken_data):
353360

354361
def get_num_entities(self):
355362
"""Return number of each type of entity used in the wait set."""
356-
num_entities = self._client_handle.get_num_entities()
363+
with self._lock:
364+
num_entities = self._client_handle.get_num_entities()
357365
return NumberOfEntities(*num_entities)
358366

359367
def add_to_wait_set(self, wait_set):
360368
"""Add entities to wait set."""
361-
self._client_handle.add_to_waitset(wait_set)
369+
with self._lock:
370+
self._client_handle.add_to_waitset(wait_set)
362371

363372
def __enter__(self):
364373
return self._client_handle.__enter__()
@@ -436,10 +445,17 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
436445
request = self._action_type.Impl.SendGoalService.Request()
437446
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
438447
request.goal = goal
439-
sequence_number = self._client_handle.send_goal_request(request)
440-
if sequence_number in self._pending_goal_requests:
441-
raise RuntimeError(
442-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
448+
future = Future()
449+
with self._lock:
450+
sequence_number = self._client_handle.send_goal_request(request)
451+
if sequence_number in self._pending_goal_requests:
452+
raise RuntimeError(
453+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
454+
self._pending_goal_requests[sequence_number] = future
455+
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
456+
future.add_done_callback(self._remove_pending_goal_request)
457+
# Add future so executor is aware
458+
self.add_future(future)
443459

444460
if feedback_callback is not None:
445461
# TODO(jacobperron): Move conversion function to a general-use package
@@ -494,12 +510,17 @@ def _cancel_goal_async(self, goal_handle):
494510

495511
cancel_request = CancelGoal.Request()
496512
cancel_request.goal_info.goal_id = goal_handle.goal_id
497-
sequence_number = self._client_handle.send_cancel_request(cancel_request)
498-
if sequence_number in self._pending_cancel_requests:
499-
raise RuntimeError(
500-
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
513+
future: Future[CancelGoal.Response] = Future()
514+
with self._lock:
515+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
516+
if sequence_number in self._pending_cancel_requests:
517+
raise RuntimeError(
518+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
501519

502-
future = Future()
520+
self._pending_cancel_requests[sequence_number] = future
521+
future.add_done_callback(self._remove_pending_cancel_request)
522+
# Add future so executor is aware
523+
self.add_future(future)
503524
self._pending_cancel_requests[sequence_number] = future
504525
future.add_done_callback(self._remove_pending_cancel_request)
505526
# Add future so executor is aware
@@ -546,17 +567,18 @@ def _get_result_async(self, goal_handle):
546567

547568
result_request = self._action_type.Impl.GetResultService.Request()
548569
result_request.goal_id = goal_handle.goal_id
549-
sequence_number = self._client_handle.send_result_request(result_request)
550-
if sequence_number in self._pending_result_requests:
551-
raise RuntimeError(
552-
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
553-
554570
future = Future()
555-
self._pending_result_requests[sequence_number] = future
556-
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
557-
future.add_done_callback(self._remove_pending_result_request)
558-
# Add future so executor is aware
559-
self.add_future(future)
571+
with self._lock:
572+
sequence_number = self._client_handle.send_result_request(result_request)
573+
if sequence_number in self._pending_result_requests:
574+
raise RuntimeError(
575+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
576+
577+
self._pending_result_requests[sequence_number] = future
578+
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
579+
future.add_done_callback(self._remove_pending_result_request)
580+
# Add future so executor is aware
581+
self.add_future(future)
560582

561583
return future
562584

rclpy/rclpy/action/server.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ async def _execute_goal_request(self, request_header_and_message):
309309
try:
310310
# If the client goes away anytime before this, sending the goal response may fail.
311311
# Catch the exception here and go on so we don't crash.
312-
self._handle.send_goal_response(request_header, response_msg)
312+
with self._lock:
313+
self._handle.send_goal_response(request_header, response_msg)
313314
except RCLError:
314315
self._node.get_logger().warn(
315316
'Failed to send goal response (the client may have gone away)')
@@ -393,7 +394,8 @@ async def _execute_cancel_request(self, request_header_and_message):
393394
try:
394395
# If the client goes away anytime before this, sending the goal response may fail.
395396
# Catch the exception here and go on so we don't crash.
396-
self._handle.send_cancel_response(request_header, cancel_response)
397+
with self._lock:
398+
self._handle.send_cancel_response(request_header, cancel_response)
397399
except RCLError:
398400
self._node.get_logger().warn(
399401
'Failed to send cancel response (the client may have gone away)')
@@ -411,7 +413,8 @@ async def _execute_get_result_request(self, request_header_and_message):
411413
'Sending result response for unknown goal ID: {0}'.format(goal_uuid))
412414
result_response = self._action_type.Impl.GetResultService.Response()
413415
result_response.status = GoalStatus.STATUS_UNKNOWN
414-
self._handle.send_result_response(request_header, result_response)
416+
with self._lock:
417+
self._handle.send_result_response(request_header, result_response)
415418
return
416419

417420
# There is an accepted goal matching the goal ID, register a callback to send the
@@ -431,7 +434,10 @@ def _send_result_response(self, request_header, future):
431434
try:
432435
# If the client goes away anytime before this, sending the result response may fail.
433436
# Catch the exception here and go on so we don't crash.
434-
self._handle.send_result_response(request_header, future.result())
437+
result = future.result()
438+
if result:
439+
with self._lock:
440+
self._handle.send_result_response(request_header, result)
435441
except RCLError:
436442
self._node.get_logger().warn(
437443
'Failed to send result response (the client may have gone away)')
@@ -508,7 +514,8 @@ async def execute(self, taken_data):
508514

509515
def get_num_entities(self):
510516
"""Return number of each type of entity used in the wait set."""
511-
num_entities = self._handle.get_num_entities()
517+
with self._lock:
518+
num_entities = self._handle.get_num_entities()
512519
return NumberOfEntities(
513520
num_entities[0],
514521
num_entities[1],

0 commit comments

Comments
 (0)