Skip to content
Merged
43 changes: 43 additions & 0 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,48 @@ def test_SffManagerTask_get_admin_status(self, mock_get_cfg_port_tbl):
mock_get_cfg_port_tbl.assert_called_once_with(0)
mock_get_cfg_port_tbl.return_value.hget.assert_called_once_with(lport, 'admin_status')

def test_SffManagerTask_enable_high_power_class(self):
mock_xcvr_api = MagicMock()
mock_xcvr_api.get_power_class = MagicMock(return_value=5)
mock_xcvr_api.set_high_power_class = MagicMock(return_value=True)
lport = 'Ethernet0'

sff_manager_task = SffManagerTask(DEFAULT_NAMESPACE,
threading.Event(),
MagicMock(),
helper_logger)

# Test with normal case
sff_manager_task.enable_high_power_class(mock_xcvr_api, lport)
assert mock_xcvr_api.get_power_class.call_count == 1
assert mock_xcvr_api.set_high_power_class.call_count == 1

# Test with get_power_class failed
mock_xcvr_api.get_power_class.return_value = None
sff_manager_task.enable_high_power_class(mock_xcvr_api, lport)
assert mock_xcvr_api.get_power_class.call_count == 2
assert mock_xcvr_api.set_high_power_class.call_count == 1

# Test for no need to set high power class
mock_xcvr_api.get_power_class.return_value = 4
sff_manager_task.enable_high_power_class(mock_xcvr_api, lport)
assert mock_xcvr_api.get_power_class.call_count == 3
assert mock_xcvr_api.set_high_power_class.call_count == 1

# Test for set_high_power_class failed
mock_xcvr_api.get_power_class.return_value = 5
mock_xcvr_api.set_high_power_class.return_value = False
sff_manager_task.enable_high_power_class(mock_xcvr_api, lport)
assert mock_xcvr_api.get_power_class.call_count == 4
assert mock_xcvr_api.set_high_power_class.call_count == 2

# Test for set_high_power_class not supported
mock_xcvr_api.get_power_class.return_value = 5
mock_xcvr_api.set_high_power_class = MagicMock(side_effect=AttributeError("Attribute not found"))
sff_manager_task.enable_high_power_class(mock_xcvr_api, lport)
assert mock_xcvr_api.get_power_class.call_count == 5
assert mock_xcvr_api.set_high_power_class.call_count == 1

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.sff_mgr.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock()))
def test_SffManagerTask_task_worker(self, mock_chassis):
Expand All @@ -1801,6 +1843,7 @@ def test_SffManagerTask_task_worker(self, mock_chassis):
mock_xcvr_api.is_flat_memory = MagicMock(return_value=False)
mock_xcvr_api.is_copper = MagicMock(return_value=False)
mock_xcvr_api.get_tx_disable_support = MagicMock(return_value=True)
mock_xcvr_api.get_power_class = MagicMock(return_value=1)

mock_sfp = MagicMock()
mock_sfp.get_presence = MagicMock(return_value=True)
Expand Down
37 changes: 36 additions & 1 deletion sonic-xcvrd/xcvrd/sff_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,39 @@ def convert_bool_array_to_bit_mask(self, bool_array):
mask += (1 << i if flag else 0)
return mask

def enable_high_power_class(self, xcvr_api, lport):
"""
Enable high power class for the transceiver.

Args:
xcvr_api (XcvrApi): The XcvrApi instance for the transceiver.
lport (str): Logical port name.
"""
try:
power_class = xcvr_api.get_power_class()
if power_class is None:
self.log_error("{}: failed to get power class".format(lport))
return

# According to SFF-8636, section 6.2.6:
# In order to protect legacy host systems designed to support only
# the original 4 power classes, the High Power Class Enable control
# was defined at byte 93, bit 2. Modules in power classes 5, 6, 7 or
# 8 are required to limit power consumption to no more than a power
# class 4 module if the High Power Class Enable, byte 93 bit 2
# control is not set. For power class < 5, there's no such power
# consumption limiting, so no need to enable high power class.
if power_class < 5:
return

if xcvr_api.set_high_power_class(power_class, True):
self.log_notice("{}: done enabling high power class".format(lport))
else:
self.log_error("{}: failed to enable high power class".format(lport))

except (AttributeError, NotImplementedError):
pass

def task_worker(self):
'''
The main goal of sff_mgr is to make sure SFF compliant modules are
Expand Down Expand Up @@ -435,8 +468,10 @@ def task_worker(self):
except (AttributeError, NotImplementedError):
# Skip if these essential routines are not available
continue

if xcvr_inserted or (admin_status_changed and data[self.ADMIN_STATUS] == "up"):
self.enable_high_power_class(api, lport)

set_lp_success = (
sfp.set_lpmode(False)
if isinstance(api, Sff8472Api)
Expand Down
Loading