Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 160 additions & 25 deletions tests/platform/test_reboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,98 @@
from check_all_interface_info import check_interface_information
pytestmark = [pytest.mark.disable_loganalyzer]

REBOOT_TYPE_WARM = "warm"
REBOOT_TYPE_COLD = "cold"
REBOOT_TYPE_FAST = "fast"
REBOOT_TYPE_POWEROFF = "power off"
REBOOT_TYPE_WATCHDOG = "watchdog"

reboot_ctrl_dict = {
REBOOT_TYPE_POWEROFF : {
"timeout" : 300,
"cause" : "Power Loss"
},
REBOOT_TYPE_COLD : {
"command" : "reboot",
"timeout" : 300,
"cause" : "reboot"
},
REBOOT_TYPE_FAST : {
"command" : "fast-reboot",
"timeout" : 180,
"cause" : "fast-reboot"
},
REBOOT_TYPE_WARM : {
"command" : "warm-reboot",
"timeout" : 180,
"cause" : "warm-reboot"
},
REBOOT_TYPE_WATCHDOG : {
"command" : "python -c \"import sonic_platform.platform as P; P.Platform().get_chassis().get_watchdog().arm(5); exit()\"",
"timeout" : 300,
"cause" : "Watchdog"
}
}

def check_reboot_cause(dut, reboot_cause_expected):
"""
@summary: Check the reboot cause on DUT.
@param dut: The AnsibleHost object of DUT.
@param reboot_cause_expected: The expected reboot cause.
"""
logging.info("Check the reboot cause")
output = dut.shell("show reboot-cause")
reboot_cause_got = output["stdout"]
logging.debug("show reboot-cause returns {}".format(reboot_cause_got))
m = re.search(reboot_cause_expected, reboot_cause_got)
assert m is not None, "got reboot-cause %s after rebooted by %s" % (reboot_cause_got, reboot_cause_expected)


def reboot_and_check(localhost, dut, interfaces, reboot_type="cold"):
def reboot_and_check(localhost, dut, interfaces, reboot_type=REBOOT_TYPE_COLD, reboot_helper=None, reboot_kwargs=None):
"""
Perform the specified type of reboot and check platform status.
@param dut: The AnsibleHost object of DUT.
@param interfaces: DUT's interfaces defined by minigraph
@param reboot_type: The reboot type, pre-defined const that has name convention of REBOOT_TYPE_XXX.
@param reboot_helper: The helper function used only by power off reboot
@param reboot_kwargs: The argument used by reboot_helper
"""
logging.info("Run %s reboot on DUT" % reboot_type)
if reboot_type == "cold":
reboot_cmd = "reboot"
reboot_timeout = 300
elif reboot_type == "fast":
reboot_cmd = "fast-reboot"
reboot_timeout = 180
elif reboot_type == "warm":
reboot_cmd = "warm-reboot"
reboot_timeout = 180

assert reboot_type in reboot_ctrl_dict.keys(), "Unknown reboot type %s" % reboot_type

reboot_timeout = reboot_ctrl_dict[reboot_type]["timeout"]
reboot_cause = reboot_ctrl_dict[reboot_type]["cause"]
if reboot_type == REBOOT_TYPE_POWEROFF:
assert reboot_helper is not None, "A reboot function must be provided for power off reboot"

reboot_helper(reboot_kwargs)

localhost.wait_for(host=dut.hostname, port=22, state="stopped", delay=10, timeout=120)
else:
assert False, "Reboot type %s is not supported" % reboot_type
process, queue = dut.command(reboot_cmd, module_async=True)

logging.info("Wait for DUT to go down")
res = localhost.wait_for(host=dut.hostname, port=22, state="stopped", delay=10, timeout=120,
module_ignore_errors=True)
if "failed" in res:
if process.is_alive():
logging.error("Command '%s' is not completed" % reboot_cmd)
process.terminate()
logging.error("Reboot result %s" % str(queue.get()))
assert False, "DUT did not go down"
reboot_cmd = reboot_ctrl_dict[reboot_type]["command"]

process, queue = dut.command(reboot_cmd, module_async=True)

logging.info("Wait for DUT to go down")
res = localhost.wait_for(host=dut.hostname, port=22, state="stopped", delay=10, timeout=120,
module_ignore_errors=True)
if "failed" in res:
if process.is_alive():
logging.error("Command '%s' is not completed" % reboot_cmd)
process.terminate()
logging.error("reboot result %s" % str(queue.get()))
assert False, "DUT did not go down"

