diff --git a/ros2topic/ros2topic/verb/info.py b/ros2topic/ros2topic/verb/info.py index 476cb9e95..43e87b340 100644 --- a/ros2topic/ros2topic/verb/info.py +++ b/ros2topic/ros2topic/verb/info.py @@ -25,6 +25,12 @@ def add_arguments(self, parser, cli_name): arg = parser.add_argument( 'topic_name', help="Name of the ROS topic to get info (e.g. '/chatter')") + parser.add_argument( + '--verbose', + '-v', + action='store_true', + help='Prints detailed information like the node name, node namespace, topic type, ' + 'GUID and QoS Profile of the publishers and subscribers to this topic') arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') @@ -39,7 +45,26 @@ def main(self, *, args): break else: return "Unknown topic '%s'" % topic_name + + line_end = '\n' + if args.verbose: + line_end = '\n\n' + type_str = topic_types[0] if len(topic_types) == 1 else topic_types - print('Type: %s' % type_str) - print('Publisher count: %d' % node.count_publishers(topic_name)) - print('Subscriber count: %d' % node.count_subscribers(topic_name)) + print('Type: %s' % type_str, end=line_end) + + print('Publisher count: %d' % node.count_publishers(topic_name), end=line_end) + if args.verbose: + try: + for info in node.get_publishers_info_by_topic(topic_name): + print(info, end=line_end) + except NotImplementedError as e: + return str(e) + + print('Subscription count: %d' % node.count_subscribers(topic_name)) + if args.verbose: + try: + for info in node.get_subscriptions_info_by_topic(topic_name): + print(info, end=line_end) + except NotImplementedError as e: + return str(e) diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py index d09821fc8..6da95d02c 100644 --- a/ros2topic/test/test_cli.py +++ b/ros2topic/test/test_cli.py @@ -256,7 +256,7 @@ def test_list_count(self): assert int(output_lines[0]) == 9 @launch_testing.markers.retry_on_failure(times=5) - def test_topic_info(self): + def test_topic_endpoint_info(self): with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command: assert topic_command.wait_for_shutdown(timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK @@ -264,7 +264,37 @@ def test_topic_info(self): expected_lines=[ 'Type: std_msgs/msg/String', 'Publisher count: 1', - 'Subscriber count: 0' + 'Subscription count: 0' + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_endpoint_info_verbose(self): + with self.launch_topic_command(arguments=['info', '-v', '/chatter']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Type: std_msgs/msg/String', + '', + 'Publisher count: 1', + '', + re.compile(r'Node name: \w+'), + 'Node namespace: /', + 'Topic type: std_msgs/msg/String', + re.compile(r'Endpoint type: (INVALID|PUBLISHER|SUBSCRIPTION)'), + re.compile(r'GID: [\w\.]+'), + 'QoS profile:', + re.compile(r' Reliability: RMW_QOS_POLICY_RELIABILITY_\w+'), + re.compile(r' Durability: RMW_QOS_POLICY_DURABILITY_\w+'), + re.compile(r' Lifespan: \d+ nanoseconds'), + re.compile(r' Deadline: \d+ nanoseconds'), + re.compile(r' Liveliness: RMW_QOS_POLICY_LIVELINESS_\w+'), + re.compile(r' Liveliness lease duration: \d+ nanoseconds'), + '', + 'Subscription count: 0' ], text=topic_command.output, strict=True @@ -517,7 +547,7 @@ def test_topic_pub(self): ), timeout=10) assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ - re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[foo\]') + re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[foo\]') ] * 3, strict=False ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) @@ -541,7 +571,7 @@ def test_topic_pub_once(self): assert topic_command.wait_for_shutdown(timeout=10) assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ - re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[bar\]') + re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[bar\]') ], strict=False ), timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK @@ -567,7 +597,7 @@ def test_topic_pub_print_every_two(self): ), timeout=10), 'Output does not match: ' + topic_command.output assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ - re.compile(r'\[INFO\] \[\d+.\d*\] \[listener\]: I heard: \[fizz\]') + re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[fizz\]') ], strict=False ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10)