Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/common/devices/sonic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ def get_port_channel_status(self, port_channel_name):
def links_status_down(self, ports):
show_int_result = self.command("show interface status")
for output_line in show_int_result['stdout_lines']:
output_port = output_line.split(' ')[0]
output_port = output_line.strip().split(' ')[0]
# Only care about port that connect to current DUT
if output_port in ports:
# Either oper or admin status 'down' means link down
Expand All @@ -2291,7 +2291,7 @@ def links_status_down(self, ports):
def links_status_up(self, ports):
show_int_result = self.command("show interface status")
for output_line in show_int_result['stdout_lines']:
output_port = output_line.split(' ')[0]
output_port = output_line.strip().split(' ')[0]
# Only care about port that connect to current DUT
if output_port in ports:
# Either oper or admin status 'down' means link down
Expand Down
98 changes: 71 additions & 27 deletions tests/platform_tests/api/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tests.common.utilities import wait_until
from tests.common.fixtures.conn_graph_facts import conn_graph_facts # noqa F401
from tests.common.fixtures.duthost_utils import shutdown_ebgp # noqa F401
from tests.common.mellanox_data import is_mellanox_device

from platform_api_test_base import PlatformApiTestBase

Expand Down Expand Up @@ -55,6 +56,7 @@ def setup(request, duthosts, enum_rand_one_per_hwsku_hostname,
intf in list(physical_port_index_map.keys())
if intf not in xcvr_skip_list[duthost.hostname]])
sfp_setup["sfp_test_port_indices"] = sorted(sfp_port_indices)
sfp_setup["sfp_physical_port_index_map"] = physical_port_index_map

# Fetch SFP names from platform.json
sfp_fact_names = []
Expand Down Expand Up @@ -280,14 +282,12 @@ def is_xcvr_resettable(self, request, xcvr_info_dict):
xcvr_type = xcvr_info_dict.get("type_abbrv_name")
return xcvr_type not in not_resettable_xcvr_type

def lp_mode_assert_delay(self, xcvr_info_dict):
xcvr_type = xcvr_info_dict["type_abbrv_name"]
def lp_mode_assert_delay(self, xcvr_type):
if "QSFP" in xcvr_type and xcvr_type != "QSFP-DD":
return 0.1
return 0

def lp_mode_deassert_delay(self, xcvr_info_dict):
xcvr_type = xcvr_info_dict["type_abbrv_name"]
def lp_mode_deassert_delay(self, xcvr_type):
if "QSFP" in xcvr_type and xcvr_type != "QSFP-DD":
return 0.3
return 0
Expand Down Expand Up @@ -734,43 +734,87 @@ def _check_lpmode_status(self, sfp, platform_api_conn, i, state):

def test_lpmode(self, duthosts, enum_rand_one_per_hwsku_hostname, localhost, platform_api_conn):
"""This function tests both the get_lpmode() and set_lpmode() APIs"""
for i in self.sfp_setup["sfp_test_port_indices"]:
info_dict = sfp.get_transceiver_info(platform_api_conn, i)
# Ensure that the transceiver type supports low-power mode
if not self.expect(info_dict is not None, "Unable to retrieve transceiver {} info".format(i)):
continue
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
support_lpmode_physical_port_index_map, support_lpmode_physical_port_with_admin_up, port_indx_to_xcvr_type_map \
= self._get_support_lpmode_physical_port_index_map(duthost, platform_api_conn)
if not support_lpmode_physical_port_index_map:
pytest.skip("No interface supports lpmode")

if not self.is_xcvr_support_lpmode(info_dict):
logger.warning(
"test_lpmode: Skipping transceiver {} (not applicable for this transceiver type)"
.format(i))
continue
if is_mellanox_device(duthost) and len(support_lpmode_physical_port_with_admin_up) > 0:
# for nvidia devices, need to shutdown the port before setting the port into lp mode
logger.info("Shut down ports:{}".format(support_lpmode_physical_port_with_admin_up))
duthost.shutdown_multiple(support_lpmode_physical_port_with_admin_up)
self.expect(wait_until(60, 1, 0, duthost.links_status_down, support_lpmode_physical_port_with_admin_up),
"Failed to shutdown {}".format(support_lpmode_physical_port_with_admin_up))

lpmode_state_pretest = sfp.get_lpmode(platform_api_conn, i)
for port_index in set(support_lpmode_physical_port_index_map.values()):

