diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d6df527b4b8..f86a36e64ae 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -45,6 +45,7 @@ jobs: nav2_lifecycle_manager nav2_loopback_sim nav2_map_server + nav2_simple_commander arguments: --config tools/pyproject.toml pre-commit: diff --git a/nav2_simple_commander/launch/assisted_teleop_example_launch.py b/nav2_simple_commander/launch/assisted_teleop_example_launch.py index 39172c22185..756ad7d5946 100644 --- a/nav2_simple_commander/launch/assisted_teleop_example_launch.py +++ b/nav2_simple_commander/launch/assisted_teleop_example_launch.py @@ -28,7 +28,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/follow_path_example_launch.py b/nav2_simple_commander/launch/follow_path_example_launch.py index 7e6b9f2ad4d..fc205c27016 100644 --- a/nav2_simple_commander/launch/follow_path_example_launch.py +++ b/nav2_simple_commander/launch/follow_path_example_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/inspection_demo_launch.py b/nav2_simple_commander/launch/inspection_demo_launch.py index 8320de3a828..01f3443cc22 100644 --- a/nav2_simple_commander/launch/inspection_demo_launch.py +++ b/nav2_simple_commander/launch/inspection_demo_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/nav_through_poses_example_launch.py b/nav2_simple_commander/launch/nav_through_poses_example_launch.py index 696bba3a6cc..c4a73c4887a 100644 --- a/nav2_simple_commander/launch/nav_through_poses_example_launch.py +++ b/nav2_simple_commander/launch/nav_through_poses_example_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/nav_to_pose_example_launch.py b/nav2_simple_commander/launch/nav_to_pose_example_launch.py index 6c49217d570..832ed7cdce7 100644 --- a/nav2_simple_commander/launch/nav_to_pose_example_launch.py +++ b/nav2_simple_commander/launch/nav_to_pose_example_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/picking_demo_launch.py b/nav2_simple_commander/launch/picking_demo_launch.py index 33db3c070c1..41f0180ad69 100644 --- a/nav2_simple_commander/launch/picking_demo_launch.py +++ b/nav2_simple_commander/launch/picking_demo_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/recoveries_example_launch.py b/nav2_simple_commander/launch/recoveries_example_launch.py index 554a0386400..7c94600edfd 100644 --- a/nav2_simple_commander/launch/recoveries_example_launch.py +++ b/nav2_simple_commander/launch/recoveries_example_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/security_demo_launch.py b/nav2_simple_commander/launch/security_demo_launch.py index 23c41cdae28..e7ae653e0f0 100644 --- a/nav2_simple_commander/launch/security_demo_launch.py +++ b/nav2_simple_commander/launch/security_demo_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/launch/waypoint_follower_example_launch.py b/nav2_simple_commander/launch/waypoint_follower_example_launch.py index 9bdcfbcb07d..22952fc6739 100644 --- a/nav2_simple_commander/launch/waypoint_follower_example_launch.py +++ b/nav2_simple_commander/launch/waypoint_follower_example_launch.py @@ -27,7 +27,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: nav2_bringup_dir = get_package_share_directory('nav2_bringup') sim_dir = get_package_share_directory('nav2_minimal_tb4_sim') desc_dir = get_package_share_directory('nav2_minimal_tb4_description') diff --git a/nav2_simple_commander/nav2_simple_commander/costmap_2d.py b/nav2_simple_commander/nav2_simple_commander/costmap_2d.py index ad4014522aa..630af757bfd 100644 --- a/nav2_simple_commander/nav2_simple_commander/costmap_2d.py +++ b/nav2_simple_commander/nav2_simple_commander/costmap_2d.py @@ -23,7 +23,12 @@ and handling semantics found in the costmap 2d C++ API. """ +from typing import Optional + +from builtin_interfaces.msg import Time +from nav_msgs.msg import OccupancyGrid import numpy as np +from numpy.typing import NDArray class PyCostmap2D: @@ -33,7 +38,7 @@ class PyCostmap2D: Costmap Python3 API for OccupancyGrids to populate from published messages """ - def __init__(self, occupancy_map): + def __init__(self, occupancy_map: OccupancyGrid) -> None: """ Initialize costmap2D. @@ -46,49 +51,49 @@ def __init__(self, occupancy_map): None """ - self.size_x = occupancy_map.info.width - self.size_y = occupancy_map.info.height - self.resolution = occupancy_map.info.resolution - self.origin_x = occupancy_map.info.origin.position.x - self.origin_y = occupancy_map.info.origin.position.y - self.global_frame_id = occupancy_map.header.frame_id - self.costmap_timestamp = occupancy_map.header.stamp + self.size_x: int = occupancy_map.info.width + self.size_y: int = occupancy_map.info.height + self.resolution: float = occupancy_map.info.resolution + self.origin_x: float = occupancy_map.info.origin.position.x + self.origin_y: float = occupancy_map.info.origin.position.y + self.global_frame_id: str = occupancy_map.header.frame_id + self.costmap_timestamp: Time = occupancy_map.header.stamp # Extract costmap - self.costmap = np.array(occupancy_map.data, dtype=np.uint8) + self.costmap: NDArray[np.uint8] = np.array(occupancy_map.data, dtype=np.uint8) - def getSizeInCellsX(self): + def getSizeInCellsX(self) -> int: """Get map width in cells.""" return self.size_x - def getSizeInCellsY(self): + def getSizeInCellsY(self) -> int: """Get map height in cells.""" return self.size_y - def getSizeInMetersX(self): + def getSizeInMetersX(self) -> float: """Get x axis map size in meters.""" return (self.size_x - 1 + 0.5) * self.resolution - def getSizeInMetersY(self): + def getSizeInMetersY(self) -> float: """Get y axis map size in meters.""" return (self.size_y - 1 + 0.5) * self.resolution - def getOriginX(self): + def getOriginX(self) -> float: """Get the origin x axis of the map [m].""" return self.origin_x - def getOriginY(self): + def getOriginY(self) -> float: """Get the origin y axis of the map [m].""" return self.origin_y - def getResolution(self): + def getResolution(self) -> float: """Get map resolution [m/cell].""" return self.resolution - def getGlobalFrameID(self): + def getGlobalFrameID(self) -> str: """Get global frame_id.""" return self.global_frame_id - def getCostmapTimestamp(self): + def getCostmapTimestamp(self) -> Time: """Get costmap timestamp.""" return self.costmap_timestamp @@ -106,7 +111,7 @@ def getCostXY(self, mx: int, my: int) -> np.uint8: np.uint8: cost of a cell """ - return self.costmap[self.getIndex(mx, my)] + return np.uint8(self.costmap[self.getIndex(mx, my)]) def getCostIdx(self, index: int) -> np.uint8: """ @@ -121,7 +126,7 @@ def getCostIdx(self, index: int) -> np.uint8: np.uint8: cost of a cell """ - return self.costmap[index] + return np.uint8(self.costmap[index]) def setCost(self, mx: int, my: int, cost: np.uint8) -> None: """ @@ -160,7 +165,7 @@ def mapToWorld(self, mx: int, my: int) -> tuple[float, float]: wy = self.origin_y + (my + 0.5) * self.resolution return (wx, wy) - def worldToMapValidated(self, wx: float, wy: float): + def worldToMapValidated(self, wx: float, wy: float) -> tuple[Optional[int], Optional[int]]: """ Get the map coordinate XY using world coordinate XY. diff --git a/nav2_simple_commander/nav2_simple_commander/demo_inspection.py b/nav2_simple_commander/nav2_simple_commander/demo_inspection.py index 6fee029cd8b..f2acdfe8cb1 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_inspection.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_inspection.py @@ -26,7 +26,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() diff --git a/nav2_simple_commander/nav2_simple_commander/demo_picking.py b/nav2_simple_commander/nav2_simple_commander/demo_picking.py index 8559609e84c..c8fe7301272 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_picking.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_picking.py @@ -41,7 +41,7 @@ } -def main(): +def main() -> None: # Received virtual request for picking item at Shelf A and bring to # worker at the pallet jack 7 for shipping. This request would # contain the shelf ID ('shelf_A') and shipping destination ('frieght_bay_3') @@ -94,7 +94,7 @@ def main(): 'Estimated time of arrival at ' + request_item_location + ' for worker: ' - + '{0:.0f}'.format( + + '{:.0f}'.format( Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 ) diff --git a/nav2_simple_commander/nav2_simple_commander/demo_recoveries.py b/nav2_simple_commander/nav2_simple_commander/demo_recoveries.py index 7675ce199e2..dcc3904ee4f 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_recoveries.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_recoveries.py @@ -25,7 +25,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() diff --git a/nav2_simple_commander/nav2_simple_commander/demo_security.py b/nav2_simple_commander/nav2_simple_commander/demo_security.py index d0cf0af5bf7..fa4be0761e8 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_security.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_security.py @@ -27,7 +27,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() @@ -56,7 +56,7 @@ def main(): navigator.waitUntilNav2Active() # Do security route until dead - while rclpy.ok(): + while rclpy.ok(): # type: ignore[attr-defined] # Send our route route_poses = [] pose = PoseStamped() @@ -78,7 +78,7 @@ def main(): if feedback and i % 5 == 0: print( 'Estimated time to complete current route: ' - + '{0:.0f}'.format( + + '{:.0f}'.format( Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 ) diff --git a/nav2_simple_commander/nav2_simple_commander/example_assisted_teleop.py b/nav2_simple_commander/nav2_simple_commander/example_assisted_teleop.py index 0c6a0f58caf..be80705a85a 100644 --- a/nav2_simple_commander/nav2_simple_commander/example_assisted_teleop.py +++ b/nav2_simple_commander/nav2_simple_commander/example_assisted_teleop.py @@ -24,7 +24,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() diff --git a/nav2_simple_commander/nav2_simple_commander/example_follow_path.py b/nav2_simple_commander/nav2_simple_commander/example_follow_path.py index 8f825c42693..9b4439e52f6 100644 --- a/nav2_simple_commander/nav2_simple_commander/example_follow_path.py +++ b/nav2_simple_commander/nav2_simple_commander/example_follow_path.py @@ -22,7 +22,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() @@ -69,9 +69,9 @@ def main(): if feedback and i % 5 == 0: print( 'Estimated distance remaining to goal position: ' - + '{0:.3f}'.format(feedback.distance_to_goal) + + f'{feedback.distance_to_goal:.3f}' + '\nCurrent speed of the robot: ' - + '{0:.3f}'.format(feedback.speed) + + f'{feedback.speed:.3f}' ) # Do something depending on the return code diff --git a/nav2_simple_commander/nav2_simple_commander/example_nav_through_poses.py b/nav2_simple_commander/nav2_simple_commander/example_nav_through_poses.py index cbcb4e3926d..7585093fd74 100644 --- a/nav2_simple_commander/nav2_simple_commander/example_nav_through_poses.py +++ b/nav2_simple_commander/nav2_simple_commander/example_nav_through_poses.py @@ -23,7 +23,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() @@ -102,7 +102,7 @@ def main(): if feedback and i % 5 == 0: print( 'Estimated time of arrival: ' - + '{0:.0f}'.format( + + '{:.0f}'.format( Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 ) diff --git a/nav2_simple_commander/nav2_simple_commander/example_nav_to_pose.py b/nav2_simple_commander/nav2_simple_commander/example_nav_to_pose.py index 5846823440c..985230915a3 100644 --- a/nav2_simple_commander/nav2_simple_commander/example_nav_to_pose.py +++ b/nav2_simple_commander/nav2_simple_commander/example_nav_to_pose.py @@ -23,7 +23,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() @@ -82,7 +82,7 @@ def main(): if feedback and i % 5 == 0: print( 'Estimated time of arrival: ' - + '{0:.0f}'.format( + + '{:.0f}'.format( Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 ) diff --git a/nav2_simple_commander/nav2_simple_commander/example_waypoint_follower.py b/nav2_simple_commander/nav2_simple_commander/example_waypoint_follower.py index 0cf065a7817..a70929b1cd9 100644 --- a/nav2_simple_commander/nav2_simple_commander/example_waypoint_follower.py +++ b/nav2_simple_commander/nav2_simple_commander/example_waypoint_follower.py @@ -23,7 +23,7 @@ """ -def main(): +def main() -> None: rclpy.init() navigator = BasicNavigator() diff --git a/nav2_simple_commander/nav2_simple_commander/footprint_collision_checker.py b/nav2_simple_commander/nav2_simple_commander/footprint_collision_checker.py index 5cf2f81794a..f721b5f1528 100644 --- a/nav2_simple_commander/nav2_simple_commander/footprint_collision_checker.py +++ b/nav2_simple_commander/nav2_simple_commander/footprint_collision_checker.py @@ -26,6 +26,7 @@ from geometry_msgs.msg import Point32, Polygon from nav2_simple_commander.costmap_2d import PyCostmap2D from nav2_simple_commander.line_iterator import LineIterator +import numpy as np NO_INFORMATION = 255 LETHAL_OBSTACLE = 254 @@ -42,12 +43,12 @@ class FootprintCollisionChecker: and checking the collisions of a Footprint """ - def __init__(self): + def __init__(self) -> None: """Initialize the FootprintCollisionChecker Object.""" self.costmap_ = None pass - def footprintCost(self, footprint: Polygon): + def footprintCost(self, footprint: Polygon) -> float: """ Iterate over all the points in a footprint and check for collision. @@ -90,7 +91,8 @@ def footprintCost(self, footprint: Polygon): return max(float(self.lineCost(xstart, x1, ystart, y1)), footprint_cost) - def lineCost(self, x0, x1, y0, y1, step_size=0.5): + def lineCost(self, x0: float, x1: float, + y0: float, y1: float, step_size: float = 0.5) -> float: """ Iterate over all the points along a line and check for collision. @@ -113,9 +115,9 @@ def lineCost(self, x0, x1, y0, y1, step_size=0.5): line_iterator = LineIterator(x0, y0, x1, y1, step_size) while line_iterator.isValid(): - point_cost = self.pointCost( + point_cost = float(self.pointCost( int(line_iterator.getX()), int(line_iterator.getY()) - ) + )) if point_cost == LETHAL_OBSTACLE: return point_cost @@ -127,7 +129,7 @@ def lineCost(self, x0, x1, y0, y1, step_size=0.5): return line_cost - def worldToMapValidated(self, wx: float, wy: float): + def worldToMapValidated(self, wx: float, wy: float) -> tuple[int, int]: """ Get the map coordinate XY using world coordinate XY. @@ -150,7 +152,7 @@ def worldToMapValidated(self, wx: float, wy: float): ) return self.costmap_.worldToMapValidated(wx, wy) - def pointCost(self, x: int, y: int): + def pointCost(self, x: int, y: int) -> np.uint8: """ Get the cost of a point in the costmap using map coordinates XY. @@ -170,7 +172,7 @@ def pointCost(self, x: int, y: int): ) return self.costmap_.getCostXY(x, y) - def setCostmap(self, costmap: PyCostmap2D): + def setCostmap(self, costmap: PyCostmap2D) -> None: """ Specify which costmap to use. @@ -186,7 +188,8 @@ def setCostmap(self, costmap: PyCostmap2D): self.costmap_ = costmap return None - def footprintCostAtPose(self, x: float, y: float, theta: float, footprint: Polygon): + def footprintCostAtPose(self, x: float, y: float, + theta: float, footprint: Polygon) -> float: """ Get the cost of a footprint at a specific Pose in map coordinates. diff --git a/nav2_simple_commander/nav2_simple_commander/line_iterator.py b/nav2_simple_commander/nav2_simple_commander/line_iterator.py index 179c5d1da1b..93dbe77f8ca 100644 --- a/nav2_simple_commander/nav2_simple_commander/line_iterator.py +++ b/nav2_simple_commander/nav2_simple_commander/line_iterator.py @@ -31,7 +31,9 @@ class LineIterator: LineIterator Python3 API for iterating along the points of a given line """ - def __init__(self, x0, y0, x1, y1, step_size=1.0): + def __init__(self, x0: float, y0: float, + x1: float, y1: float, + step_size: float = 1.0) -> None: """ Initialize the LineIterator. @@ -91,11 +93,11 @@ def __init__(self, x0, y0, x1, y1, step_size=1.0): 'Line has zero length (All 4 points have same coordinates)' ) - def isValid(self): + def isValid(self) -> bool: """Check if line is valid.""" return self.valid_ - def advance(self): + def advance(self) -> None: """Advance to the next point in the line.""" if self.x1_ > self.x0_: if self.x_ < self.x1_: @@ -131,35 +133,35 @@ def advance(self): else: self.valid_ = False - def getX(self): + def getX(self) -> float: """Get the abscissa of the current point.""" return self.x_ - def getY(self): + def getY(self) -> float: """Get the ordinate of the current point.""" return self.y_ - def getX0(self): + def getX0(self) -> float: """Get the abscissa of the initial point.""" return self.x0_ - def getY0(self): + def getY0(self) -> float: """Get the ordinate of the initial point.""" return self.y0_ - def getX1(self): + def getX1(self) -> float: """Get the abscissa of the final point.""" return self.x1_ - def getY1(self): + def getY1(self) -> float: """Get the ordinate of the final point.""" return self.y1_ - def get_line_length(self): + def get_line_length(self) -> complex: """Get the length of the line.""" return sqrt(pow(self.x1_ - self.x0_, 2) + pow(self.y1_ - self.y0_, 2)) - def clamp(self, n, min_n, max_n): + def clamp(self, n: float, min_n: float, max_n: float) -> float: """ Clamp n to be between min_n and max_n. diff --git a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py index ed277bb2e51..b65dd7449de 100644 --- a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py +++ b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py @@ -16,9 +16,11 @@ from enum import Enum import time +from typing import Any, Optional from action_msgs.msg import GoalStatus from builtin_interfaces.msg import Duration +from geographic_msgs.msg import GeoPose from geometry_msgs.msg import Point, PoseStamped, PoseWithCovarianceStamped from lifecycle_msgs.srv import GetState from nav2_msgs.action import (AssistedTeleop, BackUp, ComputePathThroughPoses, ComputePathToPose, @@ -27,11 +29,15 @@ Spin, UndockRobot) from nav2_msgs.srv import (ClearCostmapAroundRobot, ClearCostmapExceptRegion, ClearEntireCostmap, GetCostmap, LoadMap, ManageLifecycleNodes) +from nav_msgs.msg import Goals, OccupancyGrid, Path import rclpy -from rclpy.action import ActionClient +from rclpy.action import ActionClient # type: ignore[attr-defined] +from rclpy.action.client import ClientGoalHandle from rclpy.duration import Duration as rclpyDuration from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy +from rclpy.task import Future +from rclpy.type_support import GetResultServiceResponse class TaskResult(Enum): @@ -43,14 +49,15 @@ class TaskResult(Enum): class BasicNavigator(Node): - def __init__(self, node_name='basic_navigator', namespace=''): + def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> None: super().__init__(node_name=node_name, namespace=namespace) self.initial_pose = PoseStamped() self.initial_pose.header.frame_id = 'map' - self.goal_handle = None - self.result_future = None - self.feedback = None - self.status = None + self.goal_handle: Optional[ClientGoalHandle[Any, Any, Any]] = None + self.result_future: \ + Optional[Future[GetResultServiceResponse[Any]]] = None + self.feedback: str = '' + self.status: Optional[int] = None self.last_action_error_code = 0 self.last_action_error_msg = '' @@ -119,10 +126,10 @@ def __init__(self, node_name='basic_navigator', namespace=''): GetCostmap, 'local_costmap/get_costmap' ) - def destroyNode(self): + def destroyNode(self) -> None: self.destroy_node() - def destroy_node(self): + def destroy_node(self) -> None: self.nav_through_poses_client.destroy() self.nav_to_pose_client.destroy() self.follow_waypoints_client.destroy() @@ -139,13 +146,13 @@ def destroy_node(self): self.undocking_client.destroy() super().destroy_node() - def setInitialPose(self, initial_pose): + def setInitialPose(self, initial_pose: PoseStamped) -> None: """Set the initial pose to the localization system.""" self.initial_pose_received = False self.initial_pose = initial_pose self._setInitialPose() - def goThroughPoses(self, poses, behavior_tree=''): + def goThroughPoses(self, poses: Goals, behavior_tree: str = '') -> bool: """Send a `NavThroughPoses` action request.""" self.clearTaskError() self.debug("Waiting for 'NavigateThroughPoses' action server") @@ -163,7 +170,7 @@ def goThroughPoses(self, poses, behavior_tree=''): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = f'NavigateThroughPoses request with {len(poses)} was rejected!' self.setTaskError(NavigateThroughPoses.UNKNOWN, msg) self.error(msg) @@ -172,7 +179,7 @@ def goThroughPoses(self, poses, behavior_tree=''): self.result_future = self.goal_handle.get_result_async() return True - def goToPose(self, pose, behavior_tree=''): + def goToPose(self, pose: PoseStamped, behavior_tree: str = '') -> bool: """Send a `NavToPose` action request.""" self.clearTaskError() self.debug("Waiting for 'NavigateToPose' action server") @@ -196,7 +203,7 @@ def goToPose(self, pose, behavior_tree=''): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = ( 'NavigateToPose goal to ' + str(pose.pose.position.x) @@ -211,7 +218,7 @@ def goToPose(self, pose, behavior_tree=''): self.result_future = self.goal_handle.get_result_async() return True - def followWaypoints(self, poses): + def followWaypoints(self, poses: list[PoseStamped]) -> bool: """Send a `FollowWaypoints` action request.""" self.clearTaskError() self.debug("Waiting for 'FollowWaypoints' action server") @@ -228,7 +235,7 @@ def followWaypoints(self, poses): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = f'Following {len(poses)} waypoints request was rejected!' self.setTaskError(FollowWaypoints.UNKNOWN, msg) self.error(msg) @@ -237,7 +244,7 @@ def followWaypoints(self, poses): self.result_future = self.goal_handle.get_result_async() return True - def followGpsWaypoints(self, gps_poses): + def followGpsWaypoints(self, gps_poses: list[GeoPose]) -> bool: """Send a `FollowGPSWaypoints` action request.""" self.clearTaskError() self.debug("Waiting for 'FollowWaypoints' action server") @@ -254,7 +261,7 @@ def followGpsWaypoints(self, gps_poses): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = f'Following {len(gps_poses)} gps waypoints request was rejected!' self.setTaskError(FollowGPSWaypoints.UNKNOWN, msg) self.error(msg) @@ -263,7 +270,9 @@ def followGpsWaypoints(self, gps_poses): self.result_future = self.goal_handle.get_result_async() return True - def spin(self, spin_dist=1.57, time_allowance=10, disable_collision_checks=False): + def spin( + self, spin_dist: float = 1.57, time_allowance: int = 10, + disable_collision_checks: bool = False) -> bool: self.clearTaskError() self.debug("Waiting for 'Spin' action server") while not self.spin_client.wait_for_server(timeout_sec=1.0): @@ -280,7 +289,7 @@ def spin(self, spin_dist=1.57, time_allowance=10, disable_collision_checks=False rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'Spin request was rejected!' self.setTaskError(Spin.UNKNOWN, msg) self.error(msg) @@ -289,8 +298,9 @@ def spin(self, spin_dist=1.57, time_allowance=10, disable_collision_checks=False self.result_future = self.goal_handle.get_result_async() return True - def backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10, - disable_collision_checks=False): + def backup( + self, backup_dist: float = 0.15, backup_speed: float = 0.025, + time_allowance: int = 10, disable_collision_checks: bool = False) -> bool: self.clearTaskError() self.debug("Waiting for 'Backup' action server") while not self.backup_client.wait_for_server(timeout_sec=1.0): @@ -308,7 +318,7 @@ def backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10, rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'Backup request was rejected!' self.setTaskError(BackUp.UNKNOWN, msg) self.error(msg) @@ -317,8 +327,9 @@ def backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10, self.result_future = self.goal_handle.get_result_async() return True - def driveOnHeading(self, dist=0.15, speed=0.025, time_allowance=10, - disable_collision_checks=False): + def driveOnHeading( + self, dist: float = 0.15, speed: float = 0.025, + time_allowance: int = 10, disable_collision_checks: bool = False) -> bool: self.clearTaskError() self.debug("Waiting for 'DriveOnHeading' action server") while not self.backup_client.wait_for_server(timeout_sec=1.0): @@ -336,7 +347,7 @@ def driveOnHeading(self, dist=0.15, speed=0.025, time_allowance=10, rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'Drive On Heading request was rejected!' self.setTaskError(DriveOnHeading.UNKNOWN, msg) self.error(msg) @@ -345,7 +356,7 @@ def driveOnHeading(self, dist=0.15, speed=0.025, time_allowance=10, self.result_future = self.goal_handle.get_result_async() return True - def assistedTeleop(self, time_allowance=30): + def assistedTeleop(self, time_allowance: int = 30) -> bool: self.clearTaskError() self.debug("Wanting for 'assisted_teleop' action server") @@ -362,7 +373,7 @@ def assistedTeleop(self, time_allowance=30): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'Assisted Teleop request was rejected!' self.setTaskError(AssistedTeleop.UNKNOWN, msg) self.error(msg) @@ -371,7 +382,8 @@ def assistedTeleop(self, time_allowance=30): self.result_future = self.goal_handle.get_result_async() return True - def followPath(self, path, controller_id='', goal_checker_id=''): + def followPath(self, path: Path, controller_id: str = '', + goal_checker_id: str = '') -> bool: self.clearTaskError() """Send a `FollowPath` action request.""" self.debug("Waiting for 'FollowPath' action server") @@ -390,7 +402,7 @@ def followPath(self, path, controller_id='', goal_checker_id=''): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'FollowPath goal was rejected!' self.setTaskError(FollowPath.UNKNOWN, msg) self.error(msg) @@ -399,7 +411,8 @@ def followPath(self, path, controller_id='', goal_checker_id=''): self.result_future = self.goal_handle.get_result_async() return True - def dockRobotByPose(self, dock_pose, dock_type, nav_to_dock=True): + def dockRobotByPose(self, dock_pose: PoseStamped, + dock_type: str = '', nav_to_dock: bool = True) -> bool: self.clearTaskError() """Send a `DockRobot` action request.""" self.info("Waiting for 'DockRobot' action server") @@ -418,7 +431,7 @@ def dockRobotByPose(self, dock_pose, dock_type, nav_to_dock=True): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'DockRobot request was rejected!' self.setTaskError(DockRobot.UNKNOWN, msg) self.error(msg) @@ -427,7 +440,7 @@ def dockRobotByPose(self, dock_pose, dock_type, nav_to_dock=True): self.result_future = self.goal_handle.get_result_async() return True - def dockRobotByID(self, dock_id, nav_to_dock=True): + def dockRobotByID(self, dock_id: str, nav_to_dock: bool = True) -> bool: """Send a `DockRobot` action request.""" self.clearTaskError() self.info("Waiting for 'DockRobot' action server") @@ -445,7 +458,7 @@ def dockRobotByID(self, dock_id, nav_to_dock=True): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'DockRobot request was rejected!' self.setTaskError(DockRobot.UNKNOWN, msg) self.error(msg) @@ -454,7 +467,7 @@ def dockRobotByID(self, dock_id, nav_to_dock=True): self.result_future = self.goal_handle.get_result_async() return True - def undockRobot(self, dock_type=''): + def undockRobot(self, dock_type: str = '') -> bool: """Send a `UndockRobot` action request.""" self.clearTaskError() self.info("Waiting for 'UndockRobot' action server") @@ -470,7 +483,7 @@ def undockRobot(self, dock_type=''): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: msg = 'UndockRobot request was rejected!' self.setTaskError(UndockRobot.UNKNOWN, msg) self.error(msg) @@ -479,12 +492,15 @@ def undockRobot(self, dock_type=''): self.result_future = self.goal_handle.get_result_async() return True - def cancelTask(self): + def cancelTask(self) -> None: """Cancel pending task request of any type.""" self.info('Canceling current task.') if self.result_future: - future = self.goal_handle.cancel_goal_async() - rclpy.spin_until_future_complete(self, future) + if self.goal_handle is not None: + future = self.goal_handle.cancel_goal_async() + rclpy.spin_until_future_complete(self, future) + else: + self.error('No goal handle to cancel!') self.clearTaskError() return @@ -494,16 +510,25 @@ def isTaskComplete(self) -> bool: # task was cancelled or completed return True rclpy.spin_until_future_complete(self, self.result_future, timeout_sec=0.10) - if self.result_future.result(): - self.status = self.result_future.result().status + result_response = self.result_future.result() + + if result_response: + self.status = result_response.status if self.status != GoalStatus.STATUS_SUCCEEDED: - result = self.result_future.result().result - self.setTaskError(result.error_code, result.error_msg) - self.debug('Task with failed with' - f' status code:{self.status}' - f' error code:{result.error_code}' - f' error msg:{result.error_msg}') - return True + result = result_response.result + if result is not None: + self.setTaskError(result.error_code, result.error_msg) + self.debug( + 'Task with failed with' + f' status code:{self.status}' + f' error code:{result.error_code}' + f' error msg:{result.error_msg}' + ) + return True + else: + self.setTaskError(0, 'No result received') + self.debug('Task failed with no result received') + return True else: # Timed out, still processing, not complete yet return False @@ -511,11 +536,11 @@ def isTaskComplete(self) -> bool: self.debug('Task succeeded!') return True - def getFeedback(self): + def getFeedback(self) -> str: """Get the pending action feedback message.""" return self.feedback - def getResult(self): + def getResult(self) -> TaskResult: """Get the pending action result message.""" if self.status == GoalStatus.STATUS_SUCCEEDED: return TaskResult.SUCCEEDED @@ -526,18 +551,19 @@ def getResult(self): else: return TaskResult.UNKNOWN - def clearTaskError(self): + def clearTaskError(self) -> None: self.last_action_error_code = 0 self.last_action_error_msg = '' - def setTaskError(self, error_code, error_msg): + def setTaskError(self, error_code: int, error_msg: str) -> None: self.last_action_error_code = error_code self.last_action_error_msg = error_msg def getTaskError(self) -> tuple[int, str]: return (self.last_action_error_code, self.last_action_error_msg) - def waitUntilNav2Active(self, navigator='bt_navigator', localizer='amcl'): + def waitUntilNav2Active(self, navigator: str = 'bt_navigator', + localizer: str = 'amcl') -> None: """Block until the full navigation system is up and running.""" if localizer != 'robot_localization': # non-lifecycle node self._waitForNodeToActivate(localizer) @@ -548,8 +574,9 @@ def waitUntilNav2Active(self, navigator='bt_navigator', localizer='amcl'): return def _getPathImpl( - self, start, goal, planner_id='', use_start=False - ) -> ComputePathToPose.Result: + self, start: PoseStamped, goal: PoseStamped, + planner_id: str = '', use_start: bool = False + ) -> ComputePathToPose.Result: """ Send a `ComputePathToPose` action request. @@ -570,7 +597,7 @@ def _getPathImpl( rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error('Get path was rejected!') self.status = GoalStatus.UNKNOWN result = ComputePathToPose.Result() @@ -580,11 +607,13 @@ def _getPathImpl( self.result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self, self.result_future) - self.status = self.result_future.result().status + self.status = self.result_future.result().status # type: ignore[union-attr] - return self.result_future.result().result + return self.result_future.result().result # type: ignore[union-attr] - def getPath(self, start, goal, planner_id='', use_start=False): + def getPath( + self, start: PoseStamped, goal: PoseStamped, + planner_id: str = '', use_start: bool = False) -> Path: """Send a `ComputePathToPose` action request.""" self.clearTaskError() rtn = self._getPathImpl(start, goal, planner_id, use_start) @@ -600,7 +629,8 @@ def getPath(self, start, goal, planner_id='', use_start=False): return None def _getPathThroughPosesImpl( - self, start, goals, planner_id='', use_start=False + self, start: PoseStamped, goals: Goals, + planner_id: str = '', use_start: bool = False ) -> ComputePathThroughPoses.Result: """ Send a `ComputePathThroughPoses` action request. @@ -630,7 +660,7 @@ def _getPathThroughPosesImpl( rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error('Get path was rejected!') result = ComputePathThroughPoses.Result() result.error_code = ComputePathThroughPoses.UNKNOWN @@ -639,11 +669,13 @@ def _getPathThroughPosesImpl( self.result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self, self.result_future) - self.status = self.result_future.result().status + self.status = self.result_future.result().status # type: ignore[union-attr] - return self.result_future.result().result + return self.result_future.result().result # type: ignore[union-attr] - def getPathThroughPoses(self, start, goals, planner_id='', use_start=False): + def getPathThroughPoses( + self, start: PoseStamped, goals: Goals, + planner_id: str = '', use_start: bool = False) -> Path: """Send a `ComputePathThroughPoses` action request.""" self.clearTaskError() rtn = self._getPathThroughPosesImpl(start, goals, planner_id, use_start) @@ -659,7 +691,8 @@ def getPathThroughPoses(self, start, goals, planner_id='', use_start=False): return None def _smoothPathImpl( - self, path, smoother_id='', max_duration=2.0, check_for_collision=False + self, path: Path, smoother_id: str = '', + max_duration: float = 2.0, check_for_collision: bool = False ) -> SmoothPath.Result: """ Send a `SmoothPath` action request. @@ -681,7 +714,7 @@ def _smoothPathImpl( rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() - if not self.goal_handle.accepted: + if not self.goal_handle or not self.goal_handle.accepted: self.error('Smooth path was rejected!') result = SmoothPath.Result() result.error_code = SmoothPath.UNKNOWN @@ -690,13 +723,13 @@ def _smoothPathImpl( self.result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self, self.result_future) - self.status = self.result_future.result().status + self.status = self.result_future.result().status # type: ignore[union-attr] - return self.result_future.result().result + return self.result_future.result().result # type: ignore[union-attr] def smoothPath( - self, path, smoother_id='', max_duration=2.0, check_for_collision=False - ): + self, path: Path, smoother_id: str = '', + max_duration: float = 2.0, check_for_collision: bool = False) -> Path: """Send a `SmoothPath` action request.""" self.clearTaskError() rtn = self._smoothPathImpl(path, smoother_id, max_duration, check_for_collision) @@ -711,7 +744,7 @@ def smoothPath( f' error msg:{rtn.error_msg}') return None - def changeMap(self, map_filepath) -> bool: + def changeMap(self, map_filepath: str) -> bool: """Change the current static map in the map server.""" while not self.change_maps_srv.wait_for_service(timeout_sec=1.0): self.info('change map service not available, waiting...') @@ -719,7 +752,13 @@ def changeMap(self, map_filepath) -> bool: req.map_url = map_filepath future = self.change_maps_srv.call_async(req) rclpy.spin_until_future_complete(self, future) - result = future.result().result + + future_result = future.result() + if future_result is None: + self.error('Change map request failed!') + return False + + result = future_result.result if result != LoadMap.Response().RESULT_SUCCESS: if result == LoadMap.RESULT_MAP_DOES_NOT_EXIST: reason = 'Map does not exist' @@ -738,13 +777,13 @@ def changeMap(self, map_filepath) -> bool: self.info('Change map request was successful!') return True - def clearAllCostmaps(self): + def clearAllCostmaps(self) -> None: """Clear all costmaps.""" self.clearLocalCostmap() self.clearGlobalCostmap() return - def clearLocalCostmap(self): + def clearLocalCostmap(self) -> None: """Clear local costmap.""" while not self.clear_costmap_local_srv.wait_for_service(timeout_sec=1.0): self.info('Clear local costmaps service not available, waiting...') @@ -753,7 +792,7 @@ def clearLocalCostmap(self): rclpy.spin_until_future_complete(self, future) return - def clearGlobalCostmap(self): + def clearGlobalCostmap(self) -> None: """Clear global costmap.""" while not self.clear_costmap_global_srv.wait_for_service(timeout_sec=1.0): self.info('Clear global costmaps service not available, waiting...') @@ -762,7 +801,7 @@ def clearGlobalCostmap(self): rclpy.spin_until_future_complete(self, future) return - def clearCostmapExceptRegion(self, reset_distance: float): + def clearCostmapExceptRegion(self, reset_distance: float) -> None: """Clear the costmap except for a specified region.""" while not self.clear_costmap_except_region_srv.wait_for_service(timeout_sec=1.0): self.info('ClearCostmapExceptRegion service not available, waiting...') @@ -772,7 +811,7 @@ def clearCostmapExceptRegion(self, reset_distance: float): rclpy.spin_until_future_complete(self, future) return - def clearCostmapAroundRobot(self, reset_distance: float): + def clearCostmapAroundRobot(self, reset_distance: float) -> None: """Clear the costmap around the robot.""" while not self.clear_costmap_around_robot_srv.wait_for_service(timeout_sec=1.0): self.info('ClearCostmapAroundRobot service not available, waiting...') @@ -782,25 +821,38 @@ def clearCostmapAroundRobot(self, reset_distance: float): rclpy.spin_until_future_complete(self, future) return - def getGlobalCostmap(self): + def getGlobalCostmap(self) -> OccupancyGrid: """Get the global costmap.""" while not self.get_costmap_global_srv.wait_for_service(timeout_sec=1.0): self.info('Get global costmaps service not available, waiting...') req = GetCostmap.Request() future = self.get_costmap_global_srv.call_async(req) rclpy.spin_until_future_complete(self, future) - return future.result().map - def getLocalCostmap(self): + result = future.result() + if result is None: + self.error('Get global costmap request failed!') + return None + + return result.map + + def getLocalCostmap(self) -> OccupancyGrid: """Get the local costmap.""" while not self.get_costmap_local_srv.wait_for_service(timeout_sec=1.0): self.info('Get local costmaps service not available, waiting...') req = GetCostmap.Request() future = self.get_costmap_local_srv.call_async(req) rclpy.spin_until_future_complete(self, future) - return future.result().map - def lifecycleStartup(self): + result = future.result() + + if result is None: + self.error('Get local costmap request failed!') + return None + + return result.map + + def lifecycleStartup(self) -> None: """Startup nav2 lifecycle system.""" self.info('Starting up lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): @@ -824,7 +876,7 @@ def lifecycleStartup(self): self.info('Nav2 is ready for use!') return - def lifecycleShutdown(self): + def lifecycleShutdown(self) -> None: """Shutdown nav2 lifecycle system.""" self.info('Shutting down lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): @@ -840,7 +892,7 @@ def lifecycleShutdown(self): future.result() return - def _waitForNodeToActivate(self, node_name): + def _waitForNodeToActivate(self, node_name: str) -> None: # Waits for the node within the tester namespace to become active self.debug(f'Waiting for {node_name} to become active..') node_service = f'{node_name}/get_state' @@ -854,13 +906,15 @@ def _waitForNodeToActivate(self, node_name): self.debug(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) - if future.result() is not None: - state = future.result().current_state.label + + result = future.result() + if result is not None: + state = result.current_state.label self.debug(f'Result of get_state: {state}') time.sleep(2) return - def _waitForInitialPose(self): + def _waitForInitialPose(self) -> None: while not self.initial_pose_received: self.info('Setting initial pose') self._setInitialPose() @@ -868,17 +922,17 @@ def _waitForInitialPose(self): rclpy.spin_once(self, timeout_sec=1.0) return - def _amclPoseCallback(self, msg): + def _amclPoseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.debug('Received amcl pose') self.initial_pose_received = True return - def _feedbackCallback(self, msg): + def _feedbackCallback(self, msg: NavigateToPose.Feedback) -> None: self.debug('Received action feedback message') self.feedback = msg.feedback return - def _setInitialPose(self): + def _setInitialPose(self) -> None: msg = PoseWithCovarianceStamped() msg.pose.pose = self.initial_pose.pose msg.header.frame_id = self.initial_pose.header.frame_id @@ -887,18 +941,18 @@ def _setInitialPose(self): self.initial_pose_pub.publish(msg) return - def info(self, msg): + def info(self, msg: str) -> None: self.get_logger().info(msg) return - def warn(self, msg): + def warn(self, msg: str) -> None: self.get_logger().warn(msg) return - def error(self, msg): + def error(self, msg: str) -> None: self.get_logger().error(msg) return - def debug(self, msg): + def debug(self, msg: str) -> None: self.get_logger().debug(msg) return diff --git a/nav2_simple_commander/nav2_simple_commander/utils.py b/nav2_simple_commander/nav2_simple_commander/utils.py index d3639010202..8aed7434ec5 100644 --- a/nav2_simple_commander/nav2_simple_commander/utils.py +++ b/nav2_simple_commander/nav2_simple_commander/utils.py @@ -21,7 +21,7 @@ import subprocess -def find_os_processes(name): +def find_os_processes(name: str) -> list[tuple[str, str]]: """Find all the processes that are running gz sim.""" ps_output = subprocess.check_output(['ps', 'aux'], text=True) ps_lines = ps_output.split('\n') @@ -36,7 +36,7 @@ def find_os_processes(name): return gz_sim_processes -def kill_process(pid): +def kill_process(pid: str) -> None: """Kill a process with a given PID.""" try: os.kill(int(pid), signal.SIGKILL) @@ -45,7 +45,7 @@ def kill_process(pid): print(f'Failed to kill process with PID: {pid}. Error: {e}') -def kill_os_processes(name): +def kill_os_processes(name: str) -> None: """Kill all processes that are running with name.""" processes = find_os_processes(name) if processes: diff --git a/nav2_simple_commander/test/test_copyright.py b/nav2_simple_commander/test/test_copyright.py index cc8ff03f79a..1f869834559 100644 --- a/nav2_simple_commander/test/test_copyright.py +++ b/nav2_simple_commander/test/test_copyright.py @@ -18,6 +18,6 @@ @pytest.mark.copyright @pytest.mark.linter -def test_copyright(): +def test_copyright() -> None: rc = main(argv=['.', 'test']) assert rc == 0, 'Found errors' diff --git a/nav2_simple_commander/test/test_flake8.py b/nav2_simple_commander/test/test_flake8.py index 26030113cce..2c660b56e6e 100644 --- a/nav2_simple_commander/test/test_flake8.py +++ b/nav2_simple_commander/test/test_flake8.py @@ -18,8 +18,7 @@ @pytest.mark.flake8 @pytest.mark.linter -def test_flake8(): +def test_flake8() -> None: rc, errors = main_with_errors(argv=[]) - assert rc == 0, 'Found %d code style errors / warnings:\n' % len( - errors - ) + '\n'.join(errors) + assert rc == 0, f'Found {len(errors)} code style errors / warnings:\n' \ + + '\n'.join(errors) diff --git a/nav2_simple_commander/test/test_footprint_collision_checker.py b/nav2_simple_commander/test/test_footprint_collision_checker.py index 8ffc4b653b9..86eb32d1e5f 100644 --- a/nav2_simple_commander/test/test_footprint_collision_checker.py +++ b/nav2_simple_commander/test/test_footprint_collision_checker.py @@ -24,13 +24,13 @@ class TestFootprintCollisionChecker(unittest.TestCase): - def test_no_costmap(self): + def test_no_costmap(self) -> None: # Test if a type error raised when costmap is not specified yet fcc_ = FootprintCollisionChecker() self.assertRaises(ValueError, fcc_.worldToMapValidated, 0.0, 0.0) self.assertRaises(ValueError, fcc_.pointCost, 0.0, 0.0) - def test_pointCost(self): + def test_pointCost(self) -> None: # Test if point cost is calculated correctly # Create test grid 10 pixels wide by 10 pixels long, at 1 meters per pixel # AKA 10 meters x 10 meters @@ -48,7 +48,7 @@ def test_pointCost(self): self.assertEqual(fcc_.pointCost(1, 1), 0) self.assertRaises(IndexError, fcc_.pointCost, 11, 11) - def test_worldToMapValidated(self): + def test_worldToMapValidated(self) -> None: # Test if worldToMap conversion is calculated correctly # Create test grid 10 pixels wide by 10 pixels long, at 1 meters per pixel # AKA 10 meters x 10 meters @@ -70,7 +70,7 @@ def test_worldToMapValidated(self): self.assertEqual(fcc_.worldToMapValidated(14, 14), (9, 9)) self.assertEqual(fcc_.worldToMapValidated(15, 14), (None, None)) - def test_lineCost(self): + def test_lineCost(self) -> None: # Test if line cost is calculated correctly # Create test grid 10 pixels wide by 10 pixels long, at 1 meters per pixel # AKA 10 meters x 10 meters @@ -88,7 +88,7 @@ def test_lineCost(self): self.assertRaises(IndexError, fcc_.lineCost, 0, 15, 0, 9, 1) self.assertEqual(fcc_.lineCost(0, 9, 0, 9, 1), 0.0) - def test_footprintCost(self): + def test_footprintCost(self) -> None: # Test if footprint cost is calculated correctly # Create test grid 10 pixels wide by 10 pixels long, at 1 meters per pixel # AKA 10 meters x 10 meters diff --git a/nav2_simple_commander/test/test_line_iterator.py b/nav2_simple_commander/test/test_line_iterator.py index 3ec64bc5212..72eac10e18f 100644 --- a/nav2_simple_commander/test/test_line_iterator.py +++ b/nav2_simple_commander/test/test_line_iterator.py @@ -20,17 +20,17 @@ class TestLineIterator(unittest.TestCase): - def test_type_error(self): + def test_type_error(self) -> None: # Test if a type error raised when passing invalid arguments types self.assertRaises(TypeError, LineIterator, 0, 0, '10', 10, '1') - def test_value_error(self): + def test_value_error(self) -> None: # Test if a value error raised when passing negative or zero step_size self.assertRaises(ValueError, LineIterator, 0, 0, 10, 10, -2) # Test if a value error raised when passing zero length line self.assertRaises(ValueError, LineIterator, 2, 2, 2, 2, 1) - def test_get_xy(self): + def test_get_xy(self) -> None: # Test if the initial and final coordinates are returned correctly lt = LineIterator(0, 0, 5, 5, 1) self.assertEqual(lt.getX0(), 0) @@ -38,12 +38,12 @@ def test_get_xy(self): self.assertEqual(lt.getX1(), 5) self.assertEqual(lt.getY1(), 5) - def test_line_length(self): + def test_line_length(self) -> None: # Test if the line length is calculated correctly lt = LineIterator(0, 0, 5, 5, 1) self.assertEqual(lt.get_line_length(), sqrt(pow(5, 2) + pow(5, 2))) - def test_straight_line(self): + def test_straight_line(self) -> None: # Test if the calculations are correct for y = x lt = LineIterator(0, 0, 5, 5, 1) i = 0 @@ -71,7 +71,7 @@ def test_straight_line(self): lt.advance() i += 1 - def test_hor_line(self): + def test_hor_line(self) -> None: # Test if the calculations are correct for y = 0x+b (horizontal line) lt = LineIterator(0, 10, 5, 10, 1) i = 0 @@ -81,7 +81,7 @@ def test_hor_line(self): lt.advance() i += 1 - def test_ver_line(self): + def test_ver_line(self) -> None: # Test if the calculations are correct for x = n (vertical line) lt = LineIterator(5, 0, 5, 10, 1) i = 0 @@ -91,7 +91,7 @@ def test_ver_line(self): lt.advance() i += 1 - def test_clamp(self): + def test_clamp(self) -> None: # Test if the increments are clamped to avoid crossing the final points # when step_size is large with respect to line length lt = LineIterator(0, 0, 5, 5, 10) diff --git a/nav2_simple_commander/test/test_pep257.py b/nav2_simple_commander/test/test_pep257.py index b234a3840f4..b6808e1d80f 100644 --- a/nav2_simple_commander/test/test_pep257.py +++ b/nav2_simple_commander/test/test_pep257.py @@ -18,6 +18,6 @@ @pytest.mark.linter @pytest.mark.pep257 -def test_pep257(): +def test_pep257() -> None: rc = main(argv=['.', 'test']) assert rc == 0, 'Found code style errors / warnings' diff --git a/tools/pyproject.toml b/tools/pyproject.toml index 012f67b60f1..938eef79689 100644 --- a/tools/pyproject.toml +++ b/tools/pyproject.toml @@ -37,6 +37,9 @@ module = [ "ament_pep257.*", "ament_flake8.*", "ament_copyright.*", + "builtin_interfaces.*", + "lifecycle_msgs.*", + "geographic_msgs.*", ] ignore_missing_imports = true