Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,32 @@ class VdmSubtypeIndex(Enum):
"vdm_supported": "N/A"
}

def read_only_cache_none_dict_and_list(func):
cache_name = f'_{func.__name__}_cache'
def wrapper(self):
if not hasattr(self, cache_name):
cache_value = func(self)
setattr(self, cache_name, cache_value)
else:
cache_value = getattr(self, cache_name)
if cache_value is None:
cache_value = func(self)
setattr(self, cache_name, cache_value)
return cache_value
return wrapper

# Decorator that caches return values of dict/list methods only when non-empty collections are returned
def read_only_cache_dict_and_list(func):
cache_name = f'_{func.__name__}_cache'
def wrapper(self):
if not hasattr(self, cache_name):
cache_value = func(self)
if cache_value is not None and isinstance(cache_value, (dict, list)) and len(cache_value) > 0:
setattr(self, cache_name, cache_value)
return cache_value
return getattr(self, cache_name)
return wrapper

class CmisApi(XcvrApi):
NUM_CHANNELS = 8
LowPwrRequestSW = 4
Expand All @@ -119,6 +145,21 @@ def __init__(self, xcvr_eeprom):
self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None
self.cdb = CmisCdbApi(xcvr_eeprom) if not self.is_flat_memory() else None

def clear_cache(self, method_name=None):
"""
Clear cached values for methods decorated with caching decorators.
If method_name is provided, only clear that method's cache; otherwise clear all caches.
"""
if method_name:
cache_name = f'_{method_name}_cache'
if hasattr(self, cache_name):
delattr(self, cache_name)
else:
# Remove all cache attributes
for attr in list(self.__dict__.keys()):
if attr.startswith('_') and attr.endswith('_cache'):
delattr(self, attr)

def _get_vdm_key_to_db_prefix_map(self):
return CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP

Expand Down Expand Up @@ -184,12 +225,14 @@ def get_vdm_unfreeze_status(self):
'''
return self.xcvr_eeprom.read(consts.VDM_UNFREEZE_DONE)

@read_only_cache_none_dict_and_list
def get_manufacturer(self):
'''
This function returns the manufacturer of the module
'''
return self.xcvr_eeprom.read(consts.VENDOR_NAME_FIELD)

@read_only_cache_none_dict_and_list
def get_model(self):
'''
This function returns the part number of the module
Expand All @@ -202,42 +245,49 @@ def get_cable_length_type(self):
'''
return "Length Cable Assembly(m)"

@read_only_cache_none_dict_and_list
def get_cable_length(self):
'''
This function returns the cable length of the module
'''
return self.xcvr_eeprom.read(consts.LENGTH_ASSEMBLY_FIELD)

@read_only_cache_none_dict_and_list
def get_vendor_rev(self):
'''
This function returns the revision level for part number provided by vendor
'''
return self.xcvr_eeprom.read(consts.VENDOR_REV_FIELD)

@read_only_cache_none_dict_and_list
def get_serial(self):
'''
This function returns the serial number of the module
'''
return self.xcvr_eeprom.read(consts.VENDOR_SERIAL_NO_FIELD)

@read_only_cache_none_dict_and_list
def get_module_type(self):
'''
This function returns the SFF8024Identifier (module type / form-factor). Table 4-1 in SFF-8024 Rev4.6
'''
return self.xcvr_eeprom.read(consts.ID_FIELD)

@read_only_cache_none_dict_and_list
def get_module_type_abbreviation(self):
'''
This function returns the SFF8024Identifier (module type / form-factor). Table 4-1 in SFF-8024 Rev4.6
'''
return self.xcvr_eeprom.read(consts.ID_ABBRV_FIELD)

@read_only_cache_none_dict_and_list
def get_connector_type(self):
'''
This function returns module connector. Table 4-3 in SFF-8024 Rev4.6
'''
return self.xcvr_eeprom.read(consts.CONNECTOR_FIELD)

@read_only_cache_none_dict_and_list
def get_module_hardware_revision(self):
'''
This function returns the module hardware revision
Expand All @@ -249,6 +299,7 @@ def get_module_hardware_revision(self):
hw_rev = [str(num) for num in [hw_major_rev, hw_minor_rev]]
return '.'.join(hw_rev)

