From 2c385fdda964e5da6ea198914e97218f502005c5 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 16 Jan 2022 21:13:30 +0800 Subject: [PATCH 01/17] enhance show interface transceiver eeprom logic with RJ45 port support Signed-off-by: Kebo Liu --- scripts/sfpshow | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/scripts/sfpshow b/scripts/sfpshow index 7eb9edaca2..b465cf8c2c 100755 --- a/scripts/sfpshow +++ b/scripts/sfpshow @@ -214,6 +214,8 @@ QSFP_DD_DOM_VALUE_UNIT_MAP = { 'voltage': 'Volts' } +RJ45_PORT_TYPE = 'RJ45' + def display_invalid_intf_eeprom(intf_name): output = intf_name + ': SFP EEPROM Not detected\n' @@ -238,6 +240,11 @@ class SFPShow(object): self.output = '' self.multi_asic = multi_asic_util.MultiAsic(namespace_option=namespace_option) + # Determine whether it is a RJ45 port + def is_rj45_port(self, intf_name): + intf_type = self.db.get(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(intf_name), 'type') + return intf_type == RJ45_PORT_TYPE + # Convert dict values to cli output string def format_dict_value_to_string(self, sorted_key_table, dom_info_dict, dom_value_map, @@ -390,16 +397,19 @@ class SFPShow(object): def convert_interface_sfp_info_to_cli_output_string(self, state_db, interface_name, dump_dom): output = '' - sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) - output = interface_name + ': ' + 'SFP EEPROM detected' + '\n' - sfp_info_output = self.convert_sfp_info_to_output_string(sfp_info_dict) - output += sfp_info_output - - if dump_dom: - sfp_type = sfp_info_dict['type'] - dom_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_DOM_SENSOR|{}'.format(interface_name)) - dom_output = self.convert_dom_to_output_string(sfp_type, dom_info_dict) - output += dom_output + if self.is_rj45_port(interface_name): + output = interface_name + ': ' + 'SFP EEPROM is not applicable for RJ45 port' + '\n' + else: + sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) + output = interface_name + ': ' + 'SFP EEPROM detected' + '\n' + sfp_info_output = self.convert_sfp_info_to_output_string(sfp_info_dict) + output += sfp_info_output + + if dump_dom: + sfp_type = sfp_info_dict['type'] + dom_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_DOM_SENSOR|{}'.format(interface_name)) + dom_output = self.convert_dom_to_output_string(sfp_type, dom_info_dict) + output += dom_output return output From e57ecd09ab5ea9c76a2f16925de2c4e706671062 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 30 Jan 2022 09:08:09 +0800 Subject: [PATCH 02/17] enhance sfputil to support RJ45 port, exclude error status --- sfputil/main.py | 167 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 53 deletions(-) diff --git a/sfputil/main.py b/sfputil/main.py index 5eb72195d7..36c59efef4 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -225,6 +225,7 @@ 'voltage': 'Volts' } +RJ45_PORT_TYPE = 'RJ45' # Global platform-specific Chassis class instance platform_chassis = None @@ -247,6 +248,26 @@ def is_sfp_present(port_name): return bool(presence) + +# Determine whether it is a RJ45 port +def is_rj45_port_from_db(port_name, db): + intf_type = db.get(db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(port_name), 'type') + return intf_type == RJ45_PORT_TYPE + + +def is_rj45_port_from_api(port_name): + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + port_type = sfp.get_transceiver_info()['type'] + except NotImplementedError: + click.echo("Not able to judge the port type due to get_transceiver_info not implemented!", err=True) + sys.exit(ERROR_NOT_IMPLEMENTED) + + return port_type == RJ45_PORT_TYPE + + # ========================== Methods for formatting output ========================== # Convert dict values to cli output string @@ -570,6 +591,10 @@ def eeprom(port, dump_dom, namespace): for physical_port in physical_port_list: port_name = get_physical_port_name(logical_port_name, i, ganged) + if is_rj45_port_from_api(port_name): + output += "{}: SFP EEPROM is not applicable for RJ45 port\n".format(port_name) + continue + try: presence = platform_chassis.get_sfp(physical_port).get_presence() except NotImplementedError: @@ -746,15 +771,18 @@ def fetch_error_status_from_state_db(port, state_db): sorted_ports = natsort.natsorted(status) output = [] for port in sorted_ports: - statestring = status[port].get('status') - description = status[port].get('error') - if statestring == '1': - description = 'OK' - elif statestring == '0': - description = 'Unplugged' - elif description == 'N/A': - log.log_error("Inconsistent state found for port {}: state is {} but error description is N/A".format(port, statestring)) - description = 'Unknown state: {}'.format(statestring) + if is_rj45_port_from_db(port, state_db): + description = "Error status is not applicable for RJ45 port." + else: + statestring = status[port].get('status') + description = status[port].get('error') + if statestring == '1': + description = 'OK' + elif statestring == '0': + description = 'Unplugged' + elif description == 'N/A': + log.log_error("Inconsistent state found for port {}: state is {} but error description is N/A".format(port, statestring)) + description = 'Unknown state: {}'.format(statestring) output.append([port, description]) @@ -819,24 +847,27 @@ def lpmode(port): click.echo("Error: No physical ports found for logical port '{}'".format(logical_port_name)) return - if len(physical_port_list) > 1: - ganged = True + if is_rj45_port_from_api(logical_port_name): + output_table.append([port_name, "N/A"]) + else: + if len(physical_port_list) > 1: + ganged = True - for physical_port in physical_port_list: - port_name = get_physical_port_name(logical_port_name, i, ganged) + for physical_port in physical_port_list: + port_name = get_physical_port_name(logical_port_name, i, ganged) - try: - lpmode = platform_chassis.get_sfp(physical_port).get_lpmode() - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + try: + lpmode = platform_chassis.get_sfp(physical_port).get_lpmode() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - if lpmode: - output_table.append([port_name, "On"]) - else: - output_table.append([port_name, "Off"]) + if lpmode: + output_table.append([port_name, "On"]) + else: + output_table.append([port_name, "Off"]) - i += 1 + i += 1 click.echo(tabulate(output_table, table_header, tablefmt='simple')) @@ -859,6 +890,10 @@ def fwversion(port_name): physical_port = logical_port_to_physical_port_index(port_name) sfp = platform_chassis.get_sfp(physical_port) + if is_rj45_port_from_api(port_name): + click.echo("Show firmware version is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + try: presence = sfp.get_presence() except NotImplementedError: @@ -894,26 +929,29 @@ def set_lpmode(logical_port, enable): click.echo("Error: No physical ports found for logical port '{}'".format(logical_port)) return - if len(physical_port_list) > 1: - ganged = True + if is_rj45_port_from_api(logical_port): + click.echo("{} low-power mode is not applicable for RJ45 port {}.".format("Enabling" if enable else "Disabling", logical_port, nl=False)) + else: + if len(physical_port_list) > 1: + ganged = True - for physical_port in physical_port_list: - click.echo("{} low-power mode for port {} ... ".format( - "Enabling" if enable else "Disabling", - get_physical_port_name(logical_port, i, ganged)), nl=False) + for physical_port in physical_port_list: + click.echo("{} low-power mode for port {} ... ".format( + "Enabling" if enable else "Disabling", + get_physical_port_name(logical_port, i, ganged)), nl=False) - try: - result = platform_chassis.get_sfp(physical_port).set_lpmode(enable) - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + try: + result = platform_chassis.get_sfp(physical_port).set_lpmode(enable) + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - if result: - click.echo("OK") - else: - click.echo("Failed") + if result: + click.echo("OK") + else: + click.echo("Failed") - i += 1 + i += 1 # 'off' subcommand @@ -950,24 +988,27 @@ def reset(port_name): click.echo("Error: No physical ports found for logical port '{}'".format(port_name)) return - if len(physical_port_list) > 1: - ganged = True + if is_rj45_port_from_api(port_name): + click.echo("Reset is not applicable for RJ45 port {}.".format(port_name)) + else: + if len(physical_port_list) > 1: + ganged = True - for physical_port in physical_port_list: - click.echo("Resetting port {} ... ".format(get_physical_port_name(port_name, i, ganged)), nl=False) + for physical_port in physical_port_list: + click.echo("Resetting port {} ... ".format(get_physical_port_name(port_name, i, ganged)), nl=False) - try: - result = platform_chassis.get_sfp(physical_port).reset() - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + try: + result = platform_chassis.get_sfp(physical_port).reset() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - if result: - click.echo("OK") - else: - click.echo("Failed") + if result: + click.echo("OK") + else: + click.echo("Failed") - i += 1 + i += 1 # 'firmware' subgroup @cli.group() @@ -1115,6 +1156,10 @@ def run(port_name, mode): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + status = run_firmware(port_name, int(mode)) if status != 1: click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) @@ -1132,6 +1177,10 @@ def commit(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + status = commit_firmware(port_name) if status != 1: click.echo('Failed to commit firmware! CDB status: {}'.format(status)) @@ -1152,6 +1201,10 @@ def upgrade(port_name, filepath): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + show_firmware_version(physical_port) status = download_firmware(port_name, filepath) @@ -1186,6 +1239,10 @@ def download(port_name, filepath): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + start = time.time() status = download_firmware(port_name, filepath) if status == 1: @@ -1206,6 +1263,10 @@ def unlock(port_name, password): physical_port = logical_port_to_physical_port_index(port_name) sfp = platform_chassis.get_sfp(physical_port) + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) + if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) From 5e5e5bcc59875571548e908031f34cf9ab1e3727 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 30 Jan 2022 21:19:34 +0800 Subject: [PATCH 03/17] fix sfputil issue on RJ45 port Signed-off-by: Kebo Liu --- sfputil/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sfputil/main.py b/sfputil/main.py index 36c59efef4..ce7de38b21 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -593,6 +593,7 @@ def eeprom(port, dump_dom, namespace): if is_rj45_port_from_api(port_name): output += "{}: SFP EEPROM is not applicable for RJ45 port\n".format(port_name) + output += '\n' continue try: @@ -748,7 +749,10 @@ def fetch_error_status_from_platform_api(port): physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) port_name = get_physical_port_name(logical_port_name, 1, False) - output.append([port_name, output_dict.get(physical_port_list[0])]) + if is_rj45_port_from_api(logical_port_name): + output.append([port_name, "N/A"]) + else: + output.append([port_name, output_dict.get(physical_port_list[0])]) return output @@ -772,7 +776,7 @@ def fetch_error_status_from_state_db(port, state_db): output = [] for port in sorted_ports: if is_rj45_port_from_db(port, state_db): - description = "Error status is not applicable for RJ45 port." + description = "N/A" else: statestring = status[port].get('status') description = status[port].get('error') @@ -848,7 +852,7 @@ def lpmode(port): return if is_rj45_port_from_api(logical_port_name): - output_table.append([port_name, "N/A"]) + output_table.append([logical_port_name, "N/A"]) else: if len(physical_port_list) > 1: ganged = True From 52852021f17c59e874ec11e4187184cf1467f152 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 13 Feb 2022 09:29:25 +0800 Subject: [PATCH 04/17] [RJ45] change the way to judge port type and add more UT test case Signed-off-by: Kebo Liu --- scripts/sfpshow | 9 ++------- tests/sfp_test.py | 7 +++++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/scripts/sfpshow b/scripts/sfpshow index b465cf8c2c..d69deca300 100755 --- a/scripts/sfpshow +++ b/scripts/sfpshow @@ -240,11 +240,6 @@ class SFPShow(object): self.output = '' self.multi_asic = multi_asic_util.MultiAsic(namespace_option=namespace_option) - # Determine whether it is a RJ45 port - def is_rj45_port(self, intf_name): - intf_type = self.db.get(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(intf_name), 'type') - return intf_type == RJ45_PORT_TYPE - # Convert dict values to cli output string def format_dict_value_to_string(self, sorted_key_table, dom_info_dict, dom_value_map, @@ -397,10 +392,10 @@ class SFPShow(object): def convert_interface_sfp_info_to_cli_output_string(self, state_db, interface_name, dump_dom): output = '' - if self.is_rj45_port(interface_name): + sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) + if sfp_info_dict['type'] == RJ45_PORT_TYPE: output = interface_name + ': ' + 'SFP EEPROM is not applicable for RJ45 port' + '\n' else: - sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) output = interface_name + ': ' + 'SFP EEPROM detected' + '\n' sfp_info_output = self.convert_sfp_info_to_output_string(sfp_info_dict) output += sfp_info_output diff --git a/tests/sfp_test.py b/tests/sfp_test.py index 3cbd9ecda8..62b6e5b545 100644 --- a/tests/sfp_test.py +++ b/tests/sfp_test.py @@ -377,6 +377,13 @@ def test_qsfp_dd_eeprom(self): assert result.exit_code == 0 assert "result.output == test_qsfp_dd_eeprom_output" + def test_rj45_eeprom(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["eeprom"], ["Ethernet36"]) + result_lines = result.output.strip('\n') + expected = "Ethernet36: SFP EEPROM is not applicable for RJ45 port" + assert result_lines == expected + @classmethod def teardown_class(cls): print("TEARDOWN") From a425de463c609f110c409a0b62df3a005b68505f Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 13 Feb 2022 10:22:53 +0800 Subject: [PATCH 05/17] [sfputil] simplity the logic for RJ45 support Signed-off-by: Kebo Liu --- sfputil/main.py | 90 +++++++++++++++++++++++-------------------------- 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/sfputil/main.py b/sfputil/main.py index ce7de38b21..bc004ed033 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -268,6 +268,10 @@ def is_rj45_port_from_api(port_name): return port_type == RJ45_PORT_TYPE +def skip_if_port_is_rj45(port_name): + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) # ========================== Methods for formatting output ========================== # Convert dict values to cli output string @@ -935,27 +939,28 @@ def set_lpmode(logical_port, enable): if is_rj45_port_from_api(logical_port): click.echo("{} low-power mode is not applicable for RJ45 port {}.".format("Enabling" if enable else "Disabling", logical_port, nl=False)) - else: - if len(physical_port_list) > 1: - ganged = True + sys.exit(EXIT_FAIL) - for physical_port in physical_port_list: - click.echo("{} low-power mode for port {} ... ".format( - "Enabling" if enable else "Disabling", - get_physical_port_name(logical_port, i, ganged)), nl=False) + if len(physical_port_list) > 1: + ganged = True - try: - result = platform_chassis.get_sfp(physical_port).set_lpmode(enable) - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + for physical_port in physical_port_list: + click.echo("{} low-power mode for port {} ... ".format( + "Enabling" if enable else "Disabling", + get_physical_port_name(logical_port, i, ganged)), nl=False) - if result: - click.echo("OK") - else: - click.echo("Failed") + try: + result = platform_chassis.get_sfp(physical_port).set_lpmode(enable) + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - i += 1 + if result: + click.echo("OK") + else: + click.echo("Failed") + + i += 1 # 'off' subcommand @@ -994,25 +999,26 @@ def reset(port_name): if is_rj45_port_from_api(port_name): click.echo("Reset is not applicable for RJ45 port {}.".format(port_name)) - else: - if len(physical_port_list) > 1: - ganged = True + sys.exit(EXIT_FAIL) - for physical_port in physical_port_list: - click.echo("Resetting port {} ... ".format(get_physical_port_name(port_name, i, ganged)), nl=False) + if len(physical_port_list) > 1: + ganged = True - try: - result = platform_chassis.get_sfp(physical_port).reset() - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + for physical_port in physical_port_list: + click.echo("Resetting port {} ... ".format(get_physical_port_name(port_name, i, ganged)), nl=False) - if result: - click.echo("OK") - else: - click.echo("Failed") + try: + result = platform_chassis.get_sfp(physical_port).reset() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - i += 1 + if result: + click.echo("OK") + else: + click.echo("Failed") + + i += 1 # 'firmware' subgroup @cli.group() @@ -1160,9 +1166,7 @@ def run(port_name, mode): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) - if is_rj45_port_from_api(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) status = run_firmware(port_name, int(mode)) if status != 1: @@ -1181,9 +1185,7 @@ def commit(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) - if is_rj45_port_from_api(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) status = commit_firmware(port_name) if status != 1: @@ -1205,9 +1207,7 @@ def upgrade(port_name, filepath): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) - if is_rj45_port_from_api(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) show_firmware_version(physical_port) @@ -1243,9 +1243,7 @@ def download(port_name, filepath): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) - if is_rj45_port_from_api(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) start = time.time() status = download_firmware(port_name, filepath) @@ -1267,9 +1265,7 @@ def unlock(port_name, password): physical_port = logical_port_to_physical_port_index(port_name) sfp = platform_chassis.get_sfp(physical_port) - if is_rj45_port_from_api(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) From 6863fddbec24abb002ec30ae68e63ce8cf24131e Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Fri, 11 Feb 2022 11:28:41 +0000 Subject: [PATCH 06/17] Support sfputil show present Signed-off-by: Stephen Sun --- sfputil/main.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/sfputil/main.py b/sfputil/main.py index bc004ed033..17ec5ecf38 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -661,6 +661,13 @@ def presence(port): logical_port_list = [port] + state_db = SonicV2Connector(host='127.0.0.1') + if state_db is not None: + state_db.connect(state_db.STATE_DB) + else: + click.echo("Failed to connect to STATE_DB") + return + for logical_port_name in logical_port_list: ganged = False i = 1 @@ -676,13 +683,19 @@ def presence(port): for physical_port in physical_port_list: port_name = get_physical_port_name(logical_port_name, i, ganged) - try: - presence = platform_chassis.get_sfp(physical_port).get_presence() - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + if is_rj45_port_from_db(port_name, state_db): + status = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_STATUS|{}'.format(port_name)) + status_string = status.get('error') + if status_string == 'N/A': + status_string = 'Present' + else: + try: + presence = platform_chassis.get_sfp(physical_port).get_presence() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - status_string = "Present" if presence else "Not present" + status_string = "Present" if presence else "Not present" output_table.append([port_name, status_string]) i += 1 From 2da251dfac28589bcabe0db8fe5500699afd8732 Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Mon, 14 Feb 2022 02:50:52 +0000 Subject: [PATCH 07/17] Support rj45 in sfpshow Signed-off-by: Stephen Sun --- scripts/sfpshow | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/scripts/sfpshow b/scripts/sfpshow index d69deca300..d5eb14b358 100755 --- a/scripts/sfpshow +++ b/scripts/sfpshow @@ -432,26 +432,34 @@ class SFPShow(object): self.output += '\n' + def convert_presence_info_to_cli_output_string(self, state_db, interface_name): + sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) + if not sfp_info_dict: + status_string = 'Not present' + elif sfp_info_dict.get('type') == RJ45_PORT_TYPE: + status = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_STATUS|{}'.format(interface_name)) + status_string = status.get('error') + if status_string == 'N/A': + status_string = 'Present' + else: + status_string = 'Present' + + return status_string + @multi_asic_util.run_on_multi_asic def get_presence(self): port_table = [] if self.intf_name is not None: - presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(self.intf_name)) - if presence: - port_table.append((self.intf_name, 'Present')) - else: - port_table.append((self.intf_name, 'Not present')) + presence_str = self.convert_presence_info_to_cli_output_string(self.db, self.intf_name) + port_table.append((self.intf_name, presence_str)) else: port_table_keys = self.db.keys(self.db.APPL_DB, "PORT_TABLE:*") for i in port_table_keys: key = re.split(':', i, maxsplit=1)[-1].strip() if key and key.startswith(front_panel_prefix()) and not key.startswith((backplane_prefix(), inband_prefix(), recirc_prefix())): - presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(key)) - if presence: - port_table.append((key, 'Present')) - else: - port_table.append((key, 'Not present')) + presence_str = self.convert_presence_info_to_cli_output_string(self.db, key) + port_table.append((key, presence_str)) self.table += port_table From b622d5bbad865177141a878e8884540cec80dc47 Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Wed, 16 Feb 2022 09:46:30 +0000 Subject: [PATCH 08/17] Add test case for sfputil with RJ45 supported Signed-off-by: Stephen Sun --- tests/sfputil_test.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index c3b6b96a9e..b198bd7ace 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -183,7 +183,10 @@ def test_error_status_from_db(self): expected_output = [['Ethernet0', 'Blocking Error|High temperature'], ['Ethernet4', 'OK'], ['Ethernet8', 'Unplugged'], - ['Ethernet12', 'Unknown state: 255']] + ['Ethernet12', 'Unknown state: 255'], + ['Ethernet16', 'N/A'], + ['Ethernet28', 'N/A'], + ['Ethernet36', 'N/A']] output = sfputil.fetch_error_status_from_state_db(None, db.db) assert output == expected_output @@ -204,6 +207,41 @@ def test_show_firmware_version(self, mock_chassis): result = runner.invoke(sfputil.cli.commands['show'].commands['fwversion'], ["Ethernet0"]) assert result.exit_code == 0 + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) + @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) + def test_show_presence(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + import pdb;pdb.set_trace() + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['show'].commands['presence'], ["-p", "Ethernet16"]) + assert result.exit_code == 0 + expected_output = """Port Presence +---------- ----------- +Ethernet16 Not present +""" + assert result.output == expected_output + + result = runner.invoke(sfputil.cli.commands['show'].commands['presence'], ["-p", "Ethernet28"]) + assert result.exit_code == 0 + expected_output = """Port Presence +---------- ---------- +Ethernet28 Present +""" + assert result.output == expected_output + + result = runner.invoke(sfputil.cli.commands['show'].commands['presence'], ["-p", "Ethernet36"]) + assert result.exit_code == 0 + expected_output = """Port Presence +---------- ---------- +Ethernet36 Unknown +""" + assert result.output == expected_output + @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) def test_unlock_firmware(self, mock_chassis): From 586af55fbb40df11f19c5901f73d63905c4570e8 Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Wed, 16 Feb 2022 09:47:34 +0000 Subject: [PATCH 09/17] Add mock data for RJ45 ports into STATE_DB Signed-off-by: Stephen Sun --- tests/mock_tables/state_db.json | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/mock_tables/state_db.json b/tests/mock_tables/state_db.json index 6145868a9b..3a3a14629a 100644 --- a/tests/mock_tables/state_db.json +++ b/tests/mock_tables/state_db.json @@ -149,6 +149,18 @@ "status": "255", "error": "N/A" }, + "TRANSCEIVER_STATUS|Ethernet16": { + "status": "255", + "error": "Not present" + }, + "TRANSCEIVER_STATUS|Ethernet28": { + "status": "0", + "error": "N/A" + }, + "TRANSCEIVER_STATUS|Ethernet36": { + "status": "255", + "error": "Unknown" + }, "CHASSIS_INFO|chassis 1": { "psu_num": "2" }, From da447ce25099187125aab96d5f216f5ca6484c8c Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Wed, 16 Feb 2022 09:49:18 +0000 Subject: [PATCH 10/17] Add test for sfputil show for RJ45 ports Signed-off-by: Stephen Sun --- tests/sfp_test.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/sfp_test.py b/tests/sfp_test.py index 62b6e5b545..fdbfa1a267 100644 --- a/tests/sfp_test.py +++ b/tests/sfp_test.py @@ -344,6 +344,30 @@ def test_sfp_presence(self): expected = """Port Presence ----------- ----------- Ethernet200 Not present +""" + assert result.exit_code == 0 + assert result.output == expected + + result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet16"]) + expected = """Port Presence +---------- ----------- +Ethernet16 Not present +""" + assert result.exit_code == 0 + assert result.output == expected + + result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet28"]) + expected = """Port Presence +---------- ---------- +Ethernet28 Present +""" + assert result.exit_code == 0 + assert result.output == expected + + result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet36"]) + expected = """Port Presence +---------- ---------- +Ethernet36 Unknown """ assert result.exit_code == 0 assert result.output == expected From cc540eda357bc20e69bd1f4716ff0882fff31bf9 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Sun, 6 Mar 2022 11:51:26 +0800 Subject: [PATCH 11/17] remove debug code in sfputil test case Signed-off-by: Kebo Liu --- tests/sfputil_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index b198bd7ace..4a7874a5e9 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -216,7 +216,6 @@ def test_show_presence(self, mock_chassis): mock_api = MagicMock() mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) mock_sfp.get_presence.return_value = True - import pdb;pdb.set_trace() runner = CliRunner() result = runner.invoke(sfputil.cli.commands['show'].commands['presence'], ["-p", "Ethernet16"]) assert result.exit_code == 0 From d93bf37be6b78f4b7218ee1cb7f130e24d8e3bab Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Fri, 1 Apr 2022 20:00:13 +0800 Subject: [PATCH 12/17] remove unnecessary argument for format() Signed-off-by: Kebo Liu --- sfputil/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sfputil/main.py b/sfputil/main.py index 17ec5ecf38..6f0a2c7163 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -951,7 +951,7 @@ def set_lpmode(logical_port, enable): return if is_rj45_port_from_api(logical_port): - click.echo("{} low-power mode is not applicable for RJ45 port {}.".format("Enabling" if enable else "Disabling", logical_port, nl=False)) + click.echo("{} low-power mode is not applicable for RJ45 port {}.".format("Enabling" if enable else "Disabling", logical_port)) sys.exit(EXIT_FAIL) if len(physical_port_list) > 1: From 79c32185b94fef9d887b2d9d6c02c9def658bd69 Mon Sep 17 00:00:00 2001 From: Stephen Sun <5379172+stephenxs@users.noreply.github.com> Date: Fri, 27 May 2022 11:15:34 +0800 Subject: [PATCH 13/17] Revert the logic to fetch presence status from error status for RJ45 port (#17) * Revert the logic to fetch presence status from error status Signed-off-by: Stephen Sun * Unit test Signed-off-by: Stephen Sun * Fix error Signed-off-by: Stephen Sun --- scripts/sfpshow | 28 ++++++++++------------------ sfputil/main.py | 25 ++++++------------------- tests/mock_tables/state_db.json | 4 ++-- tests/sfp_test.py | 6 +++--- tests/sfputil_test.py | 6 +++--- 5 files changed, 24 insertions(+), 45 deletions(-) diff --git a/scripts/sfpshow b/scripts/sfpshow index d5eb14b358..d69deca300 100755 --- a/scripts/sfpshow +++ b/scripts/sfpshow @@ -432,34 +432,26 @@ class SFPShow(object): self.output += '\n' - def convert_presence_info_to_cli_output_string(self, state_db, interface_name): - sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) - if not sfp_info_dict: - status_string = 'Not present' - elif sfp_info_dict.get('type') == RJ45_PORT_TYPE: - status = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_STATUS|{}'.format(interface_name)) - status_string = status.get('error') - if status_string == 'N/A': - status_string = 'Present' - else: - status_string = 'Present' - - return status_string - @multi_asic_util.run_on_multi_asic def get_presence(self): port_table = [] if self.intf_name is not None: - presence_str = self.convert_presence_info_to_cli_output_string(self.db, self.intf_name) - port_table.append((self.intf_name, presence_str)) + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(self.intf_name)) + if presence: + port_table.append((self.intf_name, 'Present')) + else: + port_table.append((self.intf_name, 'Not present')) else: port_table_keys = self.db.keys(self.db.APPL_DB, "PORT_TABLE:*") for i in port_table_keys: key = re.split(':', i, maxsplit=1)[-1].strip() if key and key.startswith(front_panel_prefix()) and not key.startswith((backplane_prefix(), inband_prefix(), recirc_prefix())): - presence_str = self.convert_presence_info_to_cli_output_string(self.db, key) - port_table.append((key, presence_str)) + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(key)) + if presence: + port_table.append((key, 'Present')) + else: + port_table.append((key, 'Not present')) self.table += port_table diff --git a/sfputil/main.py b/sfputil/main.py index 6f0a2c7163..049566a6e0 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -661,13 +661,6 @@ def presence(port): logical_port_list = [port] - state_db = SonicV2Connector(host='127.0.0.1') - if state_db is not None: - state_db.connect(state_db.STATE_DB) - else: - click.echo("Failed to connect to STATE_DB") - return - for logical_port_name in logical_port_list: ganged = False i = 1 @@ -683,19 +676,13 @@ def presence(port): for physical_port in physical_port_list: port_name = get_physical_port_name(logical_port_name, i, ganged) - if is_rj45_port_from_db(port_name, state_db): - status = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_STATUS|{}'.format(port_name)) - status_string = status.get('error') - if status_string == 'N/A': - status_string = 'Present' - else: - try: - presence = platform_chassis.get_sfp(physical_port).get_presence() - except NotImplementedError: - click.echo("This functionality is currently not implemented for this platform") - sys.exit(ERROR_NOT_IMPLEMENTED) + try: + presence = platform_chassis.get_sfp(physical_port).get_presence() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) - status_string = "Present" if presence else "Not present" + status_string = "Present" if presence else "Not present" output_table.append([port_name, status_string]) i += 1 diff --git a/tests/mock_tables/state_db.json b/tests/mock_tables/state_db.json index 3a3a14629a..4b46c73bd1 100644 --- a/tests/mock_tables/state_db.json +++ b/tests/mock_tables/state_db.json @@ -150,8 +150,8 @@ "error": "N/A" }, "TRANSCEIVER_STATUS|Ethernet16": { - "status": "255", - "error": "Not present" + "status": "0", + "error": "N/A" }, "TRANSCEIVER_STATUS|Ethernet28": { "status": "0", diff --git a/tests/sfp_test.py b/tests/sfp_test.py index fdbfa1a267..a69872ab76 100644 --- a/tests/sfp_test.py +++ b/tests/sfp_test.py @@ -350,8 +350,8 @@ def test_sfp_presence(self): result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet16"]) expected = """Port Presence ----------- ----------- -Ethernet16 Not present +---------- ---------- +Ethernet16 Present """ assert result.exit_code == 0 assert result.output == expected @@ -367,7 +367,7 @@ def test_sfp_presence(self): result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet36"]) expected = """Port Presence ---------- ---------- -Ethernet36 Unknown +Ethernet36 Present """ assert result.exit_code == 0 assert result.output == expected diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 4a7874a5e9..b15698ecec 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -220,8 +220,8 @@ def test_show_presence(self, mock_chassis): result = runner.invoke(sfputil.cli.commands['show'].commands['presence'], ["-p", "Ethernet16"]) assert result.exit_code == 0 expected_output = """Port Presence ----------- ----------- -Ethernet16 Not present +---------- ---------- +Ethernet16 Present """ assert result.output == expected_output @@ -237,7 +237,7 @@ def test_show_presence(self, mock_chassis): assert result.exit_code == 0 expected_output = """Port Presence ---------- ---------- -Ethernet36 Unknown +Ethernet36 Present """ assert result.output == expected_output From 8e11f595194ed3a5264979f8efe7141baae01c39 Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Fri, 27 May 2022 10:03:06 +0000 Subject: [PATCH 14/17] Add test cases to cover lpmode and error status Signed-off-by: Stephen Sun --- tests/sfputil_test.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index e7b06645a7..9a3b42e8e3 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -283,6 +283,11 @@ def test_error_status_from_db(self): output = sfputil.fetch_error_status_from_state_db('Ethernet0', db.db) assert output == expected_output_ethernet0 + expected_output_ethernet16 = expected_output[4:5] + output = sfputil.fetch_error_status_from_state_db('Ethernet16', db.db) + assert output == expected_output_ethernet16 + + @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) def test_show_firmware_version(self, mock_chassis): @@ -327,6 +332,43 @@ def test_show_presence(self, mock_chassis): expected_output = """Port Presence ---------- ---------- Ethernet36 Present +""" + assert result.output == expected_output + + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) + @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) + def test_show_lpmode(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_lpmode.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['show'].commands['lpmode'], ["-p", "Ethernet0"]) + assert result.exit_code == 0 + expected_output = """Port Low-power Mode +--------- ---------------- +Ethernet0 On +""" + assert result.output == expected_output + + mock_sfp.get_lpmode.return_value = False + result = runner.invoke(sfputil.cli.commands['show'].commands['lpmode'], ["-p", "Ethernet0"]) + assert result.exit_code == 0 + expected_output = """Port Low-power Mode +--------- ---------------- +Ethernet0 Off +""" + assert result.output == expected_output + + mock_sfp.get_lpmode.return_value = False + mock_sfp.get_transceiver_info = MagicMock(return_value={'type': sfputil.RJ45_PORT_TYPE}) + result = runner.invoke(sfputil.cli.commands['show'].commands['lpmode'], ["-p", "Ethernet0"]) + assert result.exit_code == 0 + expected_output = """Port Low-power Mode +--------- ---------------- +Ethernet0 N/A """ assert result.output == expected_output From 09a891a12ad4550854b08bbb1308ba79f01b70d3 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Wed, 22 Jun 2022 13:38:32 +0800 Subject: [PATCH 15/17] add comments to describe the usage of functions to judge the port type Signed-off-by: Kebo Liu --- sfputil/main.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sfputil/main.py b/sfputil/main.py index c337bf1cdf..d567f39a0d 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -291,7 +291,11 @@ def is_sfp_present(port_name): return bool(presence) -# Determine whether it is a RJ45 port +# Below defined two flavors of functions to determin whether a port is a RJ45 port. +# They serve different types of SFP utilities. One type of SFP utility consume the +# info stored in the STATE_DB, these utilities shall call 'is_rj45_port_from_db' +# to judge the port type. Another type of utilities will call the platform API +# directly to access SFP, for them shall use 'is_rj45_port_from_api'. def is_rj45_port_from_db(port_name, db): intf_type = db.get(db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(port_name), 'type') return intf_type == RJ45_PORT_TYPE From 26bdd9c322c064a9b94f50ec109e161972a8871e Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Wed, 22 Jun 2022 20:54:12 +0800 Subject: [PATCH 16/17] add more testcase for sfputil Signed-off-by: Kebo Liu --- tests/sfputil_test.py | 123 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 9a3b42e8e3..57fbd5a878 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -16,6 +16,7 @@ sys.modules['sonic_platform'] = mock.MagicMock() import sfputil.main as sfputil +EXIT_FAIL = -1 class TestSfputil(object): def test_format_dict_value_to_string(self): @@ -287,6 +288,42 @@ def test_error_status_from_db(self): output = sfputil.fetch_error_status_from_state_db('Ethernet16', db.db) assert output == expected_output_ethernet16 + @patch('sfputil.main.is_rj45_port_from_db', MagicMock(return_value=True)) + def test_error_status_from_db_RJ45(self): + db = Db() + expected_output = [['Ethernet0', 'N/A'], + ['Ethernet4', 'N/A'], + ['Ethernet8', 'N/A'], + ['Ethernet12', 'N/A'], + ['Ethernet16', 'N/A'], + ['Ethernet28', 'N/A'], + ['Ethernet36', 'N/A']] + output = sfputil.fetch_error_status_from_state_db(None, db.db) + assert output == expected_output + + expected_output_ethernet0 = expected_output[:1] + output = sfputil.fetch_error_status_from_state_db('Ethernet0', db.db) + assert output == expected_output_ethernet0 + + expected_output_ethernet16 = expected_output[4:5] + output = sfputil.fetch_error_status_from_state_db('Ethernet16', db.db) + assert output == expected_output_ethernet16 + + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=False)) + @patch('subprocess.check_output', MagicMock(return_value="['0:OK']")) + def test_fetch_error_status_from_platform_api(self): + output = sfputil.fetch_error_status_from_platform_api('Ethernet0') + assert output == [['Ethernet0', None]] + + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('subprocess.check_output', MagicMock(return_value="['0:OK']")) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + def test_fetch_error_status_from_platform_api_RJ45(self): + output = sfputil.fetch_error_status_from_platform_api('Ethernet0') + assert output == [['Ethernet0', 'N/A']] @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) @@ -372,6 +409,45 @@ def test_show_lpmode(self, mock_chassis): """ assert result.output == expected_output + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) + @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + def test_show_eeprom_RJ45(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['show'].commands['eeprom'], ["-p", "Ethernet16", "-d"]) + assert result.exit_code == 0 + expected_output = "Ethernet16: SFP EEPROM is not applicable for RJ45 port\n\n\n" + assert result.output == expected_output + + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sys.exit', MagicMock(return_value=EXIT_FAIL)) + def test_skip_if_port_is_rj45(self): + result = sfputil.skip_if_port_is_rj45('Ethernet0') + assert result == None + + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) + def test_lpmode_set(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['lpmode'].commands['on'], ["Ethernet0"]) + assert result.output == 'Enabling low-power mode is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL + + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) + def test_reset_RJ45(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['reset'], ["Ethernet0"]) + assert result.output == 'Reset is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL + @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) def test_unlock_firmware(self, mock_chassis): @@ -385,6 +461,19 @@ def test_unlock_firmware(self, mock_chassis): result = runner.invoke(sfputil.cli.commands['firmware'].commands['unlock'], ["Ethernet0"]) assert result.exit_code == 0 + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + def test_show_fwversion_Rj45(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['show'].commands['fwversion'], ["Ethernet0"]) + assert result.output == 'Show firmware version is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) @@ -409,3 +498,37 @@ def test_commit_firmwre(self, mock_chassis): mock_api.cdb_commit_firmware.return_value = 1 status = sfputil.commit_firmware("Ethernet0") assert status == 1 + + @patch('sfputil.main.is_sfp_present', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + def test_firmware_run_RJ45(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['firmware'].commands['run'], ["--mode", "0", "Ethernet0"]) + assert result.output == 'This functionality is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL + + @patch('sfputil.main.is_sfp_present', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + def test_firmware_commit_RJ45(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['firmware'].commands['commit'], ["Ethernet0"]) + assert result.output == 'This functionality is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL + + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sfputil.main.is_sfp_present', MagicMock(return_value=1)) + def test_firmware_upgrade_RJ45(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['firmware'].commands['upgrade'], ["Ethernet0", "a.b"]) + assert result.output == 'This functionality is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL + + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sfputil.main.is_sfp_present', MagicMock(return_value=1)) + def test_firmware_upgrade_RJ45(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['firmware'].commands['download'], ["Ethernet0", "a.b"]) + assert result.output == 'This functionality is not applicable for RJ45 port Ethernet0.\n' + assert result.exit_code == EXIT_FAIL From 942cb38ac72ccc66906a06a216739fd735ce9b94 Mon Sep 17 00:00:00 2001 From: Kebo Liu Date: Wed, 22 Jun 2022 21:23:53 +0800 Subject: [PATCH 17/17] fix typo in testcase name Signed-off-by: Kebo Liu --- tests/sfputil_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 57fbd5a878..a4d568d20e 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -527,7 +527,7 @@ def test_firmware_upgrade_RJ45(self): @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) @patch('sfputil.main.is_sfp_present', MagicMock(return_value=1)) - def test_firmware_upgrade_RJ45(self): + def test_firmware_download_RJ45(self): runner = CliRunner() result = runner.invoke(sfputil.cli.commands['firmware'].commands['download'], ["Ethernet0", "a.b"]) assert result.output == 'This functionality is not applicable for RJ45 port Ethernet0.\n'