Skip to content

Commit eb35a19

Browse files
jmblixt3mergify[bot]
authored andcommitted
Fixes Action.*_async futures never complete (#1308)
Per rclpy:1123 If two seperate client server actions are running in seperate executors the future given to the ActionClient will never complete due to a race condition This fixes the calls to rcl handles potentially leading to deadlock scenarios by adding locks to there references Co-authored-by: Aditya Agarwal <[email protected]> Co-authored-by: Jonathan Blixt <[email protected]> Signed-off-by: Jonathan Blixt <[email protected]> Co-authored-by: Alejandro Hernandez Cordero <[email protected]> (cherry picked from commit 10f70c9) # Conflicts: # rclpy/rclpy/action/client.py # rclpy/rclpy/action/server.py
1 parent 8190ea0 commit eb35a19

File tree

2 files changed

+105
-10
lines changed

2 files changed

+105
-10
lines changed

rclpy/rclpy/action/client.py

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ def __init__(
181181
callback_group.add_entity(self)
182182
self._node.add_waitable(self)
183183

184+
<<<<<<< HEAD
185+
=======
186+
self._lock = threading.Lock()
187+
188+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
184189
def _generate_random_uuid(self):
185190
return UUID(uuid=list(uuid.uuid4().bytes))
186191

@@ -240,6 +245,7 @@ def take_data(self):
240245
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
241246
data = {}
242247
if self._is_goal_response_ready:
248+
<<<<<<< HEAD
243249
taken_data = self._client_handle.take_goal_response(
244250
self._action_type.Impl.SendGoalService.Response)
245251
# If take fails, then we get (None, None)
@@ -273,6 +279,46 @@ def take_data(self):
273279
# If take fails, then we get None
274280
if taken_data is not None:
275281
data['status'] = taken_data
282+
=======
283+
with self._lock:
284+
taken_goal_data = self._client_handle.take_goal_response(
285+
self._action_type.Impl.SendGoalService.Response)
286+
# If take fails, then we get (None, None)
287+
if taken_goal_data[0]:
288+
data['goal'] = taken_goal_data
289+
290+
if self._is_cancel_response_ready:
291+
with self._lock:
292+
taken_cancel_data = self._client_handle.take_cancel_response(
293+
self._action_type.Impl.CancelGoalService.Response)
294+
# If take fails, then we get (None, None)
295+
if taken_cancel_data[0]:
296+
data['cancel'] = taken_cancel_data
297+
298+
if self._is_result_response_ready:
299+
with self._lock:
300+
taken_result_data = self._client_handle.take_result_response(
301+
self._action_type.Impl.GetResultService.Response)
302+
# If take fails, then we get (None, None)
303+
if taken_result_data[0]:
304+
data['result'] = taken_result_data
305+
306+
if self._is_feedback_ready:
307+
with self._lock:
308+
taken_feedback_data = self._client_handle.take_feedback(
309+
self._action_type.Impl.FeedbackMessage)
310+
# If take fails, then we get None
311+
if taken_feedback_data is not None:
312+
data['feedback'] = taken_feedback_data
313+
314+
if self._is_status_ready:
315+
with self._lock:
316+
taken_status_data = self._client_handle.take_status(
317+
self._action_type.Impl.GoalStatusMessage)
318+
# If take fails, then we get None
319+
if taken_status_data is not None:
320+
data['status'] = taken_status_data
321+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
276322

277323
return data
278324

@@ -353,12 +399,14 @@ async def execute(self, taken_data):
353399

354400
def get_num_entities(self):
355401
"""Return number of each type of entity used in the wait set."""
356-
num_entities = self._client_handle.get_num_entities()
402+
with self._lock:
403+
num_entities = self._client_handle.get_num_entities()
357404
return NumberOfEntities(*num_entities)
358405

359406
def add_to_wait_set(self, wait_set):
360407
"""Add entities to wait set."""
361-
self._client_handle.add_to_waitset(wait_set)
408+
with self._lock:
409+
self._client_handle.add_to_waitset(wait_set)
362410

363411
def __enter__(self):
364412
return self._client_handle.__enter__()
@@ -436,10 +484,17 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
436484
request = self._action_type.Impl.SendGoalService.Request()
437485
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
438486
request.goal = goal
439-
sequence_number = self._client_handle.send_goal_request(request)
440-
if sequence_number in self._pending_goal_requests:
441-
raise RuntimeError(
442-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
487+
future = Future()
488+
with self._lock:
489+
sequence_number = self._client_handle.send_goal_request(request)
490+
if sequence_number in self._pending_goal_requests:
491+
raise RuntimeError(
492+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
493+
self._pending_goal_requests[sequence_number] = future
494+
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
495+
future.add_done_callback(self._remove_pending_goal_request)
496+
# Add future so executor is aware
497+
self.add_future(future)
443498

444499
if feedback_callback is not None:
445500
# TODO(jacobperron): Move conversion function to a general-use package
@@ -494,6 +549,7 @@ def _cancel_goal_async(self, goal_handle):
494549

495550
cancel_request = CancelGoal.Request()
496551
cancel_request.goal_info.goal_id = goal_handle.goal_id
552+
<<<<<<< HEAD
497553
sequence_number = self._client_handle.send_cancel_request(cancel_request)
498554
if sequence_number in self._pending_cancel_requests:
499555
raise RuntimeError(
@@ -504,6 +560,19 @@ def _cancel_goal_async(self, goal_handle):
504560
future.add_done_callback(self._remove_pending_cancel_request)
505561
# Add future so executor is aware
506562
self.add_future(future)
563+
=======
564+
future: Future[CancelGoal.Response] = Future()
565+
with self._lock:
566+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
567+
if sequence_number in self._pending_cancel_requests:
568+
raise RuntimeError(
569+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
570+
571+
self._pending_cancel_requests[sequence_number] = future
572+
future.add_done_callback(self._remove_pending_cancel_request)
573+
# Add future so executor is aware
574+
self.add_future(future)
575+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
507576

508577
return future
509578

@@ -546,6 +615,7 @@ def _get_result_async(self, goal_handle):
546615

547616
result_request = self._action_type.Impl.GetResultService.Request()
548617
result_request.goal_id = goal_handle.goal_id
618+
<<<<<<< HEAD
549619
sequence_number = self._client_handle.send_result_request(result_request)
550620
if sequence_number in self._pending_result_requests:
551621
raise RuntimeError(
@@ -557,6 +627,20 @@ def _get_result_async(self, goal_handle):
557627
future.add_done_callback(self._remove_pending_result_request)
558628
# Add future so executor is aware
559629
self.add_future(future)
630+
=======
631+
future: Future[GetResultServiceResponse[ResultT]] = Future()
632+
with self._lock:
633+
sequence_number = self._client_handle.send_result_request(result_request)
634+
if sequence_number in self._pending_result_requests:
635+
raise RuntimeError(
636+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
637+
638+
self._pending_result_requests[sequence_number] = future
639+
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
640+
future.add_done_callback(self._remove_pending_result_request)
641+
# Add future so executor is aware
642+
self.add_future(future)
643+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
560644

561645
return future
562646

rclpy/rclpy/action/server.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ async def _execute_goal_request(self, request_header_and_message):
309309
try:
310310
# If the client goes away anytime before this, sending the goal response may fail.
311311
# Catch the exception here and go on so we don't crash.
312-
self._handle.send_goal_response(request_header, response_msg)
312+
with self._lock:
313+
self._handle.send_goal_response(request_header, response_msg)
313314
except RCLError:
314315
self._node.get_logger().warn(
315316
'Failed to send goal response (the client may have gone away)')
@@ -393,7 +394,8 @@ async def _execute_cancel_request(self, request_header_and_message):
393394
try:
394395
# If the client goes away anytime before this, sending the goal response may fail.
395396
# Catch the exception here and go on so we don't crash.
396-
self._handle.send_cancel_response(request_header, cancel_response)
397+
with self._lock:
398+
self._handle.send_cancel_response(request_header, cancel_response)
397399
except RCLError:
398400
self._node.get_logger().warn(
399401
'Failed to send cancel response (the client may have gone away)')
@@ -411,7 +413,8 @@ async def _execute_get_result_request(self, request_header_and_message):
411413
'Sending result response for unknown goal ID: {0}'.format(goal_uuid))
412414
result_response = self._action_type.Impl.GetResultService.Response()
413415
result_response.status = GoalStatus.STATUS_UNKNOWN
414-
self._handle.send_result_response(request_header, result_response)
416+
with self._lock:
417+
self._handle.send_result_response(request_header, result_response)
415418
return
416419

417420
# There is an accepted goal matching the goal ID, register a callback to send the
@@ -431,7 +434,14 @@ def _send_result_response(self, request_header, future):
431434
try:
432435
# If the client goes away anytime before this, sending the result response may fail.
433436
# Catch the exception here and go on so we don't crash.
437+
<<<<<<< HEAD
434438
self._handle.send_result_response(request_header, future.result())
439+
=======
440+
result = future.result()
441+
if result:
442+
with self._lock:
443+
self._handle.send_result_response(request_header, result)
444+
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
435445
except RCLError:
436446
self._node.get_logger().warn(
437447
'Failed to send result response (the client may have gone away)')
@@ -508,7 +518,8 @@ async def execute(self, taken_data):
508518

509519
def get_num_entities(self):
510520
"""Return number of each type of entity used in the wait set."""
511-
num_entities = self._handle.get_num_entities()
521+
with self._lock:
522+
num_entities = self._handle.get_num_entities()
512523
return NumberOfEntities(
513524
num_entities[0],
514525
num_entities[1],

0 commit comments

Comments
 (0)