@read_only_cache_none_dict_and_list
def get_cmis_rev(self):
'''
This function returns the CMIS version the module complies to
Expand Down Expand Up @@ -563,6 +614,7 @@ def is_copper(self):
media_intf = self.get_module_media_type()
return media_intf == "passive_copper_media_interface" if media_intf else None

@read_only_cache_none_dict_and_list
def is_flat_memory(self):
return self.xcvr_eeprom.read(consts.FLAT_MEM_FIELD) is not False

Expand Down Expand Up @@ -834,6 +886,7 @@ def get_tx_los(self):
tx_los_final.append(bool(tx_los[key]))
return tx_los_final

@read_only_cache_none_dict_and_list
def get_tx_disable_support(self):
return not self.is_flat_memory() and self.xcvr_eeprom.read(consts.TX_DISABLE_SUPPORT_FIELD)

Expand Down Expand Up @@ -947,6 +1000,7 @@ def set_power_override(self, power_override, power_set):
def get_transceiver_thresholds_support(self):
return not self.is_flat_memory()

@read_only_cache_none_dict_and_list
def get_lpmode_support(self):
power_class = self.xcvr_eeprom.read(consts.POWER_CLASS_FIELD)
if power_class is None:
Expand Down Expand Up @@ -988,13 +1042,15 @@ def get_module_media_interface(self):
else:
return 'Unknown media interface'

@read_only_cache_none_dict_and_list
def is_coherent_module(self):
'''
Returns True if the module follow C-CMIS spec, False otherwise
'''
mintf = self.get_module_media_interface()
return False if 'ZR' not in mintf else True

@read_only_cache_none_dict_and_list
def get_datapath_init_duration(self):
'''
This function returns the duration of datapath init
Expand All @@ -1007,6 +1063,7 @@ def get_datapath_init_duration(self):
value = float(duration)
return value * DATAPATH_INIT_DURATION_MULTIPLIER if value <= DATAPATH_INIT_DURATION_OVERRIDE_THRESHOLD else value

@read_only_cache_none_dict_and_list
def get_datapath_deinit_duration(self):
'''
This function returns the duration of datapath deinit
Expand All @@ -1016,6 +1073,7 @@ def get_datapath_deinit_duration(self):
duration = self.xcvr_eeprom.read(consts.DP_PATH_DEINIT_DURATION)
return float(duration) if duration is not None else 0

@read_only_cache_none_dict_and_list
def get_datapath_tx_turnon_duration(self):
'''
This function returns the duration of datapath tx turnon
Expand All @@ -1025,6 +1083,7 @@ def get_datapath_tx_turnon_duration(self):
duration = self.xcvr_eeprom.read(consts.DP_TX_TURNON_DURATION)
return float(duration) if duration is not None else 0

@read_only_cache_none_dict_and_list
def get_datapath_tx_turnoff_duration(self):
'''
This function returns the duration of datapath tx turnoff
Expand All @@ -1034,6 +1093,7 @@ def get_datapath_tx_turnoff_duration(self):
duration = self.xcvr_eeprom.read(consts.DP_TX_TURNOFF_DURATION)
return float(duration) if duration is not None else 0

@read_only_cache_none_dict_and_list
def get_module_pwr_up_duration(self):
'''
This function returns the duration of module power up
Expand All @@ -1043,6 +1103,7 @@ def get_module_pwr_up_duration(self):
duration = self.xcvr_eeprom.read(consts.MODULE_PWRUP_DURATION)
return float(duration) if duration is not None else 0

