Skip to content

Commit 3c6f13d

Browse files
jmblixt3ahcorde
andcommitted
Fixes Action.*_async futures never complete (ros2#1308)
Per rclpy:1123 If two seperate client server actions are running in seperate executors the future given to the ActionClient will never complete due to a race condition This fixes the calls to rcl handles potentially leading to deadlock scenarios by adding locks to there references Co-authored-by: Aditya Agarwal <[email protected]> Co-authored-by: Jonathan Blixt <[email protected]> Signed-off-by: Jonathan Blixt <[email protected]> Co-authored-by: Alejandro Hernandez Cordero <[email protected]>
1 parent f14a4ab commit 3c6f13d

File tree

2 files changed

+79
-56
lines changed

2 files changed

+79
-56
lines changed

rclpy/rclpy/action/client.py

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,9 @@ def __init__(
243243
self._node.add_waitable(self)
244244
self._logger = self._node.get_logger().get_child('action_client')
245245

246-
def _generate_random_uuid(self) -> UUID:
246+
self._lock = threading.Lock()
247+
248+
def _generate_random_uuid(self):
247249
return UUID(uuid=list(uuid.uuid4().bytes))
248250

249251
def _remove_pending_request(self, future: Future[T], pending_requests: Dict[int, Future[T]]
@@ -306,39 +308,44 @@ def take_data(self) -> 'ClientGoalHandleDict[ResultT, FeedbackT]':
306308
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
307309
data: 'ClientGoalHandleDict[ResultT, FeedbackT]' = {}
308310
if self._is_goal_response_ready:
309-
taken_goal_data = self._client_handle.take_goal_response(
310-
self._action_type.Impl.SendGoalService.Response)
311-
# If take fails, then we get (None, None)
312-
if taken_goal_data[0]:
313-
data['goal'] = taken_goal_data
311+
with self._lock:
312+
taken_goal_data = self._client_handle.take_goal_response(
313+
self._action_type.Impl.SendGoalService.Response)
314+
# If take fails, then we get (None, None)
315+
if taken_goal_data[0]:
316+
data['goal'] = taken_goal_data
314317

315318
if self._is_cancel_response_ready:
316-
taken_cancel_data = self._client_handle.take_cancel_response(
317-
self._action_type.Impl.CancelGoalService.Response)
318-
# If take fails, then we get (None, None)
319-
if taken_cancel_data[0]:
320-
data['cancel'] = taken_cancel_data
319+
with self._lock:
320+
taken_cancel_data = self._client_handle.take_cancel_response(
321+
self._action_type.Impl.CancelGoalService.Response)
322+
# If take fails, then we get (None, None)
323+
if taken_cancel_data[0]:
324+
data['cancel'] = taken_cancel_data
321325

322326
if self._is_result_response_ready:
323-
taken_result_data = self._client_handle.take_result_response(
324-
self._action_type.Impl.GetResultService.Response)
325-
# If take fails, then we get (None, None)
326-
if taken_result_data[0]:
327-
data['result'] = taken_result_data
327+
with self._lock:
328+
taken_result_data = self._client_handle.take_result_response(
329+
self._action_type.Impl.GetResultService.Response)
330+
# If take fails, then we get (None, None)
331+
if taken_result_data[0]:
332+
data['result'] = taken_result_data
328333

329334
if self._is_feedback_ready:
330-
taken_feedback_data = self._client_handle.take_feedback(
331-
self._action_type.Impl.FeedbackMessage)
332-
# If take fails, then we get None
333-
if taken_feedback_data is not None:
334-
data['feedback'] = taken_feedback_data
335+
with self._lock:
336+
taken_feedback_data = self._client_handle.take_feedback(
337+
self._action_type.Impl.FeedbackMessage)
338+
# If take fails, then we get None
339+
if taken_feedback_data is not None:
340+
data['feedback'] = taken_feedback_data
335341

336342
if self._is_status_ready:
337-
taken_status_data = self._client_handle.take_status(
338-
self._action_type.Impl.GoalStatusMessage)
339-
# If take fails, then we get None
340-
if taken_status_data is not None:
341-
data['status'] = taken_status_data
343+
with self._lock:
344+
taken_status_data = self._client_handle.take_status(
345+
self._action_type.Impl.GoalStatusMessage)
346+
# If take fails, then we get None
347+
if taken_status_data is not None:
348+
data['status'] = taken_status_data
342349

343350
return data
344351

@@ -419,12 +426,14 @@ async def execute(self, taken_data: 'ClientGoalHandleDict[ResultT, FeedbackT]')
419426

420427
def get_num_entities(self) -> NumberOfEntities:
421428
"""Return number of each type of entity used in the wait set."""
422-
num_entities = self._client_handle.get_num_entities()
429+
with self._lock:
430+
num_entities = self._client_handle.get_num_entities()
423431
return NumberOfEntities(*num_entities)
424432

425433
def add_to_wait_set(self, wait_set: _rclpy.WaitSet) -> None:
426434
"""Add entities to wait set."""
427-
self._client_handle.add_to_waitset(wait_set)
435+
with self._lock:
436+
self._client_handle.add_to_waitset(wait_set)
428437

429438
def __enter__(self) -> None:
430439
self._client_handle.__enter__()
@@ -515,10 +524,17 @@ def send_goal_async(
515524
request = self._action_type.Impl.SendGoalService.Request()
516525
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
517526
request.goal = goal
518-
sequence_number = self._client_handle.send_goal_request(request)
519-
if sequence_number in self._pending_goal_requests:
520-
raise RuntimeError(
521-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
527+
future = Future()
528+
with self._lock:
529+
sequence_number = self._client_handle.send_goal_request(request)
530+
if sequence_number in self._pending_goal_requests:
531+
raise RuntimeError(
532+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
533+
self._pending_goal_requests[sequence_number] = future
534+
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
535+
future.add_done_callback(self._remove_pending_goal_request)
536+
# Add future so executor is aware
537+
self.add_future(future)
522538

523539
if feedback_callback is not None:
524540
# TODO(jacobperron): Move conversion function to a general-use package
@@ -578,16 +594,17 @@ def _cancel_goal_async(
578594

579595
cancel_request = CancelGoal.Request()
580596
cancel_request.goal_info.goal_id = goal_handle.goal_id
581-
sequence_number = self._client_handle.send_cancel_request(cancel_request)
582-
if sequence_number in self._pending_cancel_requests:
583-
raise RuntimeError(
584-
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
585-
586597
future: Future[CancelGoal.Response] = Future()
587-
self._pending_cancel_requests[sequence_number] = future
588-
future.add_done_callback(self._remove_pending_cancel_request)
589-
# Add future so executor is aware
590-
self.add_future(future)
598+
with self._lock:
599+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
600+
if sequence_number in self._pending_cancel_requests:
601+
raise RuntimeError(
602+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
603+
604+
self._pending_cancel_requests[sequence_number] = future
605+
future.add_done_callback(self._remove_pending_cancel_request)
606+
# Add future so executor is aware
607+
self.add_future(future)
591608

592609
return future
593610

@@ -633,17 +650,18 @@ def _get_result_async(self, goal_handle: ClientGoalHandle[GoalT, ResultT, Feedba
633650

634651
result_request = self._action_type.Impl.GetResultService.Request()
635652
result_request.goal_id = goal_handle.goal_id
636-
sequence_number = self._client_handle.send_result_request(result_request)
637-
if sequence_number in self._pending_result_requests:
638-
raise RuntimeError(
639-
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
640-
641653
future: Future[GetResultServiceResponse[ResultT]] = Future()
642-
self._pending_result_requests[sequence_number] = future
643-
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
644-
future.add_done_callback(self._remove_pending_result_request)
645-
# Add future so executor is aware
646-
self.add_future(future)
654+
with self._lock:
655+
sequence_number = self._client_handle.send_result_request(result_request)
656+
if sequence_number in self._pending_result_requests:
657+
raise RuntimeError(
658+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
659+
660+
self._pending_result_requests[sequence_number] = future
661+
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
662+
future.add_done_callback(self._remove_pending_result_request)
663+
# Add future so executor is aware
664+
self.add_future(future)
647665

648666
return future
649667

rclpy/rclpy/action/server.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,8 @@ async def _execute_goal_request(
365365
try:
366366
# If the client goes away anytime before this, sending the goal response may fail.
367367
# Catch the exception here and go on so we don't crash.
368-
self._handle.send_goal_response(request_header, response_msg)
368+
with self._lock:
369+
self._handle.send_goal_response(request_header, response_msg)
369370
except RCLError:
370371
self._logger.warn('Failed to send goal response (the client may have gone away)')
371372
return
@@ -455,7 +456,8 @@ async def _execute_cancel_request(
455456
try:
456457
# If the client goes away anytime before this, sending the goal response may fail.
457458
# Catch the exception here and go on so we don't crash.
458-
self._handle.send_cancel_response(request_header, cancel_response)
459+
with self._lock:
460+
self._handle.send_cancel_response(request_header, cancel_response)
459461
except RCLError:
460462
self._logger.warn('Failed to send cancel response (the client may have gone away)')
461463

@@ -476,7 +478,8 @@ async def _execute_get_result_request(
476478
'Sending result response for unknown or expired goal ID: {0}'.format(goal_uuid))
477479
result_response = self._action_type.Impl.GetResultService.Response()
478480
result_response.status = GoalStatus.STATUS_UNKNOWN
479-
self._handle.send_result_response(request_header, result_response)
481+
with self._lock:
482+
self._handle.send_result_response(request_header, result_response)
480483
return
481484

482485
# There is an accepted goal matching the goal ID, register a callback to send the
@@ -502,7 +505,8 @@ def _send_result_response(
502505
# Catch the exception here and go on so we don't crash.
503506
result = future.result()
504507
if result:
505-
self._handle.send_result_response(request_header, result)
508+
with self._lock:
509+
self._handle.send_result_response(request_header, result)
506510
except RCLError:
507511
self._logger.warning('Failed to send result response (the client may have gone away)')
508512

@@ -578,7 +582,8 @@ async def execute(self, taken_data: 'ServerGoalHandleDict[GoalT]') -> None:
578582

579583
def get_num_entities(self) -> NumberOfEntities:
580584
"""Return number of each type of entity used in the wait set."""
581-
num_entities = self._handle.get_num_entities()
585+
with self._lock:
586+
num_entities = self._handle.get_num_entities()
582587
return NumberOfEntities(
583588
num_entities[0],
584589
num_entities[1],

0 commit comments

Comments
 (0)