3030import pytest
3131import rclpy
3232from rclpy .action .client import ActionClient
33- from rclpy .action .server import ActionServer
33+ from rclpy .action .server import ActionServer , ServerGoalHandle
3434from sensor_msgs .msg import BatteryState
3535from tf2_ros import TransformBroadcaster
3636
4545@pytest .mark .rostest
4646# @pytest.mark.flaky
4747# @pytest.mark.flaky(max_runs=5, min_passes=3)
48- def generate_test_description ():
48+ def generate_test_description () -> LaunchDescription :
4949
5050 return LaunchDescription ([
5151 # SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'),
@@ -86,14 +86,14 @@ def generate_test_description():
8686class TestDockingServer (unittest .TestCase ):
8787
8888 @classmethod
89- def setUpClass (cls ):
89+ def setUpClass (cls ) -> None :
9090 rclpy .init ()
9191
9292 @classmethod
93- def tearDownClass (cls ):
93+ def tearDownClass (cls ) -> None :
9494 rclpy .shutdown ()
9595
96- def setUp (self ):
96+ def setUp (self ) -> None :
9797 # Create a ROS node for tests
9898 # Latest odom -> base_link
9999 self .x = 0.0
@@ -105,14 +105,14 @@ def setUp(self):
105105 self .command = Twist ()
106106 self .node = rclpy .create_node ('test_docking_server' )
107107
108- def tearDown (self ):
108+ def tearDown (self ) -> None :
109109 self .node .destroy_node ()
110110
111- def command_velocity_callback (self , msg ) :
112- self .node .get_logger ().info ('Command: %f %f' % ( msg .twist .linear .x , msg .twist .angular .z ) )
111+ def command_velocity_callback (self , msg : TwistStamped ) -> None :
112+ self .node .get_logger ().info (f 'Command: { msg .twist .linear .x :f } { msg .twist .angular .z :f } ' )
113113 self .command = msg .twist
114114
115- def timer_callback (self ):
115+ def timer_callback (self ) -> None :
116116 # Propagate command
117117 period = 0.05
118118 self .x += cos (self .theta ) * self .command .linear .x * period
@@ -121,7 +121,7 @@ def timer_callback(self):
121121 # Need to publish updated TF
122122 self .publish ()
123123
124- def publish (self ):
124+ def publish (self ) -> None :
125125 # Publish base->odom transform
126126 t = TransformStamped ()
127127 t .header .stamp = self .node .get_clock ().now ().to_msg ()
@@ -140,20 +140,24 @@ def publish(self):
140140 b .current = - 1.0
141141 self .battery_state_pub .publish (b )
142142
143- def action_feedback_callback (self , msg ) :
143+ def action_feedback_callback (self , msg : DockRobot . Feedback ) -> None :
144144 # Force the docking action to run a full recovery loop and then
145145 # make contact with the dock (based on pose of robot) before
146146 # we report that the robot is charging
147147 if msg .feedback .num_retries > 0 and \
148148 msg .feedback .state == msg .feedback .WAIT_FOR_CHARGE :
149149 self .is_charging = True
150150
151- def nav_execute_callback (self , goal_handle ):
151+ def nav_execute_callback (
152+ self ,
153+ goal_handle : ServerGoalHandle [NavigateToPose .Goal ,
154+ NavigateToPose .Result , NavigateToPose .Feedback ]
155+ ) -> NavigateToPose .Result :
152156 goal = goal_handle .request
153157 self .x = goal .pose .pose .position .x - 0.05
154158 self .y = goal .pose .pose .position .y + 0.05
155159 self .theta = 2.0 * acos (goal .pose .pose .orientation .w )
156- self .node .get_logger ().info ('Navigating to %f %f %f' % ( self .x , self .y , self .theta ) )
160+ self .node .get_logger ().info (f 'Navigating to { self .x :f } { self .y :f } { self .theta :f } ' )
157161 goal_handle .succeed ()
158162 self .publish ()
159163
@@ -162,7 +166,7 @@ def nav_execute_callback(self, goal_handle):
162166 result .error_msg = ''
163167 return result
164168
165- def test_docking_server (self ):
169+ def test_docking_server (self ) -> None :
166170 # Publish TF for odometry
167171 self .tf_broadcaster = TransformBroadcaster (self .node )
168172
@@ -239,8 +243,10 @@ def test_docking_server(self):
239243 self .action_result .append (result_future_original .result ())
240244
241245 # First is aborted due to preemption
242- self .assertEqual (self .action_result [0 ].status , GoalStatus .STATUS_ABORTED )
243- self .assertFalse (self .action_result [0 ].result .success )
246+ self .assertIsNotNone (self .action_result [0 ])
247+ if self .action_result [0 ] is not None :
248+ self .assertEqual (self .action_result [0 ].status , GoalStatus .STATUS_ABORTED )
249+ self .assertFalse (self .action_result [0 ].result .success )
244250
245251 self .node .get_logger ().info ('Goal preempted' )
246252
@@ -250,8 +256,10 @@ def test_docking_server(self):
250256 time .sleep (0.1 )
251257
252258 # Second is aborted due to preemption during main loop (takes down all actions)
253- self .assertEqual (self .action_result [1 ].status , GoalStatus .STATUS_ABORTED )
254- self .assertFalse (self .action_result [1 ].result .success )
259+ self .assertIsNotNone (self .action_result [1 ])
260+ if self .action_result [1 ] is not None :
261+ self .assertEqual (self .action_result [1 ].status , GoalStatus .STATUS_ABORTED )
262+ self .assertFalse (self .action_result [1 ].result .success )
255263
256264 # Resend the goal
257265 self .node .get_logger ().info ('Sending goal again' )
@@ -265,9 +273,11 @@ def test_docking_server(self):
265273 rclpy .spin_until_future_complete (self .node , result_future )
266274 self .action_result .append (result_future .result ())
267275
268- self .assertEqual (self .action_result [2 ].status , GoalStatus .STATUS_SUCCEEDED )
269- self .assertTrue (self .action_result [2 ].result .success )
270- self .assertEqual (self .action_result [2 ].result .num_retries , 1 )
276+ self .assertIsNotNone (self .action_result [2 ])
277+ if self .action_result [2 ] is not None :
278+ self .assertEqual (self .action_result [2 ].status , GoalStatus .STATUS_SUCCEEDED )
279+ self .assertTrue (self .action_result [2 ].result .success )
280+ self .assertEqual (self .action_result [2 ].result .num_retries , 1 )
271281
272282 # Test undocking action
273283 self .is_charging = False
@@ -283,13 +293,15 @@ def test_docking_server(self):
283293 rclpy .spin_until_future_complete (self .node , result_future )
284294 self .action_result .append (result_future .result ())
285295
286- self .assertEqual (self .action_result [3 ].status , GoalStatus .STATUS_SUCCEEDED )
287- self .assertTrue (self .action_result [3 ].result .success )
296+ self .assertIsNotNone (self .action_result [3 ])
297+ if self .action_result [3 ] is not None :
298+ self .assertEqual (self .action_result [3 ].status , GoalStatus .STATUS_SUCCEEDED )
299+ self .assertTrue (self .action_result [3 ].result .success )
288300
289301
290302@launch_testing .post_shutdown_test ()
291303class TestProcessOutput (unittest .TestCase ):
292304
293- def test_exit_code (self , proc_info ) :
305+ def test_exit_code (self , proc_info : launch_testing . ProcInfoHandler ) -> None :
294306 # Check that all processes in the launch exit with code 0
295307 launch_testing .asserts .assertExitCodes (proc_info )
0 commit comments