Skip to content

Commit 1d8b34e

Browse files
committed
Support 'ros2 action echo'
Signed-off-by: Barry Xu <[email protected]>
1 parent 3332095 commit 1d8b34e

File tree

5 files changed

+708
-0
lines changed

5 files changed

+708
-0
lines changed

ros2action/ros2action/api/__init__.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from rclpy.expand_topic_name import expand_topic_name
16+
from rclpy.node import Node
1617
from rclpy.validate_full_topic_name import validate_full_topic_name
1718
from ros2cli.node.strategy import NodeStrategy
1819
from rosidl_runtime_py import get_action_interfaces
@@ -66,6 +67,43 @@ def get_action_names(*, node):
6667
return [n for (n, t) in action_names_and_types]
6768

6869

70+
def get_action_class(node: Node, action_name: str):
71+
"""
72+
Load action type module for the given action.
73+
74+
The action should be running for this function to find the action type.
75+
:param node: The node object of rclpy Node class.
76+
:param action_name: The fully-qualified name of the action.
77+
:return: the action class or None
78+
"""
79+
action_names_and_types = get_action_names_and_types(node=node)
80+
81+
matched_names_and_types = list(filter(lambda x: x[0] == action_name, action_names_and_types))
82+
if len(matched_names_and_types) < 1:
83+
raise RuntimeError(f"Cannot find type for '{action_name}'")
84+
if len(matched_names_and_types) > 1:
85+
raise RuntimeError(f"Unexpectedly saw more than one entry for action '{action_name}'")
86+
87+
# Now check whether there are multiple types associated with this action, which is unsupported
88+
action_name_and_types = matched_names_and_types[0]
89+
90+
types = action_name_and_types[1]
91+
if len(types) < 1:
92+
raise RuntimeError(f"No types associated with '{action_name}'")
93+
if len(types) > 1:
94+
raise RuntimeError(f"More than one type associated with action '{action_name}'")
95+
96+
action_type = types[0]
97+
98+
if action_type is None:
99+
return None
100+
101+
try:
102+
return get_action(action_type)
103+
except (AttributeError, ModuleNotFoundError, ValueError):
104+
raise RuntimeError(f"The action type '{action_type}' is invalid")
105+
106+
69107
def action_name_completer(prefix, parsed_args, **kwargs):
70108
"""Callable returning a list of action names."""
71109
with NodeStrategy(parsed_args) as node:

