Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/library/show_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def collect_interface_status(self):
rc, self.out, err = self.module.run_command(command, executable='/bin/bash', use_unsafe_shell=True)
for line in self.out.split("\n"):
line = line.strip()
if regex_int.match(line):
if regex_int.match(line) and interface == regex_int.match(line).group(1):
self.int_status[interface]['name'] = regex_int.match(line).group(1)
self.int_status[interface]['speed'] = regex_int.match(line).group(2)
self.int_status[interface]['alias'] = regex_int.match(line).group(4)
Expand Down
23 changes: 23 additions & 0 deletions tests/common/platform/device_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Helper script for fanout switch operations
"""

def fanout_switch_port_lookup(fanout_switches, dut_port):
"""
look up the fanout switch instance and the fanout switch port
connecting to the dut_port

Args:
fanout_switches (list FanoutHost): list of fanout switch
instances.
dut_port (str): port name on the DUT

Returns:
None, None if fanout switch instance and port is not found
FanoutHost, Portname(str) if found
"""
for _, fanout in fanout_switches.items():
if dut_port in fanout.host_to_fanout_port_map:
return fanout, fanout.host_to_fanout_port_map[dut_port]

return None, None
85 changes: 85 additions & 0 deletions tests/platform/test_link_flap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging

import pytest

from common.platform.device_utils import fanout_switch_port_lookup
from common.utilities import wait_until

class TestLinkFlap:
def __get_dut_if_status(self, dut, ifname=None):
if not ifname:
status = dut.show_interface(command='status')['ansible_facts']['int_status']
else:
status = dut.show_interface(command='status', interfaces=[ifname])['ansible_facts']['int_status']

return status


def __check_if_status(self, dut, dut_port, exp_state):
status = self.__get_dut_if_status(dut, dut_port)[dut_port]
return status['oper_state'] == exp_state


def __toggle_one_link(self, dut, dut_port, fanout, fanout_port):
logging.info("Testing link flap on {}".format(dut_port))

status = self.__get_dut_if_status(dut, dut_port)[dut_port]
logging.debug("Dut port status {}".format(status))
assert status['oper_state'] == 'up', "Skipping dut port {}: link operational down".format(status)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could reduced using the above helper function.

assert self.__check_if_status(dut, dut_port, 'up'), '...'

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True.

Down side is that moving the debug statement into the helper function would cause it come out every second instead of once after waiting is done.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a good balance. Changed.


logging.info("Shutting down fanout switch {} port {} connecting to {}".format(fanout.hostname, fanout_port, dut_port))
self.ports_shutdown_by_test.add((fanout, fanout_port))
fanout.shutdown(fanout_port)
wait_until(30, 1, self.__check_if_status, dut, dut_port, 'down')
status = self.__get_dut_if_status(dut, dut_port)[dut_port]
logging.debug("Interface fact : {}".format(status))
assert status['oper_state'] == 'down', "dut port {} didn't go down as expected".format(dut_port)

logging.info("Bring up fanout switch {} port {} connecting to {}".format(fanout.hostname, fanout_port, dut_port))
fanout.no_shutdown(fanout_port)
wait_until(30, 1, self.__check_if_status, dut, dut_port, 'up')
status = self.__get_dut_if_status(dut, dut_port)[dut_port]
logging.debug("Interface fact : {}".format(status))
assert status['oper_state'] == 'up', "dut port {} didn't come up as expected".format(dut_port)
self.ports_shutdown_by_test.discard((fanout, fanout_port))


def __build_test_candidates(self, dut, fanouthosts):
status = self.__get_dut_if_status(dut)
candidates = []

for dut_port in status.keys():
fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, dut_port)

if not fanout or not fanout_port:
logging.info("Skipping port {} that is not found in connection graph".format(dut_port))
elif status[dut_port]['admin_state'] == 'down':
logging.info("Skipping port {} that is admin down".format(dut_port))
else:
candidates.append((dut_port, fanout, fanout_port))

return candidates


def run_link_flap_test(self, dut, fanouthosts):
self.ports_shutdown_by_test = set()

candidates = self.__build_test_candidates(dut, fanouthosts)
if not candidates:
pytest.skip("Didn't find any port that is admin up and present in the connection graph")

try:
for dut_port, fanout, fanout_port in candidates:
self.__toggle_one_link(dut, dut_port, fanout, fanout_port)
finally:
logging.info("Restoring fanout switch ports that were shut down by test")
for fanout, fanout_port in self.ports_shutdown_by_test:
logging.debug("Restoring fanout switch {} port {} shut down by test".format(fanout.hostname, fanout_port))
fanout.no_shutdown(fanout_port)


@pytest.mark.topology('any')
@pytest.mark.platform('physical')
def test_link_flap(duthost, fanouthosts):
tlf = TestLinkFlap()
tlf.run_link_flap_test(duthost, fanouthosts)
2 changes: 2 additions & 0 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ markers:
disable_loganalyzer: make to disable automatic loganalyzer
broadcom: test specific to Broadcom platform
sanity_check: override the default sanity check settings
topology: specify which topology testcase can be executed on: (t0, t1, ptf, etc)
platform: specify which platform testcase can be executed on: (physical, virtual, broadcom, mellanox, etc)