lpmode_state_pretest = sfp.get_lpmode(platform_api_conn, port_index)
if lpmode_state_pretest is None:
logger.warning("test_lpmode: Skipping transceiver {} (not supported on this platform)".format(i))
logger.warning(
"test_lpmode: Skipping transceiver {} (not supported on this platform)".format(port_index))
break
# This order makes sure lpmode will get restored to pretest value after test
lpmode_states_to_be_tested = [not lpmode_state_pretest, lpmode_state_pretest]

# Enable and disable low-power mode on each transceiver
for state in lpmode_states_to_be_tested:
ret = sfp.set_lpmode(platform_api_conn, i, state)
ret = sfp.set_lpmode(platform_api_conn, port_index, state)
if ret is None:
logger.warning("test_lpmode: Skipping transceiver {} (not supported on this platform)".format(i))
logger.warning("test_lpmode: Skipping transceiver {} (not supported on this platform)".format(
port_index))
break
if state is True:
delay = self.lp_mode_assert_delay(info_dict)
delay = self.lp_mode_assert_delay(port_indx_to_xcvr_type_map[port_index])
else:
delay = self.lp_mode_deassert_delay(info_dict)
self.expect(ret is True, "Failed to {} low-power mode for transceiver {}"
.format("enable" if state is True else "disable", i))
self.expect(wait_until(5, 1, delay,
self._check_lpmode_status, sfp, platform_api_conn, i, state),
"Transceiver {} expected low-power state {} is not aligned with the real state"
.format(i, "enable" if state is True else "disable"))
delay = self.lp_mode_deassert_delay(port_indx_to_xcvr_type_map[port_index])
self.expect(ret is True, "Failed to {} low-power mode for transceiver {}".format(
"enable" if state is True else "disable", port_index))
self.expect(
wait_until(5, 1, delay, self._check_lpmode_status, sfp, platform_api_conn, port_index, state),
"Transceiver {} expected low-power state {} is not aligned with the real state".format(
port_index, "enable" if state is True else "disable"))

if is_mellanox_device(duthost) and len(support_lpmode_physical_port_with_admin_up) > 0:
logger.info(
"After setting the ports to disabled lpm mode, verify that the ports:{} are still in down state".format(
support_lpmode_physical_port_with_admin_up))
self.expect(wait_until(60, 1, 0, duthost.links_status_down, support_lpmode_physical_port_with_admin_up),
"Disable lpm, ports doesn't keep down {}".format(support_lpmode_physical_port_with_admin_up))
logger.info("Startup ports:{}".format(support_lpmode_physical_port_with_admin_up))
duthost.no_shutdown_multiple(support_lpmode_physical_port_with_admin_up)
self.expect(wait_until(120, 1, 0, duthost.links_status_up, support_lpmode_physical_port_with_admin_up),
"Failed to startup {}".format(support_lpmode_physical_port_with_admin_up))

self.assert_expectations()

def _get_support_lpmode_physical_port_index_map(self, duthost, platform_api_conn):
original_interface_status = duthost.get_interfaces_status()
support_lpmode_physical_port_index_map = {}
support_lpmode_physical_port_with_admin_up = []
port_indx_to_xcvr_type_map = {}
for test_port_index in self.sfp_setup["sfp_test_port_indices"]:
info_dict = sfp.get_transceiver_info(platform_api_conn, test_port_index)
# Ensure that the transceiver type supports low-power mode
if not self.expect(info_dict is not None, "Unable to retrieve transceiver {} info".format(
test_port_index)):
continue

if not self.is_xcvr_support_lpmode(info_dict):
logger.warning(
"test_lpmode: Skipping transceiver {} (not applicable for this transceiver type)".format(
test_port_index))
continue
for port, port_index in self.sfp_setup["sfp_physical_port_index_map"].items():
if port_index == test_port_index:
physical_port = port
support_lpmode_physical_port_index_map[physical_port] = test_port_index
port_indx_to_xcvr_type_map[test_port_index] = info_dict["type_abbrv_name"]
if physical_port in original_interface_status and \
original_interface_status[physical_port]['admin'].lower() == 'up':
support_lpmode_physical_port_with_admin_up.append(physical_port)
return (support_lpmode_physical_port_index_map,
support_lpmode_physical_port_with_admin_up, port_indx_to_xcvr_type_map)

def test_power_override(self, duthosts, enum_rand_one_per_hwsku_hostname, localhost, platform_api_conn):
"""This function tests both the get_power_override() and set_power_override() APIs"""
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
Expand Down
Loading