Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 90 additions & 6 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def __init__(
self._node.add_waitable(self)
self._logger = self._node.get_logger().get_child('action_client')

<<<<<<< HEAD
=======
self._lock = threading.Lock()

>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
def _generate_random_uuid(self):
return UUID(uuid=list(uuid.uuid4().bytes))

Expand Down Expand Up @@ -241,6 +246,7 @@ def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
data = {}
if self._is_goal_response_ready:
<<<<<<< HEAD
taken_data = self._client_handle.take_goal_response(
self._action_type.Impl.SendGoalService.Response)
# If take fails, then we get (None, None)
Expand Down Expand Up @@ -274,6 +280,46 @@ def take_data(self):
# If take fails, then we get None
if taken_data is not None:
data['status'] = taken_data
=======
with self._lock:
taken_goal_data = self._client_handle.take_goal_response(
self._action_type.Impl.SendGoalService.Response)
# If take fails, then we get (None, None)
if taken_goal_data[0]:
data['goal'] = taken_goal_data

if self._is_cancel_response_ready:
with self._lock:
taken_cancel_data = self._client_handle.take_cancel_response(
self._action_type.Impl.CancelGoalService.Response)
# If take fails, then we get (None, None)
if taken_cancel_data[0]:
data['cancel'] = taken_cancel_data

if self._is_result_response_ready:
with self._lock:
taken_result_data = self._client_handle.take_result_response(
self._action_type.Impl.GetResultService.Response)
# If take fails, then we get (None, None)
if taken_result_data[0]:
data['result'] = taken_result_data

if self._is_feedback_ready:
with self._lock:
taken_feedback_data = self._client_handle.take_feedback(
self._action_type.Impl.FeedbackMessage)
# If take fails, then we get None
if taken_feedback_data is not None:
data['feedback'] = taken_feedback_data

if self._is_status_ready:
with self._lock:
taken_status_data = self._client_handle.take_status(
self._action_type.Impl.GoalStatusMessage)
# If take fails, then we get None
if taken_status_data is not None:
data['status'] = taken_status_data
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))

return data

Expand Down Expand Up @@ -354,12 +400,14 @@ async def execute(self, taken_data):

def get_num_entities(self):
"""Return number of each type of entity used in the wait set."""
num_entities = self._client_handle.get_num_entities()
with self._lock:
num_entities = self._client_handle.get_num_entities()
return NumberOfEntities(*num_entities)

def add_to_wait_set(self, wait_set):
"""Add entities to wait set."""
self._client_handle.add_to_waitset(wait_set)
with self._lock:
self._client_handle.add_to_waitset(wait_set)

def __enter__(self):
return self._client_handle.__enter__()
Expand Down Expand Up @@ -437,10 +485,17 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
request = self._action_type.Impl.SendGoalService.Request()
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
future = Future()
with self._lock:
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
Expand Down Expand Up @@ -495,6 +550,7 @@ def _cancel_goal_async(self, goal_handle):

cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id
<<<<<<< HEAD
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
Expand All @@ -505,6 +561,19 @@ def _cancel_goal_async(self, goal_handle):
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)
=======
future: Future[CancelGoal.Response] = Future()
with self._lock:
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))

self._pending_cancel_requests[sequence_number] = future
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))

return future

Expand Down Expand Up @@ -547,6 +616,7 @@ def _get_result_async(self, goal_handle):

result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id
<<<<<<< HEAD
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
Expand All @@ -558,6 +628,20 @@ def _get_result_async(self, goal_handle):
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)
=======
future: Future[GetResultServiceResponse[ResultT]] = Future()
with self._lock:
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))

return future

Expand Down
19 changes: 15 additions & 4 deletions rclpy/rclpy/action/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ async def _execute_goal_request(self, request_header_and_message):
try:
# If the client goes away anytime before this, sending the goal response may fail.
# Catch the exception here and go on so we don't crash.
self._handle.send_goal_response(request_header, response_msg)
with self._lock:
self._handle.send_goal_response(request_header, response_msg)
except RCLError:
self._logger.warn('Failed to send goal response (the client may have gone away)')
return
Expand Down Expand Up @@ -393,7 +394,8 @@ async def _execute_cancel_request(self, request_header_and_message):
try:
# If the client goes away anytime before this, sending the goal response may fail.
# Catch the exception here and go on so we don't crash.
self._handle.send_cancel_response(request_header, cancel_response)
with self._lock:
self._handle.send_cancel_response(request_header, cancel_response)
except RCLError:
self._logger.warn('Failed to send cancel response (the client may have gone away)')

Expand All @@ -410,7 +412,8 @@ async def _execute_get_result_request(self, request_header_and_message):
'Sending result response for unknown goal ID: {0}'.format(goal_uuid))
result_response = self._action_type.Impl.GetResultService.Response()
result_response.status = GoalStatus.STATUS_UNKNOWN
self._handle.send_result_response(request_header, result_response)
with self._lock:
self._handle.send_result_response(request_header, result_response)
return

# There is an accepted goal matching the goal ID, register a callback to send the
Expand All @@ -430,7 +433,14 @@ def _send_result_response(self, request_header, future):
try:
# If the client goes away anytime before this, sending the result response may fail.
# Catch the exception here and go on so we don't crash.
<<<<<<< HEAD
self._handle.send_result_response(request_header, future.result())
=======
result = future.result()
if result:
with self._lock:
self._handle.send_result_response(request_header, result)
>>>>>>> 10f70c9 (Fixes Action.*_async futures never complete (#1308))
except RCLError:
self._logger.warn('Failed to send result response (the client may have gone away)')

Expand Down Expand Up @@ -506,7 +516,8 @@ async def execute(self, taken_data):

def get_num_entities(self):
"""Return number of each type of entity used in the wait set."""
num_entities = self._handle.get_num_entities()
with self._lock:
num_entities = self._handle.get_num_entities()
return NumberOfEntities(
num_entities[0],
num_entities[1],
Expand Down