ros2action/ros2action/verb/echo.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
# Copyright 2025 Sony Group Corporation.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import OrderedDict
16+
from enum import Enum
17+
import queue
18+
import sys
19+
import threading
20+
21+
import rclpy
22+
from rclpy.qos import qos_profile_action_status_default
23+
from rclpy.qos import qos_profile_services_default
24+
from rclpy.qos import QoSProfile
25+
from rclpy.subscription import Subscription
26+
from rclpy.type_support import EventMessage
27+
from rclpy.type_support import MsgT
28+
29+
from ros2action.api import action_name_completer
30+
from ros2action.api import action_type_completer
31+
from ros2action.api import get_action_class
32+
from ros2action.verb import VerbExtension
33+
34+
from ros2cli.helpers import unsigned_int
35+
from ros2cli.node.strategy import NodeStrategy
36+
37+
from rosidl_runtime_py import message_to_csv
38+
from rosidl_runtime_py import message_to_ordereddict
39+
from rosidl_runtime_py.utilities import get_action
40+
41+
from service_msgs.msg import ServiceEventInfo
42+
43+
import yaml
44+
45+
DEFAULT_TRUNCATE_LENGTH = 128
46+
47+
48+
class ActionInterfaces(Enum):
49+
GOAL_SERVICE = 'GOAL_SERVICE'
50+
CANCEL_SERVICE = 'CANCEL_SERVICE'
51+
RESULT_SERVICE = 'RESULT_SERVICE'
52+
FEEDBACK_TOPIC = 'FEEDBACK_TOPIC'
53+
STATUS_TOPIC = 'STATUS_TOPIC'
54+
55+
56+
class EchoVerb(VerbExtension):
57+
"""Echo a action."""
58+
59+
# Custom representer for getting clean YAML output that preserves the order in an OrderedDict.
60+
# Inspired by: http://stackoverflow.com/a/16782282/7169408
61+
def __represent_ordereddict(self, dumper, data):
62+
items = []
63+
for k, v in data.items():
64+
items.append((dumper.represent_data(k), dumper.represent_data(v)))
65+
return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', items)
66+
67+
def __init__(self):
68+
self._event_number_to_name = {}
69+
for k, v in ServiceEventInfo._Metaclass_ServiceEventInfo__constants.items():
70+
self._event_number_to_name[v] = k
71+
72+
yaml.add_representer(OrderedDict, self.__represent_ordereddict)
73+
74+
def add_arguments(self, parser, cli_name):
75+
arg = parser.add_argument(
76+
'action_name',
77+
help="Name of the ROS action to echo (e.g. '/fibonacci')")
78+
arg.completer = action_name_completer
79+
arg = parser.add_argument(
80+
'action_type', nargs='?',
81+
help="Type of the ROS action (e.g. 'example_interfaces/action/Fibonacci')")
82+
arg.completer = action_type_completer
83+
parser.add_argument(
84+
'--interfaces', '-i', type=str, default=[], metavar='interface_name', nargs='+',
85+
help='Space-delimited list of action interface to output. Action interfaces include '
86+
'"goal_service", "cancel_service", "result_service", "feedback_topic" and '
87+
'"status_topic". If this option is not set, output messages from all interfaces '
88+
'of the action.')
89+
parser.add_argument(
90+
'--queue-size', '-q', type=unsigned_int, default=100,
91+
help='The length of output message queue. The default is 100.')
92+
parser.add_argument(
93+
'--csv', action='store_true', default=False,
94+
help=(
95+
'Output all recursive fields separated by commas (e.g. for plotting).'
96+
))
97+
parser.add_argument(
98+
'--full-length', '-f', action='store_true',
99+
help='Output all elements for arrays, bytes, and string with a '
100+
"length > '--truncate-length', by default they are truncated "
101+
"after '--truncate-length' elements with '...''")
102+
parser.add_argument(
103+
'--truncate-length', '-l', type=unsigned_int, default=DEFAULT_TRUNCATE_LENGTH,
104+
help='The length to truncate arrays, bytes, and string to '
105+
'(default: %d)' % DEFAULT_TRUNCATE_LENGTH)
106+
parser.add_argument(
107+
'--no-arr', action='store_true', help="Don't print array fields of messages")
108+
parser.add_argument(
109+
'--no-str', action='store_true', help="Don't print string fields of messages")
110+
parser.add_argument(
111+
'--flow-style', action='store_true',
112+
help='Print collections in the block style (not available with csv format)')
113+
114+
def main(self, *, args):
115+
action_interfaces_list = [interface.value.lower() for interface in ActionInterfaces]
116+
if args.interfaces:
117+
for input_interface in args.interfaces:
118+
if input_interface not in action_interfaces_list:
119+
return f'"{input_interface}" is incorrect interface name.'
120+
121+
if args.action_type is None:
122+
with NodeStrategy(args) as node:
123+
try:
124+
action_module = get_action_class(
125+
node, args.action_name)
126+
except (AttributeError, ModuleNotFoundError, ValueError):
127+
raise RuntimeError(f"The action name '{args.action_name}' is invalid")
128+
else:
129+
try:
130+
action_module = get_action(args.action_type)
131+
except (AttributeError, ModuleNotFoundError, ValueError):
132+
raise RuntimeError(f"The service type '{args.action_type}' is invalid")
133+
134+
if action_module is None:
135+
raise RuntimeError('Could not load the type for the passed action')
136+
137+
self.csv = args.csv
138+
self.truncate_length = args.truncate_length if not args.full_length else None
139+
self.flow_style = args.flow_style
140+
self.no_arr = args.no_arr
141+
self.no_str = args.no_str
142+
143+
send_goal_event_topic = args.action_name + '/_action/send_goal/_service_event'
144+
send_goal_event_msg_type = action_module.Impl.SendGoalService.Event
145+
146+
cancel_goal_event_topic = args.action_name + '/_action/cancel_goal/_service_event'
147+
cancel_goal_event_msg_type = action_module.Impl.CancelGoalService.Event
148+
149+
get_result_event_topic = args.action_name + '/_action/get_result/_service_event'
150+
get_result_event_msg_type = action_module.Impl.GetResultService.Event
151+
152+
feedback_topic = args.action_name + '/_action/feedback'
153+
feedback_topic_type = action_module.Impl.FeedbackMessage
154+
155+
status_topic = args.action_name + '/_action/status'
156+
status_topic_type = action_module.Impl.GoalStatusMessage
157+
158+
# Queue for messages from above topic
159+
self.output_msg_queue = queue.Queue(args.queue_size)
160+
161+
run_thread = True
162+
# Create a thread to output message from output_queue
163+
164+
def message_handler():
165+
while run_thread:
166+
try:
167+
message = self.output_msg_queue.get(block=True, timeout=0.5)
168+
self.output_msg_queue.task_done()
169+
except queue.Empty:
170+
continue
171+
print(message)
172+
output_msg_thread = threading.Thread(target=message_handler)
173+
output_msg_thread.start()
174+
175+
with NodeStrategy(args) as node:
176+
send_goal_event_sub = None
177+
if not args.interfaces or \
178+
ActionInterfaces.GOAL_SERVICE.value.lower() in args.interfaces:
179+
send_goal_event_sub: Subscription[EventMessage] = node.create_subscription(
180+
send_goal_event_msg_type,
181+
send_goal_event_topic,
182+
self._send_goal_subscriber_callback,
183+
qos_profile_services_default)
184+
185+
cancel_goal_event_sub = None
186+
if not args.interfaces or \
187+
ActionInterfaces.CANCEL_SERVICE.value.lower() in args.interfaces:
188+
cancel_goal_event_sub: Subscription[EventMessage] = node.create_subscription(
189+
cancel_goal_event_msg_type,
190+
cancel_goal_event_topic,
191+
self._cancel_goal_subscriber_callback,
192+
qos_profile_services_default)
193+
194+
get_result_event_sub = None
195+
if not args.interfaces or \
196+
ActionInterfaces.RESULT_SERVICE.value.lower() in args.interfaces:
197+
get_result_event_sub: Subscription[EventMessage] = node.create_subscription(
198+
get_result_event_msg_type,
199+
get_result_event_topic,
200+
self._get_result_subscriber_callback,
201+
qos_profile_services_default)
202+
203+
feedback_sub = None
204+
if not args.interfaces or \
205+
ActionInterfaces.FEEDBACK_TOPIC.value.lower() in args.interfaces:
206+
feedback_sub: Subscription[MsgT] = node.create_subscription(
207+
feedback_topic_type,
208+
feedback_topic,
209+
self._feedback_subscriber_callback,
210+
QoSProfile(depth=10)) # QoS setting refers to action client implementation
211+
212+
status_sub = None
213+
if not args.interfaces or \
214+
ActionInterfaces.STATUS_TOPIC.value.lower() in args.interfaces:
215+
status_sub: Subscription[MsgT] = node.create_subscription(
216+
status_topic_type,
217+
status_topic,
218+
self._status_subscriber_callback,
219+
qos_profile_action_status_default)
220+
221+
executor: rclpy.Executor = rclpy.get_global_executor()
222+
try:
223+
executor.add_node(node)
224+
while executor.context.ok():
225+
executor.spin_once()
226+
except KeyboardInterrupt:
227+
pass
228+
finally:
229+
executor.remove_node(node)
230+
231+
if send_goal_event_sub:
232+
send_goal_event_sub.destroy()
233+
if cancel_goal_event_sub:
234+
cancel_goal_event_sub.destroy()
235+
if get_result_event_sub:
236+
get_result_event_sub.destroy()
237+
if feedback_sub:
238+
feedback_sub.destroy()
239+
if status_sub:
240+
status_sub.destroy()
241+
242+
run_thread = False
243+
if output_msg_thread.is_alive():
244+
output_msg_thread.join(1)
245+
246+
def _send_goal_subscriber_callback(self, msg):
247+
self._base_subscriber_callback(msg, ActionInterfaces.GOAL_SERVICE.value)
248+
249+
def _cancel_goal_subscriber_callback(self, msg):
250+
self._base_subscriber_callback(msg, ActionInterfaces.CANCEL_SERVICE.value)
251+
252+
def _get_result_subscriber_callback(self, msg):
253+
self._base_subscriber_callback(msg, ActionInterfaces.RESULT_SERVICE.value)
254+
255+
def _feedback_subscriber_callback(self, msg):
256+
self._base_subscriber_callback(msg, ActionInterfaces.FEEDBACK_TOPIC.value)
257+
258+
def _status_subscriber_callback(self, msg):
259+
self._base_subscriber_callback(msg, ActionInterfaces.STATUS_TOPIC.value)
260+
261+
def _base_subscriber_callback(self, msg, interface: str):
262+
to_print = 'interface: ' + interface + '\n'
263+
if self.csv:
264+
to_print += message_to_csv(msg, truncate_length=self.truncate_length,
265+
no_arr=self.no_arr, no_str=self.no_str)
266+
else:
267+
# The "easy" way to print out a representation here is to call message_to_yaml().
268+
# However, the message contains numbers for the event type, but we want to show
269+
# meaningful names to the user. So we call message_to_ordereddict() instead,
270+
# and replace the numbers with meaningful names before dumping to YAML.
271+
msgdict = message_to_ordereddict(msg, truncate_length=self.truncate_length,
272+
no_arr=self.no_arr, no_str=self.no_str)
273+
274+
if 'info' in msgdict:
275+
info = msgdict['info']
276+
if 'event_type' in info:
277+
info['event_type'] = self._event_number_to_name[info['event_type']]
278+
279+
to_print += yaml.dump(msgdict, allow_unicode=True, width=sys.maxsize,
280+
default_flow_style=self.flow_style)
281+
282+
to_print += '---'
283+
try:
284+
self.output_msg_queue.put(to_print, timeout=0.5)
285+
except queue.Full:
286+
print('Output message is full! Please increase the queue size of output message by '
287+
'"--queue_size"')

ros2action/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
'send_goal = ros2action.verb.send_goal:SendGoalVerb',
4646
'type = ros2action.verb.type:TypeVerb',
4747
'find = ros2action.verb.find:FindVerb',
48+
'echo = ros2action.verb.echo:EchoVerb',
4849
],
4950
}
5051
)

0 commit comments

Comments
 (0)