diff --git a/tests/common/fixtures/__init__.py b/tests/common/fixtures/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/common/fixtures/conn_graph_facts.py b/tests/common/fixtures/conn_graph_facts.py new file mode 100644 index 00000000000..7ab84770b2a --- /dev/null +++ b/tests/common/fixtures/conn_graph_facts.py @@ -0,0 +1,12 @@ +import pytest +import os + +@pytest.fixture(scope="module") +def conn_graph_facts(testbed_devices): + dut = testbed_devices["dut"] + localhost = testbed_devices["localhost"] + + base_path = os.path.dirname(os.path.realpath(__file__)) + conn_graph_file = os.path.join(base_path, "../../../ansible/files/lab_connection_graph.xml") + conn_graph_facts = localhost.conn_graph_facts(host=dut.hostname, filename=conn_graph_file)['ansible_facts'] + return conn_graph_facts diff --git a/tests/common/platform/__init__.py b/tests/common/platform/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/common/platform/interface_utils.py b/tests/common/platform/interface_utils.py new file mode 100644 index 00000000000..bedefeb9a13 --- /dev/null +++ b/tests/common/platform/interface_utils.py @@ -0,0 +1,85 @@ +""" +Helper script for checking status of interfaces + +This script contains re-usable functions for checking status of interfaces on SONiC. +""" +import logging +from transceiver_utils import all_transceivers_detected + + +def parse_intf_status(lines): + """ + @summary: Parse the output of command "intfutil description". + @param lines: The output lines of command "intfutil description". + @return: Return a dictionary like: + { + "Ethernet0": { + "oper": "up", + "admin": "up", + "alias": "etp1", + "desc": "ARISTA01T2:Ethernet1" + }, + ... + } + """ + result = {} + for line in lines: + fields = line.split() + if len(fields) >= 5: + intf = fields[0] + oper, admin, alias, desc = fields[1], fields[2], fields[3], ' '.join(fields[4:]) + result[intf] = {"oper": oper, "admin": admin, "alias": alias, "desc": desc} + return result + + +def check_interface_status(dut, interfaces): + """ + @summary: Check the admin and oper status of the specified interfaces on DUT. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + logging.info("Check interface status using cmd 'intfutil'") + mg_ports = dut.minigraph_facts(host=dut.hostname)["ansible_facts"]["minigraph_ports"] + output = dut.command("intfutil description") + intf_status = parse_intf_status(output["stdout_lines"][2:]) + check_intf_presence_command = 'show interface transceiver presence {}' + for intf in interfaces: + expected_oper = "up" if intf in mg_ports else "down" + expected_admin = "up" if intf in mg_ports else "down" + if intf not in intf_status: + logging.info("Missing status for interface %s" % intf) + return False + if intf_status[intf]["oper"] != expected_oper: + logging.info("Oper status of interface %s is %s, expected '%s'" % (intf, intf_status[intf]["oper"], + expected_oper)) + return False + if intf_status[intf]["admin"] != expected_admin: + logging.info("Admin status of interface %s is %s, expected '%s'" % (intf, intf_status[intf]["admin"], + expected_admin)) + return False + + # Cross check the interface SFP presence status + check_presence_output = dut.command(check_intf_presence_command.format(intf)) + presence_list = check_presence_output["stdout_lines"][2].split() + assert intf in presence_list, "Wrong interface name in the output: %s" % str(presence_list) + assert 'Present' in presence_list, "Status is not expected, presence status: %s" % str(presence_list) + + logging.info("Check interface status using the interface_facts module") + intf_facts = dut.interface_facts(up_ports=mg_ports)["ansible_facts"] + down_ports = intf_facts["ansible_interface_link_down_ports"] + if len(down_ports) != 0: + logging.info("Some interfaces are down: %s" % str(down_ports)) + return False + + return True + + +def check_interface_information(dut, interfaces): + if not all_transceivers_detected(dut, interfaces): + logging.info("Not all transceivers are detected") + return False + if not check_interface_status(dut, interfaces): + logging.info("Not all interfaces are up") + return False + + return True diff --git a/tests/common/platform/transceiver_utils.py b/tests/common/platform/transceiver_utils.py new file mode 100644 index 00000000000..3ea369a2061 --- /dev/null +++ b/tests/common/platform/transceiver_utils.py @@ -0,0 +1,119 @@ +""" +Helper script for checking status of transceivers + +This script contains re-usable functions for checking status of transceivers. +""" +import logging +import re +import json + + +def parse_transceiver_info(output_lines): + """ + @summary: Parse the list of transceiver from DB table TRANSCEIVER_INFO content + @param output_lines: DB table TRANSCEIVER_INFO content output by 'redis' command + @return: Return parsed transceivers in a list + """ + result = [] + p = re.compile(r"TRANSCEIVER_INFO\|(Ethernet\d+)") + for line in output_lines: + m = p.match(line) + assert m, "Unexpected line %s" % line + result.append(m.group(1)) + return result + + +def parse_transceiver_dom_sensor(output_lines): + """ + @summary: Parse the list of transceiver from DB table TRANSCEIVER_DOM_SENSOR content + @param output_lines: DB table TRANSCEIVER_DOM_SENSOR content output by 'redis' command + @return: Return parsed transceivers in a list + """ + result = [] + p = re.compile(r"TRANSCEIVER_DOM_SENSOR\|(Ethernet\d+)") + for line in output_lines: + m = p.match(line) + assert m, "Unexpected line %s" % line + result.append(m.group(1)) + return result + + +def all_transceivers_detected(dut, interfaces): + """ + Check if transceiver information of all the specified interfaces have been detected. + """ + db_output = dut.command("redis-cli --raw -n 6 keys TRANSCEIVER_INFO\*")["stdout_lines"] + not_detected_interfaces = [intf for intf in interfaces if "TRANSCEIVER_INFO|%s" % intf not in db_output] + if len(not_detected_interfaces) > 0: + logging.info("Interfaces not detected: %s" % str(not_detected_interfaces)) + return False + return True + + +def check_transceiver_basic(dut, interfaces): + """ + @summary: Check whether all the specified interface are in TRANSCEIVER_INFO redis DB. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + logging.info("Check whether transceiver information of all ports are in redis") + xcvr_info = dut.command("redis-cli -n 6 keys TRANSCEIVER_INFO*") + parsed_xcvr_info = parse_transceiver_info(xcvr_info["stdout_lines"]) + for intf in interfaces: + assert intf in parsed_xcvr_info, "TRANSCEIVER INFO of %s is not found in DB" % intf + + +def check_transceiver_details(dut, interfaces): + """ + @summary: Check the detailed TRANSCEIVER_INFO content of all the specified interfaces. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + logging.info("Check detailed transceiver information of each connected port") + expected_fields = ["type", "hardwarerev", "serialnum", "manufacturename", "modelname"] + for intf in interfaces: + port_xcvr_info = dut.command('redis-cli -n 6 hgetall "TRANSCEIVER_INFO|%s"' % intf) + for field in expected_fields: + assert port_xcvr_info["stdout"].find(field) >= 0, \ + "Expected field %s is not found in %s while checking %s" % (field, port_xcvr_info["stdout"], intf) + + +def check_transceiver_dom_sensor_basic(dut, interfaces): + """ + @summary: Check whether all the specified interface are in TRANSCEIVER_DOM_SENSOR redis DB. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + logging.info("Check whether TRANSCEIVER_DOM_SENSOR of all ports in redis") + xcvr_dom_sensor = dut.command("redis-cli -n 6 keys TRANSCEIVER_DOM_SENSOR*") + parsed_xcvr_dom_sensor = parse_transceiver_dom_sensor(xcvr_dom_sensor["stdout_lines"]) + for intf in interfaces: + assert intf in parsed_xcvr_dom_sensor, "TRANSCEIVER_DOM_SENSOR of %s is not found in DB" % intf + + +def check_transceiver_dom_sensor_details(dut, interfaces): + """ + @summary: Check the detailed TRANSCEIVER_DOM_SENSOR content of all the specified interfaces. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + logging.info("Check detailed TRANSCEIVER_DOM_SENSOR information of each connected ports") + expected_fields = ["temperature", "voltage", "rx1power", "rx2power", "rx3power", "rx4power", "tx1bias", + "tx2bias", "tx3bias", "tx4bias", "tx1power", "tx2power", "tx3power", "tx4power"] + for intf in interfaces: + port_xcvr_dom_sensor = dut.command('redis-cli -n 6 hgetall "TRANSCEIVER_DOM_SENSOR|%s"' % intf) + for field in expected_fields: + assert port_xcvr_dom_sensor["stdout"].find(field) >= 0, \ + "Expected field %s is not found in %s while checking %s" % (field, port_xcvr_dom_sensor["stdout"], intf) + + +def check_transceiver_status(dut, interfaces): + """ + @summary: Check transceiver information of all the specified interfaces in redis DB. + @param dut: The AnsibleHost object of DUT. For interacting with DUT. + @param interfaces: List of interfaces that need to be checked. + """ + check_transceiver_basic(dut, interfaces) + check_transceiver_details(dut, interfaces) + check_transceiver_dom_sensor_basic(dut, interfaces) + check_transceiver_dom_sensor_details(dut, interfaces)