diff --git a/sfputil/main.py b/sfputil/main.py index 931e86b148..9e769ee9a1 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -5,118 +5,385 @@ # Command-line utility for interacting with SFP transceivers within SONiC # -try: - import imp - import os - import sys +import os +import sys - import click - from sonic_py_common import device_info, logger, multi_asic - from tabulate import tabulate -except ImportError as e: - raise ImportError("%s - required module not found" % str(e)) +import click +import sonic_platform +import sonic_platform_base.sonic_sfp.sfputilhelper +from natsort import natsorted +from sonic_py_common import device_info, logger, multi_asic +from tabulate import tabulate -VERSION = '2.0' +VERSION = '3.0' SYSLOG_IDENTIFIER = "sfputil" -PLATFORM_SPECIFIC_MODULE_NAME = "sfputil" -PLATFORM_SPECIFIC_CLASS_NAME = "SfpUtil" - -# Global platform-specific sfputil class instance -platform_sfputil = None PLATFORM_JSON = 'platform.json' PORT_CONFIG_INI = 'port_config.ini' +ERROR_PERMISSIONS = 1 +ERROR_CHASSIS_LOAD = 2 +ERROR_SFPUTILHELPER_LOAD = 3 +ERROR_PORT_CONFIG_LOAD = 4 +ERROR_NOT_IMPLEMENTED = 5 +ERROR_INVALID_PORT = 6 + +# TODO: We should share these maps and the formatting functions between sfputil and sfpshow +QSFP_DATA_MAP = { + 'model': 'Vendor PN', + 'vendor_oui': 'Vendor OUI', + 'vendor_date': 'Vendor Date Code(YYYY-MM-DD Lot)', + 'manufacturer': 'Vendor Name', + 'hardware_rev': 'Vendor Rev', + 'serial': 'Vendor SN', + 'type': 'Identifier', + 'ext_identifier': 'Extended Identifier', + 'ext_rateselect_compliance': 'Extended RateSelect Compliance', + 'cable_length': 'cable_length', + 'cable_type': 'Length', + 'nominal_bit_rate': 'Nominal Bit Rate(100Mbs)', + 'specification_compliance': 'Specification compliance', + 'encoding': 'Encoding', + 'connector': 'Connector', + 'application_advertisement': 'Application Advertisement' +} + +SFP_DOM_CHANNEL_MONITOR_MAP = { + 'rx1power': 'RXPower', + 'tx1bias': 'TXBias', + 'tx1power': 'TXPower' +} + +SFP_DOM_CHANNEL_THRESHOLD_MAP = { + 'txpowerhighalarm': 'TxPowerHighAlarm', + 'txpowerlowalarm': 'TxPowerLowAlarm', + 'txpowerhighwarning': 'TxPowerHighWarning', + 'txpowerlowwarning': 'TxPowerLowWarning', + 'rxpowerhighalarm': 'RxPowerHighAlarm', + 'rxpowerlowalarm': 'RxPowerLowAlarm', + 'rxpowerhighwarning': 'RxPowerHighWarning', + 'rxpowerlowwarning': 'RxPowerLowWarning', + 'txbiashighalarm': 'TxBiasHighAlarm', + 'txbiaslowalarm': 'TxBiasLowAlarm', + 'txbiashighwarning': 'TxBiasHighWarning', + 'txbiaslowwarning': 'TxBiasLowWarning' +} + +QSFP_DOM_CHANNEL_THRESHOLD_MAP = { + 'rxpowerhighalarm': 'RxPowerHighAlarm', + 'rxpowerlowalarm': 'RxPowerLowAlarm', + 'rxpowerhighwarning': 'RxPowerHighWarning', + 'rxpowerlowwarning': 'RxPowerLowWarning', + 'txbiashighalarm': 'TxBiasHighAlarm', + 'txbiaslowalarm': 'TxBiasLowAlarm', + 'txbiashighwarning': 'TxBiasHighWarning', + 'txbiaslowwarning': 'TxBiasLowWarning' +} + +DOM_MODULE_THRESHOLD_MAP = { + 'temphighalarm': 'TempHighAlarm', + 'templowalarm': 'TempLowAlarm', + 'temphighwarning': 'TempHighWarning', + 'templowwarning': 'TempLowWarning', + 'vcchighalarm': 'VccHighAlarm', + 'vcclowalarm': 'VccLowAlarm', + 'vcchighwarning': 'VccHighWarning', + 'vcclowwarning': 'VccLowWarning' +} + +QSFP_DOM_CHANNEL_MONITOR_MAP = { + 'rx1power': 'RX1Power', + 'rx2power': 'RX2Power', + 'rx3power': 'RX3Power', + 'rx4power': 'RX4Power', + 'tx1bias': 'TX1Bias', + 'tx2bias': 'TX2Bias', + 'tx3bias': 'TX3Bias', + 'tx4bias': 'TX4Bias', + 'tx1power': 'TX1Power', + 'tx2power': 'TX2Power', + 'tx3power': 'TX3Power', + 'tx4power': 'TX4Power' +} + +QSFP_DD_DOM_CHANNEL_MONITOR_MAP = { + 'rx1power': 'RX1Power', + 'rx2power': 'RX2Power', + 'rx3power': 'RX3Power', + 'rx4power': 'RX4Power', + 'rx5power': 'RX5Power', + 'rx6power': 'RX6Power', + 'rx7power': 'RX7Power', + 'rx8power': 'RX8Power', + 'tx1bias': 'TX1Bias', + 'tx2bias': 'TX2Bias', + 'tx3bias': 'TX3Bias', + 'tx4bias': 'TX4Bias', + 'tx5bias': 'TX5Bias', + 'tx6bias': 'TX6Bias', + 'tx7bias': 'TX7Bias', + 'tx8bias': 'TX8Bias', + 'tx1power': 'TX1Power', + 'tx2power': 'TX2Power', + 'tx3power': 'TX3Power', + 'tx4power': 'TX4Power', + 'tx5power': 'TX5Power', + 'tx6power': 'TX6Power', + 'tx7power': 'TX7Power', + 'tx8power': 'TX8Power' +} + +DOM_MODULE_MONITOR_MAP = { + 'temperature': 'Temperature', + 'voltage': 'Vcc' +} + +DOM_CHANNEL_THRESHOLD_UNIT_MAP = { + 'txpowerhighalarm': 'dBm', + 'txpowerlowalarm': 'dBm', + 'txpowerhighwarning': 'dBm', + 'txpowerlowwarning': 'dBm', + 'rxpowerhighalarm': 'dBm', + 'rxpowerlowalarm': 'dBm', + 'rxpowerhighwarning': 'dBm', + 'rxpowerlowwarning': 'dBm', + 'txbiashighalarm': 'mA', + 'txbiaslowalarm': 'mA', + 'txbiashighwarning': 'mA', + 'txbiaslowwarning': 'mA' +} + +DOM_MODULE_THRESHOLD_UNIT_MAP = { + 'temphighalarm': 'C', + 'templowalarm': 'C', + 'temphighwarning': 'C', + 'templowwarning': 'C', + 'vcchighalarm': 'Volts', + 'vcclowalarm': 'Volts', + 'vcchighwarning': 'Volts', + 'vcclowwarning': 'Volts' +} + +DOM_VALUE_UNIT_MAP = { + 'rx1power': 'dBm', + 'rx2power': 'dBm', + 'rx3power': 'dBm', + 'rx4power': 'dBm', + 'tx1bias': 'mA', + 'tx2bias': 'mA', + 'tx3bias': 'mA', + 'tx4bias': 'mA', + 'tx1power': 'dBm', + 'tx2power': 'dBm', + 'tx3power': 'dBm', + 'tx4power': 'dBm', + 'temperature': 'C', + 'voltage': 'Volts' +} + +QSFP_DD_DOM_VALUE_UNIT_MAP = { + 'rx1power': 'dBm', + 'rx2power': 'dBm', + 'rx3power': 'dBm', + 'rx4power': 'dBm', + 'rx5power': 'dBm', + 'rx6power': 'dBm', + 'rx7power': 'dBm', + 'rx8power': 'dBm', + 'tx1bias': 'mA', + 'tx2bias': 'mA', + 'tx3bias': 'mA', + 'tx4bias': 'mA', + 'tx5bias': 'mA', + 'tx6bias': 'mA', + 'tx7bias': 'mA', + 'tx8bias': 'mA', + 'tx1power': 'dBm', + 'tx2power': 'dBm', + 'tx3power': 'dBm', + 'tx4power': 'dBm', + 'tx5power': 'dBm', + 'tx6power': 'dBm', + 'tx7power': 'dBm', + 'tx8power': 'dBm', + 'temperature': 'C', + 'voltage': 'Volts' +} + + +# Global platform-specific Chassis class instance +platform_chassis = None + +# Global platform-specific sfputil class instance +platform_sfputil = None # Global logger instance log = logger.Logger(SYSLOG_IDENTIFIER) -# ========================== Methods for printing ========================== - - -# Convert arraw of raw bytes into pretty-printed string -def raw_bytes_to_string_pretty(raw_bytes): - hexstr = "" - - for i in range(0, len(raw_bytes)): - if i > 0 and (i % 8) == 0: - hexstr += " " - - if i > 0 and (i % 16) == 0: - hexstr += "\n" - - hexstr += raw_bytes[i] - hexstr += " " - - return hexstr - - -# Recursively convert dictionary into pretty-printed string -def dict_to_string_pretty(in_dict, indent=0): - if len(in_dict) == 0: - return "" - - key = sorted(in_dict)[0] - val = in_dict[key] - - if isinstance(val, dict): - output = "%s%s:\n" % ('\t' * indent, key) + dict_to_string_pretty(val, indent + 1) - else: - output = "%s%s: %s\n" % ('\t' * indent, key, val) - - return output + dict_to_string_pretty({i:in_dict[i] for i in in_dict if i != key}, indent) - - -# Recursively convert dictionary into comma-separated string of 'key:value' -def dict_to_string_comma_separated(in_dict, key_blacklist, elemprefix, first=True): - if len(in_dict) == 0: - return "" - - output = "" - key = sorted(in_dict)[0] - val = in_dict[key] - - if key in key_blacklist: - return "" - - if not first: - output += "," - else: - first = False +# ========================== Methods for formatting output ========================== + +# Convert dict values to cli output string +def format_dict_value_to_string(sorted_key_table, + dom_info_dict, dom_value_map, + dom_unit_map, alignment=0): + output = '' + indent = ' ' * 8 + separator = ": " + for key in sorted_key_table: + if dom_info_dict is not None and key in dom_info_dict and dom_info_dict[key] != 'N/A': + value = dom_info_dict[key] + units = '' + if type(value) != str or (value != 'Unknown' and not value.endswith(dom_unit_map[key])): + units = dom_unit_map[key] + output += '{}{}{}{}{}\n'.format((indent * 2), + dom_value_map[key], + separator.rjust(len(separator) + alignment - len(dom_value_map[key])), + value, + units) + return output + + +def convert_sfp_info_to_output_string(sfp_info_dict): + indent = ' ' * 8 + output = '' + + sorted_qsfp_data_map_keys = sorted(QSFP_DATA_MAP, key=QSFP_DATA_MAP.get) + for key in sorted_qsfp_data_map_keys: + if key == 'cable_type': + output += '{}{}: {}\n'.format(indent, sfp_info_dict['cable_type'], sfp_info_dict['cable_length']) + elif key == 'cable_length': + pass + elif key == 'specification_compliance': + if sfp_info_dict['type'] == "QSFP-DD Double Density 8X Pluggable Transceiver": + output += '{}{}: {}\n'.format(indent, QSFP_DATA_MAP[key], sfp_info_dict[key]) + else: + output += '{}{}:\n'.format(indent, QSFP_DATA_MAP['specification_compliance']) + spefic_compliance_dict = eval(sfp_info_dict['specification_compliance']) + sorted_compliance_key_table = natsorted(spefic_compliance_dict) + for compliance_key in sorted_compliance_key_table: + output += '{}{}: {}\n'.format((indent * 2), compliance_key, spefic_compliance_dict[compliance_key]) + else: + output += '{}{}: {}\n'.format(indent, QSFP_DATA_MAP[key], sfp_info_dict[key]) + + return output + + +# Convert DOM sensor info in DB to CLI output string +def convert_dom_to_output_string(sfp_type, dom_info_dict): + indent = ' ' * 8 + output_dom = '' + channel_threshold_align = 18 + module_threshold_align = 15 + + if sfp_type.startswith('QSFP'): + # Channel Monitor + if sfp_type.startswith('QSFP-DD'): + output_dom += (indent + 'ChannelMonitorValues:\n') + sorted_key_table = natsorted(QSFP_DD_DOM_CHANNEL_MONITOR_MAP) + output_channel = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + QSFP_DD_DOM_CHANNEL_MONITOR_MAP, + QSFP_DD_DOM_VALUE_UNIT_MAP) + output_dom += output_channel + else: + output_dom += (indent + 'ChannelMonitorValues:\n') + sorted_key_table = natsorted(QSFP_DOM_CHANNEL_MONITOR_MAP) + output_channel = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + QSFP_DOM_CHANNEL_MONITOR_MAP, + DOM_VALUE_UNIT_MAP) + output_dom += output_channel + + # Channel Threshold + if sfp_type.startswith('QSFP-DD'): + dom_map = SFP_DOM_CHANNEL_THRESHOLD_MAP + else: + dom_map = QSFP_DOM_CHANNEL_THRESHOLD_MAP + + output_dom += (indent + 'ChannelThresholdValues:\n') + sorted_key_table = natsorted(dom_map) + output_channel_threshold = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + dom_map, + DOM_CHANNEL_THRESHOLD_UNIT_MAP, + channel_threshold_align) + output_dom += output_channel_threshold + + # Module Monitor + output_dom += (indent + 'ModuleMonitorValues:\n') + sorted_key_table = natsorted(DOM_MODULE_MONITOR_MAP) + output_module = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + DOM_MODULE_MONITOR_MAP, + DOM_VALUE_UNIT_MAP) + output_dom += output_module + + # Module Threshold + output_dom += (indent + 'ModuleThresholdValues:\n') + sorted_key_table = natsorted(DOM_MODULE_THRESHOLD_MAP) + output_module_threshold = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + DOM_MODULE_THRESHOLD_MAP, + DOM_MODULE_THRESHOLD_UNIT_MAP, + module_threshold_align) + output_dom += output_module_threshold - if isinstance(val, dict): - output += dict_to_string_comma_separated(val, key_blacklist, key + '.', True) else: - elemname = elemprefix + key - output += elemname + ':' + str(val) - - return output + dict_to_string_comma_separated( - {i:in_dict[i] for i in in_dict if i != key}, - key_blacklist, elemprefix, first) + output_dom += (indent + 'MonitorData:\n') + sorted_key_table = natsorted(SFP_DOM_CHANNEL_MONITOR_MAP) + output_channel = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + SFP_DOM_CHANNEL_MONITOR_MAP, + DOM_VALUE_UNIT_MAP) + output_dom += output_channel + + sorted_key_table = natsorted(DOM_MODULE_MONITOR_MAP) + output_module = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + DOM_MODULE_MONITOR_MAP, + DOM_VALUE_UNIT_MAP) + output_dom += output_module + + output_dom += (indent + 'ThresholdData:\n') + + # Module Threshold + sorted_key_table = natsorted(DOM_MODULE_THRESHOLD_MAP) + output_module_threshold = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + DOM_MODULE_THRESHOLD_MAP, + DOM_MODULE_THRESHOLD_UNIT_MAP, + module_threshold_align) + output_dom += output_module_threshold + + # Channel Threshold + sorted_key_table = natsorted(SFP_DOM_CHANNEL_THRESHOLD_MAP) + output_channel_threshold = format_dict_value_to_string( + sorted_key_table, dom_info_dict, + SFP_DOM_CHANNEL_THRESHOLD_MAP, + DOM_CHANNEL_THRESHOLD_UNIT_MAP, + channel_threshold_align) + output_dom += output_channel_threshold + + return output_dom # =============== Getting and printing SFP data =============== -def get_sfp_eeprom_status_string(port, port_sfp_eeprom_status): - if port_sfp_eeprom_status: - return "%s: SFP EEPROM detected" % port - else: - return "%s: SFP EEPROM not detected" % port - - -# Returns, -# port_num if physical -# logical_port:port_num if logical port and is a ganged port -# logical_port if logical and not ganged # def get_physical_port_name(logical_port, physical_port, ganged): + """ + Returns: + port_num if physical + logical_port:port_num if logical port and is a ganged port + logical_port if logical and not ganged + """ if logical_port == physical_port: - return logical_port + return str(logical_port) elif ganged: - return logical_port + ":%d (ganged)" % physical_port + return "{}:{} (ganged)".format(logical_port, physical_port) else: return logical_port @@ -126,165 +393,69 @@ def logical_port_name_to_physical_port_list(port_name): if platform_sfputil.is_logical_port(port_name): return platform_sfputil.get_logical_to_physical(port_name) else: - click.echo("Error: Invalid port '%s'" % port_name) + click.echo("Error: Invalid port '{}'".format(port_name)) return None else: return [int(port_name)] def print_all_valid_port_values(): - click.echo("Valid values for port: %s\n" % str(platform_sfputil.logical)) - - -# Returns multi-line string of pretty SFP port EEPROM data -def port_eeprom_data_string_pretty(logical_port_name, dump_dom): - result = "" - ganged = False - i = 1 - - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) - if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port_name) - return "" - - if len(physical_port_list) > 1: - ganged = True - - for physical_port in physical_port_list: - port_name = get_physical_port_name(logical_port_name, i, ganged) - if not platform_sfputil.get_presence(physical_port): - eeprom_dict = None - else: - eeprom_dict = platform_sfputil.get_eeprom_dict(physical_port) - - if eeprom_dict is not None: - eeprom_iface_dict = eeprom_dict.get('interface') - iface_data_dict = eeprom_iface_dict.get('data') - result += get_sfp_eeprom_status_string(port_name, True) - result += "\n" - result += dict_to_string_pretty(iface_data_dict, 1) - - if dump_dom: - eeprom_dom_dict = eeprom_dict.get('dom') - if eeprom_dom_dict is not None: - dom_data_dict = eeprom_dom_dict.get('data') - if dom_data_dict is not None: - result += dict_to_string_pretty(dom_data_dict, 1) - else: - result += get_sfp_eeprom_status_string(port_name, False) - result += "\n" - - result += "\n" - i += 1 - - return result - - -# Returns single-line string of pretty SFP port EEPROM data -# Nested dictionary items are prefixed using dot-notation -def port_eeprom_data_string_pretty_oneline(logical_port_name, - ifdata_blacklist, - domdata_blacklist, - dump_dom): - result = "" - ganged = False - i = 1 - - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) - if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port_name) - return "" - - if len(physical_port_list) > 1: - ganged = True - - for physical_port in physical_port_list: - if not platform_sfputil.get_presence(physical_port): - eeprom_dict = None - else: - eeprom_dict = platform_sfputil.get_eeprom_dict(physical_port) - - # Only print detected sfp ports for oneline - if eeprom_dict is not None: - eeprom_iface_dict = eeprom_dict.get('interface') - iface_data_dict = eeprom_iface_dict.get('data') - result += "port:%s," % get_physical_port_name(logical_port_name, i, ganged) - result += dict_to_string_comma_separated(iface_data_dict, ifdata_blacklist, "") - - if dump_dom: - eeprom_dom_dict = eeprom_dict.get('dom') - if eeprom_dom_dict is not None: - dom_data_dict = eeprom_dom_dict.get('data') - if dom_data_dict is not None: - result += dict_to_string_comma_separated( - dom_data_dict, domdata_blacklist, "") - - result += "\n" - i += 1 - - return result - + click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) -def port_eeprom_data_raw_string_pretty(logical_port_name): - result = "" - ganged = False - i = 1 - physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) - if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port_name) - return "" +# ==================== Methods for initialization ==================== - if len(physical_port_list) > 1: - ganged = True - for physical_port in physical_port_list: - port_name = get_physical_port_name(logical_port_name, i, ganged) - if not platform_sfputil.get_presence(physical_port): - eeprom_raw = None - else: - eeprom_raw = platform_sfputil.get_eeprom_raw(physical_port) +# Instantiate platform-specific Chassis class +def load_platform_chassis(): + global platform_chassis - if eeprom_raw is None: - result += get_sfp_eeprom_status_string(port_name, False) - result += "\n" - else: - result += get_sfp_eeprom_status_string(port_name, True) - result += "\n" - result += raw_bytes_to_string_pretty(eeprom_raw) + # Load new platform api class + try: + platform_chassis = sonic_platform.platform.Platform().get_chassis() + except Exception as e: + log.log_error("Failed to instantiate Chassis due to {}".format(repr(e))) - result += "\n" - i += 1 + if not platform_chassis: + return False - return result + return True -# ==================== Methods for initialization ==================== +# Instantiate SfpUtilHelper class +def load_sfputilhelper(): + global platform_sfputil + # we have to make use of sfputil for some features + # even though when new platform api is used for all vendors. + # in this sense, we treat it as a part of new platform api. + # we have already moved sfputil to sonic_platform_base + # which is the root of new platform api. + platform_sfputil = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper() -# Loads platform specific sfputil module from source -def load_platform_sfputil(): - global platform_sfputil + if not platform_sfputil: + return False - # Load platform module from source - platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() + return True - try: - module_file = os.path.join(platform_path, "plugins", PLATFORM_SPECIFIC_MODULE_NAME + ".py") - module = imp.load_source(PLATFORM_SPECIFIC_MODULE_NAME, module_file) - except IOError as e: - log.log_error("Failed to load platform module '%s': %s" % (PLATFORM_SPECIFIC_MODULE_NAME, str(e)), True) - return -1 +def load_port_config(): try: - platform_sfputil_class = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME) - platform_sfputil = platform_sfputil_class() - except AttributeError as e: - log.log_error("Failed to instantiate '%s' class: %s" % (PLATFORM_SPECIFIC_CLASS_NAME, str(e)), True) - return -2 + if multi_asic.is_multi_asic(): + # For multi ASIC platforms we pass DIR of port_config_file_path and the number of asics + (platform_path, hwsku_path) = device_info.get_paths_to_platform_and_hwsku_dirs() - return 0 + # Load platform module from source + platform_sfputil.read_all_porttab_mappings(hwsku_path, multi_asic.get_num_asics()) + else: + # For single ASIC platforms we pass port_config_file_path and the asic_inst as 0 + port_config_file_path = device_info.get_path_to_port_config_file() + platform_sfputil.read_porttab_mappings(port_config_file_path, 0) + except Exception as e: + log.log_error("Error reading port info ({})".format(str(e)), True) + return False + return True # ==================== CLI commands and groups ==================== @@ -296,28 +467,19 @@ def cli(): if os.geteuid() != 0: click.echo("Root privileges are required for this operation") - sys.exit(1) + sys.exit(ERROR_PERMISSIONS) - # Load platform-specific sfputil class - err = load_platform_sfputil() - if err != 0: - sys.exit(2) + # Load platform-specific Chassis class + if not load_platform_chassis(): + sys.exit(ERROR_CHASSIS_LOAD) - # Load port info - try: - if multi_asic.is_multi_asic(): - # For multi ASIC platforms we pass DIR of port_config_file_path and the number of asics - (platform_path, hwsku_path) = device_info.get_paths_to_platform_and_hwsku_dirs() + # Load SfpUtilHelper class + if not load_sfputilhelper(): + sys.exit(ERROR_SFPUTILHELPER_LOAD) - # Load platform module from source - platform_sfputil.read_all_porttab_mappings(hwsku_path, multi_asic.get_num_asics()) - else: - # For single ASIC platforms we pass port_config_file_path and the asic_inst as 0 - port_config_file_path = device_info.get_path_to_port_config_file() - platform_sfputil.read_porttab_mappings(port_config_file_path, 0) - except Exception as e: - log.log_error("Error reading port info (%s)" % str(e), True) - sys.exit(3) + # Load port info + if not load_port_config(): + sys.exit(ERROR_PORT_CONFIG_LOAD) # 'show' subgroup @@ -331,9 +493,8 @@ def show(): @show.command() @click.option('-p', '--port', metavar='', help="Display SFP EEPROM data for port only") @click.option('-d', '--dom', 'dump_dom', is_flag=True, help="Also display Digital Optical Monitoring (DOM) data") -@click.option('-o', '--oneline', is_flag=True, help="Condense output for each port to a single line") -@click.option('--raw', is_flag=True, help="Output raw, unformatted data") -def eeprom(port, dump_dom, oneline, raw): +@click.option('-n', '--namespace', default=None, help="Display interfaces for specific namespace") +def eeprom(port, dump_dom, namespace): """Display EEPROM data of SFP transceiver(s)""" logical_port_list = [] output = "" @@ -342,31 +503,65 @@ def eeprom(port, dump_dom, oneline, raw): if port is None: logical_port_list = platform_sfputil.logical else: - if platform_sfputil.is_valid_sfputil_port(port) == 0: - click.echo("Error: invalid port '%s'\n" % port) + if platform_sfputil.is_logical_port(port) == 0: + click.echo("Error: invalid port '{}'\n".format(port)) print_all_valid_port_values() - sys.exit(4) + sys.exit(ERROR_INVALID_PORT) logical_port_list = [port] - if raw: - for logical_port_name in logical_port_list: - output += port_eeprom_data_raw_string_pretty(logical_port_name) - output += "\n" - elif oneline: - ifdata_out_blacklist = ["EncodingCodes", - "ExtIdentOfTypeOfTransceiver", - "NominalSignallingRate(UnitsOf100Mbd)"] - domdata_out_blacklist = ["AwThresholds", "StatusControl"] - - for logical_port_name in logical_port_list: - output += port_eeprom_data_string_pretty_oneline(logical_port_name, - ifdata_out_blacklist, - domdata_out_blacklist, - dump_dom) - else: - for logical_port_name in logical_port_list: - output += port_eeprom_data_string_pretty(logical_port_name, dump_dom) + for logical_port_name in logical_port_list: + ganged = False + i = 1 + + physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) + if physical_port_list is None: + click.echo("Error: No physical ports found for logical port '{}'".format(logical_port_name)) + return + + if len(physical_port_list) > 1: + ganged = True + + for physical_port in physical_port_list: + port_name = get_physical_port_name(logical_port_name, i, ganged) + + try: + presence = platform_chassis.get_sfp(physical_port).get_presence() + except NotImplementedError: + click.echo("Sfp.get_presence() is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + if not presence: + output += "{}: SFP EEPROM not detected\n".format(port_name) + else: + output += "{}: SFP EEPROM detected\n".format(port_name) + + try: + xcvr_info = platform_chassis.get_sfp(physical_port).get_transceiver_info() + except NotImplementedError: + click.echo("Sfp.get_transceiver_info() is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + output += convert_sfp_info_to_output_string(xcvr_info) + + if dump_dom: + try: + xcvr_dom_info = platform_chassis.get_sfp(physical_port).get_transceiver_bulk_status() + except NotImplementedError: + click.echo("Sfp.get_transceiver_bulk_status() is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + xcvr_dom_threshold_info = platform_chassis.get_sfp(physical_port).get_transceiver_threshold_info() + if xcvr_dom_threshold_info: + xcvr_dom_info.update(xcvr_dom_threshold_info) + except NotImplementedError: + click.echo("Sfp.get_transceiver_threshold_info() is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + output += convert_dom_to_output_string(xcvr_info['type'], xcvr_dom_info) + + output += '\n' click.echo(output) @@ -384,10 +579,10 @@ def presence(port): if port is None: logical_port_list = platform_sfputil.logical else: - if platform_sfputil.is_valid_sfputil_port(port) == 0: - click.echo("Error: invalid port '%s'\n" % port) + if platform_sfputil.is_logical_port(port) == 0: + click.echo("Error: invalid port '{}'\n".format(port)) print_all_valid_port_values() - sys.exit(4) + sys.exit(ERROR_INVALID_PORT) logical_port_list = [port] @@ -397,7 +592,7 @@ def presence(port): physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port_name) + click.echo("Error: No physical ports found for logical port '{}'".format(logical_port_name)) return if len(physical_port_list) > 1: @@ -407,16 +602,13 @@ def presence(port): port_name = get_physical_port_name(logical_port_name, i, ganged) try: - presence = platform_sfputil.get_presence(physical_port) + presence = platform_chassis.get_sfp(physical_port).get_presence() except NotImplementedError: click.echo("This functionality is currently not implemented for this platform") - sys.exit(5) + sys.exit(ERROR_NOT_IMPLEMENTED) - if platform_sfputil._is_valid_port(physical_port): - status_string = "Present" if presence else "Not present" - output_table.append([port_name, status_string]) - else: - output_table.append([port_name, "N/A"]) + status_string = "Present" if presence else "Not present" + output_table.append([port_name, status_string]) i += 1 @@ -436,10 +628,10 @@ def lpmode(port): if port is None: logical_port_list = platform_sfputil.logical else: - if platform_sfputil.is_valid_sfputil_port(port) == 0: - click.echo("Error: invalid port '%s'\n" % port) + if platform_sfputil.is_logical_port(port) == 0: + click.echo("Error: invalid port '{}'\n".format(port)) print_all_valid_port_values() - sys.exit(4) + sys.exit(ERROR_INVALID_PORT) logical_port_list = [port] @@ -449,7 +641,7 @@ def lpmode(port): physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port_name) + click.echo("Error: No physical ports found for logical port '{}'".format(logical_port_name)) return if len(physical_port_list) > 1: @@ -459,10 +651,10 @@ def lpmode(port): port_name = get_physical_port_name(logical_port_name, i, ganged) try: - lpmode = platform_sfputil.get_low_power_mode(physical_port) + lpmode = platform_chassis.get_sfp(physical_port).get_lpmode() except NotImplementedError: click.echo("This functionality is currently not implemented for this platform") - sys.exit(5) + sys.exit(ERROR_NOT_IMPLEMENTED) if lpmode: output_table.append([port_name, "On"]) @@ -486,29 +678,29 @@ def set_lpmode(logical_port, enable): ganged = False i = 1 - if platform_sfputil.is_valid_sfputil_port(logical_port) == 0: - click.echo("Error: invalid port '%s'\n" % logical_port) + if platform_sfputil.is_logical_port(logical_port) == 0: + click.echo("Error: invalid port '{}'\n".format(logical_port)) print_all_valid_port_values() - sys.exit(4) + sys.exit(ERROR_INVALID_PORT) physical_port_list = logical_port_name_to_physical_port_list(logical_port) if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % logical_port) + click.echo("Error: No physical ports found for logical port '{}'".format(logical_port)) return if len(physical_port_list) > 1: ganged = True for physical_port in physical_port_list: - click.echo("{} low-power mode for port {}... ".format( + click.echo("{} low-power mode for port {} ... ".format( "Enabling" if enable else "Disabling", get_physical_port_name(logical_port, i, ganged)), nl=False) try: - result = platform_sfputil.set_low_power_mode(physical_port, enable) + result = platform_chassis.get_sfp(physical_port).set_lpmode(enable) except NotImplementedError: click.echo("This functionality is currently not implemented for this platform") - sys.exit(5) + sys.exit(ERROR_NOT_IMPLEMENTED) if result: click.echo("OK") @@ -542,27 +734,27 @@ def reset(port_name): ganged = False i = 1 - if platform_sfputil.is_valid_sfputil_port(port_name) == 0: - click.echo("Error: invalid port '%s'\n" % port_name) + if platform_sfputil.is_logical_port(port_name) == 0: + click.echo("Error: invalid port '{}'\n".format(port_name)) print_all_valid_port_values() - sys.exit(4) + sys.exit(ERROR_INVALID_PORT) physical_port_list = logical_port_name_to_physical_port_list(port_name) if physical_port_list is None: - click.echo("Error: No physical ports found for logical port '%s'" % port_name) + click.echo("Error: No physical ports found for logical port '{}'".format(port_name)) return if len(physical_port_list) > 1: ganged = True for physical_port in physical_port_list: - click.echo("Resetting port %s... " % get_physical_port_name(port_name, i, ganged), nl=False) + click.echo("Resetting port {} ... ".format(get_physical_port_name(port_name, i, ganged)), nl=False) try: - result = platform_sfputil.reset(physical_port) + result = platform_chassis.get_sfp(physical_port).reset() except NotImplementedError: click.echo("This functionality is currently not implemented for this platform") - sys.exit(5) + sys.exit(ERROR_NOT_IMPLEMENTED) if result: click.echo("OK") diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py new file mode 100644 index 0000000000..5708bb2bb6 --- /dev/null +++ b/tests/sfputil_test.py @@ -0,0 +1,175 @@ +import sys +import os +from unittest import mock + +import pytest +from click.testing import CliRunner + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +sys.path.insert(0, modules_path) + +sys.modules['sonic_platform'] = mock.MagicMock() +import sfputil.main as sfputil + + +class TestSfputil(object): + def test_format_dict_value_to_string(self): + sorted_key_table = [ + 'rx1power', + 'rx2power', + 'rx3power', + 'rx4power', + 'tx1bias', + 'tx1power', + 'tx2bias', + 'tx2power', + 'tx3bias', + 'tx3power', + 'tx4bias', + 'tx4power' + ] + + dom_info_dict = { + 'temperature': '41.7539C', + 'voltage': '3.2577Volts', + 'rx1power': '-1.6622dBm', + 'rx2power': '-1.7901dBm', + 'rx3power': '-1.6973dBm', + 'rx4power': '-2.0915dBm', + 'tx1bias': '35.8400mA', + 'tx2bias': '37.5780mA', + 'tx3bias': '35.8400mA', + 'tx4bias': '35.8400mA', + 'tx1power': 'N/A', + 'tx2power': 'N/A', + 'tx3power': 'N/A', + 'tx4power': 'N/A' + } + + expected_output = '''\ + RX1Power: -1.6622dBm + RX2Power: -1.7901dBm + RX3Power: -1.6973dBm + RX4Power: -2.0915dBm + TX1Bias: 35.8400mA + TX2Bias: 37.5780mA + TX3Bias: 35.8400mA + TX4Bias: 35.8400mA +''' + + output = sfputil.format_dict_value_to_string(sorted_key_table, + dom_info_dict, + sfputil.QSFP_DOM_CHANNEL_MONITOR_MAP, + sfputil.DOM_VALUE_UNIT_MAP) + assert output == expected_output + + # Change temperature and voltage to floats and ensure units get appended + dom_info_dict['temperature'] = 41.7539 + dom_info_dict['voltage'] = 3.2577 + + output = sfputil.format_dict_value_to_string(sorted_key_table, + dom_info_dict, + sfputil.QSFP_DOM_CHANNEL_MONITOR_MAP, + sfputil.DOM_VALUE_UNIT_MAP) + assert output == expected_output + + def test_convert_sfp_info_to_output_string(self): + sfp_info_dict = { + 'type': 'QSFP28 or later', + 'type_abbrv_name': 'QSFP28', + 'manufacturer': 'Mellanox', + 'model': 'MCP1600-C003', + 'hardware_rev': 'A2', + 'serial': 'MT1636VS10561', + 'vendor_oui': '00-02-c9', + 'vendor_date': '2016-07-18', + 'connector': 'No separable connector', + 'encoding': '64B66B', + 'ext_identifier': 'Power Class 1(1.5W max)', + 'ext_rateselect_compliance': 'QSFP+ Rate Select Version 1', + 'cable_type': 'Length Cable Assembly(m)', + 'cable_length': '3', + 'application_advertisement': 'N/A', + 'specification_compliance': "{'10/40G Ethernet Compliance Code': '40GBASE-CR4'}", + 'dom_capability': "{'Tx_power_support': 'no', 'Rx_power_support': 'no', 'Voltage_support': 'no', 'Temp_support': 'no'}", + 'nominal_bit_rate': '255' + } + + expected_output = '''\ + Application Advertisement: N/A + Connector: No separable connector + Encoding: 64B66B + Extended Identifier: Power Class 1(1.5W max) + Extended RateSelect Compliance: QSFP+ Rate Select Version 1 + Identifier: QSFP28 or later + Length Cable Assembly(m): 3 + Nominal Bit Rate(100Mbs): 255 + Specification compliance: + 10/40G Ethernet Compliance Code: 40GBASE-CR4 + Vendor Date Code(YYYY-MM-DD Lot): 2016-07-18 + Vendor Name: Mellanox + Vendor OUI: 00-02-c9 + Vendor PN: MCP1600-C003 + Vendor Rev: A2 + Vendor SN: MT1636VS10561 +''' + output = sfputil.convert_sfp_info_to_output_string(sfp_info_dict) + assert output == expected_output + + def test_convert_dom_to_output_string(self): + sfp_type = 'QSFP28 or later' + + dom_info_dict = { + 'temperature': '41.7539C', + 'voltage': '3.2577Volts', + 'rx1power': '-1.6622dBm', + 'rx2power': '-1.7901dBm', + 'rx3power': '-1.6973dBm', + 'rx4power': '-2.0915dBm', + 'tx1bias': '35.8400mA', + 'tx2bias': '37.5780mA', + 'tx3bias': '35.8400mA', + 'tx4bias': '35.8400mA', + 'tx1power': 'N/A', + 'tx2power': 'N/A', + 'tx3power': 'N/A', + 'tx4power': 'N/A' + } + + expected_output = '''\ + ChannelMonitorValues: + RX1Power: -1.6622dBm + RX2Power: -1.7901dBm + RX3Power: -1.6973dBm + RX4Power: -2.0915dBm + TX1Bias: 35.8400mA + TX2Bias: 37.5780mA + TX3Bias: 35.8400mA + TX4Bias: 35.8400mA + ChannelThresholdValues: + ModuleMonitorValues: + Temperature: 41.7539C + Vcc: 3.2577Volts + ModuleThresholdValues: +''' + + output = sfputil.convert_dom_to_output_string(sfp_type, dom_info_dict) + assert output == expected_output + + # TODO: Add tests for other SFP types + + def test_get_physical_port_name(self): + output = sfputil.get_physical_port_name(0, 0, False) + assert output == '0' + + output = sfputil.get_physical_port_name('Ethernet0', 0, False) + assert output == 'Ethernet0' + + output = sfputil.get_physical_port_name('Ethernet0', 0, True) + assert output == 'Ethernet0:0 (ganged)' + + def test_version(self): + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['version'], []) + assert result.output.rstrip() == 'sfputil version {}'.format(sfputil.VERSION)