@read_only_cache_none_dict_and_list
def get_module_pwr_down_duration(self):
'''
This function returns the duration of module power down
Expand Down Expand Up @@ -2744,6 +2805,7 @@ def get_datapath_deinit(self):
return None
return [bool(datapath_deinit & (1 << lane)) for lane in range(self.NUM_CHANNELS)]

@read_only_cache_dict_and_list
def get_application_advertisement(self):
"""
Get the application advertisement of the CMIS transceiver
Expand Down
4 changes: 4 additions & 0 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class TestCmis(object):
old_read_func = eeprom.read
api = CmisApi(eeprom)

def setup_method(self, method):
"""Clear cached values before each test case."""
self.api.clear_cache()

@pytest.mark.parametrize("mock_response, expected", [
("1234567890", "1234567890"),
("ABCD", "ABCD")
Expand Down
147 changes: 147 additions & 0 deletions tests/sonic_xcvr/test_cmis_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import pytest
from unittest.mock import MagicMock
from sonic_platform_base.sonic_xcvr.api.public.cmis import CmisApi
from sonic_platform_base.sonic_xcvr.codes.public.sff8024 import Sff8024
from sonic_platform_base.sonic_xcvr.fields import consts

class TestReadOnlyCacheDecorator:
def setup_method(self):
# Initialize CmisApi with a mock EEPROM and clear initial reads
eeprom = MagicMock()
self.api = CmisApi(eeprom)
self.api.xcvr_eeprom.read.reset_mock()

def test_get_model_caching(self):
# Ensure get_model value is cached and read() called only once
self.api.xcvr_eeprom.read.return_value = 'model_val'
first = self.api.get_model()
second = self.api.get_model()
assert first == 'model_val'
assert second == 'model_val'
assert self.api.xcvr_eeprom.read.call_count == 1

def test_get_cmis_rev_caching(self):
# get_cmis_rev reads major and minor once, then caches result
# side_effect: first call returns major, second minor
self.api.xcvr_eeprom.read.side_effect = [5, 3, 7, 9]
v1 = self.api.get_cmis_rev()
v2 = self.api.get_cmis_rev()
assert v1 == '5.3'
assert v2 == '5.3'
# Only the first two reads (major and minor) should occur
assert self.api.xcvr_eeprom.read.call_count == 2

def test_clear_cache_for_get_model(self):
# Ensure clear_cache('get_model') clears the cache so read() is re-called
self.api.xcvr_eeprom.read.return_value = 'val1'
_ = self.api.get_model()
_ = self.api.get_model()
assert self.api.xcvr_eeprom.read.call_count == 1
# Clear only get_model cache
self.api.clear_cache('get_model')
self.api.xcvr_eeprom.read.return_value = 'val2'
_ = self.api.get_model()
assert self.api.xcvr_eeprom.read.call_count == 2
assert self.api.get_model() == 'val2'

def test_clear_all_caches(self):
# Ensure clear_cache() clears all cached methods
# Setup: get_model and get_cmis_rev use side effects
self.api.xcvr_eeprom.read.side_effect = ['m1', 1, 2]
_ = self.api.get_model()
_ = self.api.get_cmis_rev()
assert self.api.xcvr_eeprom.read.call_count == 3
# Clear all caches
self.api.clear_cache()
# Reset read mock and provide new side effects
self.api.xcvr_eeprom.read.reset_mock()
self.api.xcvr_eeprom.read.side_effect = ['m2', 3, 4]
m2 = self.api.get_model()
rev = self.api.get_cmis_rev()
assert m2 == 'm2'
assert rev == '3.4'
# Both methods should re-read their values
assert self.api.xcvr_eeprom.read.call_count == 3

class TestReadOnlyCacheDictAndListDecorator:
def setup_method(self):
# Initialize CmisApi with a mock EEPROM and clear initial reads
eeprom = MagicMock()
self.api = CmisApi(eeprom)
self.api.xcvr_eeprom.read.reset_mock()

def test_get_application_advertisement_no_cache_if_empty(self):
# Empty dict should not be cached and read() should be called each time
self.api.xcvr_eeprom.read.return_value = {}
first = self.api.get_application_advertisement()
second = self.api.get_application_advertisement()
assert first == {}
assert second == {}
assert self.api.xcvr_eeprom.read.call_count == 2

def test_get_application_advertisement_caching_if_non_empty(self):
# Non-empty dict should be cached and read() should be called only once
media_type = Sff8024.MODULE_MEDIA_TYPE[1] # e.g. "nm_850_media_interface"
prefix = consts.MODULE_MEDIA_INTERFACE_850NM
raw = {
f"{consts.HOST_ELECTRICAL_INTERFACE}_1": "iface1",
f"{prefix}_1": "mod_iface1",
f"{consts.MEDIA_LANE_COUNT}_1": 2,
f"{consts.HOST_LANE_COUNT}_1": 1,
f"{consts.HOST_LANE_ASSIGNMENT_OPTION}_1": 3,
f"{consts.MEDIA_LANE_ASSIGNMENT_OPTION}_1": 4,
}
# Make read() return raw for APPLS_ADVT_FIELD and valid media type string for MEDIA_TYPE_FIELD
def read_side_effect(field_name):
if field_name == consts.APPLS_ADVT_FIELD:
return raw
if field_name == consts.MEDIA_TYPE_FIELD:
# Return the module media type string that matches our prefix
return Sff8024.MODULE_MEDIA_TYPE[1]
return None
self.api.xcvr_eeprom.read.side_effect = read_side_effect
first = self.api.get_application_advertisement()
second = self.api.get_application_advertisement()
# The returned dict should be processed into the correct keys
expected = {
1: {
'host_electrical_interface_id': 'iface1',
'module_media_interface_id': 'mod_iface1',
'media_lane_count': 2,
'host_lane_count': 1,
'host_lane_assignment_options': 3,
'media_lane_assignment_options': 4
}
}
assert first == expected
assert second == expected
# Should have read APPLS_ADVT_FIELD and MEDIA_TYPE_FIELD once each
assert self.api.xcvr_eeprom.read.call_count == 2

def test_clear_cache_for_get_application_advertisement(self):
# Non-empty dict is cached, clear_cache should force re-read
media_type = Sff8024.MODULE_MEDIA_TYPE[1]
prefix = consts.MODULE_MEDIA_INTERFACE_850NM
raw = {
f"{consts.HOST_ELECTRICAL_INTERFACE}_1": 'iface1',
f"{prefix}_1": 'mod_iface1',
f"{consts.MEDIA_LANE_COUNT}_1": 2,
f"{consts.HOST_LANE_COUNT}_1": 1,
f"{consts.HOST_LANE_ASSIGNMENT_OPTION}_1": 3,
f"{consts.MEDIA_LANE_ASSIGNMENT_OPTION}_1": 4,
}
def read_side_effect(field_name):
if field_name == consts.APPLS_ADVT_FIELD:
return raw
if field_name == consts.MEDIA_TYPE_FIELD:
return media_type
return None
self.api.xcvr_eeprom.read.side_effect = read_side_effect
first = self.api.get_application_advertisement()
second = self.api.get_application_advertisement()
assert self.api.xcvr_eeprom.read.call_count == 2
# Clear the specific cache and read again
self.api.clear_cache('get_application_advertisement')
third = self.api.get_application_advertisement()
assert third == first
assert self.api.xcvr_eeprom.read.call_count == 4
Loading