Skip to content
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ def generate_params_supervisor_hostname(request):
# Expecting only a single supervisor node
if is_supervisor_node(inv_files, dut):
return [dut]
pytest.fail("Test selected require a supervisor node, " +
"none of the DUTs '{}' in testbed '{}' are a supervisor node".format(duts, tbname))
# If there are no supervisor cards in a multi-dut tesbed, we are dealing with all pizza box in the testbed, pick the first DUT
return [duts[0]]

def generate_param_asic_index(request, dut_indices, param_type):
_, tbinfo = get_tbinfo(request)
Expand Down
207 changes: 149 additions & 58 deletions tests/platform_tests/cli/test_show_platform.py

Large diffs are not rendered by default.

Empty file.
26 changes: 26 additions & 0 deletions tests/platform_tests/sfp/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
import logging
import os
from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer

ans_host = None


def teardown_module():
logging.info("remove script to retrieve port mapping")
file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.file(path=file_path, state='absent')


@pytest.fixture(autouse=True)
def disable_analyzer_for_mellanox(duthost):
if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='sfp_cfg')
loganalyzer.load_common_config()

loganalyzer.ignore_regex.append("kernel.*Eeprom query failed*")
marker = loganalyzer.init()
yield

if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer.analyze(marker)
56 changes: 56 additions & 0 deletions tests/platform_tests/sfp/test_sfpshow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Check SFP status using sfpshow.

This script covers test case 'Check SFP status and configure SFP' in the SONiC platform test plan:
https://github.com/Azure/SONiC/blob/master/doc/pmon/sonic_platform_test_plan.md
"""

import logging
import pytest

from util import parse_eeprom
from util import parse_output
from util import get_dev_conn

cmd_sfp_presence = "sudo sfpshow presence"
cmd_sfp_eeprom = "sudo sfpshow eeprom"


pytestmark = [
pytest.mark.disable_loganalyzer, # disable automatic loganalyzer
pytest.mark.topology('any')
]


def test_check_sfp_presence(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_presence)
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
sfp_presence = duthost.command(cmd_sfp_presence)
parsed_presence = parse_output(sfp_presence["stdout_lines"][2:])
for intf in dev_conn:
assert intf in parsed_presence, "Interface is not in output of '%s'" % cmd_sfp_presence
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
assert parsed_presence[intf] == "Present", "Interface presence is not 'Present'"


def test_check_sfpshow_eeprom(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_eeprom)
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
sfp_eeprom = duthost.command(cmd_sfp_eeprom)
parsed_eeprom = parse_eeprom(sfp_eeprom["stdout_lines"])
for intf in dev_conn:
assert intf in parsed_eeprom, "Interface is not in output of 'sfputil show eeprom'"
assert parsed_eeprom[intf] == "SFP EEPROM detected"
Original file line number Diff line number Diff line change
@@ -1,100 +1,40 @@
"""
Check SFP status and configure SFP
Check SFP status and configure SFP using sfputil.

This script covers test case 'Check SFP status and configure SFP' in the SONiC platform test plan:
https://github.com/Azure/SONiC/blob/master/doc/pmon/sonic_platform_test_plan.md
"""

import logging
import re
import os
import time
import copy

import pytest

from tests.common.fixtures.conn_graph_facts import conn_graph_facts
from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer
from tests.common.platform.interface_utils import get_port_map

ans_host = None
from util import parse_eeprom
from util import parse_output
from util import get_dev_conn

def teardown_module():
logging.info("remove script to retrieve port mapping")
file_path = os.path.join('/usr/share/sonic/device', ans_host.facts['platform'], 'plugins/getportmap.py')
ans_host.file(path=file_path, state='absent')
cmd_sfp_presence = "sudo sfputil show presence"
cmd_sfp_eeprom = "sudo sfputil show eeprom"
cmd_sfp_reset = "sudo sfputil reset"
cmd_sfp_show_lpmode = "sudo sfputil show lpmode"
cmd_sfp_set_lpmode = "sudo sfputil lpmode"

pytestmark = [
pytest.mark.disable_loganalyzer, # disable automatic loganalyzer
pytest.mark.topology('any')
]

def parse_output(output_lines):
"""
@summary: For parsing command output. The output lines should have format 'key value'.
@param output_lines: Command output lines
@return: Returns result in a dictionary
"""
res = {}
for line in output_lines:
fields = line.split()
if len(fields) != 2:
continue
res[fields[0]] = fields[1]
return res


def parse_eeprom(output_lines):
def test_check_sfputil_presence(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Parse the SFP eeprom information from command output
@param output_lines: Command output lines
@return: Returns result in a dictionary
@summary: Check SFP presence using 'sfputil show presence'
"""
res = {}
for line in output_lines:
if re.match(r"^Ethernet\d+: .*", line):
fields = line.split(":")
res[fields[0]] = fields[1].strip()
return res

