@@ -160,6 +160,9 @@ def __init__(self):
160160 self .send_interval = 0.0035
161161 self .packets_to_send = min (int (self .time_to_listen / (self .send_interval + 0.0015 )), 45000 ) # How many packets to be sent in send_in_background method
162162
163+ # Thread pool for background watching operations
164+ self .pool = ThreadPool (processes = 3 )
165+
163166 # State watcher attributes
164167 self .watching = False
165168 self .cpu_state = StateMachine ('init' )
@@ -239,22 +242,15 @@ def log(self, message, verbose=False):
239242 print "%s : %s" % (current_time , message )
240243 self .log_fp .write ("%s : %s\n " % (current_time , message ))
241244
242- def timeout (self , seconds , message ):
243- def timeout_exception (self , message ):
244- self .log ('Timeout is reached: %s' % message )
245- self .tearDown ()
246- os .kill (os .getpid (), signal .SIGINT )
247-
248- if self .timeout_thr is None :
249- self .timeout_thr = threading .Timer (seconds , timeout_exception , args = (self , message ))
250- self .timeout_thr .start ()
251- else :
252- raise Exception ("Timeout already set" )
253-
254- def cancel_timeout (self ):
255- if self .timeout_thr is not None :
256- self .timeout_thr .cancel ()
257- self .timeout_thr = None
245+ def timeout (self , func , seconds , message ):
246+ async_res = self .pool .apply_async (func )
247+ try :
248+ res = async_res .get (timeout = seconds )
249+ except Exception as err :
250+ # TimeoutError and Exception's from func
251+ # captured here
252+ raise type (err )(message )
253+ return res
258254
259255 def generate_vlan_servers (self ):
260256 vlan_host_map = defaultdict (dict )
@@ -535,30 +531,27 @@ def runTest(self):
535531 try :
536532 self .fails ['dut' ] = set ()
537533
538- pool = ThreadPool (processes = 3 )
539534 self .log ("Starting reachability state watch thread..." )
540535 self .watching = True
541536 self .light_probe = False
542- watcher = pool .apply_async (self .reachability_watcher )
543537 self .watcher_is_stopped = threading .Event () # Waiter Event for the Watcher state is stopped.
544538 self .watcher_is_running = threading .Event () # Waiter Event for the Watcher state is running.
545539 self .watcher_is_stopped .set () # By default the Watcher is not running.
546540 self .watcher_is_running .clear () # By default its required to wait for the Watcher started.
547541 # Give watch thread some time to wind up
542+ watcher = self .pool .apply_async (self .reachability_watcher )
548543 time .sleep (5 )
549544
550545 self .log ("Check that device is alive and pinging" )
551- self .fails ['dut' ].add (' DUT is not ready for test' )
552- self .assertTrue ( self . wait_dut_to_warm_up (), 'DUT is not stable' )
546+ self .fails ['dut' ].add (" DUT is not ready for test" )
547+ self .wait_dut_to_warm_up ()
553548 self .fails ['dut' ].clear ()
554549
555550 self .log ("Schedule to reboot the remote switch in %s sec" % self .reboot_delay )
556551 thr .start ()
557552
558553 self .log ("Wait until Control plane is down" )
559- self .timeout (self .task_timeout , "DUT hasn't shutdown in %d seconds" % self .task_timeout )
560- self .wait_until_cpu_port_down ()
561- self .cancel_timeout ()
554+ self .timeout (self .wait_until_cpu_port_down , self .task_timeout , "DUT hasn't shutdown in {} seconds" .format (self .task_timeout ))
562555
563556 if self .reboot_type == 'fast-reboot' :
564557 self .light_probe = True
@@ -568,15 +561,15 @@ def runTest(self):
568561
569562 if self .reboot_type == 'fast-reboot' :
570563 self .log ("Check that device is still forwarding data plane traffic" )
571- self .fails ['dut' ].add (' Data plane has a forwarding problem' )
572- self .assertTrue ( self . check_alive (), 'DUT is not stable' )
564+ self .fails ['dut' ].add (" Data plane has a forwarding problem after CPU went down" )
565+ self .check_alive ()
573566 self .fails ['dut' ].clear ()
574567
575568 self .log ("Wait until control plane up" )
576- async_cpu_up = pool .apply_async (self .wait_until_cpu_port_up )
569+ async_cpu_up = self . pool .apply_async (self .wait_until_cpu_port_up )
577570
578571 self .log ("Wait until data plane stops" )
579- async_forward_stop = pool .apply_async (self .check_forwarding_stop )
572+ async_forward_stop = self . pool .apply_async (self .check_forwarding_stop )
580573
581574 try :
582575 async_cpu_up .get (timeout = self .task_timeout )
@@ -593,9 +586,9 @@ def runTest(self):
593586 no_routing_start = datetime .datetime .min
594587
595588 if no_routing_start is not None :
596- self .timeout (self .task_timeout , "DUT hasn't started to work for %d seconds" % self . task_timeout )
597- no_routing_stop , _ = self .check_forwarding_resume ()
598- self .cancel_timeout ( )
589+ no_routing_stop , _ = self .timeout (self .check_forwarding_resume ,
590+ self .task_timeout ,
591+ "DUT hasn't started to work for %d seconds" % self .task_timeout )
599592 else :
600593 no_routing_stop = datetime .datetime .min
601594
@@ -631,15 +624,16 @@ def runTest(self):
631624 for _ , q in self .ssh_jobs :
632625 q .put ('quit' )
633626
634- self . timeout ( self . task_timeout , "SSH threads haven't finished for %d seconds" % self . task_timeout )
635- while any (thr .is_alive () for thr , _ in self .ssh_jobs ):
636- for _ , q in self .ssh_jobs :
637- q .put ('go' )
638- time .sleep (self .TIMEOUT )
627+ def wait_for_ssh_threads ():
628+ while any (thr .is_alive () for thr , _ in self .ssh_jobs ):
629+ for _ , q in self .ssh_jobs :
630+ q .put ('go' )
631+ time .sleep (self .TIMEOUT )
639632
640- for thr , _ in self .ssh_jobs :
641- thr .join ()
642- self .cancel_timeout ()
633+ for thr , _ in self .ssh_jobs :
634+ thr .join ()
635+
636+ self .timeout (wait_for_ssh_threads , self .task_timeout , "SSH threads haven't finished for %d seconds" % self .task_timeout )
643637
644638 self .log ("Data plane works again. Start time: %s" % str (no_routing_stop ))
645639 self .log ("" )
@@ -654,7 +648,8 @@ def runTest(self):
654648 self .fails ['dut' ].add ("%s cycle must be less than graceful limit %s seconds" % (self .reboot_type , self .test_params ['graceful_limit' ]))
655649 if self .reboot_type == 'fast-reboot' and no_cp_replies < 0.95 * self .nr_vl_pkts :
656650 self .fails ['dut' ].add ("Dataplane didn't route to all servers, when control-plane was down: %d vs %d" % (no_cp_replies , self .nr_vl_pkts ))
657-
651+ except Exception as e :
652+ self .fails ['dut' ].add (e )
658653 finally :
659654 # Stop watching DUT
660655 self .watching = False
@@ -1010,6 +1005,8 @@ def wait_dut_to_warm_up(self):
10101005 # up towards PTF docker. In practice, I've seen this warm up taking
10111006 # up to ~70 seconds.
10121007
1008+ fail = None
1009+
10131010 dut_stabilize_secs = int (self .test_params ['dut_stabilize_secs' ])
10141011 warm_up_timeout_secs = int (self .test_params ['warm_up_timeout_secs' ])
10151012
@@ -1023,8 +1020,7 @@ def wait_dut_to_warm_up(self):
10231020 if dataplane == 'up' and ctrlplane == 'up' and elapsed > dut_stabilize_secs :
10241021 break ;
10251022 if elapsed > warm_up_timeout_secs :
1026- # Control plane didn't come up within warm up timeout
1027- return False
1023+ raise Exception ("Control plane didn't come up within warm up timeout" )
10281024 time .sleep (1 )
10291025
10301026 # check until flooding is over. Flooding happens when FDB entry of
@@ -1036,26 +1032,28 @@ def wait_dut_to_warm_up(self):
10361032 if not self .asic_state .is_flooding () and elapsed > dut_stabilize_secs :
10371033 break
10381034 if elapsed > warm_up_timeout_secs :
1039- # Control plane didn't stop flooding within warm up timeout
1040- return False
1035+ raise Exception ("Data plane didn't stop flooding within warm up timeout" )
10411036 time .sleep (1 )
10421037
10431038 dataplane = self .asic_state .get ()
10441039 ctrlplane = self .cpu_state .get ()
1045- if not dataplane == 'up' or not ctrlplane == 'up' :
1046- # Either control or data plane went down while we were waiting
1047- # for the flooding to stop.
1048- return False
1040+ if not dataplane == 'up' :
1041+ fail = "Data plane"
1042+ elif not ctrlplane == 'up' :
1043+ fail = "Control plane"
10491044
1050- if (self .asic_state .get_state_time ('up' ) > uptime or
1051- self .cpu_state .get_state_time ('up' ) > uptime ):
1052- # Either control plane or data plane flapped while we were
1053- # waiting for the warm up.
1054- return False
1045+ if fail is not None :
1046+ raise Exception ("{} went down while waiting for flooding to stop" .format (fail ))
10551047
1056- # Everything is good
1057- return True
1048+ if self .asic_state .get_state_time ('up' ) > uptime :
1049+ fail = "Data plane"
1050+ elif self .cpu_state .get_state_time ('up' ) > uptime :
1051+ fail = "Control plane"
1052+
1053+ if fail is not None :
1054+ raise Exception ("{} flapped while waiting for the warm up" .format (fail ))
10581055
1056+ # Everything is good
10591057
10601058 def check_alive (self ):
10611059 # This function checks that DUT routes the packets in the both directions.
@@ -1077,16 +1075,17 @@ def check_alive(self):
10771075 uptime = self .asic_state .get_state_time (state )
10781076 else :
10791077 if uptime :
1080- return False # Stopped working after it working for sometime?
1078+ raise Exception ( "Data plane stopped working" )
10811079 time .sleep (2 )
10821080
10831081 # wait, until FDB entries are populated
10841082 for _ in range (self .nr_tests * 10 ): # wait for some time
1085- if not self .asic_state .is_flooding ():
1086- return True
1087- time .sleep (2 )
1083+ if self .asic_state .is_flooding ():
1084+ time .sleep (2 )
1085+ else :
1086+ break
10881087
1089- return False # we still see extra replies
1088+ raise Exception ( "DUT is flooding" )
10901089
10911090
10921091 def get_asic_vlan_reachability (self ):
0 commit comments