Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 100 additions & 65 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
from collections import defaultdict

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Expand Down Expand Up @@ -246,7 +247,10 @@ def get_transceiver_threshold_info(self):
if thresh is None:
return None
tx_bias_scale_raw = self.xcvr_eeprom.read(consts.TX_BIAS_SCALE)
tx_bias_scale = 2**tx_bias_scale_raw if tx_bias_scale_raw < 3 else 1
if tx_bias_scale_raw is not None:
tx_bias_scale = 2**tx_bias_scale_raw if tx_bias_scale_raw < 3 else 1
else:
tx_bias_scale = None
threshold_info_dict = {
"temphighalarm": float("{:.3f}".format(thresh[consts.TEMP_HIGH_ALARM_FIELD])),
"templowalarm": float("{:.3f}".format(thresh[consts.TEMP_LOW_ALARM_FIELD])),
Expand All @@ -264,16 +268,23 @@ def get_transceiver_threshold_info(self):
"txpowerlowalarm": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_LOW_ALARM_FIELD]))),
"txpowerhighwarning": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_HIGH_WARNING_FIELD]))),
"txpowerlowwarning": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_LOW_WARNING_FIELD]))),
"txbiashighalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_ALARM_FIELD]*tx_bias_scale)),
"txbiaslowalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_ALARM_FIELD]*tx_bias_scale)),
"txbiashighwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_WARNING_FIELD]*tx_bias_scale)),
"txbiashighalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_ALARM_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiaslowalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_ALARM_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiashighwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_WARNING_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiaslowwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_WARNING_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A'
}
laser_temp_dict = self.get_laser_temperature()
threshold_info_dict['lasertemphighalarm'] = laser_temp_dict['high alarm']
threshold_info_dict['lasertemplowalarm'] = laser_temp_dict['low alarm']
threshold_info_dict['lasertemphighwarning'] = laser_temp_dict['high warn']
threshold_info_dict['lasertemplowwarning'] = laser_temp_dict['low warn']
try:
threshold_info_dict['lasertemphighalarm'] = laser_temp_dict['high alarm']
threshold_info_dict['lasertemplowalarm'] = laser_temp_dict['low alarm']
threshold_info_dict['lasertemphighwarning'] = laser_temp_dict['high warn']
threshold_info_dict['lasertemplowwarning'] = laser_temp_dict['low warn']
except (KeyError, TypeError):
pass
self.vdm_dict = self.get_vdm()
try:
threshold_info_dict['prefecberhighalarm'] = self.vdm_dict['Pre-FEC BER Average Media Input'][1][1]
Expand Down Expand Up @@ -741,8 +752,10 @@ def get_active_apsel_hostlane(self):
This function returns the application select code that each host lane has
'''
if (self.is_flat_memory()):
return {'{}{}'.format(consts.ACTIVE_APSEL_HOSTLANE, i) : 'N/A' for i in range(1, self.NUM_CHANNELS+1)}
return self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE)
return defaultdict(lambda: 'N/A')

active_apsel_code = self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE)
return defaultdict(lambda: 'N/A') if not active_apsel_code else active_apsel_code

def get_tx_config_power(self):
'''
Expand Down Expand Up @@ -822,20 +835,19 @@ def get_laser_temperature(self):
try:
aux1_mon_type, aux2_mon_type, aux3_mon_type = self.get_aux_mon_type()
except TypeError:
return None
LASER_TEMP_SCALE = 256.0
return laser_temp_dict
if aux2_mon_type == 0:
laser_temp = self.xcvr_eeprom.read(consts.AUX2_MON)/LASER_TEMP_SCALE
laser_temp_high_alarm = self.xcvr_eeprom.read(consts.AUX2_HIGH_ALARM)/LASER_TEMP_SCALE
laser_temp_low_alarm = self.xcvr_eeprom.read(consts.AUX2_LOW_ALARM)/LASER_TEMP_SCALE
laser_temp_high_warn = self.xcvr_eeprom.read(consts.AUX2_HIGH_WARN)/LASER_TEMP_SCALE
laser_temp_low_warn = self.xcvr_eeprom.read(consts.AUX2_LOW_WARN)/LASER_TEMP_SCALE
laser_temp = self._get_laser_temp_threshold(consts.AUX2_MON)
laser_temp_high_alarm = self._get_laser_temp_threshold(consts.AUX2_HIGH_ALARM)
laser_temp_low_alarm = self._get_laser_temp_threshold(consts.AUX2_LOW_ALARM)
laser_temp_high_warn = self._get_laser_temp_threshold(consts.AUX2_HIGH_WARN)
laser_temp_low_warn = self._get_laser_temp_threshold(consts.AUX2_LOW_WARN)
elif aux2_mon_type == 1 and aux3_mon_type == 0:
laser_temp = self.xcvr_eeprom.read(consts.AUX3_MON)/LASER_TEMP_SCALE
laser_temp_high_alarm = self.xcvr_eeprom.read(consts.AUX3_HIGH_ALARM)/LASER_TEMP_SCALE
laser_temp_low_alarm = self.xcvr_eeprom.read(consts.AUX3_LOW_ALARM)/LASER_TEMP_SCALE
laser_temp_high_warn = self.xcvr_eeprom.read(consts.AUX3_HIGH_WARN)/LASER_TEMP_SCALE
laser_temp_low_warn = self.xcvr_eeprom.read(consts.AUX3_LOW_WARN)/LASER_TEMP_SCALE
laser_temp = self._get_laser_temp_threshold(consts.AUX3_MON)
laser_temp_high_alarm = self._get_laser_temp_threshold(consts.AUX3_HIGH_ALARM)
laser_temp_low_alarm = self._get_laser_temp_threshold(consts.AUX3_LOW_ALARM)
laser_temp_high_warn = self._get_laser_temp_threshold(consts.AUX3_HIGH_WARN)
laser_temp_low_warn = self._get_laser_temp_threshold(consts.AUX3_LOW_WARN)
else:
return laser_temp_dict
laser_temp_dict = {'monitor value': laser_temp,
Expand All @@ -844,6 +856,11 @@ def get_laser_temperature(self):
'high warn': laser_temp_high_warn,
'low warn': laser_temp_low_warn}
return laser_temp_dict

