Skip to content

Commit 97defea

Browse files
jmblixt3mergify[bot]
authored andcommitted
Fixes Action.*_async futures never complete (#1308)
Per rclpy:1123 If two seperate client server actions are running in seperate executors the future given to the ActionClient will never complete due to a race condition This fixes the calls to rcl handles potentially leading to deadlock scenarios by adding locks to there references Co-authored-by: Aditya Agarwal <[email protected]> Co-authored-by: Jonathan Blixt <[email protected]> Signed-off-by: Jonathan Blixt <[email protected]> Co-authored-by: Alejandro Hernandez Cordero <[email protected]> (cherry picked from commit 10f70c9) # Conflicts: # rclpy/rclpy/action/client.py # rclpy/rclpy/action/server.py
1 parent ec40fc6 commit 97defea

File tree

2 files changed

+105
-10
lines changed

2 files changed

+105
-10
lines changed

rclpy/rclpy/action/client.py

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ def __init__(
182182
self._node.add_waitable(self)
183183
self._logger = self._node.get_logger().get_child('action_client')
184184

185+
<<<<<<< HEAD
186+
=======
187+
self._lock = threading.Lock()
188+
189+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
185190
def _generate_random_uuid(self):
186191
return UUID(uuid=list(uuid.uuid4().bytes))
187192

@@ -241,6 +246,7 @@ def take_data(self):
241246
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
242247
data = {}
243248
if self._is_goal_response_ready:
249+
<<<<<<< HEAD
244250
taken_data = self._client_handle.take_goal_response(
245251
self._action_type.Impl.SendGoalService.Response)
246252
# If take fails, then we get (None, None)
@@ -274,6 +280,46 @@ def take_data(self):
274280
# If take fails, then we get None
275281
if taken_data is not None:
276282
data['status'] = taken_data
283+
=======
284+
with self._lock:
285+
taken_goal_data = self._client_handle.take_goal_response(
286+
self._action_type.Impl.SendGoalService.Response)
287+
# If take fails, then we get (None, None)
288+
if taken_goal_data[0]:
289+
data['goal'] = taken_goal_data
290+
291+
if self._is_cancel_response_ready:
292+
with self._lock:
293+
taken_cancel_data = self._client_handle.take_cancel_response(
294+
self._action_type.Impl.CancelGoalService.Response)
295+
# If take fails, then we get (None, None)
296+
if taken_cancel_data[0]:
297+
data['cancel'] = taken_cancel_data
298+
299+
if self._is_result_response_ready:
300+
with self._lock:
301+
taken_result_data = self._client_handle.take_result_response(
302+
self._action_type.Impl.GetResultService.Response)
303+
# If take fails, then we get (None, None)
304+
if taken_result_data[0]:
305+
data['result'] = taken_result_data
306+
307+
if self._is_feedback_ready:
308+
with self._lock:
309+
taken_feedback_data = self._client_handle.take_feedback(
310+
self._action_type.Impl.FeedbackMessage)
311+
# If take fails, then we get None
312+
if taken_feedback_data is not None:
313+
data['feedback'] = taken_feedback_data
314+
315+
if self._is_status_ready:
316+
with self._lock:
317+
taken_status_data = self._client_handle.take_status(
318+
self._action_type.Impl.GoalStatusMessage)
319+
# If take fails, then we get None
320+
if taken_status_data is not None:
321+
data['status'] = taken_status_data
322+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
277323

278324
return data
279325

@@ -354,12 +400,14 @@ async def execute(self, taken_data):
354400

355401
def get_num_entities(self):
356402
"""Return number of each type of entity used in the wait set."""
357-
num_entities = self._client_handle.get_num_entities()
403+
with self._lock:
404+
num_entities = self._client_handle.get_num_entities()
358405
return NumberOfEntities(*num_entities)
359406

360407
def add_to_wait_set(self, wait_set):
361408
"""Add entities to wait set."""
362-
self._client_handle.add_to_waitset(wait_set)
409+
with self._lock:
410+
self._client_handle.add_to_waitset(wait_set)
363411

364412
def __enter__(self):
365413
return self._client_handle.__enter__()
@@ -437,10 +485,17 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
437485
request = self._action_type.Impl.SendGoalService.Request()
438486
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
439487
request.goal = goal
440-
sequence_number = self._client_handle.send_goal_request(request)
441-
if sequence_number in self._pending_goal_requests:
442-
raise RuntimeError(
443-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
488+
future = Future()
489+
with self._lock:
490+
sequence_number = self._client_handle.send_goal_request(request)
491+
if sequence_number in self._pending_goal_requests:
492+
raise RuntimeError(
493+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
494+
self._pending_goal_requests[sequence_number] = future
495+
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
496+
future.add_done_callback(self._remove_pending_goal_request)
497+
# Add future so executor is aware
498+
self.add_future(future)
444499

445500
if feedback_callback is not None:
446501
# TODO(jacobperron): Move conversion function to a general-use package
@@ -495,6 +550,7 @@ def _cancel_goal_async(self, goal_handle):
495550

496551
cancel_request = CancelGoal.Request()
497552
cancel_request.goal_info.goal_id = goal_handle.goal_id
553+
<<<<<<< HEAD
498554
sequence_number = self._client_handle.send_cancel_request(cancel_request)
499555
if sequence_number in self._pending_cancel_requests:
500556
raise RuntimeError(
@@ -505,6 +561,19 @@ def _cancel_goal_async(self, goal_handle):
505561
future.add_done_callback(self._remove_pending_cancel_request)
506562
# Add future so executor is aware
507563
self.add_future(future)
564+
=======
565+
future: Future[CancelGoal.Response] = Future()
566+
with self._lock:
567+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
568+
if sequence_number in self._pending_cancel_requests:
569+
raise RuntimeError(
570+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
571+
572+
self._pending_cancel_requests[sequence_number] = future
573+
future.add_done_callback(self._remove_pending_cancel_request)
574+
# Add future so executor is aware
575+
self.add_future(future)
576+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
508577

509578
return future
510579

@@ -547,6 +616,7 @@ def _get_result_async(self, goal_handle):
547616

548617
result_request = self._action_type.Impl.GetResultService.Request()
549618
result_request.goal_id = goal_handle.goal_id
619+
<<<<<<< HEAD
550620
sequence_number = self._client_handle.send_result_request(result_request)
551621
if sequence_number in self._pending_result_requests:
552622
raise RuntimeError(
@@ -558,6 +628,20 @@ def _get_result_async(self, goal_handle):
558628
future.add_done_callback(self._remove_pending_result_request)
559629
# Add future so executor is aware
560630
self.add_future(future)
631+
=======
632+
future: Future[GetResultServiceResponse[ResultT]] = Future()
633+
with self._lock:
634+
sequence_number = self._client_handle.send_result_request(result_request)
635+
if sequence_number in self._pending_result_requests:
636+
raise RuntimeError(
637+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
638+
639+
self._pending_result_requests[sequence_number] = future
640+
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
641+
future.add_done_callback(self._remove_pending_result_request)
642+
# Add future so executor is aware
643+
self.add_future(future)
644+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
561645

562646
return future
563647

rclpy/rclpy/action/server.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ async def _execute_goal_request(self, request_header_and_message):
310310
try:
311311
# If the client goes away anytime before this, sending the goal response may fail.
312312
# Catch the exception here and go on so we don't crash.
313-
self._handle.send_goal_response(request_header, response_msg)
313+
with self._lock:
314+
self._handle.send_goal_response(request_header, response_msg)
314315
except RCLError:
315316
self._logger.warn('Failed to send goal response (the client may have gone away)')
316317
return
@@ -393,7 +394,8 @@ async def _execute_cancel_request(self, request_header_and_message):
393394
try:
394395
# If the client goes away anytime before this, sending the goal response may fail.
395396
# Catch the exception here and go on so we don't crash.
396-
self._handle.send_cancel_response(request_header, cancel_response)
397+
with self._lock:
398+
self._handle.send_cancel_response(request_header, cancel_response)
397399
except RCLError:
398400
self._logger.warn('Failed to send cancel response (the client may have gone away)')
399401

@@ -410,7 +412,8 @@ async def _execute_get_result_request(self, request_header_and_message):
410412
'Sending result response for unknown goal ID: {0}'.format(goal_uuid))
411413
result_response = self._action_type.Impl.GetResultService.Response()
412414
result_response.status = GoalStatus.STATUS_UNKNOWN
413-
self._handle.send_result_response(request_header, result_response)
415+
with self._lock:
416+
self._handle.send_result_response(request_header, result_response)
414417
return
415418

416419
# There is an accepted goal matching the goal ID, register a callback to send the
@@ -430,7 +433,14 @@ def _send_result_response(self, request_header, future):
430433
try:
431434
# If the client goes away anytime before this, sending the result response may fail.
432435
# Catch the exception here and go on so we don't crash.
436+
<<<<<<< HEAD
433437
self._handle.send_result_response(request_header, future.result())
438+
=======
439+
result = future.result()
440+
if result:
441+
with self._lock:
442+
self._handle.send_result_response(request_header, result)
443+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
434444
except RCLError:
435445
self._logger.warn('Failed to send result response (the client may have gone away)')
436446

@@ -506,7 +516,8 @@ async def execute(self, taken_data):
506516

507517
def get_num_entities(self):
508518
"""Return number of each type of entity used in the wait set."""
509-
num_entities = self._handle.get_num_entities()
519+
with self._lock:
520+
num_entities = self._handle.get_num_entities()
510521
return NumberOfEntities(
511522
num_entities[0],
512523
num_entities[1],

0 commit comments

Comments
 (0)