def test_check_sfp_status_and_configure_sfp(duthosts, rand_one_dut_hostname, enum_frontend_asic_index, conn_graph_facts, tbinfo):
"""
@summary: Check SFP status and configure SFP

This case is to use the sfputil tool and show command to check SFP status and configure SFP. Currently the
only configuration is to reset SFP. Commands to be tested:
* sfputil show presence
* show interface transceiver presence
* sfputil show eeprom
* show interface transceiver eeprom
* sfputil reset <interface name>
"""
duthost = duthosts[rand_one_dut_hostname]
if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='sfp_cfg')
loganalyzer.load_common_config()

loganalyzer.ignore_regex.append("kernel.*Eeprom query failed*")
marker = loganalyzer.init()

dev_conn = conn_graph_facts["device_conn"][duthost.hostname]

# Get the interface pertaining to that asic
portmap = get_port_map(duthost, enum_frontend_asic_index)
logging.info("Got portmap {}".format(portmap))

if enum_frontend_asic_index is not None:
# Check if the interfaces of this AISC is present in conn_graph_facts
dev_conn = {k:v for k, v in portmap.items() if k in conn_graph_facts["device_conn"][duthost.hostname]}
logging.info("ASIC {} interface_list {}".format(enum_frontend_asic_index, dev_conn))

cmd_sfp_presence = "sudo sfputil show presence"
cmd_sfp_eeprom = "sudo sfputil show eeprom"
cmd_sfp_reset = "sudo sfputil reset"
cmd_xcvr_presence = "show interface transceiver presence"
cmd_xcvr_eeprom = "show interface transceiver eeprom"

duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_presence)
sfp_presence = duthost.command(cmd_sfp_presence)
Expand All @@ -103,12 +43,14 @@ def test_check_sfp_status_and_configure_sfp(duthosts, rand_one_dut_hostname, enu
assert intf in parsed_presence, "Interface is not in output of '%s'" % cmd_sfp_presence
assert parsed_presence[intf] == "Present", "Interface presence is not 'Present'"

logging.info("Check output of '%s'" % cmd_xcvr_presence)
xcvr_presence = duthost.command(cmd_xcvr_presence)
parsed_presence = parse_output(xcvr_presence["stdout_lines"][2:])
for intf in dev_conn:
assert intf in parsed_presence, "Interface is not in output of '%s'" % cmd_xcvr_presence
assert parsed_presence[intf] == "Present", "Interface presence is not 'Present'"
def test_check_sfputil_eeprom(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_eeprom)
sfp_eeprom = duthost.command(cmd_sfp_eeprom)
Expand All @@ -117,14 +59,15 @@ def test_check_sfp_status_and_configure_sfp(duthosts, rand_one_dut_hostname, enu
assert intf in parsed_eeprom, "Interface is not in output of 'sfputil show eeprom'"
assert parsed_eeprom[intf] == "SFP EEPROM detected"

logging.info("Check output of '%s'" % cmd_xcvr_eeprom)
xcvr_eeprom = duthost.command(cmd_xcvr_eeprom)
parsed_eeprom = parse_eeprom(xcvr_eeprom["stdout_lines"])
for intf in dev_conn:
assert intf in parsed_eeprom, "Interface is not in output of '%s'" % cmd_xcvr_eeprom
assert parsed_eeprom[intf] == "SFP EEPROM detected"

logging.info("Test '%s <interface name>'" % cmd_sfp_reset)
def test_check_sfputil_reset(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts, tbinfo):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)
tested_physical_ports = set()
for intf in dev_conn:
phy_intf = portmap[intf][0]
Expand All @@ -147,22 +90,13 @@ def test_check_sfp_status_and_configure_sfp(duthosts, rand_one_dut_hostname, enu
assert parsed_presence[intf] == "Present", "Interface presence is not 'Present'"

logging.info("Check interface status")
namespace = duthost.get_namespace_from_asic_id(enum_frontend_asic_index)
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
# TODO Remove this logic when minigraph facts supports namespace in multi_asic
up_ports = mg_facts["minigraph_ports"]
if enum_frontend_asic_index is not None:
# Check if the interfaces of this AISC is present in conn_graph_facts
up_ports = {k:v for k, v in portmap.items() if k in mg_facts["minigraph_ports"]}
intf_facts = duthost.interface_facts(namespace=namespace, up_ports=up_ports)["ansible_facts"]
intf_facts = duthost.interface_facts(up_ports=mg_facts["minigraph_ports"])["ansible_facts"]
assert len(intf_facts["ansible_interface_link_down_ports"]) == 0, \
"Some interfaces are down: %s" % str(intf_facts["ansible_interface_link_down_ports"])

if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer.analyze(marker)


def test_check_sfp_low_power_mode(duthosts, rand_one_dut_hostname, enum_frontend_asic_index, conn_graph_facts, tbinfo):
def test_check_sfputil_low_power_mode(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts, tbinfo):
"""
@summary: Check SFP low power mode

Expand All @@ -171,30 +105,11 @@ def test_check_sfp_low_power_mode(duthosts, rand_one_dut_hostname, enum_frontend
* sfputil lpmode off
* sfputil lpmode on
"""
duthost = duthosts[rand_one_dut_hostname]
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
asichost = duthost.get_asic(enum_frontend_asic_index)
if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='sfp_lpm')
loganalyzer.load_common_config()

loganalyzer.ignore_regex.append("Eeprom query failed")
marker = loganalyzer.init()

dev_conn = conn_graph_facts["device_conn"][duthost.hostname]

# Get the interface pertaining to that asic
portmap = get_port_map(duthost, enum_frontend_asic_index)
logging.info("Got portmap {}".format(portmap))

if enum_frontend_asic_index is not None:
# Check if the interfaces of this AISC is present in conn_graph_facts
dev_conn = {k:v for k, v in portmap.items() if k in conn_graph_facts["device_conn"][duthost.hostname]}
logging.info("ASIC {} interface_list {}".format(enum_frontend_asic_index, dev_conn))

cmd_sfp_presence = "sudo sfputil show presence"
cmd_sfp_show_lpmode = "sudo sfputil show lpmode"
cmd_sfp_set_lpmode = "sudo sfputil lpmode"

portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)
global ans_host
ans_host = duthost

