Skip to content

Commit 86411bc

Browse files
committed
Fixes Action.*_async futures never complete (#1308)
Per rclpy:1123 If two seperate client server actions are running in seperate executors the future given to the ActionClient will never complete due to a race condition This fixes the calls to rcl handles potentially leading to deadlock scenarios by adding locks to there references Co-authored-by: Aditya Agarwal <[email protected]> Co-authored-by: Jonathan Blixt <[email protected]> Signed-off-by: Jonathan <[email protected]>
1 parent ec40fc6 commit 86411bc

File tree

2 files changed

+81
-56
lines changed

2 files changed

+81
-56
lines changed

rclpy/rclpy/action/client.py

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ def __init__(
182182
self._node.add_waitable(self)
183183
self._logger = self._node.get_logger().get_child('action_client')
184184

185+
self._lock = threading.Lock()
186+
185187
def _generate_random_uuid(self):
186188
return UUID(uuid=list(uuid.uuid4().bytes))
187189

@@ -241,39 +243,44 @@ def take_data(self):
241243
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
242244
data = {}
243245
if self._is_goal_response_ready:
244-
taken_data = self._client_handle.take_goal_response(
245-
self._action_type.Impl.SendGoalService.Response)
246-
# If take fails, then we get (None, None)
247-
if all(taken_data):
248-
data['goal'] = taken_data
246+
with self._lock:
247+
taken_data = self._client_handle.take_goal_response(
248+
self._action_type.Impl.SendGoalService.Response)
249+
# If take fails, then we get (None, None)
250+
if all(taken_data):
251+
data['goal'] = taken_data
249252

250253
if self._is_cancel_response_ready:
251-
taken_data = self._client_handle.take_cancel_response(
252-
self._action_type.Impl.CancelGoalService.Response)
253-
# If take fails, then we get (None, None)
254-
if all(taken_data):
255-
data['cancel'] = taken_data
254+
with self._lock:
255+
taken_data = self._client_handle.take_cancel_response(
256+
self._action_type.Impl.CancelGoalService.Response)
257+
# If take fails, then we get (None, None)
258+
if all(taken_data):
259+
data['cancel'] = taken_data
256260

257261
if self._is_result_response_ready:
258-
taken_data = self._client_handle.take_result_response(
259-
self._action_type.Impl.GetResultService.Response)
260-
# If take fails, then we get (None, None)
261-
if all(taken_data):
262-
data['result'] = taken_data
262+
with self._lock:
263+
taken_data = self._client_handle.take_result_response(
264+
self._action_type.Impl.GetResultService.Response)
265+
# If take fails, then we get (None, None)
266+
if all(taken_data):
267+
data['result'] = taken_data
263268

264269
if self._is_feedback_ready:
265-
taken_data = self._client_handle.take_feedback(
266-
self._action_type.Impl.FeedbackMessage)
267-
# If take fails, then we get None
268-
if taken_data is not None:
269-
data['feedback'] = taken_data
270+
with self._lock:
271+
taken_data = self._client_handle.take_feedback(
272+
self._action_type.Impl.FeedbackMessage)
273+
# If take fails, then we get None
274+
if taken_data is not None:
275+
data['feedback'] = taken_data
270276

271277
if self._is_status_ready:
272-
taken_data = self._client_handle.take_status(
273-
self._action_type.Impl.GoalStatusMessage)
274-
# If take fails, then we get None
275-
if taken_data is not None:
276-
data['status'] = taken_data
278+
with self._lock:
279+
taken_data = self._client_handle.take_status(
280+
self._action_type.Impl.GoalStatusMessage)
281+
# If take fails, then we get None
282+
if taken_data is not None:
283+
data['status'] = taken_data
277284

278285
return data
279286

@@ -354,12 +361,14 @@ async def execute(self, taken_data):
354361

355362
def get_num_entities(self):
356363
"""Return number of each type of entity used in the wait set."""
357-
num_entities = self._client_handle.get_num_entities()
364+
with self._lock:
365+
num_entities = self._client_handle.get_num_entities()
358366
return NumberOfEntities(*num_entities)
359367

360368
def add_to_wait_set(self, wait_set):
361369
"""Add entities to wait set."""
362-
self._client_handle.add_to_waitset(wait_set)
370+
with self._lock:
371+
self._client_handle.add_to_waitset(wait_set)
363372

364373
def __enter__(self):
365374
return self._client_handle.__enter__()
@@ -437,10 +446,17 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
437446
request = self._action_type.Impl.SendGoalService.Request()
438447
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
439448
request.goal = goal
440-
sequence_number = self._client_handle.send_goal_request(request)
441-
if sequence_number in self._pending_goal_requests:
442-
raise RuntimeError(
443-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
449+
future = Future()
450+
with self._lock:
451+
sequence_number = self._client_handle.send_goal_request(request)
452+
if sequence_number in self._pending_goal_requests:
453+
raise RuntimeError(
454+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
455+
self._pending_goal_requests[sequence_number] = future
456+
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
457+
future.add_done_callback(self._remove_pending_goal_request)
458+
# Add future so executor is aware
459+
self.add_future(future)
444460

445461
if feedback_callback is not None:
446462
# TODO(jacobperron): Move conversion function to a general-use package
@@ -495,16 +511,17 @@ def _cancel_goal_async(self, goal_handle):
495511

496512
cancel_request = CancelGoal.Request()
497513
cancel_request.goal_info.goal_id = goal_handle.goal_id
498-
sequence_number = self._client_handle.send_cancel_request(cancel_request)
499-
if sequence_number in self._pending_cancel_requests:
500-
raise RuntimeError(
501-
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
514+
future: Future[CancelGoal.Response] = Future()
515+
with self._lock:
516+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
517+
if sequence_number in self._pending_cancel_requests:
518+
raise RuntimeError(
519+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
502520

503-
future = Future()
504-
self._pending_cancel_requests[sequence_number] = future
505-
future.add_done_callback(self._remove_pending_cancel_request)
506-
# Add future so executor is aware
507-
self.add_future(future)
521+
self._pending_cancel_requests[sequence_number] = future
522+
future.add_done_callback(self._remove_pending_cancel_request)
523+
# Add future so executor is aware
524+
self.add_future(future)
508525

509526
return future
510527

@@ -547,17 +564,18 @@ def _get_result_async(self, goal_handle):
547564

548565
result_request = self._action_type.Impl.GetResultService.Request()
549566
result_request.goal_id = goal_handle.goal_id
550-
sequence_number = self._client_handle.send_result_request(result_request)
551-
if sequence_number in self._pending_result_requests:
552-
raise RuntimeError(
553-
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
554-
555-
future = Future()
556-
self._pending_result_requests[sequence_number] = future
557-
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
558-
future.add_done_callback(self._remove_pending_result_request)
559-
# Add future so executor is aware
560-
self.add_future(future)
567+
future: Future[GetResultServiceResponse[ResultT]] = Future()
568+
with self._lock:
569+
sequence_number = self._client_handle.send_result_request(result_request)
570+
if sequence_number in self._pending_result_requests:
571+
raise RuntimeError(
572+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
573+
574+
self._pending_result_requests[sequence_number] = future
575+
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
576+
future.add_done_callback(self._remove_pending_result_request)
577+
# Add future so executor is aware
578+
self.add_future(future)
561579

562580
return future
563581

rclpy/rclpy/action/server.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ async def _execute_goal_request(self, request_header_and_message):
310310
try:
311311
# If the client goes away anytime before this, sending the goal response may fail.
312312
# Catch the exception here and go on so we don't crash.
313-
self._handle.send_goal_response(request_header, response_msg)
313+
with self._lock:
314+
self._handle.send_goal_response(request_header, response_msg)
314315
except RCLError:
315316
self._logger.warn('Failed to send goal response (the client may have gone away)')
316317
return
@@ -393,7 +394,8 @@ async def _execute_cancel_request(self, request_header_and_message):
393394
try:
394395
# If the client goes away anytime before this, sending the goal response may fail.
395396
# Catch the exception here and go on so we don't crash.
396-
self._handle.send_cancel_response(request_header, cancel_response)
397+
with self._lock:
398+
self._handle.send_cancel_response(request_header, cancel_response)
397399
except RCLError:
398400
self._logger.warn('Failed to send cancel response (the client may have gone away)')
399401

@@ -410,7 +412,8 @@ async def _execute_get_result_request(self, request_header_and_message):
410412
'Sending result response for unknown goal ID: {0}'.format(goal_uuid))
411413
result_response = self._action_type.Impl.GetResultService.Response()
412414
result_response.status = GoalStatus.STATUS_UNKNOWN
413-
self._handle.send_result_response(request_header, result_response)
415+
with self._lock:
416+
self._handle.send_result_response(request_header, result_response)
414417
return
415418

416419
# There is an accepted goal matching the goal ID, register a callback to send the
@@ -430,7 +433,10 @@ def _send_result_response(self, request_header, future):
430433
try:
431434
# If the client goes away anytime before this, sending the result response may fail.
432435
# Catch the exception here and go on so we don't crash.
433-
self._handle.send_result_response(request_header, future.result())
436+
result = future.result()
437+
if result:
438+
with self._lock:
439+
self._handle.send_result_response(request_header, result)
434440
except RCLError:
435441
self._logger.warn('Failed to send result response (the client may have gone away)')
436442

@@ -506,7 +512,8 @@ async def execute(self, taken_data):
506512

507513
def get_num_entities(self):
508514
"""Return number of each type of entity used in the wait set."""
509-
num_entities = self._handle.get_num_entities()
515+
with self._lock:
516+
num_entities = self._handle.get_num_entities()
510517
return NumberOfEntities(
511518
num_entities[0],
512519
num_entities[1],

0 commit comments

Comments
 (0)