diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index d2126dd67..19f44f92d 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -243,7 +243,9 @@ def __init__( self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_client') - def _generate_random_uuid(self) -> UUID: + self._lock = threading.Lock() + + def _generate_random_uuid(self): return UUID(uuid=list(uuid.uuid4().bytes)) def _remove_pending_request(self, future: Future[T], pending_requests: Dict[int, Future[T]] @@ -306,39 +308,44 @@ def take_data(self) -> 'ClientGoalHandleDict[ResultT, FeedbackT]': """Take stuff from lower level so the wait set doesn't immediately wake again.""" data: 'ClientGoalHandleDict[ResultT, FeedbackT]' = {} if self._is_goal_response_ready: - taken_goal_data = self._client_handle.take_goal_response( - self._action_type.Impl.SendGoalService.Response) - # If take fails, then we get (None, None) - if taken_goal_data[0]: - data['goal'] = taken_goal_data + with self._lock: + taken_goal_data = self._client_handle.take_goal_response( + self._action_type.Impl.SendGoalService.Response) + # If take fails, then we get (None, None) + if taken_goal_data[0]: + data['goal'] = taken_goal_data if self._is_cancel_response_ready: - taken_cancel_data = self._client_handle.take_cancel_response( - self._action_type.Impl.CancelGoalService.Response) - # If take fails, then we get (None, None) - if taken_cancel_data[0]: - data['cancel'] = taken_cancel_data + with self._lock: + taken_cancel_data = self._client_handle.take_cancel_response( + self._action_type.Impl.CancelGoalService.Response) + # If take fails, then we get (None, None) + if taken_cancel_data[0]: + data['cancel'] = taken_cancel_data if self._is_result_response_ready: - taken_result_data = self._client_handle.take_result_response( - self._action_type.Impl.GetResultService.Response) - # If take fails, then we get (None, None) - if taken_result_data[0]: - data['result'] = taken_result_data + with self._lock: + taken_result_data = self._client_handle.take_result_response( + self._action_type.Impl.GetResultService.Response) + # If take fails, then we get (None, None) + if taken_result_data[0]: + data['result'] = taken_result_data if self._is_feedback_ready: - taken_feedback_data = self._client_handle.take_feedback( - self._action_type.Impl.FeedbackMessage) - # If take fails, then we get None - if taken_feedback_data is not None: - data['feedback'] = taken_feedback_data + with self._lock: + taken_feedback_data = self._client_handle.take_feedback( + self._action_type.Impl.FeedbackMessage) + # If take fails, then we get None + if taken_feedback_data is not None: + data['feedback'] = taken_feedback_data if self._is_status_ready: - taken_status_data = self._client_handle.take_status( - self._action_type.Impl.GoalStatusMessage) - # If take fails, then we get None - if taken_status_data is not None: - data['status'] = taken_status_data + with self._lock: + taken_status_data = self._client_handle.take_status( + self._action_type.Impl.GoalStatusMessage) + # If take fails, then we get None + if taken_status_data is not None: + data['status'] = taken_status_data return data @@ -419,12 +426,14 @@ async def execute(self, taken_data: 'ClientGoalHandleDict[ResultT, FeedbackT]') def get_num_entities(self) -> NumberOfEntities: """Return number of each type of entity used in the wait set.""" - num_entities = self._client_handle.get_num_entities() + with self._lock: + num_entities = self._client_handle.get_num_entities() return NumberOfEntities(*num_entities) def add_to_wait_set(self, wait_set: _rclpy.WaitSet) -> None: """Add entities to wait set.""" - self._client_handle.add_to_waitset(wait_set) + with self._lock: + self._client_handle.add_to_waitset(wait_set) def __enter__(self) -> None: self._client_handle.__enter__() @@ -515,10 +524,17 @@ def send_goal_async( request = self._action_type.Impl.SendGoalService.Request() request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid request.goal = goal - sequence_number = self._client_handle.send_goal_request(request) - if sequence_number in self._pending_goal_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) + future = Future() + with self._lock: + sequence_number = self._client_handle.send_goal_request(request) + if sequence_number in self._pending_goal_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) + self._pending_goal_requests[sequence_number] = future + self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id + future.add_done_callback(self._remove_pending_goal_request) + # Add future so executor is aware + self.add_future(future) if feedback_callback is not None: # TODO(jacobperron): Move conversion function to a general-use package @@ -578,16 +594,17 @@ def _cancel_goal_async( cancel_request = CancelGoal.Request() cancel_request.goal_info.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_cancel_request(cancel_request) - if sequence_number in self._pending_cancel_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) - future: Future[CancelGoal.Response] = Future() - self._pending_cancel_requests[sequence_number] = future - future.add_done_callback(self._remove_pending_cancel_request) - # Add future so executor is aware - self.add_future(future) + with self._lock: + sequence_number = self._client_handle.send_cancel_request(cancel_request) + if sequence_number in self._pending_cancel_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) + + self._pending_cancel_requests[sequence_number] = future + future.add_done_callback(self._remove_pending_cancel_request) + # Add future so executor is aware + self.add_future(future) return future @@ -633,17 +650,18 @@ def _get_result_async(self, goal_handle: ClientGoalHandle[GoalT, ResultT, Feedba result_request = self._action_type.Impl.GetResultService.Request() result_request.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_result_request(result_request) - if sequence_number in self._pending_result_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) - future: Future[GetResultServiceResponse[ResultT]] = Future() - self._pending_result_requests[sequence_number] = future - self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id - future.add_done_callback(self._remove_pending_result_request) - # Add future so executor is aware - self.add_future(future) + with self._lock: + sequence_number = self._client_handle.send_result_request(result_request) + if sequence_number in self._pending_result_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) + + self._pending_result_requests[sequence_number] = future + self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id + future.add_done_callback(self._remove_pending_result_request) + # Add future so executor is aware + self.add_future(future) return future diff --git a/rclpy/rclpy/action/server.py b/rclpy/rclpy/action/server.py index 18c305bb4..12066664c 100644 --- a/rclpy/rclpy/action/server.py +++ b/rclpy/rclpy/action/server.py @@ -365,7 +365,8 @@ async def _execute_goal_request( try: # If the client goes away anytime before this, sending the goal response may fail. # Catch the exception here and go on so we don't crash. - self._handle.send_goal_response(request_header, response_msg) + with self._lock: + self._handle.send_goal_response(request_header, response_msg) except RCLError: self._logger.warning('Failed to send goal response (the client may have gone away)') return @@ -455,7 +456,8 @@ async def _execute_cancel_request( try: # If the client goes away anytime before this, sending the goal response may fail. # Catch the exception here and go on so we don't crash. - self._handle.send_cancel_response(request_header, cancel_response) + with self._lock: + self._handle.send_cancel_response(request_header, cancel_response) except RCLError: self._logger.warning('Failed to send cancel response (the client may have gone away)') @@ -476,7 +478,8 @@ async def _execute_get_result_request( 'Sending result response for unknown or expired goal ID: {0}'.format(goal_uuid)) result_response = self._action_type.Impl.GetResultService.Response() result_response.status = GoalStatus.STATUS_UNKNOWN - self._handle.send_result_response(request_header, result_response) + with self._lock: + self._handle.send_result_response(request_header, result_response) return # There is an accepted goal matching the goal ID, register a callback to send the @@ -502,7 +505,8 @@ def _send_result_response( # Catch the exception here and go on so we don't crash. result = future.result() if result: - self._handle.send_result_response(request_header, result) + with self._lock: + self._handle.send_result_response(request_header, result) except RCLError: self._logger.warning('Failed to send result response (the client may have gone away)') @@ -578,7 +582,8 @@ async def execute(self, taken_data: 'ServerGoalHandleDict[GoalT]') -> None: def get_num_entities(self) -> NumberOfEntities: """Return number of each type of entity used in the wait set.""" - num_entities = self._handle.get_num_entities() + with self._lock: + num_entities = self._handle.get_num_entities() return NumberOfEntities( num_entities[0], num_entities[1],