-
Notifications
You must be signed in to change notification settings - Fork 819
[sfputil] Firmware download/upgrade CLI support for QSFP-DD #1947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a65f974
2fe2671
f71a7f0
83d8b94
f53f590
e666291
716d3c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| import sys | ||
| import natsort | ||
| import ast | ||
| import time | ||
|
|
||
| import subprocess | ||
| import click | ||
|
|
@@ -26,12 +27,16 @@ | |
| PLATFORM_JSON = 'platform.json' | ||
| PORT_CONFIG_INI = 'port_config.ini' | ||
|
|
||
| EXIT_FAIL = -1 | ||
| ERROR_PERMISSIONS = 1 | ||
| ERROR_CHASSIS_LOAD = 2 | ||
| ERROR_SFPUTILHELPER_LOAD = 3 | ||
| ERROR_PORT_CONFIG_LOAD = 4 | ||
| ERROR_NOT_IMPLEMENTED = 5 | ||
| ERROR_INVALID_PORT = 6 | ||
| SMBUS_BLOCK_WRITE_SIZE = 32 | ||
| # Default host password as per CMIS spec | ||
| CDB_DEFAULT_HOST_PASSWORD = 0x00001011 | ||
|
|
||
| # TODO: We should share these maps and the formatting functions between sfputil and sfpshow | ||
| QSFP_DATA_MAP = { | ||
|
|
@@ -225,6 +230,17 @@ | |
| # Global logger instance | ||
| log = logger.Logger(SYSLOG_IDENTIFIER) | ||
|
|
||
| def is_sfp_present(port_name): | ||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
|
|
||
| try: | ||
| presence = sfp.get_presence() | ||
| except NotImplementedError: | ||
| click.echo("sfp get_presence() NOT implemented!") | ||
|
||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| return True if presence else False | ||
|
||
|
|
||
| # ========================== Methods for formatting output ========================== | ||
|
|
||
|
|
@@ -408,6 +424,19 @@ def logical_port_name_to_physical_port_list(port_name): | |
| else: | ||
| return [int(port_name)] | ||
|
|
||
| def logical_port_to_physical_port_index(port_name): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this function available in some other library? can we reuse?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i couldn't find one in sonic-utilities |
||
| if platform_sfputil.is_logical_port(port_name) == 0: | ||
|
||
| click.echo("Error: invalid port '{}'\n".format(port_name)) | ||
| print_all_valid_port_values() | ||
| sys.exit(ERROR_INVALID_PORT) | ||
|
|
||
| physical_port = logical_port_name_to_physical_port_list(port_name)[0] | ||
| if physical_port is None: | ||
| click.echo("Error: No physical port found for logical port '{}'".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| return physical_port | ||
|
|
||
|
|
||
| def print_all_valid_port_values(): | ||
| click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) | ||
|
|
@@ -805,6 +834,36 @@ def lpmode(port): | |
|
|
||
| click.echo(tabulate(output_table, table_header, tablefmt='simple')) | ||
|
|
||
| def show_firmware_version(physical_port): | ||
| try: | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
| api = sfp.get_xcvr_api() | ||
| out = api.get_module_fw_info() | ||
| click.echo(out['info']) | ||
| except NotImplementedError: | ||
| click.echo("This functionality is currently not implemented for this platform") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| # 'fwversion' subcommand | ||
| @show.command() | ||
| @click.argument('port_name', metavar='<port_name>', required=True) | ||
| def fwversion(port_name): | ||
| """Show firmware version of the transceiver""" | ||
|
|
||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
|
|
||
| try: | ||
| presence = sfp.get_presence() | ||
| except NotImplementedError: | ||
| click.echo("sfp get_presence() NOT implemented!") | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| if not presence: | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| show_firmware_version(physical_port) | ||
|
|
||
| # 'lpmode' subgroup | ||
| @cli.group() | ||
|
|
@@ -903,6 +962,258 @@ def reset(port_name): | |
|
|
||
| i += 1 | ||
|
|
||
| # 'firmware' subgroup | ||
| @cli.group() | ||
| def firmware(): | ||
| """Download/Upgrade firmware on the transceiver""" | ||
| pass | ||
|
|
||
| def run_firmware(port_name, mode): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| status = 0 | ||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
|
|
||
| try: | ||
| api = sfp.get_xcvr_api() | ||
| except NotImplementedError: | ||
| click.echo("This functionality is currently not implemented for this platform") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| if mode == 0: | ||
| click.echo("Running firmare: Non-hitless Reset to Inactive Image") | ||
prgeor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| elif mode == 1: | ||
| click.echo("Running firmware: Hitless Reset to Inactive Image") | ||
| elif mode == 2: | ||
| click.echo("Running firmware: Attempt non-hitless Reset to Running Image") | ||
| elif mode == 3: | ||
| click.echo("Running firmware: Attempt Hitless Reset to Running Image") | ||
| else: | ||
| click.echo("Running firmwaer: Unknown mode {}".format(mode)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| try: | ||
| status = api.cdb_run_firmware(mode) | ||
| except NotImplementedError: | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| click.echo("This functionality is not applicable for this transceiver") | ||
|
|
||
| return status | ||
|
|
||
| def commit_firmware(port_name): | ||
| status = 0 | ||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
|
|
||
| try: | ||
| api = sfp.get_xcvr_api() | ||
| except NotImplementedError: | ||
| click.echo("This functionality is currently not implemented for this platform") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| try: | ||
| status = api.cdb_commit_firmware() | ||
| except NotImplementedError: | ||
| click.echo("This functionality is not applicable for this transceiver") | ||
|
|
||
| return status | ||
|
|
||
| def download_firmware(port_name, filepath): | ||
| """Download firmware on the transceiver""" | ||
| try: | ||
| fd = open(filepath, 'rb') | ||
| fd.seek(0, 2) | ||
| file_size = fd.tell() | ||
| fd.seek(0, 0) | ||
| except FileNotFoundError: | ||
| click.echo("Firmware file {} NOT found".format(filepath)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
| try: | ||
| api = sfp.get_xcvr_api() | ||
| except NotImplementedError: | ||
| click.echo("This functionality is NOT applicable to this platform") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| try: | ||
| fwinfo = api.get_module_fw_mgmt_feature() | ||
| if fwinfo['status'] == True: | ||
| startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature'] | ||
| else: | ||
| click.echo("Failed to fetch CDB Firmware management features") | ||
| sys.exit(EXIT_FAIL) | ||
| except NotImplementedError: | ||
| click.echo("This functionality is NOT applicable for this transceiver") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| click.echo('CDB: Starting firmware download') | ||
| startdata = fd.read(startLPLsize) | ||
| status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size) | ||
| if status != 1: | ||
| click.echo('CDB: Start firmware download failed - status {}'.format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| # Increase the optoe driver's write max to speed up firmware download | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE) | ||
|
|
||
| with click.progressbar(length=file_size, label="Downloading ...") as bar: | ||
| address = 0 | ||
| BLOCK_SIZE = 116 if lplonly_flag else maxblocksize | ||
|
||
| remaining = file_size - startLPLsize | ||
| while remaining > 0: | ||
| count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining | ||
| data = fd.read(count) | ||
| if lplonly_flag: | ||
| status = api.cdb_lpl_block_write(address, data) | ||
| else: | ||
| status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength) | ||
| if (status != 1): | ||
| click.echo("CDB: firmware download failed! - status {}".format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| bar.update(count) | ||
| time.sleep(0.1) | ||
|
||
| address += count | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we make sure count matches len of the data read
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If that does not match, you trigger assert. However this is a runtime error.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now, exiting |
||
| remaining = remaining - count | ||
|
||
|
|
||
| # Restore the optoe driver's write max to '1' (default value) | ||
| sfp.set_optoe_write_max(1) | ||
|
|
||
| time.sleep(2) | ||
prgeor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| status = api.cdb_firmware_download_complete() | ||
| click.echo('CDB: firmware download complete') | ||
| return status | ||
|
|
||
| # 'run' subcommand | ||
| @firmware.command() | ||
| @click.argument('port_name', required=True, default=None) | ||
| @click.option('--mode', type=click.Choice(["0", "1", "2", "3"]), help="0 = Non-hitless Reset to Inactive Image\n \ | ||
|
||
| 1 = Hitless Reset to Inactive Image\n \ | ||
| 2 = Attempt non-hitless Reset to Running Image\n \ | ||
| 3 = Attempt Hitless Reset to Running Image\n") | ||
| def run(port_name, mode): | ||
| """Run the firmware with default mode=1""" | ||
|
|
||
| if not is_sfp_present(port_name): | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| if mode is None: | ||
| mode = 1 | ||
|
|
||
| status = run_firmware(port_name, int(mode)) | ||
| if status != 1: | ||
| click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| click.echo("Firmware run in mode={} success".format(mode)) | ||
|
|
||
| # 'commit' subcommand | ||
| @firmware.command() | ||
| @click.argument('port_name', required=True, default=None) | ||
| def commit(port_name): | ||
| """Commit the running firmware""" | ||
|
|
||
| if not is_sfp_present(port_name): | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| status = commit_firmware(port_name) | ||
| if status != 1: | ||
| click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| click.echo("Firmware commit successful") | ||
|
|
||
| # 'upgrade' subcommand | ||
| @firmware.command() | ||
| @click.argument('port_name', required=True, default=None) | ||
| @click.argument('filepath', required=True, default=None) | ||
| def upgrade(port_name, filepath): | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Upgrade firmware on the transceiver""" | ||
|
|
||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
|
|
||
| if not is_sfp_present(port_name): | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| show_firmware_version(physical_port) | ||
|
|
||
| status = download_firmware(port_name, filepath) | ||
| if status == 1: | ||
| click.echo("Firmware download complete success") | ||
| else: | ||
| click.echo("Firmware download complete failed! CDB status = {}".format(status)) | ||
prgeor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| status = run_firmware(port_name, 1) | ||
| if status != 1: | ||
| click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| click.echo("Firmware run in mode 1 successful") | ||
|
|
||
| status = commit_firmware(port_name) | ||
| if status != 1: | ||
| click.echo('Failed to commit firmware! CDB status: {}'.format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| click.echo("Firmware commit successful") | ||
|
|
||
| # 'download' subcommand | ||
| @firmware.command() | ||
| @click.argument('port_name', required=True, default=None) | ||
| @click.argument('filepath', required=True, default=None) | ||
| def download(port_name, filepath): | ||
| """Download firmware on the transceiver""" | ||
|
|
||
| if not is_sfp_present(port_name): | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| start = time.time() | ||
| status = download_firmware(port_name, filepath) | ||
| if status == 1: | ||
| click.echo("Firmware download complete success") | ||
| else: | ||
| click.echo("Firmware download complete failed! status = {}".format(status)) | ||
| sys.exit(EXIT_FAIL) | ||
| end = time.time() | ||
| hours, rem = divmod(end-start, 3600) | ||
| minutes, seconds = divmod(rem, 60) | ||
|
||
| click.echo("Total download Time: {:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds)) | ||
|
|
||
| # 'unlock' subcommand | ||
| @firmware.command() | ||
| @click.argument('port_name', required=True, default=None) | ||
| @click.option('--password', type=click.INT, help="Password in integer\n") | ||
| def unlock(port_name, password): | ||
| """Unlock the firmware download feature via CDB host password""" | ||
| physical_port = logical_port_to_physical_port_index(port_name) | ||
| sfp = platform_chassis.get_sfp(physical_port) | ||
|
|
||
| if not is_sfp_present(port_name): | ||
| click.echo("{}: SFP EEPROM not detected\n".format(port_name)) | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| try: | ||
| api = sfp.get_xcvr_api() | ||
| except NotImplementedError: | ||
| click.echo("This functionality is currently not implemented for this platform") | ||
| sys.exit(ERROR_NOT_IMPLEMENTED) | ||
|
|
||
| if password is None: | ||
| password = CDB_DEFAULT_HOST_PASSWORD | ||
| try: | ||
| status = api.cdb_enter_host_password(int(password)) | ||
| except NotImplementedError: | ||
| click.echo("This functionality is not applicable for this transceiver") | ||
| sys.exit(EXIT_FAIL) | ||
|
|
||
| if status == 1: | ||
| click.echo("CDB: Host password accepted") | ||
| else: | ||
| click.echo("CDB: Host password NOT accepted! status = {}".format(status)) | ||
|
|
||
| # 'version' subcommand | ||
| @cli.command() | ||
|
|
@@ -912,4 +1223,4 @@ def version(): | |
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| cli() | ||
| cli() | ||
prgeor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this spec public available? if yes, add as code comment? #Closed