Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions tests/platform_tests/api/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,51 @@ def is_xcvr_support_power_override(self, xcvr_info_dict):
is_valid_xcvr_type = "QSFP" in xcvr_type and xcvr_type != "QSFP-DD"
return self.is_xcvr_optical(xcvr_info_dict) and is_valid_xcvr_type

def get_interfaces_to_flap_after_sfp_reset(self, port_index_to_info_dict, duthost):
interfaces_to_flap = []
admin_up_port_list = set(duthost.get_admin_up_ports())
for intf in self.sfp_setup['conn_interfaces']:
logger.info("Processing interface {} for flap after SFP reset".format(intf))
if intf not in admin_up_port_list:
# skip interfaces which are not in admin up state.
logger.info("Skipping interface {} as it is not in admin up state".format(intf))
continue

# skip if info_dict is not retrieved during reset, which also means reset was not performed.
sfp_port_idx = self.sfp_setup['physical_port_index_map'][intf]
if sfp_port_idx not in port_index_to_info_dict:
logger.info(
"Skipping interface {} as SFP reset was not performed on port index {}".format(intf, sfp_port_idx)
)
continue

info_dict = port_index_to_info_dict[sfp_port_idx]
if self.is_xcvr_support_lpmode(info_dict):
logger.info("Flapping interface {} - xcvr supports lpmode and needs to be flapped".format(intf))
interfaces_to_flap.append(intf)
return interfaces_to_flap

def _shutdown_and_no_shutdown_ports(self, duthost, intf_list):
intf_to_flap_joined = ",".join(intf_list)
try:
if duthost.is_multi_asic:
for intf in intf_list:
duthost.shutdown_interface(intf)
else:
duthost.shutdown_interface(intf_to_flap_joined)
except Exception as e:
logger.error("Failed to shutdown interfaces: {}".format(e))

shutdown_wait_scale_factor = max(1, len(intf_list)*0.01)
time.sleep(WAIT_TIME_AFTER_INTF_SHUTDOWN*shutdown_wait_scale_factor)
try:
if duthost.is_multi_asic:
for intf in intf_list:
duthost.no_shutdown_interface(intf)
else:
duthost.no_shutdown_interface(intf_to_flap_joined)
except Exception as e:
logger.error("Failed to startup interfaces: {}".format(e))
#
# Functions to test methods inherited from DeviceBase class
#
Expand Down Expand Up @@ -714,37 +759,16 @@ def test_reset(self, request, duthosts, enum_rand_one_per_hwsku_hostname, localh

# allow the I2C interface to recover post sfp reset
time.sleep(I2C_WAIT_TIME_AFTER_SFP_RESET)

# shutdown and bring up in batch so that we don't have to add delay for each interface.
intfs_changed = []
admin_up_port_list = duthost.get_admin_up_ports()
for intf in self.sfp_setup['conn_interfaces']:
if intf not in admin_up_port_list:
# skip interfaces which are not in admin up state.
continue

sfp_port_idx = self.sfp_setup['physical_port_index_map'][intf]
# skip if info_dict is not retrieved during reset, which also means reset was not performed.
if sfp_port_idx not in port_index_to_info_dict:
continue
info_dict = port_index_to_info_dict[sfp_port_idx]

# only flap interfaces where are CMIS optics,
# non-CMIS optics should stay up after sfp_reset(), no need to flap.
if "cmis_rev" in info_dict:
duthost.shutdown_interface(intf)
intfs_changed.append(intf)

time.sleep(WAIT_TIME_AFTER_INTF_SHUTDOWN)

for intf in intfs_changed:
duthost.no_shutdown_interface(intf)

_, port_up_wait_time = default_port_toggle_wait_time(duthost, len(intfs_changed))
if not wait_until(port_up_wait_time, 10, 0,
check_interface_status_of_up_ports, duthost):
self.expect(False, "Not all interfaces are up after reset")

intf_list = self.get_interfaces_to_flap_after_sfp_reset(port_index_to_info_dict, duthost)
if intf_list:
logger.info("Flapping interfaces: {}".format(intf_list))
self._shutdown_and_no_shutdown_ports(duthost, intf_list)
_, port_up_wait_time = default_port_toggle_wait_time(duthost, len(intf_list))
if not wait_until(port_up_wait_time, 10, 0,
check_interface_status_of_up_ports, duthost):
self.expect(False, "Not all interfaces are up after reset")
else:
logger.info("No interfaces to flap after SFP reset")
self.assert_expectations()

def test_tx_disable(self, duthosts, enum_rand_one_per_hwsku_hostname, localhost, platform_api_conn): # noqa F811
Expand Down Expand Up @@ -943,7 +967,7 @@ def test_get_error_description(self, duthosts, enum_rand_one_per_hwsku_hostname,
if self.expect(isinstance(error_description, str) or isinstance(error_description, str),
"Transceiver {} error description appears incorrect".format(i)):
self.expect(error_description == expected_state,
f"Transceiver {i} is not {expected_state}, actual state is:{error_description}.")
f"Transceiver {i} is not {expected_state}, actual state is: {error_description}.")
self.assert_expectations()

def test_thermals(self, platform_api_conn): # noqa F811
Expand Down
Loading