diff --git a/ansible/roles/test/files/ptftests/advanced-reboot.py b/ansible/roles/test/files/ptftests/advanced-reboot.py index e87d6ec0721..b8be0e89642 100644 --- a/ansible/roles/test/files/ptftests/advanced-reboot.py +++ b/ansible/roles/test/files/ptftests/advanced-reboot.py @@ -62,359 +62,9 @@ import pickle from operator import itemgetter import scapy.all as scapyall +import itertools - -class Arista(object): - DEBUG = False - def __init__(self, ip, queue, test_params, login='admin', password='123456'): - self.ip = ip - self.queue = queue - self.login = login - self.password = password - self.conn = None - self.hostname = None - self.v4_routes = [test_params['vlan_ip_range'], test_params['lo_prefix']] - self.v6_routes = [test_params['lo_v6_prefix']] - self.fails = set() - self.info = set() - self.min_bgp_gr_timeout = int(test_params['min_bgp_gr_timeout']) - - def __del__(self): - self.disconnect() - - def connect(self): - self.conn = paramiko.SSHClient() - self.conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.conn.connect(self.ip, username=self.login, password=self.password, allow_agent=False, look_for_keys=False) - self.shell = self.conn.invoke_shell() - - first_prompt = self.do_cmd(None, prompt = '>') - self.hostname = self.extract_hostname(first_prompt) - - self.do_cmd('enable') - self.do_cmd('terminal length 0') - - return self.shell - - def extract_hostname(self, first_prompt): - lines = first_prompt.split('\n') - prompt = lines[-1] - return prompt.strip().replace('>', '#') - - def do_cmd(self, cmd, prompt = None): - if prompt == None: - prompt = self.hostname - - if cmd is not None: - self.shell.send(cmd + '\n') - - input_buffer = '' - while prompt not in input_buffer: - input_buffer += self.shell.recv(16384) - - return input_buffer - - def disconnect(self): - if self.conn is not None: - self.conn.close() - self.conn = None - - return - - def run(self): - data = {} - debug_data = {} - run_once = False - log_first_line = None - quit_enabled = False - v4_routing_ok = False - v6_routing_ok = False - routing_works = True - self.connect() - - cur_time = time.time() - sample = {} - samples = {} - portchannel_output = self.do_cmd("show interfaces po1 | json") - portchannel_output = "\n".join(portchannel_output.split("\r\n")[1:-1]) - sample["po_changetime"] = json.loads(portchannel_output, strict=False)['interfaces']['Port-Channel1']['lastStatusChangeTimestamp'] - samples[cur_time] = sample - - while not (quit_enabled and v4_routing_ok and v6_routing_ok): - cmd = self.queue.get() - if cmd == 'quit': - quit_enabled = True - continue - cur_time = time.time() - info = {} - debug_info = {} - lacp_output = self.do_cmd('show lacp neighbor') - info['lacp'] = self.parse_lacp(lacp_output) - bgp_neig_output = self.do_cmd('show ip bgp neighbors') - info['bgp_neig'] = self.parse_bgp_neighbor(bgp_neig_output) - - bgp_route_v4_output = self.do_cmd('show ip route bgp | json') - v4_routing_ok = self.parse_bgp_route(bgp_route_v4_output, self.v4_routes) - info['bgp_route_v4'] = v4_routing_ok - - bgp_route_v6_output = self.do_cmd("show ipv6 route bgp | json") - v6_routing_ok = self.parse_bgp_route(bgp_route_v6_output, self.v6_routes) - info["bgp_route_v6"] = v6_routing_ok - - portchannel_output = self.do_cmd("show interfaces po1 | json") - portchannel_output = "\n".join(portchannel_output.split("\r\n")[1:-1]) - sample["po_changetime"] = json.loads(portchannel_output, strict=False)['interfaces']['Port-Channel1']['lastStatusChangeTimestamp'] - - if not run_once: - self.ipv4_gr_enabled, self.ipv6_gr_enabled, self.gr_timeout = self.parse_bgp_neighbor_once(bgp_neig_output) - if self.gr_timeout is not None: - log_first_line = "session_begins_%f" % cur_time - self.do_cmd("send log message %s" % log_first_line) - run_once = True - - data[cur_time] = info - samples[cur_time] = sample - if self.DEBUG: - debug_data[cur_time] = { - 'show lacp neighbor' : lacp_output, - 'show ip bgp neighbors' : bgp_neig_output, - 'show ip route bgp' : bgp_route_v4_output, - 'show ipv6 route bgp' : bgp_route_v6_output, - } - - attempts = 60 - for _ in range(attempts): - log_output = self.do_cmd("show log | begin %s" % log_first_line) - log_lines = log_output.split("\r\n")[1:-1] - log_data = self.parse_logs(log_lines) - if len(log_data) != 0: - break - time.sleep(1) # wait until logs are populated - - if len(log_data) == 0: - log_data['error'] = 'Incomplete output' - - self.disconnect() - - # save data for troubleshooting - with open("/tmp/%s.data.pickle" % self.ip, "w") as fp: - pickle.dump(data, fp) - - # save debug data for troubleshooting - if self.DEBUG: - with open("/tmp/%s.raw.pickle" % self.ip, "w") as fp: - pickle.dump(debug_data, fp) - with open("/tmp/%s.logging" % self.ip, "w") as fp: - fp.write("\n".join(log_lines)) - - self.check_gr_peer_status(data) - cli_data = {} - cli_data['lacp'] = self.check_series_status(data, "lacp", "LACP session") - cli_data['bgp_v4'] = self.check_series_status(data, "bgp_route_v4", "BGP v4 routes") - cli_data['bgp_v6'] = self.check_series_status(data, "bgp_route_v6", "BGP v6 routes") - cli_data['po'] = self.check_change_time(samples, "po_changetime", "PortChannel interface") - - route_timeout = log_data['route_timeout'] - cli_data['route_timeout'] = route_timeout - - # {'10.0.0.38': [(0, '4200065100)')], 'fc00::2d': [(0, '4200065100)')]} - for nei in route_timeout.keys(): - asn = route_timeout[nei][0][-1] - msg = 'BGP route GR timeout: neighbor %s (ASN %s' % (nei, asn) - self.fails.add(msg) - - return self.fails, self.info, cli_data, log_data - - def extract_from_logs(self, regexp, data): - raw_data = [] - result = defaultdict(list) - initial_time = -1 - re_compiled = re.compile(regexp) - for line in data: - m = re_compiled.match(line) - if not m: - continue - raw_data.append((datetime.datetime.strptime(m.group(1), "%b %d %X"), m.group(2), m.group(3))) - - if len(raw_data) > 0: - initial_time = raw_data[0][0] - for when, what, status in raw_data: - offset = (when - initial_time if when > initial_time else initial_time - when).seconds - result[what].append((offset, status)) - - return result, initial_time - - def parse_logs(self, data): - result = {} - bgp_r = r'^(\S+\s+\d+\s+\S+) \S+ Rib: %BGP-5-ADJCHANGE: peer (\S+) .+ (\S+)$' - result_bgp, initial_time_bgp = self.extract_from_logs(bgp_r, data) - if_r = r'^(\S+\s+\d+\s+\S+) \S+ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface (\S+), changed state to (\S+)$' - result_if, initial_time_if = self.extract_from_logs(if_r, data) - - route_r = r'^(\S+\s+\d+\s+\S+) \S+ Rib: %BGP-5-BGP_GRACEFUL_RESTART_TIMEOUT: Deleting stale routes from peer (\S+) .+ (\S+)$' - result_rt, initial_time_rt = self.extract_from_logs(route_r, data) - - result['route_timeout'] = result_rt - - if initial_time_bgp == -1 or initial_time_if == -1: - return result - - for events in result_bgp.values(): - if events[-1][1] != 'Established': - return result - - # first state is Idle, last state is Established - for events in result_bgp.values(): - if len(events) > 1: - assert(events[0][1] != 'Established') - - assert(events[-1][1] == 'Established') - - # first state is down, last state is up - for events in result_if.values(): - assert(events[0][1] == 'down') - assert(events[-1][1] == 'up') - - po_name = [ifname for ifname in result_if.keys() if 'Port-Channel' in ifname][0] - neigh_ipv4 = [neig_ip for neig_ip in result_bgp.keys() if '.' in neig_ip][0] - - result['PortChannel was down (seconds)'] = result_if[po_name][-1][0] - result_if[po_name][0][0] - for if_name in sorted(result_if.keys()): - result['Interface %s was down (times)' % if_name] = map(itemgetter(1), result_if[if_name]).count("down") - - for neig_ip in result_bgp.keys(): - key = "BGP IPv6 was down (seconds)" if ':' in neig_ip else "BGP IPv4 was down (seconds)" - result[key] = result_bgp[neig_ip][-1][0] - result_bgp[neig_ip][0][0] - - for neig_ip in result_bgp.keys(): - key = "BGP IPv6 was down (times)" if ':' in neig_ip else "BGP IPv4 was down (times)" - result[key] = map(itemgetter(1), result_bgp[neig_ip]).count("Idle") - - bgp_po_offset = (initial_time_if - initial_time_bgp if initial_time_if > initial_time_bgp else initial_time_bgp - initial_time_if).seconds - result['PortChannel went down after bgp session was down (seconds)'] = bgp_po_offset + result_if[po_name][0][0] - - for neig_ip in result_bgp.keys(): - key = "BGP IPv6 was gotten up after Po was up (seconds)" if ':' in neig_ip else "BGP IPv4 was gotten up after Po was up (seconds)" - result[key] = result_bgp[neig_ip][-1][0] - bgp_po_offset - result_if[po_name][-1][0] - - return result - - def parse_lacp(self, output): - return output.find('Bundled') != -1 - - def parse_bgp_neighbor_once(self, output): - is_gr_ipv4_enabled = False - is_gr_ipv6_enabled = False - restart_time = None - for line in output.split('\n'): - if ' Restart-time is' in line: - restart_time = int(line.replace(' Restart-time is ', '')) - continue - - if 'is enabled, Forwarding State is' in line: - if 'IPv6' in line: - is_gr_ipv6_enabled = True - elif 'IPv4' in line: - is_gr_ipv4_enabled = True - - return is_gr_ipv4_enabled, is_gr_ipv6_enabled, restart_time - - def parse_bgp_neighbor(self, output): - gr_active = None - gr_timer = None - for line in output.split('\n'): - if 'Restart timer is' in line: - gr_active = 'is active' in line - gr_timer = str(line[-9:-1]) - - return gr_active, gr_timer - - def parse_bgp_route(self, output, expects): - prefixes = set() - data = "\n".join(output.split("\r\n")[1:-1]) - obj = json.loads(data) - - if "vrfs" in obj and "default" in obj["vrfs"]: - obj = obj["vrfs"]["default"] - for prefix, attrs in obj["routes"].items(): - if "routeAction" not in attrs or attrs["routeAction"] != "forward": - continue - if all("Port-Channel" in via["interface"] for via in attrs["vias"]): - prefixes.add(prefix) - - return set(expects) == prefixes - - def check_gr_peer_status(self, output): - # [0] True 'ipv4_gr_enabled', [1] doesn't matter 'ipv6_enabled', [2] should be >= 120 - if not self.ipv4_gr_enabled: - self.fails.add("bgp ipv4 graceful restart is not enabled") - if not self.ipv6_gr_enabled: - pass # ToDo: - if self.gr_timeout < 120: # bgp graceful restart timeout less then 120 seconds - self.fails.add("bgp graceful restart timeout is less then 120 seconds") - - for when, other in sorted(output.items(), key = lambda x : x[0]): - gr_active, timer = other['bgp_neig'] - # wnen it's False, it's ok, wnen it's True, check that inactivity timer not less then self.min_bgp_gr_timeout seconds - if gr_active and datetime.datetime.strptime(timer, '%H:%M:%S') < datetime.datetime(1900, 1, 1, second = self.min_bgp_gr_timeout): - self.fails.add("graceful restart timer is almost finished. Less then %d seconds left" % self.min_bgp_gr_timeout) - - def check_series_status(self, output, entity, what): - # find how long anything was down - # Input parameter is a dictionary when:status - # constraints: - # entity must be down just once - # entity must be up when the test starts - # entity must be up when the test stops - - sorted_keys = sorted(output.keys()) - if not output[sorted_keys[0]][entity]: - self.fails.add("%s must be up when the test starts" % what) - return 0, 0 - if not output[sorted_keys[-1]][entity]: - self.fails.add("%s must be up when the test stops" % what) - return 0, 0 - - start = sorted_keys[0] - cur_state = True - res = defaultdict(list) - for when in sorted_keys[1:]: - if cur_state != output[when][entity]: - res[cur_state].append(when - start) - start = when - cur_state = output[when][entity] - res[cur_state].append(when - start) - - is_down_count = len(res[False]) - - if is_down_count > 1: - self.info.add("%s must be down just for once" % what) - - return is_down_count, sum(res[False]) # summary_downtime - - def check_change_time(self, output, entity, what): - # find last changing time updated, if no update, the entity is never changed - # Input parameter is a dictionary when:last_changing_time - # constraints: - # the dictionary `output` cannot be empty - sorted_keys = sorted(output.keys()) - if not output: - self.fails.add("%s cannot be empty" % what) - return 0, 0 - - start = sorted_keys[0] - prev_time = output[start] - change_count = 0 - for when in sorted_keys[1:]: - if prev_time != output[when][entity]: - prev_time = output[when][entity] - change_count += 1 - - if change_count > 0: - self.info.add("%s state changed %d times" % (what, change_count)) - - # Note: the first item is a placeholder - return 0, change_count +from arista import Arista class StateMachine(): @@ -458,6 +108,10 @@ def is_flooding(self): class ReloadTest(BaseTest): TIMEOUT = 0.5 + VLAN_BASE_MAC_PATTERN = '72060001{:04}' + LAG_BASE_MAC_PATTERN = '5c010203{:04}' + SOCKET_RECV_BUFFER_SIZE = 10 * 1024 * 1024 + def __init__(self): BaseTest.__init__(self) self.fails = {} @@ -466,36 +120,36 @@ def __init__(self): self.logs_info = {} self.log_lock = threading.RLock() self.test_params = testutils.test_params_get() - self.check_param('verbose', False, required = False) - self.check_param('dut_username', '', required = True) - self.check_param('dut_hostname', '', required = True) - self.check_param('reboot_limit_in_seconds', 30, required = False) - self.check_param('reboot_type', 'fast-reboot', required = False) - self.check_param('graceful_limit', 180, required = False) - self.check_param('portchannel_ports_file', '', required = True) - self.check_param('vlan_ports_file', '', required = True) - self.check_param('ports_file', '', required = True) - self.check_param('dut_mac', '', required = True) - self.check_param('dut_vlan_ip', '', required = True) - self.check_param('default_ip_range', '', required = True) - self.check_param('vlan_ip_range', '', required = True) - self.check_param('lo_prefix', '10.1.0.32/32', required = False) - self.check_param('lo_v6_prefix', 'fc00:1::/64', required = False) - self.check_param('arista_vms', [], required = True) - self.check_param('min_bgp_gr_timeout', 15, required = False) - self.check_param('warm_up_timeout_secs', 180, required = False) - self.check_param('dut_stabilize_secs', 20, required = False) + self.check_param('verbose', False, required=False) + self.check_param('dut_username', '', required=True) + self.check_param('dut_hostname', '', required=True) + self.check_param('reboot_limit_in_seconds', 30, required=False) + self.check_param('reboot_type', 'fast-reboot', required=False) + self.check_param('graceful_limit', 180, required=False) + self.check_param('portchannel_ports_file', '', required=True) + self.check_param('vlan_ports_file', '', required=True) + self.check_param('ports_file', '', required=True) + self.check_param('dut_mac', '', required=True) + self.check_param('dut_vlan_ip', '', required=True) + self.check_param('default_ip_range', '', required=True) + self.check_param('vlan_ip_range', '', required=True) + self.check_param('lo_prefix', '10.1.0.32/32', required=False) + self.check_param('lo_v6_prefix', 'fc00:1::/64', required=False) + self.check_param('arista_vms', [], required=True) + self.check_param('min_bgp_gr_timeout', 15, required=False) + self.check_param('warm_up_timeout_secs', 180, required=False) + self.check_param('dut_stabilize_secs', 20, required=False) self.log_file_name = '/tmp/%s.log' % self.test_params['reboot_type'] self.log_fp = open(self.log_file_name, 'w') # Default settings - self.ping_dut_pkts = 10 - self.arp_ping_pkts = 1 - self.nr_pc_pkts = 100 - self.nr_tests = 3 - self.reboot_delay = 10 - self.task_timeout = 300 # Wait up to 5 minutes for tasks to complete + self.ping_dut_pkts = 10 + self.arp_ping_pkts = 1 + self.nr_pc_pkts = 100 + self.nr_tests = 3 + self.reboot_delay = 10 + self.task_timeout = 300 # Wait up to 5 minutes for tasks to complete self.max_nr_vl_pkts = 500 # FIXME: should be 1000. # But ptf is not fast enough + swss is slow for FDB and ARP entries insertions self.timeout_thr = None @@ -519,6 +173,10 @@ def __init__(self): # True : when one direction probe fails, don't probe another. # False: when one direction probe fails, continue probe another. self.light_probe = False + # We have two data plane traffic generators which are mutualy exclusive + # one is the reachability_watcher thread + # second is the fast send_in_background + self.dataplane_io_lock = threading.Lock() return @@ -529,9 +187,9 @@ def read_json(self, name): return content def read_port_indices(self): - self.port_indices = self.read_json('ports_file') + port_indices = self.read_json('ports_file') - return + return port_indices def read_portchannel_ports(self): content = self.read_json('portchannel_ports_file') @@ -598,23 +256,60 @@ def cancel_timeout(self): self.timeout_thr.cancel() self.timeout_thr = None + def generate_vlan_servers(self): + vlan_host_map = defaultdict(dict) + vlan_ip_range = self.test_params['vlan_ip_range'] + + _, mask = vlan_ip_range.split('/') + n_hosts = min(2**(32 - int(mask)) - 3, self.max_nr_vl_pkts) + + for counter, i in enumerate(xrange(2, n_hosts + 2)): + mac = self.VLAN_BASE_MAC_PATTERN.format(counter) + port = self.vlan_ports[i % len(self.vlan_ports)] + addr = self.host_ip(vlan_ip_range, i) + + vlan_host_map[port][addr] = mac + + self.nr_vl_pkts = n_hosts + + return vlan_host_map + + def generate_arp_responder_conf(self, vlan_host_map): + arp_responder_conf = {} + for port in vlan_host_map: + arp_responder_conf['eth{}'.format(port)] = vlan_host_map[port] + + return arp_responder_conf + + def dump_arp_responder_config(self, dump): + # save data for arp_replay process + with open("/tmp/from_t1.json", "w") as fp: + json.dump(dump, fp) + def setUp(self): - self.read_port_indices() + self.port_indices = self.read_port_indices() self.portchannel_ports = self.read_portchannel_ports() - vlan_ip_range = self.test_params['vlan_ip_range'] self.vlan_ports = self.read_vlan_ports() + self.vlan_ip_range = self.test_params['vlan_ip_range'] + self.default_ip_range = self.test_params['default_ip_range'] + self.limit = datetime.timedelta(seconds=self.test_params['reboot_limit_in_seconds']) self.reboot_type = self.test_params['reboot_type'] if self.reboot_type not in ['fast-reboot', 'warm-reboot']: raise ValueError('Not supported reboot_type %s' % self.reboot_type) self.dut_ssh = self.test_params['dut_username'] + '@' + self.test_params['dut_hostname'] self.dut_mac = self.test_params['dut_mac'] - # - self.generate_from_t1() - self.generate_from_vlan() - self.generate_ping_dut_lo() - self.generate_arp_ping_packet() + + self.vlan_host_map = self.generate_vlan_servers() + arp_responder_conf = self.generate_arp_responder_conf(self.vlan_host_map) + self.dump_arp_responder_config(arp_responder_conf) + + self.random_vlan = random.choice(self.vlan_ports) + self.from_server_src_port = self.random_vlan + self.from_server_src_addr = random.choice(self.vlan_host_map[self.random_vlan].keys()) + self.from_server_dst_addr = self.random_ip(self.test_params['default_ip_range']) + self.from_server_dst_ports = self.portchannel_ports self.log("Test params:") self.log("DUT ssh: %s" % self.dut_ssh) @@ -630,6 +325,11 @@ def setUp(self): self.log("Reboot type is %s" % self.reboot_type) + self.generate_from_t1() + self.generate_from_vlan() + self.generate_ping_dut_lo() + self.generate_arp_ping_packet() + if self.reboot_type == 'warm-reboot': # Pre-generate list of packets to be sent in send_in_background method. generate_start = datetime.datetime.now() @@ -639,7 +339,7 @@ def setUp(self): self.dataplane = ptf.dataplane_instance for p in self.dataplane.ports.values(): port = p.get_packet_source() - port.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1000000) + port.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.SOCKET_RECV_BUFFER_SIZE) self.dataplane.flush() if config["log_dir"] != None: @@ -673,38 +373,34 @@ def get_mac(self, iff): SIOCGIFHWADDR = 0x8927 # Get hardware address return ':'.join(['%02x' % ord(char) for char in self.get_if(iff, SIOCGIFHWADDR)[18:24]]) + @staticmethod + def hex_to_mac(hex_mac): + return ':'.join(hex_mac[i:i+2] for i in range(0, len(hex_mac), 2)) + def generate_from_t1(self): self.from_t1 = [] - vlan_ip_range = self.test_params['vlan_ip_range'] + # for each server host create a packet destinating server IP + for counter, host_port in enumerate(self.vlan_host_map): + src_addr = self.random_ip(self.default_ip_range) + src_port = self.random_port(self.portchannel_ports) - _, mask = vlan_ip_range.split('/') - n_hosts = min(2**(32 - int(mask)) - 3, self.max_nr_vl_pkts) + for server_ip in self.vlan_host_map[host_port]: + dst_addr = server_ip - dump = defaultdict(dict) - counter = 0 - for i in xrange(2, n_hosts + 2): - from_t1_src_addr = self.random_ip(self.test_params['default_ip_range']) - from_t1_src_port = self.random_port(self.portchannel_ports) - from_t1_dst_addr = self.host_ip(vlan_ip_range, i) - from_t1_dst_port = self.vlan_ports[i % len(self.vlan_ports)] - from_t1_if_name = "eth%d" % from_t1_dst_port - from_t1_if_addr = "%s/%s" % (from_t1_dst_addr, vlan_ip_range.split('/')[1]) - vlan_mac_hex = '72060001%04x' % counter - lag_mac_hex = '5c010203%04x' % counter - mac_addr = ':'.join(lag_mac_hex[i:i+2] for i in range(0, len(lag_mac_hex), 2)) - packet = simple_tcp_packet( - eth_src=mac_addr, - eth_dst=self.dut_mac, - ip_src=from_t1_src_addr, - ip_dst=from_t1_dst_addr, - ip_ttl=255, - tcp_dport=5000 - ) - self.from_t1.append((from_t1_src_port, str(packet))) - dump[from_t1_if_name][from_t1_dst_addr] = vlan_mac_hex - counter += 1 + # generate source MAC address for traffic based on LAG_BASE_MAC_PATTERN + mac_addr = self.hex_to_mac(self.LAG_BASE_MAC_PATTERN.format(counter)) + + packet = simple_tcp_packet(eth_src=mac_addr, + eth_dst=self.dut_mac, + ip_src=src_addr, + ip_dst=dst_addr, + ip_ttl=255, + tcp_dport=5000) + + self.from_t1.append((src_port, str(packet))) + # expect any packet with dport 5000 exp_packet = simple_tcp_packet( ip_src="0.0.0.0", ip_dst="0.0.0.0", @@ -720,20 +416,6 @@ def generate_from_t1(self): self.from_t1_exp_packet.set_do_not_care_scapy(scapy.TCP, "chksum") self.from_t1_exp_packet.set_do_not_care_scapy(scapy.IP, "ttl") - # save data for arp_replay process - with open("/tmp/from_t1.json", "w") as fp: - json.dump(dump, fp) - - random_vlan_iface = random.choice(dump.keys()) - self.from_server_src_port = int(random_vlan_iface.replace('eth','')) - self.from_server_src_addr = random.choice(dump[random_vlan_iface].keys()) - self.from_server_dst_addr = self.random_ip(self.test_params['default_ip_range']) - self.from_server_dst_ports = self.portchannel_ports - - self.nr_vl_pkts = n_hosts - - return - def generate_from_vlan(self): packet = simple_tcp_packet( eth_dst=self.dut_mac, @@ -749,13 +431,11 @@ def generate_from_vlan(self): ) self.from_vlan_exp_packet = Mask(exp_packet) - self.from_vlan_exp_packet.set_do_not_care_scapy(scapy.Ether,"src") - self.from_vlan_exp_packet.set_do_not_care_scapy(scapy.Ether,"dst") + self.from_vlan_exp_packet.set_do_not_care_scapy(scapy.Ether, "src") + self.from_vlan_exp_packet.set_do_not_care_scapy(scapy.Ether, "dst") self.from_vlan_packet = str(packet) - return - def generate_ping_dut_lo(self): dut_lo_ipv4 = self.test_params['lo_prefix'].split('/')[0] packet = simple_icmp_packet(eth_dst=self.dut_mac, @@ -800,50 +480,29 @@ def generate_arp_ping_packet(self): self.arp_resp.set_do_not_care_scapy(scapy.ARP, 'hwsrc') self.arp_src_port = src_port - def generate_bidirectional(self, packets_to_send = None): + def generate_bidirectional(self): """ This method is used to pre-generate packets to be sent in background thread. Packets are composed into a list, and present a bidirectional flow as next: five packet from T1, one packet from vlan. - Each packet has sequential UDP Payload - to be identified later. + Each packet has sequential TCP Payload - to be identified later. """ - if packets_to_send: - self.packets_to_send = packets_to_send - self.send_interval = self.time_to_listen / self.packets_to_send - else: - packets_to_send = self.packets_to_send - vlan_ip_range = self.test_params['vlan_ip_range'] - _, mask = vlan_ip_range.split('/') - n_hosts = min(2**(32 - int(mask)) - 3, self.max_nr_vl_pkts) - counter = 0 - self.packets_list = list() - for i in xrange(packets_to_send): + + self.send_interval = self.time_to_listen / self.packets_to_send + self.packets_list = [] + from_t1_iter = itertools.cycle(self.from_t1) + + for i in xrange(self.packets_to_send): payload = '0' * 60 + str(i) if (i % 5) == 0 : # From vlan to T1. - packet = simple_udp_packet( - eth_dst = self.dut_mac, - ip_src = self.from_server_src_addr, - ip_dst = self.from_server_dst_addr, - udp_sport = 1234, - udp_dport = 5000, - udp_payload = payload) + packet = scapyall.Ether(self.from_vlan_packet) + packet.load = payload from_port = self.from_server_src_port else: # From T1 to vlan. - from_t1_src_addr = self.random_ip(self.test_params['default_ip_range']) - from_t1_src_port = self.random_port(self.portchannel_ports) - from_t1_dst_addr = self.host_ip(vlan_ip_range, (counter%(n_hosts-2))+2) - lag_mac_hex = '5c010203%04x' % counter - mac_addr = ':'.join(lag_mac_hex[i:i+2] for i in range(0, len(lag_mac_hex), 2)) - counter += 1 - packet = simple_udp_packet( - eth_src = mac_addr, - eth_dst = self.dut_mac, - ip_src = from_t1_src_addr, - ip_dst = from_t1_dst_addr, - ip_ttl = 255, - udp_dport = 5000, - udp_payload = payload) - from_port = from_t1_src_port + src_port, packet = next(from_t1_iter) + packet = scapyall.Ether(packet) + packet.load = payload + from_port = src_port self.packets_list.append((from_port, str(packet))) def runTest(self): @@ -944,11 +603,14 @@ def runTest(self): self.watching = False if self.reboot_type == 'warm-reboot': + self.send_and_sniff() + # Stop watching DUT self.watching = False self.log("Stopping reachability state watch thread.") self.watcher_is_stopped.wait(timeout = 10) # Wait for the Watcher stopped. - self.send_and_sniff() + + self.save_sniffed_packets() examine_start = datetime.datetime.now() self.log("Packet flow examine started %s after the reboot" % str(examine_start - self.reboot_start)) @@ -1122,6 +784,11 @@ def wait_until_cpu_port_up(self): break time.sleep(self.TIMEOUT) + def apply_filter_all_ports(self, filter_expression): + for p in self.dataplane.ports.values(): + port = p.get_packet_source() + scapyall.attach_filter(port.socket, filter_expression) + def send_in_background(self, packets_list = None, interval = None): """ This method sends predefined list of packets with predefined interval. @@ -1131,16 +798,24 @@ def send_in_background(self, packets_list = None, interval = None): if not packets_list: packets_list = self.packets_list self.sniffer_started.wait(timeout=10) - sender_start = datetime.datetime.now() - self.log("Sender started at %s" % str(sender_start)) - for entry in packets_list: - time.sleep(interval) - testutils.send_packet(self, *entry) - self.log("Sender has been running for %s" % str(datetime.datetime.now() - sender_start)) + with self.dataplane_io_lock: + # While running fast data plane sender thread there are two reasons for filter to be applied + # 1. filter out data plane traffic which is tcp to free up the load on PTF socket (sniffer thread is using a different one) + # 2. during warm neighbor restoration DUT will send a lot of ARP requests which we are not interested in + # This is essential to get stable results + self.apply_filter_all_ports('not (arp and ether src {}) and not tcp'.format(self.test_params['dut_mac'])) + sender_start = datetime.datetime.now() + self.log("Sender started at %s" % str(sender_start)) + for entry in packets_list: + time.sleep(interval) + testutils.send_packet(self, *entry) + self.log("Sender has been running for %s" % str(datetime.datetime.now() - sender_start)) + # Remove filter + self.apply_filter_all_ports('') def sniff_in_background(self, wait = None): """ - This function listens on all ports, in both directions, for the UDP src=1234 dst=5000 packets, until timeout. + This function listens on all ports, in both directions, for the TCP src=1234 dst=5000 packets, until timeout. Once found, all packets are dumped to local pcap file, and all packets are saved to self.packets as scapy type. The native scapy.snif() is used as a background thread, to allow delayed start for the send_in_background(). @@ -1149,8 +824,7 @@ def sniff_in_background(self, wait = None): wait = self.time_to_listen + 30 sniffer_start = datetime.datetime.now() self.log("Sniffer started at %s" % str(sniffer_start)) - filename = '/tmp/capture.pcap' - sniff_filter = "udp and udp dst port 5000 and udp src port 1234 and not icmp" + sniff_filter = "tcp and tcp dst port 5000 and tcp src port 1234 and not icmp" scapy_sniffer = threading.Thread(target=self.scapy_sniff, kwargs={'wait': wait, 'sniff_filter': sniff_filter}) scapy_sniffer.start() time.sleep(2) # Let the scapy sniff initialize completely. @@ -1158,6 +832,9 @@ def sniff_in_background(self, wait = None): scapy_sniffer.join() self.log("Sniffer has been running for %s" % str(datetime.datetime.now() - sniffer_start)) self.sniffer_started.clear() + + def save_sniffed_packets(self): + filename = '/tmp/capture.pcap' if self.packets: scapyall.wrpcap(filename, self.packets) self.log("Pcap file dumped to %s" % filename) @@ -1183,13 +860,13 @@ def send_and_sniff(self): self.sniff_thr.join() self.sender_thr.join() - def check_udp_payload(self, packet): + def check_tcp_payload(self, packet): """ This method is used by examine_flow() method. - It returns True if a packet is not corrupted and has a valid UDP sequential UDP Payload, as created by generate_bidirectional() method'. + It returns True if a packet is not corrupted and has a valid TCP sequential TCP Payload, as created by generate_bidirectional() method'. """ try: - int(str(packet[scapyall.UDP].payload)) in range(self.packets_to_send) + int(str(packet[scapyall.TCP].payload)) in range(self.packets_to_send) return True except Exception as err: return False @@ -1198,9 +875,9 @@ def no_flood(self, packet): """ This method filters packets which are unique (i.e. no floods). """ - if (not int(str(packet[scapyall.UDP].payload)) in self.unique_id) and (packet[scapyall.Ether].src == self.dut_mac): + if (not int(str(packet[scapyall.TCP].payload)) in self.unique_id) and (packet[scapyall.Ether].src == self.dut_mac): # This is a unique (no flooded) received packet. - self.unique_id.append(int(str(packet[scapyall.UDP].payload))) + self.unique_id.append(int(str(packet[scapyall.TCP].payload))) return True elif packet[scapyall.Ether].dst == self.dut_mac: # This is a sent packet. @@ -1211,7 +888,7 @@ def no_flood(self, packet): def examine_flow(self, filename = None): """ This method examines pcap file (if given), or self.packets scapy file. - The method compares UDP payloads of the packets one by one (assuming all payloads are consecutive integers), + The method compares TCP payloads of the packets one by one (assuming all payloads are consecutive integers), and the losses if found - are treated as disruptions in Dataplane forwarding. All disruptions are saved to self.lost_packets dictionary, in format: disrupt_start_id = (missing_packets_count, disrupt_time, disrupt_start_timestamp, disrupt_stop_timestamp) @@ -1227,15 +904,15 @@ def examine_flow(self, filename = None): # Filter out packets and remove floods: self.unique_id = list() # This list will contain all unique Payload ID, to filter out received floods. filtered_packets = [ pkt for pkt in all_packets if - scapyall.UDP in pkt and + scapyall.TCP in pkt and not scapyall.ICMP in pkt and - pkt[scapyall.UDP].sport == 1234 and - pkt[scapyall.UDP].dport == 5000 and - self.check_udp_payload(pkt) and + pkt[scapyall.TCP].sport == 1234 and + pkt[scapyall.TCP].dport == 5000 and + self.check_tcp_payload(pkt) and self.no_flood(pkt) ] # Re-arrange packets, if delayed, by Payload ID and Timestamp: - packets = sorted(filtered_packets, key = lambda packet: (int(str(packet[scapyall.UDP].payload)), packet.time )) + packets = sorted(filtered_packets, key = lambda packet: (int(str(packet[scapyall.TCP].payload)), packet.time )) self.lost_packets = dict() self.max_disrupt, self.total_disruption = 0, 0 sent_packets = dict() @@ -1250,13 +927,13 @@ def examine_flow(self, filename = None): for packet in packets: if packet[scapyall.Ether].dst == self.dut_mac: # This is a sent packet - keep track of it as payload_id:timestamp. - sent_payload = int(str(packet[scapyall.UDP].payload)) + sent_payload = int(str(packet[scapyall.TCP].payload)) sent_packets[sent_payload] = packet.time continue if packet[scapyall.Ether].src == self.dut_mac: # This is a received packet. received_time = packet.time - received_payload = int(str(packet[scapyall.UDP].payload)) + received_payload = int(str(packet[scapyall.TCP].payload)) received_counter += 1 if not (received_payload and received_time): # This is the first valid received packet. @@ -1483,16 +1160,18 @@ def reachability_watcher(self): # changes for future analysis self.watcher_is_stopped.clear() # Watcher is running. while self.watching: - vlan_to_t1, t1_to_vlan = self.ping_data_plane(self.light_probe) - reachable = (t1_to_vlan > self.nr_vl_pkts * 0.7 and - vlan_to_t1 > self.nr_pc_pkts * 0.7) - partial = (reachable and - (t1_to_vlan < self.nr_vl_pkts or - vlan_to_t1 < self.nr_pc_pkts)) - flooding = (reachable and - (t1_to_vlan > self.nr_vl_pkts or - vlan_to_t1 > self.nr_pc_pkts)) - self.log_asic_state_change(reachable, partial, t1_to_vlan, flooding) + if self.dataplane_io_lock.acquire(False): + vlan_to_t1, t1_to_vlan = self.ping_data_plane(self.light_probe) + reachable = (t1_to_vlan > self.nr_vl_pkts * 0.7 and + vlan_to_t1 > self.nr_pc_pkts * 0.7) + partial = (reachable and + (t1_to_vlan < self.nr_vl_pkts or + vlan_to_t1 < self.nr_pc_pkts)) + flooding = (reachable and + (t1_to_vlan > self.nr_vl_pkts or + vlan_to_t1 > self.nr_pc_pkts)) + self.log_asic_state_change(reachable, partial, t1_to_vlan, flooding) + self.dataplane_io_lock.release() total_rcv_pkt_cnt = self.pingDut() reachable = total_rcv_pkt_cnt > 0 and total_rcv_pkt_cnt > self.ping_dut_pkts * 0.7 partial = total_rcv_pkt_cnt > 0 and total_rcv_pkt_cnt < self.ping_dut_pkts diff --git a/ansible/roles/test/files/ptftests/arista.py b/ansible/roles/test/files/ptftests/arista.py new file mode 100644 index 00000000000..e77b69a3874 --- /dev/null +++ b/ansible/roles/test/files/ptftests/arista.py @@ -0,0 +1,386 @@ +import ptf +from ptf.base_tests import BaseTest +from ptf import config +import ptf.testutils as testutils +from ptf.testutils import * +from ptf.dataplane import match_exp_pkt +import datetime +import time +import subprocess +from ptf.mask import Mask +import socket +import ptf.packet as scapy +import thread +import threading +from multiprocessing.pool import ThreadPool, TimeoutError +import os +import signal +import random +import struct +import socket +from pprint import pprint +from fcntl import ioctl +import sys +import json +import re +from collections import defaultdict +import json +import paramiko +import Queue +import pickle +from operator import itemgetter +import scapy.all as scapyall +import enum + +class Arista(object): + DEBUG = False + def __init__(self, ip, queue, test_params, login='admin', password='123456'): + self.ip = ip + self.queue = queue + self.login = login + self.password = password + self.conn = None + self.hostname = None + self.v4_routes = [test_params['vlan_ip_range'], test_params['lo_prefix']] + self.v6_routes = [test_params['lo_v6_prefix']] + self.fails = set() + self.info = set() + self.min_bgp_gr_timeout = int(test_params['min_bgp_gr_timeout']) + + def __del__(self): + self.disconnect() + + def connect(self): + self.conn = paramiko.SSHClient() + self.conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.conn.connect(self.ip, username=self.login, password=self.password, allow_agent=False, look_for_keys=False) + self.shell = self.conn.invoke_shell() + + first_prompt = self.do_cmd(None, prompt = '>') + self.hostname = self.extract_hostname(first_prompt) + + self.do_cmd('enable') + self.do_cmd('terminal length 0') + + return self.shell + + def extract_hostname(self, first_prompt): + lines = first_prompt.split('\n') + prompt = lines[-1] + return prompt.strip().replace('>', '#') + + def do_cmd(self, cmd, prompt = None): + if prompt == None: + prompt = self.hostname + + if cmd is not None: + self.shell.send(cmd + '\n') + + input_buffer = '' + while prompt not in input_buffer: + input_buffer += self.shell.recv(16384) + + return input_buffer + + def disconnect(self): + if self.conn is not None: + self.conn.close() + self.conn = None + + return + + def run(self): + data = {} + debug_data = {} + run_once = False + log_first_line = None + quit_enabled = False + v4_routing_ok = False + v6_routing_ok = False + routing_works = True + self.connect() + + cur_time = time.time() + sample = {} + samples = {} + portchannel_output = self.do_cmd("show interfaces po1 | json") + portchannel_output = "\n".join(portchannel_output.split("\r\n")[1:-1]) + sample["po_changetime"] = json.loads(portchannel_output, strict=False)['interfaces']['Port-Channel1']['lastStatusChangeTimestamp'] + samples[cur_time] = sample + + while not (quit_enabled and v4_routing_ok and v6_routing_ok): + cmd = self.queue.get() + if cmd == 'quit': + quit_enabled = True + continue + cur_time = time.time() + info = {} + debug_info = {} + lacp_output = self.do_cmd('show lacp neighbor') + info['lacp'] = self.parse_lacp(lacp_output) + bgp_neig_output = self.do_cmd('show ip bgp neighbors') + info['bgp_neig'] = self.parse_bgp_neighbor(bgp_neig_output) + + bgp_route_v4_output = self.do_cmd('show ip route bgp | json') + v4_routing_ok = self.parse_bgp_route(bgp_route_v4_output, self.v4_routes) + info['bgp_route_v4'] = v4_routing_ok + + bgp_route_v6_output = self.do_cmd("show ipv6 route bgp | json") + v6_routing_ok = self.parse_bgp_route(bgp_route_v6_output, self.v6_routes) + info["bgp_route_v6"] = v6_routing_ok + + portchannel_output = self.do_cmd("show interfaces po1 | json") + portchannel_output = "\n".join(portchannel_output.split("\r\n")[1:-1]) + sample["po_changetime"] = json.loads(portchannel_output, strict=False)['interfaces']['Port-Channel1']['lastStatusChangeTimestamp'] + + if not run_once: + self.ipv4_gr_enabled, self.ipv6_gr_enabled, self.gr_timeout = self.parse_bgp_neighbor_once(bgp_neig_output) + if self.gr_timeout is not None: + log_first_line = "session_begins_%f" % cur_time + self.do_cmd("send log message %s" % log_first_line) + run_once = True + + data[cur_time] = info + samples[cur_time] = sample + if self.DEBUG: + debug_data[cur_time] = { + 'show lacp neighbor' : lacp_output, + 'show ip bgp neighbors' : bgp_neig_output, + 'show ip route bgp' : bgp_route_v4_output, + 'show ipv6 route bgp' : bgp_route_v6_output, + } + + attempts = 60 + for _ in range(attempts): + log_output = self.do_cmd("show log | begin %s" % log_first_line) + log_lines = log_output.split("\r\n")[1:-1] + log_data = self.parse_logs(log_lines) + if len(log_data) != 0: + break + time.sleep(1) # wait until logs are populated + + if len(log_data) == 0: + log_data['error'] = 'Incomplete output' + + self.disconnect() + + # save data for troubleshooting + with open("/tmp/%s.data.pickle" % self.ip, "w") as fp: + pickle.dump(data, fp) + + # save debug data for troubleshooting + if self.DEBUG: + with open("/tmp/%s.raw.pickle" % self.ip, "w") as fp: + pickle.dump(debug_data, fp) + with open("/tmp/%s.logging" % self.ip, "w") as fp: + fp.write("\n".join(log_lines)) + + self.check_gr_peer_status(data) + cli_data = {} + cli_data['lacp'] = self.check_series_status(data, "lacp", "LACP session") + cli_data['bgp_v4'] = self.check_series_status(data, "bgp_route_v4", "BGP v4 routes") + cli_data['bgp_v6'] = self.check_series_status(data, "bgp_route_v6", "BGP v6 routes") + cli_data['po'] = self.check_change_time(samples, "po_changetime", "PortChannel interface") + + route_timeout = log_data['route_timeout'] + cli_data['route_timeout'] = route_timeout + + # {'10.0.0.38': [(0, '4200065100)')], 'fc00::2d': [(0, '4200065100)')]} + for nei in route_timeout.keys(): + asn = route_timeout[nei][0][-1] + msg = 'BGP route GR timeout: neighbor %s (ASN %s' % (nei, asn) + self.fails.add(msg) + + return self.fails, self.info, cli_data, log_data + + def extract_from_logs(self, regexp, data): + raw_data = [] + result = defaultdict(list) + initial_time = -1 + re_compiled = re.compile(regexp) + for line in data: + m = re_compiled.match(line) + if not m: + continue + raw_data.append((datetime.datetime.strptime(m.group(1), "%b %d %X"), m.group(2), m.group(3))) + + if len(raw_data) > 0: + initial_time = raw_data[0][0] + for when, what, status in raw_data: + offset = (when - initial_time if when > initial_time else initial_time - when).seconds + result[what].append((offset, status)) + + return result, initial_time + + def parse_logs(self, data): + result = {} + bgp_r = r'^(\S+\s+\d+\s+\S+) \S+ Rib: %BGP-5-ADJCHANGE: peer (\S+) .+ (\S+)$' + result_bgp, initial_time_bgp = self.extract_from_logs(bgp_r, data) + if_r = r'^(\S+\s+\d+\s+\S+) \S+ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface (\S+), changed state to (\S+)$' + result_if, initial_time_if = self.extract_from_logs(if_r, data) + + route_r = r'^(\S+\s+\d+\s+\S+) \S+ Rib: %BGP-5-BGP_GRACEFUL_RESTART_TIMEOUT: Deleting stale routes from peer (\S+) .+ (\S+)$' + result_rt, initial_time_rt = self.extract_from_logs(route_r, data) + + result['route_timeout'] = result_rt + + if initial_time_bgp == -1 or initial_time_if == -1: + return result + + for events in result_bgp.values(): + if events[-1][1] != 'Established': + return result + + # first state is Idle, last state is Established + for events in result_bgp.values(): + if len(events) > 1: + assert(events[0][1] != 'Established') + + assert(events[-1][1] == 'Established') + + # first state is down, last state is up + for events in result_if.values(): + assert(events[0][1] == 'down') + assert(events[-1][1] == 'up') + + po_name = [ifname for ifname in result_if.keys() if 'Port-Channel' in ifname][0] + neigh_ipv4 = [neig_ip for neig_ip in result_bgp.keys() if '.' in neig_ip][0] + + result['PortChannel was down (seconds)'] = result_if[po_name][-1][0] - result_if[po_name][0][0] + for if_name in sorted(result_if.keys()): + result['Interface %s was down (times)' % if_name] = map(itemgetter(1), result_if[if_name]).count("down") + + for neig_ip in result_bgp.keys(): + key = "BGP IPv6 was down (seconds)" if ':' in neig_ip else "BGP IPv4 was down (seconds)" + result[key] = result_bgp[neig_ip][-1][0] - result_bgp[neig_ip][0][0] + + for neig_ip in result_bgp.keys(): + key = "BGP IPv6 was down (times)" if ':' in neig_ip else "BGP IPv4 was down (times)" + result[key] = map(itemgetter(1), result_bgp[neig_ip]).count("Idle") + + bgp_po_offset = (initial_time_if - initial_time_bgp if initial_time_if > initial_time_bgp else initial_time_bgp - initial_time_if).seconds + result['PortChannel went down after bgp session was down (seconds)'] = bgp_po_offset + result_if[po_name][0][0] + + for neig_ip in result_bgp.keys(): + key = "BGP IPv6 was gotten up after Po was up (seconds)" if ':' in neig_ip else "BGP IPv4 was gotten up after Po was up (seconds)" + result[key] = result_bgp[neig_ip][-1][0] - bgp_po_offset - result_if[po_name][-1][0] + + return result + + def parse_lacp(self, output): + return output.find('Bundled') != -1 + + def parse_bgp_neighbor_once(self, output): + is_gr_ipv4_enabled = False + is_gr_ipv6_enabled = False + restart_time = None + for line in output.split('\n'): + if ' Restart-time is' in line: + restart_time = int(line.replace(' Restart-time is ', '')) + continue + + if 'is enabled, Forwarding State is' in line: + if 'IPv6' in line: + is_gr_ipv6_enabled = True + elif 'IPv4' in line: + is_gr_ipv4_enabled = True + + return is_gr_ipv4_enabled, is_gr_ipv6_enabled, restart_time + + def parse_bgp_neighbor(self, output): + gr_active = None + gr_timer = None + for line in output.split('\n'): + if 'Restart timer is' in line: + gr_active = 'is active' in line + gr_timer = str(line[-9:-1]) + + return gr_active, gr_timer + + def parse_bgp_route(self, output, expects): + prefixes = set() + data = "\n".join(output.split("\r\n")[1:-1]) + obj = json.loads(data) + + if "vrfs" in obj and "default" in obj["vrfs"]: + obj = obj["vrfs"]["default"] + for prefix, attrs in obj["routes"].items(): + if "routeAction" not in attrs or attrs["routeAction"] != "forward": + continue + if all("Port-Channel" in via["interface"] for via in attrs["vias"]): + prefixes.add(prefix) + + return set(expects) == prefixes + + def check_gr_peer_status(self, output): + # [0] True 'ipv4_gr_enabled', [1] doesn't matter 'ipv6_enabled', [2] should be >= 120 + if not self.ipv4_gr_enabled: + self.fails.add("bgp ipv4 graceful restart is not enabled") + if not self.ipv6_gr_enabled: + pass # ToDo: + if self.gr_timeout < 120: # bgp graceful restart timeout less then 120 seconds + self.fails.add("bgp graceful restart timeout is less then 120 seconds") + + for when, other in sorted(output.items(), key = lambda x : x[0]): + gr_active, timer = other['bgp_neig'] + # wnen it's False, it's ok, wnen it's True, check that inactivity timer not less then self.min_bgp_gr_timeout seconds + if gr_active and datetime.datetime.strptime(timer, '%H:%M:%S') < datetime.datetime(1900, 1, 1, second = self.min_bgp_gr_timeout): + self.fails.add("graceful restart timer is almost finished. Less then %d seconds left" % self.min_bgp_gr_timeout) + + def check_series_status(self, output, entity, what): + # find how long anything was down + # Input parameter is a dictionary when:status + # constraints: + # entity must be down just once + # entity must be up when the test starts + # entity must be up when the test stops + + sorted_keys = sorted(output.keys()) + if not output[sorted_keys[0]][entity]: + self.fails.add("%s must be up when the test starts" % what) + return 0, 0 + if not output[sorted_keys[-1]][entity]: + self.fails.add("%s must be up when the test stops" % what) + return 0, 0 + + start = sorted_keys[0] + cur_state = True + res = defaultdict(list) + for when in sorted_keys[1:]: + if cur_state != output[when][entity]: + res[cur_state].append(when - start) + start = when + cur_state = output[when][entity] + res[cur_state].append(when - start) + + is_down_count = len(res[False]) + + if is_down_count > 1: + self.info.add("%s must be down just for once" % what) + + return is_down_count, sum(res[False]) # summary_downtime + + def check_change_time(self, output, entity, what): + # find last changing time updated, if no update, the entity is never changed + # Input parameter is a dictionary when:last_changing_time + # constraints: + # the dictionary `output` cannot be empty + sorted_keys = sorted(output.keys()) + if not output: + self.fails.add("%s cannot be empty" % what) + return 0, 0 + + start = sorted_keys[0] + prev_time = output[start] + change_count = 0 + for when in sorted_keys[1:]: + if prev_time != output[when][entity]: + prev_time = output[when][entity] + change_count += 1 + + if change_count > 0: + self.info.add("%s state changed %d times" % (what, change_count)) + + # Note: the first item is a placeholder + return 0, change_count +