diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index f86a36e64ae..b0a6a1825c9 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -46,6 +46,7 @@ jobs: nav2_loopback_sim nav2_map_server nav2_simple_commander + nav2_system_tests arguments: --config tools/pyproject.toml pre-commit: diff --git a/nav2_system_tests/src/behaviors/assisted_teleop/test_assisted_teleop_behavior_launch.py b/nav2_system_tests/src/behaviors/assisted_teleop/test_assisted_teleop_behavior_launch.py index 3a9556278bc..c33f1711bd2 100755 --- a/nav2_system_tests/src/behaviors/assisted_teleop/test_assisted_teleop_behavior_launch.py +++ b/nav2_system_tests/src/behaviors/assisted_teleop/test_assisted_teleop_behavior_launch.py @@ -27,7 +27,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -43,7 +43,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) params_file = os.path.join(nav2_bringup_dir, 'params/nav2_params.yaml') @@ -109,10 +109,10 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() - testExecutable = os.getenv('TEST_EXECUTABLE') + testExecutable = os.getenv('TEST_EXECUTABLE', '') test1_action = ExecuteProcess( cmd=[testExecutable], name='test_assisted_teleop_behavior_node', output='screen' diff --git a/nav2_system_tests/src/behaviors/backup/backup_tester.py b/nav2_system_tests/src/behaviors/backup/backup_tester.py index 711441bd4cb..bcd863cae0e 100755 --- a/nav2_system_tests/src/behaviors/backup/backup_tester.py +++ b/nav2_system_tests/src/behaviors/backup/backup_tester.py @@ -15,6 +15,7 @@ import sys import time +from typing import Optional from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point32, PolygonStamped @@ -22,7 +23,8 @@ from nav2_msgs.msg import Costmap from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.duration import Duration from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -30,7 +32,7 @@ class BackupTest(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='backup_tester', namespace='') self.costmap_qos = QoSProfile( durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, @@ -43,10 +45,11 @@ def __init__(self): Costmap, 'local_costmap/costmap_raw', self.costmap_qos) self.footprint_pub = self.create_publisher( PolygonStamped, 'local_costmap/published_footprint', 10) - self.goal_handle = None - self.action_result = None + self.goal_handle: Optional[ClientGoalHandle[ + BackUp.Goal, BackUp.Result, BackUp.Feedback]] = None + self.action_result = BackUp.Result() - def sendCommand(self, command): + def sendCommand(self, command: BackUp.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -55,7 +58,7 @@ def sendCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -65,8 +68,8 @@ def sendCommand(self, command): self.info_msg("Waiting for 'Backup' action to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -80,7 +83,7 @@ def sendCommand(self, command): self.info_msg('Backup failed to meet target!') return False - def sendAndPreemptWithFasterCommand(self, command): + def sendAndPreemptWithFasterCommand(self, command: BackUp.Goal) -> bool: # Send initial goal self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) @@ -90,7 +93,7 @@ def sendAndPreemptWithFasterCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -108,7 +111,7 @@ def sendAndPreemptWithFasterCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Preemption rejected') return False @@ -119,8 +122,8 @@ def sendAndPreemptWithFasterCommand(self, command): self.info_msg("Waiting for 'backup' action Preemption to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -134,7 +137,7 @@ def sendAndPreemptWithFasterCommand(self, command): self.info_msg('Backup failed to meet target!') return False - def sendAndCancelCommand(self, command): + def sendAndCancelCommand(self, command: BackUp.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -143,7 +146,7 @@ def sendAndCancelCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -155,7 +158,7 @@ def sendAndCancelCommand(self, command): cancel_future = self.goal_handle.cancel_goal_async() rclpy.spin_until_future_complete(self, cancel_future) rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status + status = self.result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_CANCELED: self.info_msg(f'Goal failed with status code: {status}') return False @@ -163,7 +166,7 @@ def sendAndCancelCommand(self, command): self.info_msg('Goal was canceled successfully') return True - def sendFreeCostmap(self): + def sendFreeCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -186,7 +189,7 @@ def sendFreeCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def sendOccupiedCostmap(self): + def sendOccupiedCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -209,7 +212,7 @@ def sendOccupiedCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def run(self): + def run(self) -> bool: while not self.action_client.wait_for_server(timeout_sec=1.0): self.info_msg("'Backup' action server not available, waiting...") @@ -278,7 +281,7 @@ def run(self): self.info_msg('Test D passed') return True - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -300,17 +303,17 @@ def shutdown(self): self.info_msg(f'{transition_service} finished') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info(msg) - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn(msg) - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error(msg) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() time.sleep(10) test = BackupTest() diff --git a/nav2_system_tests/src/behaviors/backup/test_backup_behavior.launch.py b/nav2_system_tests/src/behaviors/backup/test_backup_behavior.launch.py index b7ca717286d..291dbb4b836 100755 --- a/nav2_system_tests/src/behaviors/backup/test_backup_behavior.launch.py +++ b/nav2_system_tests/src/behaviors/backup/test_backup_behavior.launch.py @@ -28,7 +28,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') params_file = LaunchConfiguration('params_file') @@ -133,12 +133,13 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[os.path.join( - os.getenv('TEST_DIR'), 'backup_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], + os.getenv('TEST_DIR', ''), + 'backup_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], name='tester_node', output='screen', ) diff --git a/nav2_system_tests/src/behaviors/drive_on_heading/drive_tester.py b/nav2_system_tests/src/behaviors/drive_on_heading/drive_tester.py index 852f3bee6a9..2f3721c1580 100755 --- a/nav2_system_tests/src/behaviors/drive_on_heading/drive_tester.py +++ b/nav2_system_tests/src/behaviors/drive_on_heading/drive_tester.py @@ -15,6 +15,7 @@ import sys import time +from typing import Optional from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point32, PolygonStamped @@ -22,7 +23,8 @@ from nav2_msgs.msg import Costmap from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.duration import Duration from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -30,7 +32,7 @@ class DriveTest(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='drive_tester', namespace='') self.costmap_qos = QoSProfile( durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, @@ -43,10 +45,12 @@ def __init__(self): Costmap, 'local_costmap/costmap_raw', self.costmap_qos) self.footprint_pub = self.create_publisher( PolygonStamped, 'local_costmap/published_footprint', 10) - self.goal_handle = None - self.action_result = None + self.goal_handle: Optional[ClientGoalHandle[ + DriveOnHeading.Goal, DriveOnHeading.Result, + DriveOnHeading.Feedback]] = None + self.action_result = DriveOnHeading.Result() - def sendCommand(self, command): + def sendCommand(self, command: DriveOnHeading.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -55,7 +59,7 @@ def sendCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -65,8 +69,8 @@ def sendCommand(self, command): self.info_msg("Waiting for 'DriveOnHeading' action to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -80,7 +84,7 @@ def sendCommand(self, command): self.info_msg('DriveOnHeading failed to meet target!') return False - def sendAndPreemptWithFasterCommand(self, command): + def sendAndPreemptWithFasterCommand(self, command: DriveOnHeading.Goal) -> bool: # Send initial goal self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) @@ -90,7 +94,7 @@ def sendAndPreemptWithFasterCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -108,7 +112,7 @@ def sendAndPreemptWithFasterCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Preemption rejected') return False @@ -119,8 +123,8 @@ def sendAndPreemptWithFasterCommand(self, command): self.info_msg("Waiting for 'DriveOnHeading' action Preemption to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -134,7 +138,7 @@ def sendAndPreemptWithFasterCommand(self, command): self.info_msg('DriveOnHeading failed to meet target!') return False - def sendAndCancelCommand(self, command): + def sendAndCancelCommand(self, command: DriveOnHeading.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -143,7 +147,7 @@ def sendAndCancelCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -155,7 +159,7 @@ def sendAndCancelCommand(self, command): cancel_future = self.goal_handle.cancel_goal_async() rclpy.spin_until_future_complete(self, cancel_future) rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status + status = self.result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_CANCELED: self.info_msg(f'Goal failed with status code: {status}') return False @@ -163,7 +167,7 @@ def sendAndCancelCommand(self, command): self.info_msg('Goal was canceled successfully') return True - def sendFreeCostmap(self): + def sendFreeCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -186,7 +190,7 @@ def sendFreeCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def sendOccupiedCostmap(self): + def sendOccupiedCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -209,7 +213,7 @@ def sendOccupiedCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def run(self): + def run(self) -> bool: while not self.action_client.wait_for_server(timeout_sec=1.0): self.info_msg("'DriveOnHeading' action server not available, waiting...") @@ -286,7 +290,7 @@ def run(self): self.info_msg('Test D passed') return True - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -308,17 +312,17 @@ def shutdown(self): self.info_msg(f'{transition_service} finished') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info(msg) - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn(msg) - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error(msg) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() time.sleep(10) test = DriveTest() diff --git a/nav2_system_tests/src/behaviors/drive_on_heading/test_drive_on_heading_behavior.launch.py b/nav2_system_tests/src/behaviors/drive_on_heading/test_drive_on_heading_behavior.launch.py index 2b308537f1d..016d9ef163f 100755 --- a/nav2_system_tests/src/behaviors/drive_on_heading/test_drive_on_heading_behavior.launch.py +++ b/nav2_system_tests/src/behaviors/drive_on_heading/test_drive_on_heading_behavior.launch.py @@ -28,7 +28,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') params_file = LaunchConfiguration('params_file') @@ -133,12 +133,13 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[os.path.join( - os.getenv('TEST_DIR'), 'drive_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], + os.getenv('TEST_DIR', ''), + 'drive_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], name='tester_node', output='screen', ) diff --git a/nav2_system_tests/src/behaviors/spin/spin_tester.py b/nav2_system_tests/src/behaviors/spin/spin_tester.py index 917df80aeb9..ff5087e62ed 100755 --- a/nav2_system_tests/src/behaviors/spin/spin_tester.py +++ b/nav2_system_tests/src/behaviors/spin/spin_tester.py @@ -15,6 +15,7 @@ import sys import time +from typing import Optional from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point32, PolygonStamped @@ -22,7 +23,8 @@ from nav2_msgs.msg import Costmap from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.duration import Duration from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -30,7 +32,7 @@ class SpinTest(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='spin_tester', namespace='') self.costmap_qos = QoSProfile( durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, @@ -43,10 +45,11 @@ def __init__(self): Costmap, 'local_costmap/costmap_raw', self.costmap_qos) self.footprint_pub = self.create_publisher( PolygonStamped, 'local_costmap/published_footprint', 10) - self.goal_handle = None - self.action_result = None + self.goal_handle: Optional[ClientGoalHandle[ + Spin.Goal, Spin.Result, Spin.Feedback]] = None + self.action_result = Spin.Result() - def sendCommand(self, command): + def sendCommand(self, command: Spin.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -55,7 +58,7 @@ def sendCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -65,8 +68,8 @@ def sendCommand(self, command): self.info_msg("Waiting for 'spin' action to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -80,7 +83,7 @@ def sendCommand(self, command): self.info_msg('Spin failed to meet target!') return False - def sendAndPreemptWithInvertedCommand(self, command): + def sendAndPreemptWithInvertedCommand(self, command: Spin.Goal) -> bool: # Send initial goal self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) @@ -90,7 +93,7 @@ def sendAndPreemptWithInvertedCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -108,7 +111,7 @@ def sendAndPreemptWithInvertedCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Preemption rejected') return False @@ -119,8 +122,8 @@ def sendAndPreemptWithInvertedCommand(self, command): self.info_msg("Waiting for 'spin' action Preemption to complete") try: rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status - result = self.result_future.result().result + status = self.result_future.result().status # type: ignore[union-attr] + result = self.result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -134,7 +137,7 @@ def sendAndPreemptWithInvertedCommand(self, command): self.info_msg('Spin failed to meet target!') return False - def sendAndCancelCommand(self, command): + def sendAndCancelCommand(self, command: Spin.Goal) -> bool: self.info_msg('Sending goal request...') self.goal_future = self.action_client.send_goal_async(command) try: @@ -143,7 +146,7 @@ def sendAndCancelCommand(self, command): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -155,7 +158,7 @@ def sendAndCancelCommand(self, command): cancel_future = self.goal_handle.cancel_goal_async() rclpy.spin_until_future_complete(self, cancel_future) rclpy.spin_until_future_complete(self, self.result_future) - status = self.result_future.result().status + status = self.result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_CANCELED: self.info_msg(f'Goal failed with status code: {status}') return False @@ -163,7 +166,7 @@ def sendAndCancelCommand(self, command): self.info_msg('Goal was canceled successfully') return True - def sendFreeCostmap(self): + def sendFreeCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -186,7 +189,7 @@ def sendFreeCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def sendOccupiedCostmap(self): + def sendOccupiedCostmap(self) -> None: costmap_msg = Costmap() costmap_msg.header.frame_id = 'odom' costmap_msg.header.stamp = self.get_clock().now().to_msg() @@ -209,7 +212,7 @@ def sendOccupiedCostmap(self): ] self.footprint_pub.publish(footprint_msg) - def run(self): + def run(self) -> bool: while not self.action_client.wait_for_server(timeout_sec=1.0): self.info_msg("'spin' action server not available, waiting...") @@ -270,7 +273,7 @@ def run(self): self.info_msg('Test D passed') return True - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -292,17 +295,17 @@ def shutdown(self): self.info_msg(f'{transition_service} finished') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info(msg) - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn(msg) - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error(msg) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() time.sleep(10) test = SpinTest() diff --git a/nav2_system_tests/src/behaviors/spin/test_spin_behavior.launch.py b/nav2_system_tests/src/behaviors/spin/test_spin_behavior.launch.py index 38c205c6508..cadef8c367a 100755 --- a/nav2_system_tests/src/behaviors/spin/test_spin_behavior.launch.py +++ b/nav2_system_tests/src/behaviors/spin/test_spin_behavior.launch.py @@ -28,7 +28,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') params_file = LaunchConfiguration('params_file') @@ -133,12 +133,13 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[os.path.join( - os.getenv('TEST_DIR'), 'spin_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], + os.getenv('TEST_DIR', ''), + 'spin_tester.py'), '--ros-args', '-p', 'use_sim_time:=True'], name='tester_node', output='screen', ) diff --git a/nav2_system_tests/src/behaviors/wait/test_wait_behavior_launch.py b/nav2_system_tests/src/behaviors/wait/test_wait_behavior_launch.py index da70e9b04da..49cee4ea48e 100755 --- a/nav2_system_tests/src/behaviors/wait/test_wait_behavior_launch.py +++ b/nav2_system_tests/src/behaviors/wait/test_wait_behavior_launch.py @@ -28,7 +28,7 @@ from nav2_simple_commander.utils import kill_os_processes -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') ros_gz_sim_dir = get_package_share_directory('ros_gz_sim') @@ -45,7 +45,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) bringup_dir = get_package_share_directory('nav2_bringup') @@ -114,10 +114,10 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() - testExecutable = os.getenv('TEST_EXECUTABLE') + testExecutable = os.getenv('TEST_EXECUTABLE', '') test1_action = ExecuteProcess( cmd=[testExecutable], name='test_wait_behavior_node', output='screen', diff --git a/nav2_system_tests/src/costmap_filters/test_keepout_launch.py b/nav2_system_tests/src/costmap_filters/test_keepout_launch.py index ff554956e8f..71d4189994a 100755 --- a/nav2_system_tests/src/costmap_filters/test_keepout_launch.py +++ b/nav2_system_tests/src/costmap_filters/test_keepout_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') nav2_sys_test_dir = get_package_share_directory('nav2_system_tests') @@ -43,12 +43,12 @@ def generate_launch_description(): robot_description = infp.read() map_yaml_file = os.path.join(nav2_sys_test_dir, 'maps', 'map_circular.yaml') - filter_mask_file = os.getenv('TEST_MASK') + filter_mask_file = os.getenv('TEST_MASK', '') bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) script_dir = os.path.dirname(os.path.realpath(__file__)) @@ -164,12 +164,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), 'tester_node.py'), + os.path.join(os.getenv('TEST_DIR', ''), 'tester_node.py'), '-t', 'keepout', '-r', diff --git a/nav2_system_tests/src/costmap_filters/test_speed_launch.py b/nav2_system_tests/src/costmap_filters/test_speed_launch.py index 31c55ccc9db..8746c7c7266 100755 --- a/nav2_system_tests/src/costmap_filters/test_speed_launch.py +++ b/nav2_system_tests/src/costmap_filters/test_speed_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') nav2_sys_test_dir = get_package_share_directory('nav2_system_tests') @@ -48,10 +48,10 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) - params_file = os.getenv('PARAMS_FILE') + params_file = os.getenv('PARAMS_FILE', '') # Replace the `use_astar` setting on the params file param_substitutions = { @@ -155,12 +155,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), 'tester_node.py'), + os.path.join(os.getenv('TEST_DIR', ''), 'tester_node.py'), '-t', 'speed', '-r', diff --git a/nav2_system_tests/src/costmap_filters/tester_node.py b/nav2_system_tests/src/costmap_filters/tester_node.py index c978e570b13..0f80d146fe6 100755 --- a/nav2_system_tests/src/costmap_filters/tester_node.py +++ b/nav2_system_tests/src/costmap_filters/tester_node.py @@ -30,7 +30,7 @@ from nav2_msgs.srv import ManageLifecycleNodes from nav_msgs.msg import OccupancyGrid, Path import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy from sensor_msgs.msg import PointCloud2 @@ -44,12 +44,12 @@ class TestType(Enum): class FilterMask: def __init__(self, filter_mask: OccupancyGrid): - self.filter_mask = filter_mask + self.filter_mask: OccupancyGrid = filter_mask # Converts world coordinates into filter mask map coordinate. # Returns filter mask map coordinates or (-1, -1) in case # if world coordinates are out of mask bounds. - def worldToMap(self, wx: float, wy: float): + def worldToMap(self, wx: float, wy: float) -> tuple[int, int]: origin_x = self.filter_mask.info.origin.position.x origin_y = self.filter_mask.info.origin.position.y size_x = self.filter_mask.info.width @@ -68,7 +68,7 @@ def worldToMap(self, wx: float, wy: float): return -1, -1 # Gets filter_mask[mx, my] value - def getValue(self, mx, my): + def getValue(self, mx: int, my: int): # type: ignore[no-untyped-def] size_x = self.filter_mask.info.width return self.filter_mask.data[mx + my * size_x] @@ -95,9 +95,9 @@ def __init__( ) volatile_qos = QoSProfile( - durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE, + durability=QoSDurabilityPolicy.VOLATILE, reliability=QoSReliabilityPolicy.RELIABLE, - history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST, + history=QoSHistoryPolicy.KEEP_LAST, depth=1, ) @@ -153,16 +153,16 @@ def __init__( self.goal_pose = goal_pose self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info('\033[1;37;44m' + msg + '\033[0m') - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m') - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error('\033[1;37;41m' + msg + '\033[0m') - def setInitialPose(self): + def setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose msg.header.frame_id = 'map' @@ -170,17 +170,17 @@ def setInitialPose(self): self.initial_pose_pub.publish(msg) self.currentPose = self.initial_pose - def getStampedPoseMsg(self, pose: Pose): + def getStampedPoseMsg(self, pose: Pose) -> PoseStamped: msg = PoseStamped() msg.header.frame_id = 'map' msg.pose = pose return msg - def publishGoalPose(self, goal_pose: Optional[Pose] = None): + def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None: self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose)) - def runNavigateAction(self, goal_pose: Optional[Pose] = None): + def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateToPose' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -196,7 +196,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -205,7 +205,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: self.info_msg(f'Goal failed with status code: {status}') return False @@ -213,7 +213,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg('Goal succeeded!') return True - def isInKeepout(self, x, y): + def isInKeepout(self, x: float, y: float) -> bool: mx, my = self.filter_mask.worldToMap(x, y) if mx == -1 and my == -1: # Out of mask's area return False @@ -222,7 +222,7 @@ def isInKeepout(self, x, y): return False # Checks that (x, y) position does not belong to a keepout zone. - def checkKeepout(self, x, y): + def checkKeepout(self, x: float, y: float) -> bool: if not self.mask_received: self.warn_msg('Filter mask was not received') elif self.isInKeepout(x, y): @@ -238,7 +238,7 @@ def checkKeepout(self, x, y): # Also verifies that speed limit messages received no more than N-times # (where N - is the length of 'limits' array), # otherwise sets overall 'filter_test_result' to be false. - def checkSpeed(self, it, speed_limit): + def checkSpeed(self, it: int, speed_limit: float) -> None: if it >= len(self.limits): self.error_msg('Got excess speed limit') self.filter_test_result = False @@ -253,7 +253,7 @@ def checkSpeed(self, it, speed_limit): + str(self.limits[it]) ) - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.current_pose = msg.pose.pose self.initial_pose_received = True @@ -263,41 +263,41 @@ def poseCallback(self, msg): ): self.error_msg('Robot goes into keepout zone') - def planCallback(self, msg): + def planCallback(self, msg: Path) -> None: self.info_msg('Received plan') for pose in msg.poses: if not self.checkKeepout(pose.pose.position.x, pose.pose.position.y): self.error_msg('Path plan intersects with keepout zone') return - def clearingEndpointsCallback(self, msg): + def clearingEndpointsCallback(self, msg: PointCloud2) -> None: if len(msg.data) > 0: self.clearing_endpoints_received = True - def voxelMarkedCallback(self, msg): + def voxelMarkedCallback(self, msg: PointCloud2) -> None: if len(msg.data) > 0: self.voxel_marked_received = True - def voxelUnknownCallback(self, msg): + def voxelUnknownCallback(self, msg: PointCloud2) -> None: if len(msg.data) > 0: self.voxel_unknown_received = True - def dwbCostCloudCallback(self, msg): + def dwbCostCloudCallback(self, msg: PointCloud2) -> None: self.info_msg('Received cost_cloud points') if len(msg.data) > 0: self.cost_cloud_received = True - def speedLimitCallback(self, msg): + def speedLimitCallback(self, msg: SpeedLimit) -> None: self.info_msg(f'Received speed limit: {msg.speed_limit}') self.checkSpeed(self.speed_it, msg.speed_limit) self.speed_it += 1 - def maskCallback(self, msg): + def maskCallback(self, msg: OccupancyGrid) -> None: self.info_msg('Received filter mask') self.filter_mask = FilterMask(msg) self.mask_received = True - def wait_for_filter_mask(self, timeout): + def wait_for_filter_mask(self, timeout: float) -> bool: start_time = time.time() while not self.mask_received: @@ -308,7 +308,7 @@ def wait_for_filter_mask(self, timeout): return False return True - def wait_for_pointcloud_subscribers(self, timeout): + def wait_for_pointcloud_subscribers(self, timeout: float) -> bool: start_time = time.time() while ( not self.voxel_unknown_received @@ -328,7 +328,7 @@ def wait_for_pointcloud_subscribers(self, timeout): return False return True - def reachesGoal(self, timeout, distance): + def reachesGoal(self, timeout: float, distance: float) -> bool: goalReached = False start_time = time.time() @@ -343,14 +343,17 @@ def reachesGoal(self, timeout, distance): self.error_msg('Robot timed out reaching its goal!') return False - def distanceFromGoal(self): + self.info_msg('Robot reached its goal!') + return True + + def distanceFromGoal(self) -> float: d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) self.info_msg(f'Distance from goal is: {distance}') return distance - def wait_for_node_active(self, node_name: str): + def wait_for_node_active(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.info_msg(f'Waiting for {node_name} to become active') node_service = f'{node_name}/get_state' @@ -364,15 +367,15 @@ def wait_for_node_active(self, node_name: str): future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: - state = future.result().current_state.label + state = future.result().current_state.label # type: ignore[union-attr] self.info_msg(f'Result of get_state: {state}') else: self.error_msg( - 'Exception while calling service: %r' % future.exception() + f'Exception while calling service: {future.exception()!r}' ) time.sleep(5) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -407,7 +410,7 @@ def shutdown(self): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - def wait_for_initial_pose(self): + def wait_for_initial_pose(self) -> None: self.initial_pose_received = False while not self.initial_pose_received: self.info_msg('Setting initial pose') @@ -416,7 +419,7 @@ def wait_for_initial_pose(self): rclpy.spin_once(self, timeout_sec=1) -def test_RobotMovesToGoal(robot_tester): +def test_RobotMovesToGoal(robot_tester: NavTester) -> bool: robot_tester.info_msg('Setting goal pose') robot_tester.publishGoalPose() robot_tester.info_msg('Waiting 60 seconds for robot to reach goal') @@ -428,7 +431,7 @@ def test_RobotMovesToGoal(robot_tester): # checks that all items in 'limit_passed' permissive array are also true. # In other words, it verifies that all speed limits are received # exactly (by count and values) as expected by 'limits' array. -def test_SpeedLimitsAllCorrect(robot_tester): +def test_SpeedLimitsAllCorrect(robot_tester: NavTester) -> bool: if not robot_tester.filter_test_result: return False for passed in robot_tester.limit_passed: @@ -438,7 +441,7 @@ def test_SpeedLimitsAllCorrect(robot_tester): return True -def run_all_tests(robot_tester): +def run_all_tests(robot_tester: NavTester) -> bool: # set transforms to use_sim_time result = True if result: @@ -472,7 +475,7 @@ def run_all_tests(robot_tester): return result -def fwd_pose(x=0.0, y=0.0, z=0.01): +def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose: initial_pose = Pose() initial_pose.position.x = x initial_pose.position.y = y @@ -484,7 +487,7 @@ def fwd_pose(x=0.0, y=0.0, z=0.01): return initial_pose -def get_tester(args): +def get_tester(args: argparse.Namespace) -> NavTester: # Requested tester for one robot type_str = args.type @@ -512,7 +515,7 @@ def get_tester(args): return tester -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] # The robot(s) positions from the input arguments parser = argparse.ArgumentParser( description='System-level costmap filters tester node' diff --git a/nav2_system_tests/src/error_codes/test_error_codes.py b/nav2_system_tests/src/error_codes/test_error_codes.py index 24ce673726a..f46819ef7aa 100755 --- a/nav2_system_tests/src/error_codes/test_error_codes.py +++ b/nav2_system_tests/src/error_codes/test_error_codes.py @@ -24,7 +24,7 @@ import rclpy -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() navigator = BasicNavigator() @@ -113,7 +113,7 @@ def main(argv=sys.argv[1:]): result.error_msg != '' ), 'Compute path to pose error_msg empty' - def cancel_task(): + def cancel_task() -> None: time.sleep(1) navigator.goal_handle.cancel_goal_async() @@ -174,7 +174,7 @@ def cancel_task(): a_path.poses.append(pose1) navigator._waitForNodeToActivate('smoother_server') - smoother = { + smoother_errors = { 'invalid_smoother': SmoothPath.Result().INVALID_SMOOTHER, 'unknown': SmoothPath.Result().UNKNOWN, 'timeout': SmoothPath.Result().TIMEOUT, @@ -183,7 +183,7 @@ def cancel_task(): 'invalid_path': SmoothPath.Result().INVALID_PATH, } - for smoother, error_code in smoother.items(): + for smoother, error_code in smoother_errors.items(): result = navigator._smoothPathImpl(a_path, smoother) assert result.error_code == error_code, 'Smoother error does not match' assert result.error_msg != '', 'Smoother error_msg is empty' diff --git a/nav2_system_tests/src/error_codes/test_error_codes_launch.py b/nav2_system_tests/src/error_codes/test_error_codes_launch.py index 3ca61767624..7e33c5bb240 100755 --- a/nav2_system_tests/src/error_codes/test_error_codes_launch.py +++ b/nav2_system_tests/src/error_codes/test_error_codes_launch.py @@ -22,8 +22,8 @@ from launch_testing.legacy import LaunchTestService -def generate_launch_description(): - params_file = os.path.join(os.getenv('TEST_DIR'), 'error_code_param.yaml') +def generate_launch_description() -> LaunchDescription: + params_file = os.path.join(os.getenv('TEST_DIR', ''), 'error_code_param.yaml') remappings = [('/tf', 'tf'), ('/tf_static', 'tf_static')] @@ -103,11 +103,11 @@ def generate_launch_description(): return ld -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test_error_codes_action = ExecuteProcess( - cmd=[os.path.join(os.getenv('TEST_DIR'), 'test_error_codes.py')], + cmd=[os.path.join(os.getenv('TEST_DIR', ''), 'test_error_codes.py')], name='test_error_codes', output='screen', ) diff --git a/nav2_system_tests/src/gps_navigation/dual_ekf_navsat.launch.py b/nav2_system_tests/src/gps_navigation/dual_ekf_navsat.launch.py index 8cc64ec69e1..4705cd77bf2 100644 --- a/nav2_system_tests/src/gps_navigation/dual_ekf_navsat.launch.py +++ b/nav2_system_tests/src/gps_navigation/dual_ekf_navsat.launch.py @@ -18,7 +18,7 @@ import launch_ros.actions -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: launch_dir = os.path.dirname(os.path.realpath(__file__)) params_file = os.path.join(launch_dir, 'dual_ekf_navsat_params.yaml') os.environ['FILE_PATH'] = str(launch_dir) diff --git a/nav2_system_tests/src/gps_navigation/test_case_py.launch.py b/nav2_system_tests/src/gps_navigation/test_case_py.launch.py index ef42a91e4b7..140be5ab708 100755 --- a/nav2_system_tests/src/gps_navigation/test_case_py.launch.py +++ b/nav2_system_tests/src/gps_navigation/test_case_py.launch.py @@ -27,7 +27,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -110,11 +110,11 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( - cmd=[os.path.join(os.getenv('TEST_DIR'), 'tester.py')], + cmd=[os.path.join(os.getenv('TEST_DIR', ''), 'tester.py')], name='tester_node', output='screen', ) diff --git a/nav2_system_tests/src/gps_navigation/tester.py b/nav2_system_tests/src/gps_navigation/tester.py index 6829dc6a494..56c99324b29 100755 --- a/nav2_system_tests/src/gps_navigation/tester.py +++ b/nav2_system_tests/src/gps_navigation/tester.py @@ -15,6 +15,7 @@ import sys import time +from typing import Optional from action_msgs.msg import GoalStatus from geographic_msgs.msg import GeoPose @@ -22,27 +23,30 @@ from nav2_msgs.srv import ManageLifecycleNodes from rcl_interfaces.srv import SetParameters import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.node import Node from rclpy.parameter import Parameter class GpsWaypointFollowerTest(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='nav2_gps_waypoint_tester', namespace='') - self.waypoints = None + self.waypoints: list[float] = [] self.action_client = ActionClient( self, FollowGPSWaypoints, 'follow_gps_waypoints' ) - self.goal_handle = None - self.action_result = None + self.goal_handle: Optional[ClientGoalHandle[ + FollowGPSWaypoints.Goal, FollowGPSWaypoints.Result, + FollowGPSWaypoints.Feedback]] = None + self.action_result = FollowGPSWaypoints.Result() self.param_cli = self.create_client( SetParameters, '/waypoint_follower/set_parameters' ) - def setWaypoints(self, waypoints): + def setWaypoints(self, waypoints: list[list[float]]) -> None: self.waypoints = [] for wp in waypoints: msg = GeoPose() @@ -51,7 +55,7 @@ def setWaypoints(self, waypoints): msg.orientation.w = 1.0 self.waypoints.append(msg) - def run(self, block, cancel): + def run(self, block: bool, cancel: bool) -> bool: # if not self.waypoints: # rclpy.error_msg('Did not set valid waypoints before running test!') # return False @@ -72,7 +76,7 @@ def run(self, block, cancel): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -88,8 +92,8 @@ def run(self, block, cancel): self.info_msg("Waiting for 'follow_gps_waypoints' action to complete") try: rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status - result = get_result_future.result().result + status = get_result_future.result().status # type: ignore[union-attr] + result = get_result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -100,14 +104,14 @@ def run(self, block, cancel): if len(result.missed_waypoints) > 0: self.info_msg( 'Goal failed to process all waypoints,' - ' missed {0} wps.'.format(len(result.missed_waypoints)) + f' missed {len(result.missed_waypoints)} wps.' ) return False self.info_msg('Goal succeeded!') return True - def setStopFailureParam(self, value): + def setStopFailureParam(self, value: bool) -> None: req = SetParameters.Request() req.parameters = [ Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg() @@ -115,7 +119,7 @@ def setStopFailureParam(self, value): future = self.param_cli.call_async(req) rclpy.spin_until_future_complete(self, future) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -137,21 +141,21 @@ def shutdown(self): self.info_msg(f'{transition_service} finished') - def cancel_goal(self): - cancel_future = self.goal_handle.cancel_goal_async() + def cancel_goal(self) -> None: + cancel_future = self.goal_handle.cancel_goal_async() # type: ignore[union-attr] rclpy.spin_until_future_complete(self, cancel_future) - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info(msg) - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn(msg) - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error(msg) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() # wait a few seconds to make sure entire stacks are up diff --git a/nav2_system_tests/src/localization/test_localization_launch.py b/nav2_system_tests/src/localization/test_localization_launch.py index 1520ac09bbe..c573f39dd56 100755 --- a/nav2_system_tests/src/localization/test_localization_launch.py +++ b/nav2_system_tests/src/localization/test_localization_launch.py @@ -26,8 +26,8 @@ from launch_testing.legacy import LaunchTestService -def main(argv=sys.argv[1:]): - testExecutable = os.getenv('TEST_EXECUTABLE') +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] + testExecutable = os.getenv('TEST_EXECUTABLE', '') sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') diff --git a/nav2_system_tests/src/planning/test_planner_costmaps_launch.py b/nav2_system_tests/src/planning/test_planner_costmaps_launch.py index fd8cadb4f9d..6eb5888e80f 100755 --- a/nav2_system_tests/src/planning/test_planner_costmaps_launch.py +++ b/nav2_system_tests/src/planning/test_planner_costmaps_launch.py @@ -22,7 +22,7 @@ from launch_testing.legacy import LaunchTestService -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] testExecutable = os.getenv('TEST_EXECUTABLE') ld = LaunchDescription([]) diff --git a/nav2_system_tests/src/planning/test_planner_random_launch.py b/nav2_system_tests/src/planning/test_planner_random_launch.py index 105063110f8..ca014d6a59f 100755 --- a/nav2_system_tests/src/planning/test_planner_random_launch.py +++ b/nav2_system_tests/src/planning/test_planner_random_launch.py @@ -22,7 +22,7 @@ from launch_testing.legacy import LaunchTestService -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] testExecutable = os.getenv('TEST_EXECUTABLE') ld = LaunchDescription([]) diff --git a/nav2_system_tests/src/system/nav_through_poses_tester_error_msg_node.py b/nav2_system_tests/src/system/nav_through_poses_tester_error_msg_node.py index f24729cd79d..c170d91338a 100755 --- a/nav2_system_tests/src/system/nav_through_poses_tester_error_msg_node.py +++ b/nav2_system_tests/src/system/nav_through_poses_tester_error_msg_node.py @@ -75,16 +75,16 @@ def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''): SetParameters, '/controller_server/set_parameters' ) - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info('\033[1;37;44m' + msg + '\033[0m') - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m') - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error('\033[1;37;41m' + msg + '\033[0m') - def setInitialPose(self): + def setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose msg.header.frame_id = 'map' @@ -92,17 +92,17 @@ def setInitialPose(self): self.initial_pose_pub.publish(msg) self.currentPose = self.initial_pose - def setGoalChecker(self, name): + def setGoalChecker(self, name: str) -> None: msg = String() msg.data = name self.goal_checker_selector_pub.publish(msg) - def setProgressChecker(self, name): + def setProgressChecker(self, name: str) -> None: msg = String() msg.data = name self.progress_checker_selector_pub.publish(msg) - def setControllerParam(self, name, parameter_type, value): + def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) -> None: req = SetParameters.Request() req.parameters = [ Parameter(name, parameter_type, value).to_parameter_msg() @@ -110,7 +110,7 @@ def setControllerParam(self, name, parameter_type, value): future = self.controller_param_cli.call_async(req) rclpy.spin_until_future_complete(self, future) - def getStampedPoseMsg(self, pose: Pose): + def getStampedPoseMsg(self, pose: Pose) -> PoseStamped: msg = PoseStamped() msg.header.frame_id = 'map' msg.pose = pose @@ -120,7 +120,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None, behavior_tree: Optional[str] = None, expected_error_code: Optional[int] = None, - expected_error_msg: Optional[str] = None): + expected_error_msg: Optional[str] = None) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateThroughPoses' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -153,9 +153,9 @@ def runNavigateAction(self, self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: - result = get_result_future.result().result + result = get_result_future.result().result # type: ignore[union-attr] if (result.error_code == expected_error_code and result.error_msg == expected_error_msg): self.info_msg(f'Goal failed as expected with status code: {status}' @@ -173,12 +173,12 @@ def runNavigateAction(self, self.info_msg('Goal succeeded!') return True - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.current_pose = msg.pose.pose self.initial_pose_received = True - def wait_for_node_active(self, node_name: str): + def wait_for_node_active(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.info_msg(f'Waiting for {node_name} to become active') node_service = f'{node_name}/get_state' @@ -192,7 +192,7 @@ def wait_for_node_active(self, node_name: str): future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: - state = future.result().current_state.label + state = future.result().current_state.label # type: ignore[union-attr] self.info_msg(f'Result of get_state: {state}') else: self.error_msg( @@ -200,7 +200,7 @@ def wait_for_node_active(self, node_name: str): ) time.sleep(5) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -235,7 +235,7 @@ def shutdown(self): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - def wait_for_initial_pose(self): + def wait_for_initial_pose(self) -> None: self.initial_pose_received = False while not self.initial_pose_received: self.info_msg('Setting initial pose') @@ -244,7 +244,7 @@ def wait_for_initial_pose(self): rclpy.spin_once(self, timeout_sec=1) -def run_all_tests(robot_tester): +def run_all_tests(robot_tester: NavTester) -> bool: pose_out_of_bounds = Pose() pose_out_of_bounds.position.x = 2000.0 pose_out_of_bounds.position.y = 4000.0 @@ -341,7 +341,7 @@ def run_all_tests(robot_tester): return result -def fwd_pose(x=0.0, y=0.0, z=0.01): +def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose: initial_pose = Pose() initial_pose.position.x = x initial_pose.position.y = y @@ -353,7 +353,7 @@ def fwd_pose(x=0.0, y=0.0, z=0.01): return initial_pose -def get_testers(args): +def get_testers(args: argparse.Namespace) -> list[NavTester]: testers = [] init_x, init_y, final_x, final_y = args.robot[0] @@ -376,7 +376,7 @@ def get_testers(args): return testers -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] # The robot(s) positions from the input arguments parser = argparse.ArgumentParser(description='System-level navigation tester node') group = parser.add_mutually_exclusive_group(required=True) diff --git a/nav2_system_tests/src/system/nav_through_poses_tester_node.py b/nav2_system_tests/src/system/nav_through_poses_tester_node.py index 6109223c82a..02c793d5507 100755 --- a/nav2_system_tests/src/system/nav_through_poses_tester_node.py +++ b/nav2_system_tests/src/system/nav_through_poses_tester_node.py @@ -25,7 +25,7 @@ from nav2_msgs.action import NavigateThroughPoses from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -55,16 +55,16 @@ def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''): self, NavigateThroughPoses, 'navigate_through_poses' ) - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info('\033[1;37;44m' + msg + '\033[0m') - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m') - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error('\033[1;37;41m' + msg + '\033[0m') - def setInitialPose(self): + def setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose msg.header.frame_id = 'map' @@ -72,13 +72,13 @@ def setInitialPose(self): self.initial_pose_pub.publish(msg) self.currentPose = self.initial_pose - def getStampedPoseMsg(self, pose: Pose): + def getStampedPoseMsg(self, pose: Pose) -> PoseStamped: msg = PoseStamped() msg.header.frame_id = 'map' msg.pose = pose return msg - def runNavigateAction(self, goal_pose: Optional[Pose] = None): + def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateThroughPoses' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -101,7 +101,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -110,9 +110,9 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: - result = get_result_future.result().result + result = get_result_future.result().result # type: ignore[union-attr] self.info_msg(f'Goal failed with status code: {status}' f' error code:{result.error_code}' f' error msg:{result.error_msg}') @@ -121,7 +121,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg('Goal succeeded!') return True - def runFakeNavigateAction(self): + def runFakeNavigateAction(self) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateThroughPoses' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -137,7 +137,7 @@ def runFakeNavigateAction(self): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -146,9 +146,9 @@ def runFakeNavigateAction(self): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: - result = get_result_future.result().result + result = get_result_future.result().result # type: ignore[union-attr] self.info_msg(f'Goal failed with status code: {status}' f' error code:{result.error_code}' f' error msg:{result.error_msg}') @@ -157,7 +157,7 @@ def runFakeNavigateAction(self): self.info_msg('Goal succeeded!') return True - def runNavigatePreemptionAction(self, block): + def runNavigatePreemptionAction(self, block: bool) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateThroughPoses' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -174,7 +174,7 @@ def runNavigatePreemptionAction(self, block): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -186,9 +186,9 @@ def runNavigatePreemptionAction(self, block): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: - result = get_result_future.result().result + result = get_result_future.result().result # type: ignore[union-attr] self.info_msg(f'Goal failed with status code: {status}' f' error code:{result.error_code}' f' error msg:{result.error_msg}') @@ -197,12 +197,12 @@ def runNavigatePreemptionAction(self, block): self.info_msg('Goal succeeded!') return True - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.current_pose = msg.pose.pose self.initial_pose_received = True - def wait_for_node_active(self, node_name: str): + def wait_for_node_active(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.info_msg(f'Waiting for {node_name} to become active') node_service = f'{node_name}/get_state' @@ -216,7 +216,7 @@ def wait_for_node_active(self, node_name: str): future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: - state = future.result().current_state.label + state = future.result().current_state.label # type: ignore[union-attr] self.info_msg(f'Result of get_state: {state}') else: self.error_msg( @@ -224,7 +224,7 @@ def wait_for_node_active(self, node_name: str): ) time.sleep(5) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -259,7 +259,7 @@ def shutdown(self): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - def wait_for_initial_pose(self): + def wait_for_initial_pose(self) -> None: self.initial_pose_received = False while not self.initial_pose_received: self.info_msg('Setting initial pose') @@ -268,7 +268,7 @@ def wait_for_initial_pose(self): rclpy.spin_once(self, timeout_sec=1) -def run_all_tests(robot_tester): +def run_all_tests(robot_tester: NavTester) -> bool: # set transforms to use_sim_time result = True if result: @@ -292,7 +292,7 @@ def run_all_tests(robot_tester): return result -def fwd_pose(x=0.0, y=0.0, z=0.01): +def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose: initial_pose = Pose() initial_pose.position.x = x initial_pose.position.y = y @@ -304,7 +304,7 @@ def fwd_pose(x=0.0, y=0.0, z=0.01): return initial_pose -def get_testers(args): +def get_testers(args: argparse.Namespace) -> list[NavTester]: testers = [] init_x, init_y, final_x, final_y = args.robot[0] @@ -327,7 +327,7 @@ def get_testers(args): return testers -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] # The robot(s) positions from the input arguments parser = argparse.ArgumentParser(description='System-level navigation tester node') group = parser.add_mutually_exclusive_group(required=True) diff --git a/nav2_system_tests/src/system/nav_to_pose_tester_node.py b/nav2_system_tests/src/system/nav_to_pose_tester_node.py index 24a60bb0882..222d0f8237a 100755 --- a/nav2_system_tests/src/system/nav_to_pose_tester_node.py +++ b/nav2_system_tests/src/system/nav_to_pose_tester_node.py @@ -21,7 +21,7 @@ import struct import sys import time -from typing import Optional +from typing import Any, Optional from action_msgs.msg import GoalStatus from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped @@ -29,7 +29,7 @@ from nav2_msgs.action import NavigateToPose from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy import zmq @@ -60,16 +60,16 @@ def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''): self.set_initial_pose_timeout = 15 self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info('\033[1;37;44m' + msg + '\033[0m') - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m') - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error('\033[1;37;41m' + msg + '\033[0m') - def setInitialPose(self): + def setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose msg.header.frame_id = 'map' @@ -77,17 +77,17 @@ def setInitialPose(self): self.initial_pose_pub.publish(msg) self.currentPose = self.initial_pose - def getStampedPoseMsg(self, pose: Pose): + def getStampedPoseMsg(self, pose: Pose) -> PoseStamped: msg = PoseStamped() msg.header.frame_id = 'map' msg.pose = pose return msg - def publishGoalPose(self, goal_pose: Optional[Pose] = None): + def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None: self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose)) - def runNavigateAction(self, goal_pose: Optional[Pose] = None): + def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateToPose' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -109,7 +109,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -131,9 +131,9 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_SUCCEEDED: - result = get_result_future.result().result + result = get_result_future.result().result # type: ignore[union-attr] self.info_msg(f'Goal failed with status code: {status}' f' error code:{result.error_code}' f' error msg:{result.error_msg}') @@ -145,7 +145,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg('Goal succeeded!') return True - def grootMonitoringReloadTree(self): + def grootMonitoringReloadTree(self) -> bool: # ZeroMQ Context context = zmq.Context() @@ -191,7 +191,7 @@ def grootMonitoringReloadTree(self): return True - def grootMonitoringSetBreakpoint(self): + def grootMonitoringSetBreakpoint(self) -> bool: # ZeroMQ Context context = zmq.Context() # Define the socket using the 'Context' @@ -232,7 +232,7 @@ def grootMonitoringSetBreakpoint(self): self.info_msg('ZMQ - HOOK_INSERT request sent') return True - def grootMonitoringGetStatus(self): + def grootMonitoringGetStatus(self) -> bool: # ZeroMQ Context context = zmq.Context() @@ -276,12 +276,12 @@ def grootMonitoringGetStatus(self): self.info_msg('ZMQ - Did receive status') return True - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.current_pose = msg.pose.pose self.initial_pose_received = True - def reachesGoal(self, timeout, distance): + def reachesGoal(self, timeout: float, distance: float) -> bool: goalReached = False start_time = time.time() @@ -296,14 +296,17 @@ def reachesGoal(self, timeout, distance): self.error_msg('Robot timed out reaching its goal!') return False - def distanceFromGoal(self): + self.info_msg('Robot reached its goal!') + return True + + def distanceFromGoal(self) -> float: d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) self.info_msg(f'Distance from goal is: {distance}') return distance - def wait_for_node_active(self, node_name: str): + def wait_for_node_active(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.info_msg(f'Waiting for {node_name} to become active') node_service = f'{node_name}/get_state' @@ -317,7 +320,7 @@ def wait_for_node_active(self, node_name: str): future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: - state = future.result().current_state.label + state = future.result().current_state.label # type: ignore[union-attr] self.info_msg(f'Result of get_state: {state}') else: self.error_msg( @@ -325,7 +328,7 @@ def wait_for_node_active(self, node_name: str): ) time.sleep(5) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -360,13 +363,13 @@ def shutdown(self): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - def wait_for_initial_pose(self): + def wait_for_initial_pose(self) -> bool: self.initial_pose_received = False # If the initial pose is not received within 100 seconds, return False # this is because when setting a wrong initial pose, amcl_pose is not received # and the test will hang indefinitely start_time = time.time() - duration = 0 + duration = 0.0 while not self.initial_pose_received: self.info_msg('Setting initial pose') self.setInitialPose() @@ -379,14 +382,14 @@ def wait_for_initial_pose(self): return True -def test_RobotMovesToGoal(robot_tester): +def test_RobotMovesToGoal(robot_tester: NavTester) -> bool: robot_tester.info_msg('Setting goal pose') robot_tester.publishGoalPose() robot_tester.info_msg('Waiting 60 seconds for robot to reach goal') return robot_tester.reachesGoal(timeout=60, distance=0.5) -def run_all_tests(robot_tester): +def run_all_tests(robot_tester: NavTester) -> bool: # set transforms to use_sim_time result = True if result: @@ -409,7 +412,7 @@ def run_all_tests(robot_tester): return result -def fwd_pose(x=0.0, y=0.0, z=0.01): +def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose: initial_pose = Pose() initial_pose.position.x = x initial_pose.position.y = y @@ -421,7 +424,7 @@ def fwd_pose(x=0.0, y=0.0, z=0.01): return initial_pose -def get_testers(args): +def get_testers(args: argparse.Namespace) -> list[NavTester]: testers = [] if args.robot: @@ -469,7 +472,7 @@ def get_testers(args): return testers -def check_args(expect_failure: str): +def check_args(expect_failure: str) -> Any: # Check if --expect_failure is True or False if expect_failure != 'True' and expect_failure != 'False': print( @@ -480,7 +483,7 @@ def check_args(expect_failure: str): return eval(expect_failure) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] # The robot(s) positions from the input arguments parser = argparse.ArgumentParser(description='System-level navigation tester node') parser.add_argument('-e', '--expect_failure') diff --git a/nav2_system_tests/src/system/test_multi_robot_launch.py b/nav2_system_tests/src/system/test_multi_robot_launch.py index ed8bdc00e3b..e39a6993f77 100755 --- a/nav2_system_tests/src/system/test_multi_robot_launch.py +++ b/nav2_system_tests/src/system/test_multi_robot_launch.py @@ -27,16 +27,16 @@ from launch_testing.legacy import LaunchTestService -def generate_launch_description(): - map_yaml_file = os.getenv('TEST_MAP') - world = os.getenv('TEST_WORLD') - urdf = os.getenv('TEST_URDF') - sdf = os.getenv('TEST_SDF') +def generate_launch_description() -> LaunchDescription: + map_yaml_file = os.getenv('TEST_MAP', '') + world = os.getenv('TEST_WORLD', '') + urdf = os.getenv('TEST_URDF', '') + sdf = os.getenv('TEST_SDF', '') bt_xml_file = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) bringup_dir = get_package_share_directory('nav2_bringup') @@ -157,13 +157,13 @@ def generate_launch_description(): return ld -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() # TODO(orduno) remove duplicated definition of robots on `generate_launch_description` test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), os.getenv('TESTER')), + os.path.join(os.getenv('TEST_DIR', ''), os.getenv('TESTER', '')), '-rs', 'robot1', '0.0', diff --git a/nav2_system_tests/src/system/test_system_launch.py b/nav2_system_tests/src/system/test_system_launch.py index 7713d2815a1..e93efc9a646 100755 --- a/nav2_system_tests/src/system/test_system_launch.py +++ b/nav2_system_tests/src/system/test_system_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -46,7 +46,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) # Use local param file @@ -65,10 +65,10 @@ def generate_launch_description(): param_substitutions.update({'enable_groot_monitoring': 'True'}) param_substitutions.update( - {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER')} + {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER', '')} ) param_substitutions.update( - {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER')} + {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER', '')} ) configured_params = RewrittenYaml( @@ -137,12 +137,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), os.getenv('TESTER')), + os.path.join(os.getenv('TEST_DIR', ''), os.getenv('TESTER', '')), '-r', '-2.0', '-0.5', diff --git a/nav2_system_tests/src/system/test_system_with_obstacle_launch.py b/nav2_system_tests/src/system/test_system_with_obstacle_launch.py index b6fb8104ec4..8413da01458 100755 --- a/nav2_system_tests/src/system/test_system_with_obstacle_launch.py +++ b/nav2_system_tests/src/system/test_system_with_obstacle_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') nav2_system_tests_dir = get_package_share_directory('nav2_system_tests') @@ -47,7 +47,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) # Use local param file @@ -63,10 +63,10 @@ def generate_launch_description(): param_substitutions.update({'use_astar': 'True'}) param_substitutions.update( - {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER')} + {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER', '')} ) param_substitutions.update( - {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER')} + {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER', '')} ) configured_params = RewrittenYaml( @@ -147,12 +147,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), os.getenv('TESTER')), + os.path.join(os.getenv('TEST_DIR', ''), os.getenv('TESTER', '')), '-r', '-2.0', '-0.5', diff --git a/nav2_system_tests/src/system/test_wrong_init_pose_launch.py b/nav2_system_tests/src/system/test_wrong_init_pose_launch.py index 7a2af3355a8..81ae8bd483c 100755 --- a/nav2_system_tests/src/system/test_wrong_init_pose_launch.py +++ b/nav2_system_tests/src/system/test_wrong_init_pose_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -46,7 +46,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) # Use local param file @@ -65,10 +65,10 @@ def generate_launch_description(): param_substitutions.update({'enable_groot_monitoring': 'True'}) param_substitutions.update( - {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER')} + {'planner_server.ros__parameters.GridBased.plugin': os.getenv('PLANNER', '')} ) param_substitutions.update( - {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER')} + {'controller_server.ros__parameters.FollowPath.plugin': os.getenv('CONTROLLER', '')} ) configured_params = RewrittenYaml( @@ -137,12 +137,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), os.getenv('TESTER')), + os.path.join(os.getenv('TEST_DIR', ''), os.getenv('TESTER', '')), '-r', '-200000.0', '-200000.0', diff --git a/nav2_system_tests/src/system_failure/test_system_failure_launch.py b/nav2_system_tests/src/system_failure/test_system_failure_launch.py index 3e7f918e174..8b8e2542c6d 100755 --- a/nav2_system_tests/src/system_failure/test_system_failure_launch.py +++ b/nav2_system_tests/src/system_failure/test_system_failure_launch.py @@ -30,7 +30,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -46,7 +46,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', ''), ) params_file = os.path.join(nav2_bringup_dir, 'params', 'nav2_params.yaml') @@ -122,12 +122,12 @@ def generate_launch_description(): ) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( cmd=[ - os.path.join(os.getenv('TEST_DIR'), 'tester_node.py'), + os.path.join(os.getenv('TEST_DIR', ''), 'tester_node.py'), '-r', '-2.0', '-0.5', diff --git a/nav2_system_tests/src/system_failure/tester_node.py b/nav2_system_tests/src/system_failure/tester_node.py index 9f296ff0d46..615f24f27ec 100755 --- a/nav2_system_tests/src/system_failure/tester_node.py +++ b/nav2_system_tests/src/system_failure/tester_node.py @@ -26,7 +26,7 @@ from nav2_msgs.action import NavigateToPose from nav2_msgs.srv import ManageLifecycleNodes import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -55,16 +55,16 @@ def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''): self.goal_pose = goal_pose self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose') - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info('\033[1;37;44m' + msg + '\033[0m') - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m') - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error('\033[1;37;41m' + msg + '\033[0m') - def setInitialPose(self): + def setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose msg.header.frame_id = 'map' @@ -72,17 +72,17 @@ def setInitialPose(self): self.initial_pose_pub.publish(msg) self.currentPose = self.initial_pose - def getStampedPoseMsg(self, pose: Pose): + def getStampedPoseMsg(self, pose: Pose) -> PoseStamped: msg = PoseStamped() msg.header.frame_id = 'map' msg.pose = pose return msg - def publishGoalPose(self, goal_pose: Optional[Pose] = None): + def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None: self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose)) - def runNavigateAction(self, goal_pose: Optional[Pose] = None): + def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool: # Sends a `NavToPose` action request and waits for completion self.info_msg("Waiting for 'NavigateToPose' action server") while not self.action_client.wait_for_server(timeout_sec=1.0): @@ -98,7 +98,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, send_goal_future) goal_handle = send_goal_future.result() - if not goal_handle.accepted: + if not goal_handle or not goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -107,7 +107,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status + status = get_result_future.result().status # type: ignore[union-attr] if status != GoalStatus.STATUS_ABORTED: self.info_msg(f'Goal failed with status code: {status}') return False @@ -115,12 +115,12 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.info_msg('Goal failed, as expected!') return True - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.current_pose = msg.pose.pose self.initial_pose_received = True - def reachesGoal(self, timeout, distance): + def reachesGoal(self, timeout: float, distance: float) -> bool: goalReached = False start_time = time.time() @@ -135,14 +135,17 @@ def reachesGoal(self, timeout, distance): self.error_msg('Robot timed out reaching its goal!') return False - def distanceFromGoal(self): + self.info_msg('Robot reached its goal!') + return True + + def distanceFromGoal(self) -> float: d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) self.info_msg(f'Distance from goal is: {distance}') return distance - def wait_for_node_active(self, node_name: str): + def wait_for_node_active(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.info_msg(f'Waiting for {node_name} to become active') node_service = f'{node_name}/get_state' @@ -156,7 +159,7 @@ def wait_for_node_active(self, node_name: str): future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: - state = future.result().current_state.label + state = future.result().current_state.label # type: ignore[union-attr] self.info_msg(f'Result of get_state: {state}') else: self.error_msg( @@ -164,7 +167,7 @@ def wait_for_node_active(self, node_name: str): ) time.sleep(5) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -199,7 +202,7 @@ def shutdown(self): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - def wait_for_initial_pose(self): + def wait_for_initial_pose(self) -> None: self.initial_pose_received = False while not self.initial_pose_received: self.info_msg('Setting initial pose') @@ -208,7 +211,7 @@ def wait_for_initial_pose(self): rclpy.spin_once(self, timeout_sec=1) -def run_all_tests(robot_tester): +def run_all_tests(robot_tester: NavTester) -> bool: # set transforms to use_sim_time result = True if result: @@ -227,7 +230,7 @@ def run_all_tests(robot_tester): return result -def fwd_pose(x=0.0, y=0.0, z=0.01): +def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose: initial_pose = Pose() initial_pose.position.x = x initial_pose.position.y = y @@ -239,7 +242,7 @@ def fwd_pose(x=0.0, y=0.0, z=0.01): return initial_pose -def get_testers(args): +def get_testers(args: argparse.Namespace) -> list[NavTester]: testers = [] if args.robot: @@ -266,7 +269,7 @@ def get_testers(args): return testers -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] # The robot(s) positions from the input arguments parser = argparse.ArgumentParser(description='System-level navigation tester node') group = parser.add_mutually_exclusive_group(required=True) diff --git a/nav2_system_tests/src/updown/test_updown_launch.py b/nav2_system_tests/src/updown/test_updown_launch.py index 8a70be4e727..e9312e3ae8b 100755 --- a/nav2_system_tests/src/updown/test_updown_launch.py +++ b/nav2_system_tests/src/updown/test_updown_launch.py @@ -20,7 +20,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> launch.LaunchDescription: # Configuration parameters for the launch nav2_bringup_dir = get_package_share_directory('nav2_bringup') diff --git a/nav2_system_tests/src/updown/updownresults.py b/nav2_system_tests/src/updown/updownresults.py index de38c51c8bf..1106267179f 100755 --- a/nav2_system_tests/src/updown/updownresults.py +++ b/nav2_system_tests/src/updown/updownresults.py @@ -28,7 +28,7 @@ import sys -def main(): +def main() -> None: log = sys.stdin test_count = 0 fail_count = 0 diff --git a/nav2_system_tests/src/waypoint_follower/test_case_launch.py b/nav2_system_tests/src/waypoint_follower/test_case_launch.py index eb581b373ad..0c92849d806 100755 --- a/nav2_system_tests/src/waypoint_follower/test_case_launch.py +++ b/nav2_system_tests/src/waypoint_follower/test_case_launch.py @@ -28,7 +28,7 @@ from nav2_common.launch import RewrittenYaml -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: sim_dir = get_package_share_directory('nav2_minimal_tb3_sim') nav2_bringup_dir = get_package_share_directory('nav2_bringup') @@ -44,7 +44,7 @@ def generate_launch_description(): bt_navigator_xml = os.path.join( get_package_share_directory('nav2_bt_navigator'), 'behavior_trees', - os.getenv('BT_NAVIGATOR_XML'), + os.getenv('BT_NAVIGATOR_XML', '') ) params_file = os.path.join(nav2_bringup_dir, 'params/nav2_params.yaml') @@ -112,11 +112,11 @@ def generate_launch_description(): ]) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] ld = generate_launch_description() test1_action = ExecuteProcess( - cmd=[os.path.join(os.getenv('TEST_DIR'), 'tester.py')], + cmd=[os.path.join(os.getenv('TEST_DIR', ''), 'tester.py')], name='tester_node', output='screen') diff --git a/nav2_system_tests/src/waypoint_follower/tester.py b/nav2_system_tests/src/waypoint_follower/tester.py index 87c7b332c94..c70198c5e8f 100755 --- a/nav2_system_tests/src/waypoint_follower/tester.py +++ b/nav2_system_tests/src/waypoint_follower/tester.py @@ -15,6 +15,7 @@ import sys import time +from typing import Optional from action_msgs.msg import GoalStatus from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped @@ -22,7 +23,8 @@ from nav2_msgs.srv import ManageLifecycleNodes from rcl_interfaces.srv import SetParameters import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.node import Node from rclpy.parameter import Parameter from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy @@ -30,16 +32,18 @@ class WaypointFollowerTest(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='nav2_waypoint_tester', namespace='') - self.waypoints = None + self.waypoints: list[float] = [] self.action_client = ActionClient(self, FollowWaypoints, 'follow_waypoints') self.initial_pose_pub = self.create_publisher( PoseWithCovarianceStamped, 'initialpose', 10 ) self.initial_pose_received = False - self.goal_handle = None - self.action_result = None + self.goal_handle: Optional[ClientGoalHandle[ + FollowWaypoints.Goal, FollowWaypoints.Result, + FollowWaypoints.Feedback]] = None + self.action_result = FollowWaypoints.Result() pose_qos = QoSProfile( durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, @@ -55,7 +59,7 @@ def __init__(self): SetParameters, '/waypoint_follower/set_parameters' ) - def setInitialPose(self, pose): + def setInitialPose(self, pose: list[float]) -> None: self.init_pose = PoseWithCovarianceStamped() self.init_pose.pose.pose.position.x = pose[0] self.init_pose.pose.pose.position.y = pose[1] @@ -63,11 +67,11 @@ def setInitialPose(self, pose): self.publishInitialPose() time.sleep(5) - def poseCallback(self, msg): + def poseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info_msg('Received amcl_pose') self.initial_pose_received = True - def setWaypoints(self, waypoints): + def setWaypoints(self, waypoints: list[list[float]]) -> None: self.waypoints = [] for wp in waypoints: msg = PoseStamped() @@ -77,7 +81,7 @@ def setWaypoints(self, waypoints): msg.pose.orientation.w = 1.0 self.waypoints.append(msg) - def run(self, block, cancel): + def run(self, block: bool, cancel: bool) -> bool: # if not self.waypoints: # rclpy.error_msg('Did not set valid waypoints before running test!') # return False @@ -96,7 +100,7 @@ def run(self, block, cancel): except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error_msg('Goal rejected') return False @@ -112,8 +116,8 @@ def run(self, block, cancel): self.info_msg("Waiting for 'follow_waypoints' action to complete") try: rclpy.spin_until_future_complete(self, get_result_future) - status = get_result_future.result().status - result = get_result_future.result().result + status = get_result_future.result().status # type: ignore[union-attr] + result = get_result_future.result().result # type: ignore[union-attr] self.action_result = result except Exception as e: # noqa: B902 self.error_msg(f'Service call failed {e!r}') @@ -124,17 +128,17 @@ def run(self, block, cancel): if len(self.action_result.missed_waypoints) > 0: self.info_msg( 'Goal failed to process all waypoints,' - ' missed {0} wps.'.format(len(self.action_result.missed_waypoints)) + f' missed {len(self.action_result.missed_waypoints)} wps.' ) return False self.info_msg('Goal succeeded!') return True - def publishInitialPose(self): + def publishInitialPose(self) -> None: self.initial_pose_pub.publish(self.init_pose) - def setStopFailureParam(self, value): + def setStopFailureParam(self, value: bool) -> None: req = SetParameters.Request() req.parameters = [ Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg() @@ -142,7 +146,7 @@ def setStopFailureParam(self, value): future = self.param_cli.call_async(req) rclpy.spin_until_future_complete(self, future) - def shutdown(self): + def shutdown(self) -> None: self.info_msg('Shutting down') self.action_client.destroy() @@ -180,21 +184,21 @@ def shutdown(self): self.info_msg(f'{transition_service} finished') - def cancel_goal(self): - cancel_future = self.goal_handle.cancel_goal_async() + def cancel_goal(self) -> None: + cancel_future = self.goal_handle.cancel_goal_async() # type: ignore[union-attr] rclpy.spin_until_future_complete(self, cancel_future) - def info_msg(self, msg: str): + def info_msg(self, msg: str) -> None: self.get_logger().info(msg) - def warn_msg(self, msg: str): + def warn_msg(self, msg: str) -> None: self.get_logger().warn(msg) - def error_msg(self, msg: str): + def error_msg(self, msg: str) -> None: self.get_logger().error(msg) -def main(argv=sys.argv[1:]): +def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def] rclpy.init() # wait a few seconds to make sure entire stacks are up diff --git a/tools/pyproject.toml b/tools/pyproject.toml index 938eef79689..86d0a1e414d 100644 --- a/tools/pyproject.toml +++ b/tools/pyproject.toml @@ -40,6 +40,9 @@ module = [ "builtin_interfaces.*", "lifecycle_msgs.*", "geographic_msgs.*", + "rcl_interfaces.*", + "std_msgs.*", + "zmq.*", ] ignore_missing_imports = true