diff --git a/nav2_docking/opennav_docking/test/test_docking_server.py b/nav2_docking/opennav_docking/test/test_docking_server.py index 43e3e2df329..ecd3c777ee0 100644 --- a/nav2_docking/opennav_docking/test/test_docking_server.py +++ b/nav2_docking/opennav_docking/test/test_docking_server.py @@ -19,12 +19,18 @@ from action_msgs.msg import GoalStatus from geometry_msgs.msg import TransformStamped, Twist, TwistStamped from launch import LaunchDescription +# from launch.actions import SetEnvironmentVariable from launch_ros.actions import Node import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.markers +import launch_testing.util from nav2_msgs.action import DockRobot, NavigateToPose, UndockRobot import pytest import rclpy -from rclpy.action import ActionClient, ActionServer +from rclpy.action.client import ActionClient +from rclpy.action.server import ActionServer from sensor_msgs.msg import BatteryState from tf2_ros import TransformBroadcaster @@ -32,10 +38,18 @@ # This test can be run standalone with: # python3 -u -m pytest test_docking_server.py -s +# If python3-flaky is installed, you can run the test multiple times to +# try to identify flaky ness. +# python3 -u -m pytest --force-flaky --min-passes 3 --max-runs 5 -s -v test_docking_server.py + @pytest.mark.rostest +# @pytest.mark.flaky +# @pytest.mark.flaky(max_runs=5, min_passes=3) def generate_test_description(): return LaunchDescription([ + # SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'), + # SetEnvironmentVariable('RCUTILS_LOGGING_USE_STDOUT', '1'), Node( package='opennav_docking', executable='opennav_docking', @@ -74,21 +88,26 @@ class TestDockingServer(unittest.TestCase): @classmethod def setUpClass(cls): rclpy.init() - # Latest odom -> base_link - cls.x = 0.0 - cls.y = 0.0 - cls.theta = 0.0 - # Track charge state - cls.is_charging = False - # Latest command velocity - cls.command = Twist() - cls.node = rclpy.create_node('test_docking_server') @classmethod def tearDownClass(cls): - cls.node.destroy_node() rclpy.shutdown() + def setUp(self): + # Create a ROS node for tests + # Latest odom -> base_link + self.x = 0.0 + self.y = 0.0 + self.theta = 0.0 + # Track charge state + self.is_charging = False + # Latest command velocity + self.command = Twist() + self.node = rclpy.create_node('test_docking_server') + + def tearDown(self): + self.node.destroy_node() + def command_velocity_callback(self, msg): self.node.get_logger().info('Command: %f %f' % (msg.twist.linear.x, msg.twist.angular.z)) self.command = msg.twist @@ -175,12 +194,19 @@ def test_docking_server(self): 'navigate_to_pose', self.nav_execute_callback) - # Spin once so that TF is published - rclpy.spin_once(self.node, timeout_sec=0.1) + # Publish transform + self.publish() + + # Run for 1 seconds to allow tf to propogate + for _ in range(10): + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.1) # Test docking action self.action_result = [] - self.dock_action_client.wait_for_server(timeout_sec=5.0) + assert self.dock_action_client.wait_for_server(timeout_sec=5.0), \ + 'dock_robot service not available' + goal = DockRobot.Goal() goal.use_dock_id = True goal.dock_id = 'test_dock' @@ -188,11 +214,12 @@ def test_docking_server(self): goal, feedback_callback=self.action_feedback_callback) rclpy.spin_until_future_complete(self.node, future) self.goal_handle = future.result() - assert self.goal_handle.accepted + assert self.goal_handle is not None, 'goal_handle should not be None' + assert self.goal_handle.accepted, 'goal_handle not accepted' result_future_original = self.goal_handle.get_result_async() # Run for 2 seconds - for i in range(20): + for _ in range(20): rclpy.spin_once(self.node, timeout_sec=0.1) time.sleep(0.1) @@ -201,7 +228,8 @@ def test_docking_server(self): goal, feedback_callback=self.action_feedback_callback) rclpy.spin_until_future_complete(self.node, future) self.goal_handle = future.result() - assert self.goal_handle.accepted + assert self.goal_handle is not None, 'goal_handle should not be None' + assert self.goal_handle.accepted, 'goal_handle not accepted' result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self.node, result_future) self.action_result.append(result_future.result()) @@ -216,7 +244,7 @@ def test_docking_server(self): self.node.get_logger().info('Goal preempted') # Run for 0.5 seconds - for i in range(5): + for _ in range(5): rclpy.spin_once(self.node, timeout_sec=0.1) time.sleep(0.1) @@ -230,7 +258,8 @@ def test_docking_server(self): goal, feedback_callback=self.action_feedback_callback) rclpy.spin_until_future_complete(self.node, future) self.goal_handle = future.result() - assert self.goal_handle.accepted + assert self.goal_handle is not None, 'goal_handle should not be None' + assert self.goal_handle.accepted, 'goal_handle not accepted' result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self.node, result_future) self.action_result.append(result_future.result()) @@ -247,10 +276,19 @@ def test_docking_server(self): future = self.undock_action_client.send_goal_async(goal) rclpy.spin_until_future_complete(self.node, future) self.goal_handle = future.result() - assert self.goal_handle.accepted + assert self.goal_handle is not None, 'goal_handle should not be None' + assert self.goal_handle.accepted, 'goal_handle not accepted' result_future = self.goal_handle.get_result_async() rclpy.spin_until_future_complete(self.node, result_future) self.action_result.append(result_future.result()) self.assertEqual(self.action_result[3].status, GoalStatus.STATUS_SUCCEEDED) self.assertTrue(self.action_result[3].result.success) + + +@launch_testing.post_shutdown_test() +class TestProcessOutput(unittest.TestCase): + + def test_exit_code(self, proc_info): + # Check that all processes in the launch exit with code 0 + launch_testing.asserts.assertExitCodes(proc_info)