Skip to content
Merged
31 changes: 28 additions & 3 deletions ros2topic/ros2topic/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic_name',
help="Name of the ROS topic to get info (e.g. '/chatter')")
parser.add_argument(
'--verbose',
'-v',
action='store_true',
help='Prints detailed information like the node name, node namespace, topic type, '
'GUID and QoS Profile of the publishers and subscribers to this topic')
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')

Expand All @@ -39,7 +45,26 @@ def main(self, *, args):
break
else:
return "Unknown topic '%s'" % topic_name

line_end = '\n'
if args.verbose:
line_end = '\n\n'

type_str = topic_types[0] if len(topic_types) == 1 else topic_types
print('Type: %s' % type_str)
print('Publisher count: %d' % node.count_publishers(topic_name))
print('Subscriber count: %d' % node.count_subscribers(topic_name))
print('Type: %s' % type_str, end=line_end)

print('Publisher count: %d' % node.count_publishers(topic_name), end=line_end)
if args.verbose:
try:
for info in node.get_publishers_info_by_topic(topic_name):
print(info, end=line_end)
except NotImplementedError as e:
return str(e)

print('Subscription count: %d' % node.count_subscribers(topic_name))
if args.verbose:
try:
for info in node.get_subscriptions_info_by_topic(topic_name):
print(info, end=line_end)
except NotImplementedError as e:
return str(e)
40 changes: 35 additions & 5 deletions ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,45 @@ def test_list_count(self):
assert int(output_lines[0]) == 9

@launch_testing.markers.retry_on_failure(times=5)
def test_topic_info(self):
def test_topic_endpoint_info(self):
with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command:
assert topic_command.wait_for_shutdown(timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Type: std_msgs/msg/String',
'Publisher count: 1',
'Subscriber count: 0'
'Subscription count: 0'
],
text=topic_command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_topic_endpoint_info_verbose(self):
with self.launch_topic_command(arguments=['info', '-v', '/chatter']) as topic_command:
assert topic_command.wait_for_shutdown(timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Type: std_msgs/msg/String',
'',
'Publisher count: 1',
'',
re.compile(r'Node name: \w+'),
'Node namespace: /',
'Topic type: std_msgs/msg/String',
re.compile(r'Endpoint type: (INVALID|PUBLISHER|SUBSCRIPTION)'),
re.compile(r'GID: [\w\.]+'),
'QoS profile:',
re.compile(r' Reliability: RMW_QOS_POLICY_RELIABILITY_\w+'),
re.compile(r' Durability: RMW_QOS_POLICY_DURABILITY_\w+'),
re.compile(r' Lifespan: \d+ nanoseconds'),
re.compile(r' Deadline: \d+ nanoseconds'),
re.compile(r' Liveliness: RMW_QOS_POLICY_LIVELINESS_\w+'),
re.compile(r' Liveliness lease duration: \d+ nanoseconds'),
'',
'Subscription count: 0'
],
text=topic_command.output,
strict=True
Expand Down Expand Up @@ -517,7 +547,7 @@ def test_topic_pub(self):
), timeout=10)
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[foo\]')
re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[foo\]')
] * 3, strict=False
), timeout=10)
assert topic_command.wait_for_shutdown(timeout=10)
Expand All @@ -541,7 +571,7 @@ def test_topic_pub_once(self):
assert topic_command.wait_for_shutdown(timeout=10)
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[bar\]')
re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[bar\]')
], strict=False
), timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
Expand All @@ -567,7 +597,7 @@ def test_topic_pub_print_every_two(self):
), timeout=10), 'Output does not match: ' + topic_command.output
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[fizz\]')
re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[fizz\]')
], strict=False
), timeout=10)
assert topic_command.wait_for_shutdown(timeout=10)
Expand Down