Expand Down Expand Up @@ -287,6 +202,3 @@ def test_check_sfp_low_power_mode(duthosts, rand_one_dut_hostname, enum_frontend
intf_facts = duthost.interface_facts(namespace=namespace, up_ports=up_ports)["ansible_facts"]
assert len(intf_facts["ansible_interface_link_down_ports"]) == 0, \
"Some interfaces are down: %s" % str(intf_facts["ansible_interface_link_down_ports"])

if duthost.facts["asic_type"] in ["mellanox"]:
loganalyzer.analyze(marker)
55 changes: 55 additions & 0 deletions tests/platform_tests/sfp/test_show_intf_xcvr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Check SFP status using 'show interface transciever'.

This script covers test case 'Check SFP status and configure SFP' in the SONiC platform test plan:
https://github.com/Azure/SONiC/blob/master/doc/pmon/sonic_platform_test_plan.md
"""

import logging
import pytest

from util import parse_eeprom
from util import parse_output
from util import get_dev_conn

cmd_sfp_presence = "show interface transceiver presence"
cmd_sfp_eeprom = "show interface transceiver eeprom"

pytestmark = [
pytest.mark.disable_loganalyzer, # disable automatic loganalyzer
pytest.mark.topology('any')
]


def test_check_sfp_presence(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_presence)
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
sfp_presence = duthost.command(cmd_sfp_presence)
parsed_presence = parse_output(sfp_presence["stdout_lines"][2:])
for intf in dev_conn:
assert intf in parsed_presence, "Interface is not in output of '%s'" % cmd_sfp_presence
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
assert parsed_presence[intf] == "Present", "Interface presence is not 'Present'"


def test_check_sfpshow_eeprom(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index, conn_graph_facts):
"""
@summary: Check SFP presence using 'sfputil show presence'
"""
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
global ans_host
ans_host = duthost
portmap, dev_conn = get_dev_conn(duthost, conn_graph_facts, enum_frontend_asic_index)

logging.info("Check output of '%s'" % cmd_sfp_eeprom)
Comment thread
sanmalho-git marked this conversation as resolved.
Outdated
sfp_eeprom = duthost.command(cmd_sfp_eeprom)
parsed_eeprom = parse_eeprom(sfp_eeprom["stdout_lines"])
for intf in dev_conn:
assert intf in parsed_eeprom, "Interface is not in output of 'sfputil show eeprom'"
assert parsed_eeprom[intf] == "SFP EEPROM detected"
Loading