Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 58 additions & 20 deletions nav2_docking/opennav_docking/test/test_docking_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,37 @@
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import TransformStamped, Twist, TwistStamped
from launch import LaunchDescription
# from launch.actions import SetEnvironmentVariable
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.util
from nav2_msgs.action import DockRobot, NavigateToPose, UndockRobot
import pytest
import rclpy
from rclpy.action import ActionClient, ActionServer
from rclpy.action.client import ActionClient
from rclpy.action.server import ActionServer
from sensor_msgs.msg import BatteryState
from tf2_ros import TransformBroadcaster


# This test can be run standalone with:
# python3 -u -m pytest test_docking_server.py -s

# If python3-flaky is installed, you can run the test multiple times to
# try to identify flaky ness.
# python3 -u -m pytest --force-flaky --min-passes 3 --max-runs 5 -s -v test_docking_server.py

@pytest.mark.rostest
# @pytest.mark.flaky
# @pytest.mark.flaky(max_runs=5, min_passes=3)
def generate_test_description():

return LaunchDescription([
# SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'),
# SetEnvironmentVariable('RCUTILS_LOGGING_USE_STDOUT', '1'),
Node(
package='opennav_docking',
executable='opennav_docking',
Expand Down Expand Up @@ -74,21 +88,26 @@ class TestDockingServer(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
# Latest odom -> base_link
cls.x = 0.0
cls.y = 0.0
cls.theta = 0.0
# Track charge state
cls.is_charging = False
# Latest command velocity
cls.command = Twist()
cls.node = rclpy.create_node('test_docking_server')

@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
rclpy.shutdown()

def setUp(self):
# Create a ROS node for tests
# Latest odom -> base_link
self.x = 0.0
self.y = 0.0
self.theta = 0.0
# Track charge state
self.is_charging = False
# Latest command velocity
self.command = Twist()
self.node = rclpy.create_node('test_docking_server')

def tearDown(self):
self.node.destroy_node()

def command_velocity_callback(self, msg):
self.node.get_logger().info('Command: %f %f' % (msg.twist.linear.x, msg.twist.angular.z))
self.command = msg.twist
Expand Down Expand Up @@ -175,24 +194,32 @@ def test_docking_server(self):
'navigate_to_pose',
self.nav_execute_callback)

# Spin once so that TF is published
rclpy.spin_once(self.node, timeout_sec=0.1)
# Publish transform
self.publish()

# Run for 1 seconds to allow tf to propogate
for _ in range(10):
rclpy.spin_once(self.node, timeout_sec=0.1)
time.sleep(0.1)

# Test docking action
self.action_result = []
self.dock_action_client.wait_for_server(timeout_sec=5.0)
assert self.dock_action_client.wait_for_server(timeout_sec=5.0), \
'dock_robot service not available'

goal = DockRobot.Goal()
goal.use_dock_id = True
goal.dock_id = 'test_dock'
future = self.dock_action_client.send_goal_async(
goal, feedback_callback=self.action_feedback_callback)
rclpy.spin_until_future_complete(self.node, future)
self.goal_handle = future.result()
assert self.goal_handle.accepted
assert self.goal_handle is not None, 'goal_handle should not be None'
assert self.goal_handle.accepted, 'goal_handle not accepted'
result_future_original = self.goal_handle.get_result_async()

# Run for 2 seconds
for i in range(20):
for _ in range(20):
rclpy.spin_once(self.node, timeout_sec=0.1)
time.sleep(0.1)

Expand All @@ -201,7 +228,8 @@ def test_docking_server(self):
goal, feedback_callback=self.action_feedback_callback)
rclpy.spin_until_future_complete(self.node, future)
self.goal_handle = future.result()
assert self.goal_handle.accepted
assert self.goal_handle is not None, 'goal_handle should not be None'
assert self.goal_handle.accepted, 'goal_handle not accepted'
result_future = self.goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future)
self.action_result.append(result_future.result())
Expand All @@ -216,7 +244,7 @@ def test_docking_server(self):
self.node.get_logger().info('Goal preempted')

# Run for 0.5 seconds
for i in range(5):
for _ in range(5):
rclpy.spin_once(self.node, timeout_sec=0.1)
time.sleep(0.1)

Expand All @@ -230,7 +258,8 @@ def test_docking_server(self):
goal, feedback_callback=self.action_feedback_callback)
rclpy.spin_until_future_complete(self.node, future)
self.goal_handle = future.result()
assert self.goal_handle.accepted
assert self.goal_handle is not None, 'goal_handle should not be None'
assert self.goal_handle.accepted, 'goal_handle not accepted'
result_future = self.goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future)
self.action_result.append(result_future.result())
Expand All @@ -247,10 +276,19 @@ def test_docking_server(self):
future = self.undock_action_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self.node, future)
self.goal_handle = future.result()
assert self.goal_handle.accepted
assert self.goal_handle is not None, 'goal_handle should not be None'
assert self.goal_handle.accepted, 'goal_handle not accepted'
result_future = self.goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future)
self.action_result.append(result_future.result())

self.assertEqual(self.action_result[3].status, GoalStatus.STATUS_SUCCEEDED)
self.assertTrue(self.action_result[3].result.success)


@launch_testing.post_shutdown_test()
class TestProcessOutput(unittest.TestCase):

def test_exit_code(self, proc_info):
# Check that all processes in the launch exit with code 0
launch_testing.asserts.assertExitCodes(proc_info)
Loading