diff --git a/tests/common/helpers/platform_api/psu.py b/tests/common/helpers/platform_api/psu.py new file mode 100644 index 00000000000..e89fb3b33bc --- /dev/null +++ b/tests/common/helpers/platform_api/psu.py @@ -0,0 +1,72 @@ +""" +This module provides an interface to remotely interact with the power +supply units of the DUT via platform API +""" + +import json +import logging + +logger = logging.getLogger(__name__) + + +def psu_api(conn, psu_id, name, args=None): + if args is None: + args = [] + conn.request('POST', '/platform/chassis/psu/{}/{}'.format(psu_id, name), json.dumps({'args': args})) + resp = conn.getresponse() + res = json.loads(resp.read())['res'] + logger.info('Executing PSU API: "{}", arguments: "{}", result: "{}"'.format(name, args, res)) + return res + + +def get_num_fans(conn, psu_id): + return psu_api(conn, psu_id, 'get_num_fans') + + +def get_all_fans(conn, psu_id): + return psu_api(conn, psu_id, 'get_all_fans') + + +def get_fan(conn, psu_id, index): + return psu_api(conn, psu_id, 'get_fan', [index]) + + +def get_voltage(conn, psu_id): + return psu_api(conn, psu_id, 'get_voltage') + + +def get_current(conn, psu_id): + return psu_api(conn, psu_id, 'get_current') + + +def get_power(conn, psu_id): + return psu_api(conn, psu_id, 'get_power') + + +def get_powergood_status(conn, psu_id): + return psu_api(conn, psu_id, 'get_powergood_status') + + +def set_status_led(conn, psu_id, color): + return psu_api(conn, psu_id, 'set_status_led', [color]) + + +def get_status_led(conn, psu_id): + return psu_api(conn, psu_id, 'get_status_led') + + +def get_temperature(conn, psu_id): + return psu_api(conn, psu_id, 'get_temperature') + + +def get_temperature_high_threshold(conn, psu_id): + return psu_api(conn, psu_id, 'get_temperature_high_threshold') + + +def get_voltage_high_threshold(conn, psu_id): + return psu_api(conn, psu_id, 'get_voltage_high_threshold') + + +def get_voltage_low_threshold(conn, psu_id): + return psu_api(conn, psu_id, 'get_voltage_low_threshold') + diff --git a/tests/platform_tests/api/test_psu.py b/tests/platform_tests/api/test_psu.py new file mode 100644 index 00000000000..9a2246f4a6b --- /dev/null +++ b/tests/platform_tests/api/test_psu.py @@ -0,0 +1,124 @@ +import logging +import re + +import pytest +import yaml + +from common.helpers.assertions import pytest_assert +from common.helpers.platform_api import chassis, psu + +from platform_api_test_base import PlatformApiTestBase + + +logger = logging.getLogger(__name__) + +pytestmark = [ + pytest.mark.topology('any'), + pytest.mark.sanity_check(skip_sanity=True), + pytest.mark.disable_loganalyzer # disable automatic loganalyzer +] + +STATUS_LED_COLOR_GREEN = "green" +STATUS_LED_COLOR_AMBER = "amber" +STATUS_LED_COLOR_RED = "red" +STATUS_LED_COLOR_OFF = "off" + +class TestPsuApi(PlatformApiTestBase): + ''' Platform API test cases for the PSU class''' + + num_psus = None + + @pytest.fixture(scope="function", autouse=True) + def setup(self, platform_api_conn): + if self.num_psus is None: + try: + self.num_psus = int(chassis.get_num_psus(platform_api_conn)) + except: + pytest.fail("num_psus is not an integer") + + + def test_fans(self, duthost, localhost, platform_api_conn): + ''' PSU fan test ''' + for psu_id in range(self.num_psus): + try: + num_fans = int(psu.get_num_fans(platform_api_conn, psu_id)) + except: + pytest.fail("num_fans is not an integer!") + + fan_list = psu.get_all_fans(platform_api_conn, psu_id) + if self.expect(fan_list is not None, "Failed to retrieve fans of PSU {}".format(psu_id)): + self.expect(isinstance(fan_list, list) and len(fan_list) == num_fans, "Fans of PSU {} appear to be incorrect".format(psu_id)) + + for i in range(num_fans): + fan = psu.get_fan(platform_api_conn, psu_id, i) + if self.expect(fan is not None, "Failed to retrieve fan {} of PSU {}".format(i, psu_id)): + self.expect(fan and fan == fan_list[i], "Fan {} of PSU {} is incorrect".format(i, psu_id)) + self.assert_expectations() + + + def test_power(self, duthost, localhost, platform_api_conn): + ''' PSU power test ''' + for psu_id in range(self.num_psus): + voltage = psu.get_voltage(platform_api_conn, psu_id) + if self.expect(voltage is not None, "Failed to retrieve voltage of PSU {}".format(psu_id)): + self.expect(isinstance(voltage, float), "PSU {} voltage appears incorrect".format(psu_id)) + current = psu.get_current(platform_api_conn, psu_id) + if self.expect(current is not None, "Failed to retrieve current of PSU {}".format(psu_id)): + self.expect(isinstance(current, float), "PSU {} current appears incorrect".format(psu_id)) + power = psu.get_power(platform_api_conn, psu_id) + if self.expect(current is not None, "Failed to retrieve current of PSU {}".format(psu_id)): + self.expect(isinstance(power, float), "PSU {} power appears incorrect".format(psu_id)) + if current is not None and voltage is not None and power is not None: + self.expect(abs(power - (voltage*current)) < power*0.1, "PSU {} reading does not make sense \ + (power:{}, voltage:{}, current:{})".format(psu_id, power, voltage, current)) + + powergood_status = psu.get_powergood_status(platform_api_conn, psu_id) + if self.expect(powergood_status is not None, "Failed to retrieve operational status of PSU {}".format(psu_id)): + self.expect(powergood_status is True, "PSU {} is not operational".format(psu_id)) + + high_threshold = psu.get_voltage_high_threshold(platform_api_conn, psu_id) + if self.expect(high_threshold is not None, "Failed to retrieve the high voltage threshold of PSU {}".format(psu_id)): + self.expect(isinstance(high_threshold, float), "PSU {} voltage high threshold appears incorrect".format(psu_id)) + low_threshold = psu.get_voltage_low_threshold(platform_api_conn, psu_id) + if self.expect(low_threshold is not None, "Failed to retrieve the low voltage threshold of PSU {}".format(psu_id)): + self.expect(isinstance(low_threshold, float), "PSU {} voltage low threshold appears incorrect".format(psu_id)) + if high_threshold is not None and low_threshold is not None: + self.expect(voltage < high_threshold and voltage > low_threshold, "Voltage {} of PSU {} is not in between {} and {}".format(voltage, psu_id, low_threshold, high_threshold)) + self.assert_expectations() + + + def test_temperature(self, duthost, localhost, platform_api_conn): + ''' PSU temperature test ''' + for psu_id in range(self.num_psus): + temperature = psu.get_temperature(platform_api_conn, psu_id) + if self.expect(temperature is not None, "Failed to retrieve temperature of PSU {}".format(psu_id)): + self.expect(isinstance(temperature, float), "PSU {} temperature appears incorrect".format(psu_id)) + + temp_threshold = psu.get_temperature_high_threshold(platform_api_conn, psu_id) + if self.expect(temp_threshold is not None, "Failed to retrieve temperature threshold of PSU {}".format(psu_id)): + if self.expect(isinstance(temp_threshold, float), "PSU {} temperature high threshold appears incorrect".format(psu_id)): + self.expect(temperature < temp_threshold, "Temperature {} of PSU {} is over the threshold {}".format(temperature, psu_id, temp_threshold)) + self.assert_expectations() + + + def test_led(self, duthost, localhost, platform_api_conn): + ''' PSU status led test ''' + LED_COLOR_LIST = [ + STATUS_LED_COLOR_GREEN, + STATUS_LED_COLOR_AMBER, + STATUS_LED_COLOR_RED, + STATUS_LED_COLOR_OFF + ] + + for psu_id in range(self.num_psus): + for color in LED_COLOR_LIST: + result = psu.set_status_led(platform_api_conn, psu_id, color) + if self.expect(result is not None, "Failed to perform set_status_led of PSU {}".format(psu_id)): + self.expect(result is True, "Failed to set status_led of PSU {} to {}".format(psu_id, color)) + + color_actual = psu.get_status_led(platform_api_conn, psu_id) + if self.expect(color_actual is not None, "Failed to retrieve status_led of PSU {}".format(psu_id)): + if self.expect(isinstance(color_actual, str), "PSU {} status LED color appears incorrect".format(psu_id)): + self.expect(color == color_actual, "Status LED color incorrect (expected: {}, actual: {}) from PSU {}".format(color, color_actual, psu_id)) + self.assert_expectations() +