Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
upgrade of remote target from the local target itself.
"""

import struct
import sys
import traceback
from ...fields import consts
Expand Down Expand Up @@ -44,7 +43,38 @@

class CmisTargetFWUpgradeAPI(CmisApi):
def set_firmware_download_target_end(self, target):
return self.xcvr_eeprom.write(consts.TARGET_MODE, target)
"""
Sets the target mode to the specified target.
If the target mode is set to a remote target, then the page select byte is set to 0.
Also, the remote target is then checked to ensure that its accessible.
In case of any error, the target mode is restored to E0.
Returns:
True if the target mode is set successfully, False otherwise.
"""
try:
if not self.xcvr_eeprom.write(consts.TARGET_MODE, target):
return self._handle_error_and_restore_target_to_E0(
"Failed to set target mode to {}".format(target))
if target != TARGET_E0_VALUE:
if not self.xcvr_eeprom.write(consts.PAGE_SELECT_BYTE, 0):
return self._handle_error_and_restore_target_to_E0(
"Failed to set page select byte to {}".format(target))
if not self._is_remote_target_accessible():
return self._handle_error_and_restore_target_to_E0(
"Remote target {} not accessible.".format(target))
except Exception as e:
return self._handle_error_and_restore_target_to_E0(
"Exception occurred while setting target mode to {}: {}".format(target, repr(e)))

return True

def get_firmware_download_target_end(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihirpat1 nit, rename to get_current_target_end()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihirpat1 please check if this is tested for

  1. remote target is already powered off
  2. remote target is powered off after setting the target > 0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prgeor I have addressed this now. I haven't tested the power off scenario though.

"""
Reads the target mode and returns the target mode.
Returns:
The target mode.
"""
return self.xcvr_eeprom.read(consts.TARGET_MODE)

"""
Reads the active, inactive and server firmware version from all targets
Expand All @@ -70,13 +100,6 @@ def get_transceiver_info_firmware_versions(self):
logging.error("Target mode change failed. Target: {}".format(target))
continue

# Any register apart from the TARGET_MODE register will have the value 0xff
# if the remote target is not accessible from the local target.
module_type = self.get_module_type()
if 'Unknown' in module_type:
logging.info("Remote target {} not accessible. Skipping.".format(target))
continue

firmware_versions = super().get_transceiver_info_firmware_versions()
if target in REMOTE_TARGET_FIRMWARE_INFO_MAP:
# Add server firmware version to the firmware_versions dictionary
Expand All @@ -97,6 +120,31 @@ def get_transceiver_info_firmware_versions(self):
self.set_firmware_download_target_end(TARGET_E0_VALUE)
return return_dict

def _is_remote_target_accessible(self):
"""
Once the target is changed to remote, any register apart from the TARGET_MODE register
will have the value 0xff if the remote target is powered down.
Assumption:
The target mode has already been set to the desired remote target.
Returns:
True if the remote target is accessible from the local target, False otherwise.
"""
module_type = self.get_module_type()
if 'Unknown' in module_type:
return False

return True

def _handle_error_and_restore_target_to_E0(self, error_message):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihirpat1 this API be rename to _restore_target_E0(). Let the caller print the log errors before calling this. This allows reusing this API in future without throwing error logs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prgeor I have addressed this now.

"""
Logs the error message and restores the target mode to E0.
Returns:
False always.
"""
logging.error(error_message)
self.xcvr_eeprom.write(consts.TARGET_MODE, TARGET_E0_VALUE)
return False

def _convert_firmware_info_to_target_firmware_info(self, firmware_info, firmware_info_map):
return_dict = {}
for key, value in firmware_info_map.items():
Expand Down
1 change: 1 addition & 0 deletions sonic_platform_base/sonic_xcvr/fields/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@
CDB_WRITE_MSG = "CdbWriteMessage"

#CMISTargetFWUpgrade
PAGE_SELECT_BYTE = "PageSelectByte"
CMIS_TARGET_SERVER_INFO = "CmisTargetServerInfo"
SERVER_FW_MAGIC_BYTE = "ServerFirmwareMagicByte"
SERVER_FW_CHECKSUM = "ServerFirmwareChecksum"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, codes):
super().__init__(codes)

self.CMIS_TARGET_SERVER_INFO = RegGroupField(consts.CMIS_TARGET_SERVER_INFO,
NumberRegField(consts.PAGE_SELECT_BYTE, self.getaddr(0, 127), format="B", size=1, ro=False),
NumberRegField(consts.SERVER_FW_MAGIC_BYTE, self.getaddr(0x3, 128), format="B", size=1),
NumberRegField(consts.SERVER_FW_CHECKSUM, self.getaddr(0x3, 129), format="B", size=1),
ServerFWVersionRegField(consts.SERVER_FW_VERSION, self.getaddr(0x3, 130), size=16))
58 changes: 51 additions & 7 deletions tests/sonic_xcvr/test_cmisTargetFWUpgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,43 @@ class TestCmis(object):
eeprom = XcvrEeprom(reader, writer, mem_map)
api = CmisTargetFWUpgradeAPI(eeprom)