logging.info("Wait for DUT to come back")
localhost.wait_for(host=dut.hostname, port=22, state="started", delay=10, timeout=reboot_timeout)

logging.info("Wait until all critical services are fully started")
check_critical_services(dut)

logging.info("Check reboot cause")
check_reboot_cause(dut, reboot_cause)

logging.info("Wait some time for all the transceivers to be detected")
assert wait_until(300, 20, check_interface_information, dut, interfaces), \
"Not all transceivers are detected or interfaces are up in 300 seconds"
Expand Down Expand Up @@ -91,7 +148,7 @@ def test_cold_reboot(testbed_devices, conn_graph_facts):
ans_host = testbed_devices["dut"]
localhost = testbed_devices["localhost"]

reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type="cold")
reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type=REBOOT_TYPE_COLD)


def test_fast_reboot(testbed_devices, conn_graph_facts):
Expand All @@ -101,7 +158,7 @@ def test_fast_reboot(testbed_devices, conn_graph_facts):
ans_host = testbed_devices["dut"]
localhost = testbed_devices["localhost"]

reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type="fast")
reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type=REBOOT_TYPE_FAST)


def test_warm_reboot(testbed_devices, conn_graph_facts):
Expand All @@ -117,4 +174,82 @@ def test_warm_reboot(testbed_devices, conn_graph_facts):
if "disabled" in issu_capability:
pytest.skip("ISSU is not supported on this DUT, skip this test case")

reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type="warm")
reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], reboot_type=REBOOT_TYPE_WARM)


@pytest.fixture(params=[15, 5])
def power_off_delay(request):
"""
@summary: used to parametrized test cases on power_off_delay
@param request: pytest request object
@return: power_off_delay
"""
return request.param


def _power_off_reboot_helper(kwargs):
"""
@summary: used to parametrized test cases on power_off_delay
@param kwargs: the delay time between turning off and on the PSU
"""
psu_ctrl = kwargs["psu_ctrl"]
all_psu = kwargs["all_psu"]
power_on_seq = kwargs["power_on_seq"]
delay_time = kwargs["delay_time"]

for psu in all_psu:
logging.debug("turning off {}".format(psu))
psu_ctrl.turn_off_psu(psu["psu_id"])
time.sleep(delay_time)
logging.info("Power on {}".format(power_on_seq))
for psu in power_on_seq:
logging.debug("turning on {}".format(psu))
psu_ctrl.turn_on_psu(psu["psu_id"])


def test_power_off_reboot(testbed_devices, conn_graph_facts, psu_controller, power_off_delay):
"""
@summary: This test case is to perform reboot via powercycle and check platform status
@param psu_controller: The python object of psu controller
@param power_off_delay: Pytest fixture. The delay between turning off and on the PSU
"""
ans_host = testbed_devices["dut"]
localhost = testbed_devices["localhost"]

psu_ctrl = psu_controller(ans_host.hostname, ans_host.facts["asic_type"])
if psu_ctrl is None:
pytest.skip("No PSU controller for %s, skip rest of the testing in this case" % ans_host.hostname)

all_psu = psu_ctrl.get_psu_status()
if all_psu:
power_on_seq_list = [[item] for item in all_psu]
power_on_seq_list.append(all_psu)

logging.info("Got all power on sequences {}".format(power_on_seq_list))

delay_time_list = [15, 5]
poweroff_reboot_kwargs = {}
poweroff_reboot_kwargs["dut"] = ans_host

for power_on_seq in power_on_seq_list:
poweroff_reboot_kwargs["psu_ctrl"] = psu_ctrl
poweroff_reboot_kwargs["all_psu"] = all_psu
poweroff_reboot_kwargs["power_on_seq"] = power_on_seq
poweroff_reboot_kwargs["delay_time"] = power_off_delay
reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], REBOOT_TYPE_POWEROFF, _power_off_reboot_helper, poweroff_reboot_kwargs)


def test_watchdog_reboot(testbed_devices, conn_graph_facts):
"""
@summary: This test case is to perform reboot via watchdog and check platform status
"""
ans_host = testbed_devices["dut"]
localhost = testbed_devices["localhost"]

test_watchdog_supported = "python -c \"import sonic_platform.platform as P; P.Platform().get_chassis().get_watchdog(); exit()\""

watchdog_supported = ans_host.command(test_watchdog_supported)["stderr"]
if "" != watchdog_supported:
pytest.skip("Watchdog is not supported on this DUT, skip this test case")

reboot_and_check(localhost, ans_host, conn_graph_facts["device_conn"], REBOOT_TYPE_WATCHDOG)