def _get_laser_temp_threshold(self, field):
LASER_TEMP_SCALE = 256.0
value = self.xcvr_eeprom.read(field)
return value/LASER_TEMP_SCALE if value is not None else 'N/A'

def get_laser_TEC_current(self):
'''
Expand Down Expand Up @@ -1651,14 +1668,17 @@ def get_transceiver_status(self):
except TypeError:
pass
module_flag = self.get_module_level_flag()
trans_status['temphighalarm_flag'] = module_flag['case_temp_flags']['case_temp_high_alarm_flag']
trans_status['templowalarm_flag'] = module_flag['case_temp_flags']['case_temp_low_alarm_flag']
trans_status['temphighwarning_flag'] = module_flag['case_temp_flags']['case_temp_high_warn_flag']
trans_status['templowwarning_flag'] = module_flag['case_temp_flags']['case_temp_low_warn_flag']
trans_status['vcchighalarm_flag'] = module_flag['voltage_flags']['voltage_high_alarm_flag']
trans_status['vcclowalarm_flag'] = module_flag['voltage_flags']['voltage_low_alarm_flag']
trans_status['vcchighwarning_flag'] = module_flag['voltage_flags']['voltage_high_warn_flag']
trans_status['vcclowwarning_flag'] = module_flag['voltage_flags']['voltage_low_warn_flag']
try:
trans_status['temphighalarm_flag'] = module_flag['case_temp_flags']['case_temp_high_alarm_flag']
trans_status['templowalarm_flag'] = module_flag['case_temp_flags']['case_temp_low_alarm_flag']
trans_status['temphighwarning_flag'] = module_flag['case_temp_flags']['case_temp_high_warn_flag']
trans_status['templowwarning_flag'] = module_flag['case_temp_flags']['case_temp_low_warn_flag']
trans_status['vcchighalarm_flag'] = module_flag['voltage_flags']['voltage_high_alarm_flag']
trans_status['vcclowalarm_flag'] = module_flag['voltage_flags']['voltage_low_alarm_flag']
trans_status['vcchighwarning_flag'] = module_flag['voltage_flags']['voltage_high_warn_flag']
trans_status['vcclowwarning_flag'] = module_flag['voltage_flags']['voltage_low_warn_flag']
except TypeError:
pass
try:
aux1_mon_type, aux2_mon_type, aux3_mon_type = self.get_aux_mon_type()
if aux2_mon_type == 0:
Expand All @@ -1675,53 +1695,66 @@ def get_transceiver_status(self):
pass
if not self.is_flat_memory():
dp_state_dict = self.get_datapath_state()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['DP%dState' % lane] = dp_state_dict['DP%dState' % lane]
if dp_state_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['DP%dState' % lane] = dp_state_dict.get('DP%dState' % lane)
tx_output_status_dict = self.get_tx_output_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txoutput_status%d' % lane] = tx_output_status_dict['TxOutputStatus%d' % lane]
if tx_output_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txoutput_status%d' % lane] = tx_output_status_dict.get('TxOutputStatus%d' % lane)
rx_output_status_dict = self.get_rx_output_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxoutput_status_hostlane%d' % lane] = rx_output_status_dict['RxOutputStatus%d' % lane]
if rx_output_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxoutput_status_hostlane%d' % lane] = rx_output_status_dict.get('RxOutputStatus%d' % lane)
tx_fault = self.get_tx_fault()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txfault%d' % lane] = tx_fault[lane - 1]
if tx_fault:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txfault%d' % lane] = tx_fault[lane - 1]
tx_los = self.get_tx_los()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txlos_hostlane%d' % lane] = tx_los[lane - 1]
if tx_los:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txlos_hostlane%d' % lane] = tx_los[lane - 1]
tx_lol = self.get_tx_cdr_lol()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txcdrlol_hostlane%d' % lane] = tx_lol[lane - 1]
if tx_lol:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txcdrlol_hostlane%d' % lane] = tx_lol[lane - 1]
rx_los = self.get_rx_los()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxlos%d' % lane] = rx_los[lane - 1]
if rx_los:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxlos%d' % lane] = rx_los[lane - 1]
rx_lol = self.get_rx_cdr_lol()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1]
if rx_lol:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1]
config_status_dict = self.get_config_datapath_hostlane_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['config_state_hostlane%d' % lane] = config_status_dict['ConfigStatusLane%d' % lane]
if config_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['config_state_hostlane%d' % lane] = config_status_dict.get('ConfigStatusLane%d' % lane)
dpinit_pending_dict = self.get_dpinit_pending()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['dpinit_pending_hostlane%d' % lane] = dpinit_pending_dict['DPInitPending%d' % lane]
if dpinit_pending_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['dpinit_pending_hostlane%d' % lane] = dpinit_pending_dict.get('DPInitPending%d' % lane)
tx_power_flag_dict = self.get_tx_power_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txpowerhighalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_high_alarm']['TxPowerHighAlarmFlag%d' % lane]
trans_status['txpowerlowalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_low_alarm']['TxPowerLowAlarmFlag%d' % lane]
trans_status['txpowerhighwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_high_warn']['TxPowerHighWarnFlag%d' % lane]
trans_status['txpowerlowwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_low_warn']['TxPowerLowWarnFlag%d' % lane]
if tx_power_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txpowerhighalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_high_alarm']['TxPowerHighAlarmFlag%d' % lane]
trans_status['txpowerlowalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_low_alarm']['TxPowerLowAlarmFlag%d' % lane]
trans_status['txpowerhighwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_high_warn']['TxPowerHighWarnFlag%d' % lane]
trans_status['txpowerlowwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_low_warn']['TxPowerLowWarnFlag%d' % lane]
rx_power_flag_dict = self.get_rx_power_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxpowerhighalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_high_alarm']['RxPowerHighAlarmFlag%d' % lane]
trans_status['rxpowerlowalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_low_alarm']['RxPowerLowAlarmFlag%d' % lane]
trans_status['rxpowerhighwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_high_warn']['RxPowerHighWarnFlag%d' % lane]
trans_status['rxpowerlowwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_low_warn']['RxPowerLowWarnFlag%d' % lane]
if rx_power_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxpowerhighalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_high_alarm']['RxPowerHighAlarmFlag%d' % lane]
trans_status['rxpowerlowalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_low_alarm']['RxPowerLowAlarmFlag%d' % lane]
trans_status['rxpowerhighwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_high_warn']['RxPowerHighWarnFlag%d' % lane]
trans_status['rxpowerlowwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_low_warn']['RxPowerLowWarnFlag%d' % lane]
tx_bias_flag_dict = self.get_tx_bias_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txbiashighalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_alarm']['TxBiasHighAlarmFlag%d' % lane]
trans_status['txbiaslowalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_alarm']['TxBiasLowAlarmFlag%d' % lane]
trans_status['txbiashighwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_warn']['TxBiasHighWarnFlag%d' % lane]
trans_status['txbiaslowwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_warn']['TxBiasLowWarnFlag%d' % lane]
if tx_bias_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txbiashighalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_alarm']['TxBiasHighAlarmFlag%d' % lane]
trans_status['txbiaslowalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_alarm']['TxBiasLowAlarmFlag%d' % lane]
trans_status['txbiashighwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_warn']['TxBiasHighWarnFlag%d' % lane]
trans_status['txbiaslowwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_warn']['TxBiasLowWarnFlag%d' % lane]
self.vdm_dict = self.get_vdm()
try:
trans_status['prefecberhighalarm_flag'] = self.vdm_dict['Pre-FEC BER Average Media Input'][1][5]
Expand Down Expand Up @@ -1877,6 +1910,8 @@ def get_application_advertisement(self):
ret = {}
# Read the application advertisment in lower memory
dic = self.xcvr_eeprom.read(consts.APPLS_ADVT_FIELD)
if not dic:
return ret

if not self.is_flat_memory():
# Read the application advertisement in page01
Expand Down
22 changes: 22 additions & 0 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch
from mock import MagicMock
import pytest
import traceback
import random
from sonic_platform_base.sonic_xcvr.api.public.cmis import CmisApi
from sonic_platform_base.sonic_xcvr.mem_maps.public.cmis import CmisMemMap
from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom
Expand All @@ -14,6 +16,7 @@ class TestCmis(object):
reader = MagicMock(return_value=None)
writer = MagicMock()
eeprom = XcvrEeprom(reader, writer, mem_map)
old_read_func = eeprom.read
api = CmisApi(eeprom)

@pytest.mark.parametrize("mock_response, expected", [
Expand Down Expand Up @@ -2146,3 +2149,22 @@ def test_get_error_description(self):

result = self.api.get_error_description()
assert result is None

def test_random_read_fail(self):
def mock_read_raw(offset, size):
i = random.randint(0, 1)
return None if i == 0 else b'0' * size

self.api.xcvr_eeprom.read = self.old_read_func
self.api.xcvr_eeprom.reader = mock_read_raw

run_num = 5
while run_num > 0:
try:
self.api.get_transceiver_bulk_status()
self.api.get_transceiver_info()
self.api.get_transceiver_threshold_info()
self.api.get_transceiver_status()
except:
assert 0, traceback.format_exc()
run_num -= 1
21 changes: 20 additions & 1 deletion tests/sonic_xcvr/test_sff8436.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from mock import MagicMock, patch
import pytest

import traceback
import random

from sonic_platform_base.sonic_xcvr.api.public.sff8436 import Sff8436Api
from sonic_platform_base.sonic_xcvr.codes.public.sff8436 import Sff8436Codes
from sonic_platform_base.sonic_xcvr.mem_maps.public.sff8436 import Sff8436MemMap
from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom
from sonic_platform_base.sonic_xcvr.fields import consts


class TestSff8436(object):
codes = Sff8436Codes
mem_map = Sff8436MemMap(codes)
Expand Down Expand Up @@ -106,6 +108,23 @@ def test_simulate_copper(self):
assert not self.api.get_temperature_support()
assert not self.api.get_voltage_support()

def test_random_read_fail(self):
def mock_read_raw(offset, size):
i = random.randint(0, 1)
return None if i == 0 else b'0' * size

self.api.xcvr_eeprom.reader = mock_read_raw

run_num = 5
while run_num > 0:
try:
self.api.get_transceiver_bulk_status()
self.api.get_transceiver_info()
self.api.get_transceiver_threshold_info()
except:
assert 0, traceback.format_exc()
run_num -= 1

def test_get_lpmode(self):
self.api.get_lpmode_support = MagicMock()
self.api.get_lpmode_support.return_value = True
Expand Down
Loading