@pytest.mark.parametrize("set_firmware_result, module_type, exception_raised", [
(False, 'QSFP+ or later with CMIS', False),
(True, 'Unknown', False),
(True, 'QSFP+ or later with CMIS', True)
@pytest.mark.parametrize("target,write_results,accessible,exception,expected_result", [
(1, [False, True, True, True], True, None, False), # Failed to set target mode
(1, [True, False, True, True], True, None, False), # Failed to set page select byte
(1, [True, True, True, True], False, None, False), # Remote target not accessible
(1, [True, True, True, True], True, Exception("Simulated exception"), False), # Exception occurred
(1, [True, True, True, True], True, None, True), # All operations successful
(0, [True, True, True, True], True, None, True), # Target is E0, all operations successful
])
@patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._handle_error_and_restore_target_to_E0', MagicMock(return_value=False))
def test_set_firmware_download_target_end(self, target, write_results, accessible, exception, expected_result):
self.api.xcvr_eeprom.write = MagicMock()
self.api.xcvr_eeprom.write.side_effect = write_results
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._is_remote_target_accessible', return_value=accessible):
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._handle_error_and_restore_target_to_E0', return_value=False):
if exception is not None:
self.api.xcvr_eeprom.write.side_effect = exception

result = self.api.set_firmware_download_target_end(target)
assert result == expected_result
if result:
expected_call_count = 0
else:
expected_call_count = 1
assert self.api._handle_error_and_restore_target_to_E0.call_count == expected_call_count

@pytest.mark.parametrize("set_firmware_result, exception_raised", [
(False, False),
(True, True)
])
@patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', MagicMock(side_effect=({}, Exception('error'), {})))
@patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', MagicMock())
@patch('traceback.format_exception')
def test_get_transceiver_info_firmware_versions_failure(self, mock_format_exception, set_firmware_result, module_type, exception_raised):
def test_get_transceiver_info_firmware_versions_failure(self, mock_format_exception, set_firmware_result, exception_raised):
expected_output = {'active_firmware': 'N/A', 'inactive_firmware': 'N/A', 'e1_active_firmware': 'N/A',\
'e1_inactive_firmware': 'N/A', 'e2_active_firmware': 'N/A', 'e2_inactive_firmware': 'N/A',\
'e1_server_firmware': 'N/A', 'e2_server_firmware': 'N/A'}
self.api.set_firmware_download_target_end = MagicMock(return_value=set_firmware_result)
self.api.get_module_type = MagicMock(return_value=module_type)

result = self.api.get_transceiver_info_firmware_versions()
assert result == expected_output
Expand Down Expand Up @@ -58,12 +81,33 @@ def test_get_transceiver_info_firmware_versions_success(self, fw_info_dict, serv
with patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', side_effect=fw_info_dict):
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', side_effect=server_fw_info_dict):
self.api.set_firmware_download_target_end = MagicMock(return_value=True)
self.api.get_module_type = MagicMock(return_value='QSFP+ or later with CMIS')

result = self.api.get_transceiver_info_firmware_versions()
assert result == expected_output
assert self.api.set_firmware_download_target_end.call_count == len(TARGET_LIST) + 1

@pytest.mark.parametrize("module_type, expected_result", [
('Unknown', False),
('QSFP+ or later with CMIS', True)
])
def test_is_remote_target_accessible(self, module_type, expected_result):
# Mock the get_module_type method to return the parameterized module_type
self.api.get_module_type = MagicMock(return_value=module_type)

# Call the method and check the result
result = self.api._is_remote_target_accessible()
assert result == expected_result

@patch('logging.error')
def test_handle_error_and_restore_target_to_E0(self, mock_error):
self.api.xcvr_eeprom.write = MagicMock()
error_message = "Test error message"
assert self.api._handle_error_and_restore_target_to_E0(error_message) == False
mock_error.assert_called_once_with(error_message)
# Check that the target mode was restored to E0
self.api.xcvr_eeprom.write.assert_called_once()


@pytest.mark.parametrize("magic_byte, checksum, server_fw_version_byte_array, expected", [
(0, 0, (), {'server_firmware': 'N/A'}),
(0, 0x98, [0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d], {'server_firmware': 'N/A'}), # Magic byte is 0 but other values are valid
Expand Down