From 87ef882434b5367d42b6eb67e4b14b10cbb32aaf Mon Sep 17 00:00:00 2001 From: Dante Su Date: Fri, 19 Nov 2021 01:43:29 +0000 Subject: [PATCH 01/14] Squashed commit of the following: commit a1d8e103f818774f0dbf0045fad701c16cef5ca8 Author: Dante Su Date: Thu Nov 18 12:45:01 2021 +0000 okay Signed-off-by: Dante Su commit 11b6850c3bd669df9ad614dec2bcf27d5150fb2d Author: Dante Su Date: Fri Nov 12 07:39:34 2021 +0000 Add custom media setting for CMIS Signed-off-by: Dante Su commit 6dd3608514cd6ca67693b7b500486495e72ff0e2 Author: Dante Su Date: Tue Nov 9 11:34:10 2021 +0000 add missing unittest for CMIS Signed-off-by: Dante Su commit ceea2235a0647a395aa373fa238427e38f7f5fab Author: Dante Su Date: Tue Nov 9 03:34:59 2021 +0000 fix port remove event Signed-off-by: Dante Su commit 9be51358bc05babffac3f1324eef22b534e68e40 Author: Dante Su Date: Tue Nov 9 01:46:27 2021 +0000 port_mapping: drop the hacks for CMIS Signed-off-by: Dante Su commit 501134b5f5ca2042eb372e9ee933f461968a5f7d Author: Dante Su Date: Fri Nov 5 09:08:59 2021 +0000 [sfp-refactoring] xcvrd: add initial support for CMIS application initialization Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 82 ++++++ sonic-xcvrd/xcvrd/xcvrd.py | 243 +++++++++++++++++- .../xcvrd/xcvrd_utilities/port_mapping.py | 96 ++++--- 3 files changed, 386 insertions(+), 35 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index e10b930e4..42758a213 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -530,6 +530,88 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + def test_CmisManagerTask_handle_port_change_event(self): + port_mapping = PortMapping() + task = CmisManagerTask(port_mapping) + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.on_port_config_change(port_change_event) + assert task.task_queue.empty() + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) + task.on_port_config_change(port_change_event) + assert task.task_queue.empty() + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL) + task.on_port_config_change(port_change_event) + assert task.task_queue.empty() + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) + task.on_port_config_change(port_change_event) + assert not task.task_queue.empty() + + @patch('xcvrd.xcvrd.platform_chassis') + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + def test_CmisManagerTask_task_run_stop(self, mock_chassis): + mock_object = MagicMock() + mock_object.get_presence = MagicMock(return_value=True) + mock_object.get_port_type = MagicMock(return_value="QSFP_DD") + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object]) + + port_mapping = PortMapping() + task = CmisManagerTask(port_mapping) + task.task_run([False]) + task.task_stop() + for worker in task.task_workers: + assert not worker.is_alive() + + @patch('xcvrd.xcvrd.platform_chassis') + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + def test_CmisManagerTask_task_worker(self, mock_chassis): + mock_object = MagicMock() + mock_object.get_presence = MagicMock(return_value=True) + mock_object.get_port_type = MagicMock(return_value="QSFP_DD") + mock_object.set_cmis_application = MagicMock() + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object]) + mock_chassis.get_sfp = MagicMock(return_value=mock_object) + + port_mapping = PortMapping() + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) + + # Case 1: Both port speed and lanes are unset + task = CmisManagerTask(port_mapping) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + task.on_port_config_change(port_change_event) + task.task_worker(False) + assert mock_object.set_cmis_application.call_count == 0 + + # Case 2: Invalid port speed while lanes is valid + port_change_event.port_dict = {'speed': 0, 'lanes': "1,2,3,4,5,6,7,8"} + task = CmisManagerTask(port_mapping) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + task.on_port_config_change(port_change_event) + task.task_worker(False) + assert mock_object.set_cmis_application.call_count == 0 + + # Case 3: Valid port speed with invalid lanes + port_change_event.port_dict = {'speed': 400000, 'lanes': None} + task = CmisManagerTask(port_mapping) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + task.on_port_config_change(port_change_event) + task.task_worker(False) + assert mock_object.set_cmis_application.call_count == 0 + + # Case 4: valid port speed and lanes + port_change_event.port_dict = {'speed': 400000, 'lanes': "1,2,3,4,5,6,7,8"} + task = CmisManagerTask(port_mapping) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + task.on_port_config_change(port_change_event) + task.task_worker(False) + assert mock_object.set_cmis_application.call_count == 1 + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 0992dca18..30f4cfc2d 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -196,7 +196,15 @@ def _wrapper_get_transceiver_change_event(timeout): def _wrapper_get_sfp_type(physical_port): if platform_chassis: try: - return platform_chassis.get_sfp(physical_port).sfp_type + sfp = platform_chassis.get_sfp(physical_port) + except (NotImplementedError, AttributeError): + return None + try: + return sfp.sfp_type + except (NotImplementedError, AttributeError): + pass + try: + return sfp.get_port_type() except (NotImplementedError, AttributeError): pass return None @@ -227,7 +235,7 @@ def beautify_dom_info_dict(dom_info_dict, physical_port): dom_info_dict['tx2power'] = strip_unit_and_beautify(dom_info_dict['tx2power'], POWER_UNIT) dom_info_dict['tx3power'] = strip_unit_and_beautify(dom_info_dict['tx3power'], POWER_UNIT) dom_info_dict['tx4power'] = strip_unit_and_beautify(dom_info_dict['tx4power'], POWER_UNIT) - if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD': + if 'rx5power' in dom_info_dict: dom_info_dict['rx5power'] = strip_unit_and_beautify(dom_info_dict['rx5power'], POWER_UNIT) dom_info_dict['rx6power'] = strip_unit_and_beautify(dom_info_dict['rx6power'], POWER_UNIT) dom_info_dict['rx7power'] = strip_unit_and_beautify(dom_info_dict['rx7power'], POWER_UNIT) @@ -433,7 +441,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= dom_info_cache[physical_port] = dom_info_dict if dom_info_dict is not None: beautify_dom_info_dict(dom_info_dict, physical_port) - if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD': + if 'rx5power' in dom_info_dict: fvs = swsscommon.FieldValuePairs( [('temperature', dom_info_dict['temperature']), ('voltage', dom_info_dict['voltage']), @@ -679,7 +687,6 @@ def get_media_settings_key(physical_port, transceiver_dict): return [vendor_key, media_key] - def get_media_val_str_from_dict(media_dict): LANE_STR = 'lane' LANE_SEPARATOR = ',' @@ -843,6 +850,230 @@ def is_fast_reboot_enabled(): # Helper classes =============================================================== # +# Thread wrapper class for CMIS transceiver management + +class CmisManagerTask: + + CMIS_STATE_INSERTED = 'INSERTED' + CMIS_STATE_DP_DEINIT = 'DP_DEINIT' + CMIS_STATE_AP_CONF = 'AP_CONFIGURED' + CMIS_STATE_DP_INIT = 'DP_INIT' + CMIS_STATE_DP_TXON = 'DP_TXON' + CMIS_STATE_READY = 'READY' + CMIS_STATE_REMOVED = 'REMOVED' + CMIS_STATE_FAILED = 'FAILED' + + def __init__(self, port_mapping): + self.task_stopping_event = multiprocessing.Event() + self.task_process = None + self.port_dict = {} + self.port_mapping = copy.deepcopy(port_mapping) + self.isPortInitDone = False + self.isPortConfigDone = False + + def dbg_print(self, message): + helper_logger.log_notice("CMIS: {}".format(message)) + + def on_port_config_change(self, port_change_event): + if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: + return + + lport = port_change_event.port_name + pport = port_change_event.port_index + + if lport in ['PortInitDone']: + self.isPortInitDone = True + return + + if lport in ['PortConfigDone']: + self.isPortConfigDone = True + return + + if not lport.startswith('Ethernet'): + return + + if port_change_event.port_dict is None: + speed = "0" + lanes = "" + else: + speed = port_change_event.port_dict.get('speed', "0") + lanes = port_change_event.port_dict.get('lanes', "") + + if pport is None: + return + + # Skip if the port/cage type is not QSFP-DD + ptype = _wrapper_get_sfp_type(pport) + if ptype not in ['QSFP-DD', 'QSFP_DD']: + return + + if lport not in self.port_dict: + self.port_dict[lport] = {} + + if port_change_event.event_type == port_change_event.PORT_SET: + if pport >= 0: + self.port_dict[lport]['index'] = pport + if port_change_event.port_dict is not None and 'speed' in port_change_event.port_dict: + self.port_dict[lport]['speed'] = port_change_event.port_dict['speed'] + if port_change_event.port_dict is not None and 'lanes' in port_change_event.port_dict: + self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED + else: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED + + def task_worker(self): + self.dbg_print("Starting...") + + # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal + sel, asic_context = port_mapping.subscribe_port_config_change(['APPL_DB', 'STATE_DB']) + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_mapping.handle_port_config_change(sel, + asic_context, + self.task_stopping_event, + self.port_mapping, + helper_logger, + self.on_port_config_change, + True) + + if not self.isPortConfigDone: + continue + + for lport, info in self.port_dict.items(): + if self.task_stopping_event.is_set(): + break + + if lport not in self.port_dict: + continue + + state = self.port_dict[lport].get('cmis_state', self.CMIS_STATE_REMOVED) + if state in [self.CMIS_STATE_FAILED, self.CMIS_STATE_READY, self.CMIS_STATE_REMOVED]: + continue + + pport = int(info.get('index', "-1")) + speed = int(info.get('speed', "0")) + lanes = info.get('lanes', "").strip() + if pport < 0 or speed == 0 or len(lanes) < 1: + continue + + # Replace the physical lane id with logical lane index + # + # TODO: Add dynamic port breakout support + lanes_new = [] + lanes_old = lanes.split(',') + for i in range(len(lanes_old)): + lanes_new.append(i) + host_lanes = lanes_new + host_speed = speed + + sfp = platform_chassis.get_sfp(pport) + if not sfp.get_presence(): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED + continue + + try: + # Skip if the xcvr type is not QSFP-DD + info = sfp.get_transceiver_info() + if info.get('type_abbrv_name', None) not in ['QSFP-DD', 'QSFP_DD']: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + + # Skip if the memory type is flat + if info.get('memory_type', 'flat') in ['flat', 'FLAT']: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + self.dbg_print("{}: {}G, {}-lanes, state={}".format( + lport, int(speed/1000), len(host_lanes), state)) + + if state == self.CMIS_STATE_INSERTED: + (flag, appl) = sfp.get_cmis_application_update(host_lanes, host_speed) + if not flag: + # No application updates + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + self.port_dict[lport]['cmis_apsel'] = appl + sfp.set_cmis_application_stop(host_lanes) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT + elif state == self.CMIS_STATE_DP_DEINIT: + if sfp.get_module_state() != 'ModuleReady': + continue + appl = self.port_dict[lport]['cmis_apsel'] + sfp.set_cmis_application_apsel(host_lanes, appl) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF + elif state == self.CMIS_STATE_AP_CONF: + st = sfp.get_cmis_state()['config_state'] + done = True + for lane in host_lanes: + name = "ConfigStatusLane{}".format(lane + 1) + if st[name] != 'ConfigSuccess': + done = False + continue + if not done: + continue + sfp.set_cmis_application_start(host_lanes) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT + elif state == self.CMIS_STATE_DP_INIT: + st = sfp.get_cmis_state()['datapath_state'] + done = True + for lane in host_lanes: + name = "DP{}State".format(lane + 1) + if st[name] != 'DataPathInitialized': + done = False + continue + if not done: + continue + sfp.set_cmis_application_txon(host_lanes) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON + elif state == self.CMIS_STATE_DP_TXON: + st = sfp.get_cmis_state()['datapath_state'] + done = True + for lane in host_lanes: + name = "DP{}State".format(lane + 1) + if st[name] != 'DataPathActivated': + done = False + continue + if not done: + continue + state = self.CMIS_STATE_READY + self.dbg_print("{}: {}G, {}-lanes, state={}".format( + lport, int(speed/1000), len(host_lanes), state)) + self.port_dict[lport]['cmis_state'] = state + + except (NotImplementedError, AttributeError): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + + self.dbg_print("Stopped") + + def task_run(self, y_cable_presence): + if platform_chassis is None: + self.dbg_print("Platform chassis is not available, stopping...") + return + + has_cmis = False + for sfp in platform_chassis.get_all_sfps(): + try: + ptype = sfp.get_port_type() + except (NotImplementedError, ValueError): + ptype = 'Unknown' + if ptype in ['QSFP_DD', 'QSFP-DD']: + has_cmis = True + break + + if not has_cmis: + self.dbg_print("None of QSFP-DD cages are detected, stopping...") + return + + self.task_process = multiprocessing.Process(target=self.task_worker) + if self.task_process is not None: + self.task_process.start() + + def task_stop(self): + self.task_stopping_event.set() + if self.task_process is not None: + self.task_process.join() + + # Thread wrapper class to update dom info periodically @@ -1538,6 +1769,10 @@ def run(self): # Start daemon initialization sequence port_mapping_data, retry_eeprom_set = self.init() + # Start the CMIS manager + cmis_manager = CmisManagerTask(port_mapping_data) + cmis_manager.task_run(self.y_cable_presence) + # Start the dom sensor info update thread dom_info_update = DomInfoUpdateTask(port_mapping_data) dom_info_update.task_run(self.y_cable_presence) diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index bf44fb70b..f749532b0 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -8,7 +8,10 @@ class PortChangeEvent: PORT_ADD = 0 PORT_REMOVE = 1 - def __init__(self, port_name, port_index, asic_id, event_type): + PORT_SET = 2 + PORT_DEL = 3 + + def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): # Logical port name, e.g. Ethernet0 self.port_name = port_name # Physical port index, equals to "index" field of PORT table in CONFIG_DB @@ -17,6 +20,8 @@ def __init__(self, port_name, port_index, asic_id, event_type): self.asic_id = asic_id # Port change event type self.event_type = event_type + # Port config dict + self.port_dict = port_dict def __str__(self): return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', @@ -83,21 +88,28 @@ def logical_port_name_to_physical_port_list(self, port_name): else: return None +def subscribe_port_config_change(db_list=['CONFIG_DB']): + port_tbl_map = { + 'APPL_DB': swsscommon.APP_PORT_TABLE_NAME, + 'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME, + 'STATE_DB': 'TRANSCEIVER_INFO' + } -def subscribe_port_config_change(): sel = swsscommon.Select() asic_context = {} namespaces = multi_asic.get_front_end_namespaces() - for namespace in namespaces: - config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - port_tbl = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) - asic_context[port_tbl] = asic_id - sel.addSelectable(port_tbl) + for db_name in db_list: + if db_name not in port_tbl_map: + continue + for namespace in namespaces: + db = daemon_base.db_connect(db_name, namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(db, port_tbl_map[db_name]) + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) return sel, asic_context - -def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): +def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler, promiscuous=False): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers """ if not stop_event.is_set(): @@ -107,48 +119,70 @@ def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logge if state != swsscommon.Select.OBJECT: logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') return - - read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler) - -def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler): + read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, promiscuous) + +def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, promiscuous): for port_tbl in asic_context.keys(): while True: (key, op, fvp) = port_tbl.pop() if not key: break + fvp = dict(fvp) if fvp is not None else {} if op == swsscommon.SET_COMMAND: - fvp = dict(fvp) if 'index' not in fvp: - continue - + fvp['index'] = -1 new_physical_index = int(fvp['index']) - if not port_mapping.is_logical_port(key): + + if promiscuous: + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_SET, fvp) + port_change_event_handler(port_change_event) + + if key in ['PortConfigDone', 'PortInitDone']: + continue + elif not port_mapping.is_logical_port(key): # New logical port created - port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp) port_change_event_handler(port_change_event) else: current_physical_index = port_mapping.get_logical_to_physical(key)[0] if current_physical_index != new_physical_index: - port_change_event = PortChangeEvent(key, - current_physical_index, - asic_context[port_tbl], - PortChangeEvent.PORT_REMOVE) + port_change_event = PortChangeEvent(key, + current_physical_index, + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE, + fvp) port_change_event_handler(port_change_event) - port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp) port_change_event_handler(port_change_event) elif op == swsscommon.DEL_COMMAND: - if port_mapping.is_logical_port(key): - port_change_event = PortChangeEvent(key, - port_mapping.get_logical_to_physical(key)[0], - asic_context[port_tbl], - PortChangeEvent.PORT_REMOVE) + if promiscuous: + if 'index' in fvp: + physical_index = int(fvp['index']) + elif port_mapping.is_logical_port(key): + physical_index = port_mapping.get_logical_to_physical(key)[0] + else: + physical_index = -1 + port_change_event = PortChangeEvent(key, + physical_index, + asic_context[port_tbl], + PortChangeEvent.PORT_DEL, + fvp) + port_change_event_handler(port_change_event) + + if key in ['PortConfigDone', 'PortInitDone']: + continue + elif port_mapping.is_logical_port(key): + port_change_event = PortChangeEvent(key, + port_mapping.get_logical_to_physical(key)[0], + asic_context[port_tbl], + PortChangeEvent.PORT_REMOVE, + fvp) port_change_event_handler(port_change_event) else: logger.log_warning('Invalid DB operation: {}'.format(op)) - def get_port_mapping(): """Get port mapping from CONFIG_DB """ @@ -163,4 +197,4 @@ def get_port_mapping(): port_config_dict = dict(port_config) port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) - return port_mapping \ No newline at end of file + return port_mapping From d778266114234a64cf692ed3a34328b3a86a809d Mon Sep 17 00:00:00 2001 From: Dante Su Date: Fri, 19 Nov 2021 03:16:46 +0000 Subject: [PATCH 02/14] unittest okay Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 114 +++++++++++++++++++++----------- sonic-xcvrd/xcvrd/xcvrd.py | 8 ++- 2 files changed, 80 insertions(+), 42 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 42758a213..e6c45aac1 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -530,26 +530,31 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() task = CmisManagerTask(port_mapping) + assert not task.isPortConfigDone + port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) + task.on_port_config_change(port_change_event) + assert task.isPortConfigDone + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.on_port_config_change(port_change_event) - assert task.task_queue.empty() + assert len(task.port_dict) == 0 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) task.on_port_config_change(port_change_event) - assert task.task_queue.empty() + assert len(task.port_dict) == 0 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL) task.on_port_config_change(port_change_event) - assert task.task_queue.empty() + assert len(task.port_dict) == 1 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) task.on_port_config_change(port_change_event) - assert not task.task_queue.empty() + assert len(task.port_dict) == 1 @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) @@ -564,53 +569,82 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): task = CmisManagerTask(port_mapping) task.task_run([False]) task.task_stop() - for worker in task.task_workers: - assert not worker.is_alive() + assert task.task_process is not None + assert not task.task_process.is_alive() @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_task_worker(self, mock_chassis): - mock_object = MagicMock() - mock_object.get_presence = MagicMock(return_value=True) - mock_object.get_port_type = MagicMock(return_value="QSFP_DD") - mock_object.set_cmis_application = MagicMock() - mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object]) - mock_chassis.get_sfp = MagicMock(return_value=mock_object) - - port_mapping = PortMapping() - port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) + mock_sfp = MagicMock() + mock_sfp.get_presence = MagicMock(return_value=True) + mock_sfp.get_port_type = MagicMock(return_value="QSFP_DD") + mock_sfp.get_transceiver_info = MagicMock(return_value={'type_abbrv_name':'QSFP_DD', 'memory_type':'paged'}) + mock_sfp.get_module_state = MagicMock(return_value="ModuleReady") + mock_sfp.get_cmis_state = MagicMock(return_value={ + 'config_state': { + 'ConfigStatusLane1': 'ConfigSuccess', + 'ConfigStatusLane2': 'ConfigSuccess', + 'ConfigStatusLane3': 'ConfigSuccess', + 'ConfigStatusLane4': 'ConfigSuccess', + 'ConfigStatusLane5': 'ConfigSuccess', + 'ConfigStatusLane6': 'ConfigSuccess', + 'ConfigStatusLane7': 'ConfigSuccess', + 'ConfigStatusLane8': 'ConfigSuccess' + }, + 'datapath_state': { + 'DP1State': 'DataPathInitialized', + 'DP2State': 'DataPathInitialized', + 'DP3State': 'DataPathInitialized', + 'DP4State': 'DataPathInitialized', + 'DP5State': 'DataPathInitialized', + 'DP6State': 'DataPathInitialized', + 'DP7State': 'DataPathInitialized', + 'DP8State': 'DataPathInitialized' + } + }) + mock_sfp.get_cmis_application_update = MagicMock(return_value=(True, 1)) + mock_sfp.set_cmis_application_stop = MagicMock(return_value=True) + mock_sfp.set_cmis_application_apsel = MagicMock(return_value=True) + mock_sfp.set_cmis_application_start = MagicMock(return_value=True) + mock_sfp.set_cmis_application_txon = MagicMock(return_value=True) - # Case 1: Both port speed and lanes are unset - task = CmisManagerTask(port_mapping) - task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) - task.on_port_config_change(port_change_event) - task.task_worker(False) - assert mock_object.set_cmis_application.call_count == 0 + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp]) + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) - # Case 2: Invalid port speed while lanes is valid - port_change_event.port_dict = {'speed': 0, 'lanes': "1,2,3,4,5,6,7,8"} + port_mapping = PortMapping() task = CmisManagerTask(port_mapping) - task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) - task.on_port_config_change(port_change_event) - task.task_worker(False) - assert mock_object.set_cmis_application.call_count == 0 - # Case 3: Valid port speed with invalid lanes - port_change_event.port_dict = {'speed': 400000, 'lanes': None} - task = CmisManagerTask(port_mapping) - task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) task.on_port_config_change(port_change_event) - task.task_worker(False) - assert mock_object.set_cmis_application.call_count == 0 + assert task.isPortConfigDone - # Case 4: valid port speed and lanes - port_change_event.port_dict = {'speed': 400000, 'lanes': "1,2,3,4,5,6,7,8"} - task = CmisManagerTask(port_mapping) - task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'speed':'400000', 'lanes':'1,2,3,4,5,6,7,8'}) task.on_port_config_change(port_change_event) - task.task_worker(False) - assert mock_object.set_cmis_application.call_count == 1 + assert len(task.port_dict) == 1 + + # Case 1: Module Inserted --> DP_DEINIT + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_sfp.get_cmis_application_update.call_count > 0 + assert mock_sfp.set_cmis_application_stop.call_count > 0 + + # Case 2: DP_DEINIT --> AP Configured + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_sfp.set_cmis_application_apsel.call_count > 0 + + # Case 3: AP Configured --> DP_INIT + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_sfp.set_cmis_application_start.call_count > 0 + + # Case 4: DP_INIT --> DP_TXON + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_sfp.set_cmis_application_txon.call_count > 0 @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 30f4cfc2d..ae803a1f5 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -854,6 +854,7 @@ def is_fast_reboot_enabled(): class CmisManagerTask: + CMIS_STATE_UNKNOWN = 'UNKNOWN' CMIS_STATE_INSERTED = 'INSERTED' CMIS_STATE_DP_DEINIT = 'DP_DEINIT' CMIS_STATE_AP_CONF = 'AP_CONFIGURED' @@ -946,8 +947,11 @@ def task_worker(self): if lport not in self.port_dict: continue - state = self.port_dict[lport].get('cmis_state', self.CMIS_STATE_REMOVED) - if state in [self.CMIS_STATE_FAILED, self.CMIS_STATE_READY, self.CMIS_STATE_REMOVED]: + state = self.port_dict[lport].get('cmis_state', self.CMIS_STATE_UNKNOWN) + if state in [self.CMIS_STATE_UNKNOWN, + self.CMIS_STATE_FAILED, + self.CMIS_STATE_READY, + self.CMIS_STATE_REMOVED]: continue pport = int(info.get('index', "-1")) From 95637b87039a79c1795879f2e199dd5d755a5286 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Fri, 19 Nov 2021 04:02:25 +0000 Subject: [PATCH 03/14] Drop unused local variable Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index ae803a1f5..08f76bc45 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -893,13 +893,6 @@ def on_port_config_change(self, port_change_event): if not lport.startswith('Ethernet'): return - if port_change_event.port_dict is None: - speed = "0" - lanes = "" - else: - speed = port_change_event.port_dict.get('speed', "0") - lanes = port_change_event.port_dict.get('lanes', "") - if pport is None: return From a1eb712ce27fc8773534cba39da10151a7e4dd15 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Fri, 19 Nov 2021 07:20:31 +0000 Subject: [PATCH 04/14] Add the READY message in case that ap update is skipped Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 08f76bc45..8104468a9 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -987,7 +987,10 @@ def task_worker(self): (flag, appl) = sfp.get_cmis_application_update(host_lanes, host_speed) if not flag: # No application updates - self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + state = self.CMIS_STATE_READY + self.dbg_print("{}: {}G, {}-lanes, state={}".format( + lport, int(speed/1000), len(host_lanes), state)) + self.port_dict[lport]['cmis_state'] = state continue self.port_dict[lport]['cmis_apsel'] = appl sfp.set_cmis_application_stop(host_lanes) From af01207fa0302d736e665d5932aaf8f95057e287 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Mon, 22 Nov 2021 14:16:53 +0000 Subject: [PATCH 05/14] Address the review comments Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 8 +- sonic-xcvrd/xcvrd/xcvrd.py | 33 +++++-- .../xcvrd/xcvrd_utilities/port_mapping.py | 99 ++++++++++++------- 3 files changed, 90 insertions(+), 50 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index e6c45aac1..e612f843a 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -557,8 +557,8 @@ def test_CmisManagerTask_handle_port_change_event(self): assert len(task.port_dict) == 1 @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_object = MagicMock() mock_object.get_presence = MagicMock(return_value=True) @@ -573,8 +573,8 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): assert not task.task_process.is_alive() @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_task_worker(self, mock_chassis): mock_sfp = MagicMock() diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 8104468a9..3272d9a23 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -890,9 +890,11 @@ def on_port_config_change(self, port_change_event): self.isPortConfigDone = True return + # Skip if it's not a physical port if not lport.startswith('Ethernet'): return + # Skip if the physical index is not available if pport is None: return @@ -919,16 +921,14 @@ def task_worker(self): self.dbg_print("Starting...") # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - sel, asic_context = port_mapping.subscribe_port_config_change(['APPL_DB', 'STATE_DB']) + sel, asic_context = port_mapping.subscribe_port_update_event(['APPL_DB', 'STATE_DB']) while not self.task_stopping_event.is_set(): # Handle port change event from main thread - port_mapping.handle_port_config_change(sel, - asic_context, - self.task_stopping_event, - self.port_mapping, - helper_logger, - self.on_port_config_change, - True) + port_mapping.handle_port_update_event(sel, + asic_context, + self.task_stopping_event, + helper_logger, + self.on_port_config_change) if not self.isPortConfigDone: continue @@ -963,6 +963,7 @@ def task_worker(self): host_lanes = lanes_new host_speed = speed + # double-check the HW presence before moving forward sfp = platform_chassis.get_sfp(pport) if not sfp.get_presence(): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED @@ -976,7 +977,7 @@ def task_worker(self): continue # Skip if the memory type is flat - if info.get('memory_type', 'flat') in ['flat', 'FLAT']: + if info.get('memory_type', 'flat') in ['flat', 'FLAT', 'Flat']: self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue @@ -1046,6 +1047,20 @@ def task_worker(self): self.dbg_print("Stopped") def task_run(self, y_cable_presence): + # Stop the service if it's disabled in pmon_daemon_control.json + try: + (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() + pmon_daemon_control_file_path = os.path.join(platform_path, "pmon_daemon_control.json") + if os.path.isfile(pmon_daemon_control_file_path): + ctrl = {} + with open(pmon_daemon_control_file_path, "r") as f: + ctrl = json.load(f) + if ctrl.get('skip_xcvrd_cmis_manager', False): + self.dbg_print("service stopped administratively") + return + except AttributeError: + pass + if platform_chassis is None: self.dbg_print("Platform chassis is not available, stopping...") return diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index f749532b0..39e55656e 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -88,7 +88,19 @@ def logical_port_name_to_physical_port_list(self, port_name): else: return None -def subscribe_port_config_change(db_list=['CONFIG_DB']): +def subscribe_port_config_change(): + sel = swsscommon.Select() + asic_context = {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(config_db, swsscommon.CFG_PORT_TABLE_NAME) + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + return sel, asic_context + +def subscribe_port_update_event(db_list=['APPL_DB', 'STATE_DB']): port_tbl_map = { 'APPL_DB': swsscommon.APP_PORT_TABLE_NAME, 'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME, @@ -109,7 +121,44 @@ def subscribe_port_config_change(db_list=['CONFIG_DB']): sel.addSelectable(port_tbl) return sel, asic_context -def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler, promiscuous=False): +def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler): + """ + Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB + or a XCVR insertion/removal in STATE_DB + """ + if not stop_event.is_set(): + (state, _) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return + if state != swsscommon.Select.OBJECT: + logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return + for port_tbl in asic_context.keys(): + while True: + (key, op, fvp) = port_tbl.pop() + if not key: + break + fvp = dict(fvp) if fvp is not None else {} + if 'index' not in fvp: + fvp['index'] = '-1' + port_index = int(fvp['index']) + port_change_event = None + if op == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(key, + port_index, + asic_context[port_tbl], + PortChangeEvent.PORT_SET, + fvp) + elif op == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(key, + port_index, + asic_context[port_tbl], + PortChangeEvent.PORT_DEL, + fvp) + if port_change_event is not None: + port_change_event_handler(port_change_event) + +def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers """ if not stop_event.is_set(): @@ -120,29 +169,23 @@ def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logge logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') return - read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, promiscuous) + read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler) -def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, promiscuous): +def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler): for port_tbl in asic_context.keys(): while True: (key, op, fvp) = port_tbl.pop() if not key: break - fvp = dict(fvp) if fvp is not None else {} if op == swsscommon.SET_COMMAND: + fvp = dict(fvp) if 'index' not in fvp: - fvp['index'] = -1 - new_physical_index = int(fvp['index']) - - if promiscuous: - port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_SET, fvp) - port_change_event_handler(port_change_event) - - if key in ['PortConfigDone', 'PortInitDone']: continue - elif not port_mapping.is_logical_port(key): + + new_physical_index = int(fvp['index']) + if not port_mapping.is_logical_port(key): # New logical port created - port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp) + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) port_change_event_handler(port_change_event) else: current_physical_index = port_mapping.get_logical_to_physical(key)[0] @@ -150,35 +193,17 @@ def read_port_config_change(asic_context, port_mapping, logger, port_change_even port_change_event = PortChangeEvent(key, current_physical_index, asic_context[port_tbl], - PortChangeEvent.PORT_REMOVE, - fvp) + PortChangeEvent.PORT_REMOVE) port_change_event_handler(port_change_event) - port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp) + port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD) port_change_event_handler(port_change_event) elif op == swsscommon.DEL_COMMAND: - if promiscuous: - if 'index' in fvp: - physical_index = int(fvp['index']) - elif port_mapping.is_logical_port(key): - physical_index = port_mapping.get_logical_to_physical(key)[0] - else: - physical_index = -1 - port_change_event = PortChangeEvent(key, - physical_index, - asic_context[port_tbl], - PortChangeEvent.PORT_DEL, - fvp) - port_change_event_handler(port_change_event) - - if key in ['PortConfigDone', 'PortInitDone']: - continue - elif port_mapping.is_logical_port(key): + if port_mapping.is_logical_port(key): port_change_event = PortChangeEvent(key, port_mapping.get_logical_to_physical(key)[0], asic_context[port_tbl], - PortChangeEvent.PORT_REMOVE, - fvp) + PortChangeEvent.PORT_REMOVE) port_change_event_handler(port_change_event) else: logger.log_warning('Invalid DB operation: {}'.format(op)) From 3fcbaa41d1ac74909c7d661d42149ce7429b8ac5 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Tue, 23 Nov 2021 00:08:33 +0000 Subject: [PATCH 06/14] fix UT failures Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 5 +++-- sonic-xcvrd/xcvrd/xcvrd.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index e612f843a..c24c43708 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -569,8 +569,9 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): task = CmisManagerTask(port_mapping) task.task_run([False]) task.task_stop() - assert task.task_process is not None - assert not task.task_process.is_alive() + # task.task_process could be none if the feature is disabled in pmon_daemon_control.json + if task.task_process is not None: + assert not task.task_process.is_alive() @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 3272d9a23..1044f6402 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1058,7 +1058,7 @@ def task_run(self, y_cable_presence): if ctrl.get('skip_xcvrd_cmis_manager', False): self.dbg_print("service stopped administratively") return - except AttributeError: + except (AttributeError, TypeError): pass if platform_chassis is None: From dc421d1746886c218f7ea817391bd0477cefe643 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Tue, 23 Nov 2021 01:34:24 +0000 Subject: [PATCH 07/14] cosmetic updates Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 22 ++++++++++------------ sonic-xcvrd/xcvrd/xcvrd.py | 8 ++++---- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index c24c43708..142eacae8 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -537,23 +537,23 @@ def test_CmisManagerTask_handle_port_change_event(self): assert not task.isPortConfigDone port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert task.isPortConfigDone port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert len(task.port_dict) == 0 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert len(task.port_dict) == 0 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert len(task.port_dict) == 1 port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert len(task.port_dict) == 1 @patch('xcvrd.xcvrd.platform_chassis') @@ -569,9 +569,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): task = CmisManagerTask(port_mapping) task.task_run([False]) task.task_stop() - # task.task_process could be none if the feature is disabled in pmon_daemon_control.json - if task.task_process is not None: - assert not task.task_process.is_alive() + assert task.task_process is None @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) @@ -605,7 +603,7 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): 'DP8State': 'DataPathInitialized' } }) - mock_sfp.get_cmis_application_update = MagicMock(return_value=(True, 1)) + mock_sfp.has_cmis_application_update = MagicMock(return_value=(True, 1)) mock_sfp.set_cmis_application_stop = MagicMock(return_value=True) mock_sfp.set_cmis_application_apsel = MagicMock(return_value=True) mock_sfp.set_cmis_application_start = MagicMock(return_value=True) @@ -618,18 +616,18 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): task = CmisManagerTask(port_mapping) port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert task.isPortConfigDone port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, {'speed':'400000', 'lanes':'1,2,3,4,5,6,7,8'}) - task.on_port_config_change(port_change_event) + task.on_port_update_event(port_change_event) assert len(task.port_dict) == 1 # Case 1: Module Inserted --> DP_DEINIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.get_cmis_application_update.call_count > 0 + assert mock_sfp.has_cmis_application_update.call_count > 0 assert mock_sfp.set_cmis_application_stop.call_count > 0 # Case 2: DP_DEINIT --> AP Configured diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 1044f6402..854bd28b8 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -875,7 +875,7 @@ def __init__(self, port_mapping): def dbg_print(self, message): helper_logger.log_notice("CMIS: {}".format(message)) - def on_port_config_change(self, port_change_event): + def on_port_update_event(self, port_change_event): if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: return @@ -928,7 +928,7 @@ def task_worker(self): asic_context, self.task_stopping_event, helper_logger, - self.on_port_config_change) + self.on_port_update_event) if not self.isPortConfigDone: continue @@ -985,7 +985,7 @@ def task_worker(self): lport, int(speed/1000), len(host_lanes), state)) if state == self.CMIS_STATE_INSERTED: - (flag, appl) = sfp.get_cmis_application_update(host_lanes, host_speed) + (flag, appl) = sfp.has_cmis_application_update(host_speed, host_lanes) if not flag: # No application updates state = self.CMIS_STATE_READY @@ -1087,7 +1087,7 @@ def task_stop(self): self.task_stopping_event.set() if self.task_process is not None: self.task_process.join() - + self.task_process = None # Thread wrapper class to update dom info periodically From 143014ff9677575095efba550ab11b83ba8d9b3b Mon Sep 17 00:00:00 2001 From: Dante Su Date: Mon, 29 Nov 2021 03:52:05 +0000 Subject: [PATCH 08/14] drop pmon_daemon_control.json and add more comments for the cage/port type checker Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 854bd28b8..1ab19e44f 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1047,24 +1047,13 @@ def task_worker(self): self.dbg_print("Stopped") def task_run(self, y_cable_presence): - # Stop the service if it's disabled in pmon_daemon_control.json - try: - (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() - pmon_daemon_control_file_path = os.path.join(platform_path, "pmon_daemon_control.json") - if os.path.isfile(pmon_daemon_control_file_path): - ctrl = {} - with open(pmon_daemon_control_file_path, "r") as f: - ctrl = json.load(f) - if ctrl.get('skip_xcvrd_cmis_manager', False): - self.dbg_print("service stopped administratively") - return - except (AttributeError, TypeError): - pass - if platform_chassis is None: self.dbg_print("Platform chassis is not available, stopping...") return + # Deactivate the service if none of the QSFPDD cages/ports are available + # Please note this check has nothing to do with the module type fetched + # from byte 0 or 128 of the transceiver EEPROM. has_cmis = False for sfp in platform_chassis.get_all_sfps(): try: From 2afd61e989f4504c7bc0420a9988af42a8af9f51 Mon Sep 17 00:00:00 2001 From: Dante Su Date: Wed, 1 Dec 2021 00:14:42 +0000 Subject: [PATCH 09/14] Stop deactivating the CmisManager based on the port/cage type Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 1ab19e44f..6f927a85a 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1051,23 +1051,6 @@ def task_run(self, y_cable_presence): self.dbg_print("Platform chassis is not available, stopping...") return - # Deactivate the service if none of the QSFPDD cages/ports are available - # Please note this check has nothing to do with the module type fetched - # from byte 0 or 128 of the transceiver EEPROM. - has_cmis = False - for sfp in platform_chassis.get_all_sfps(): - try: - ptype = sfp.get_port_type() - except (NotImplementedError, ValueError): - ptype = 'Unknown' - if ptype in ['QSFP_DD', 'QSFP-DD']: - has_cmis = True - break - - if not has_cmis: - self.dbg_print("None of QSFP-DD cages are detected, stopping...") - return - self.task_process = multiprocessing.Process(target=self.task_worker) if self.task_process is not None: self.task_process.start() From 3c901382ff008b45a81887bd55e3d9c72cb5edfa Mon Sep 17 00:00:00 2001 From: Dante Su Date: Thu, 2 Dec 2021 11:21:15 +0000 Subject: [PATCH 10/14] address review comments Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 28 +++++++------ sonic-xcvrd/xcvrd/xcvrd.py | 73 ++++++++++++++++++++------------- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 142eacae8..1ccf4942b 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -562,12 +562,11 @@ def test_CmisManagerTask_handle_port_change_event(self): def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_object = MagicMock() mock_object.get_presence = MagicMock(return_value=True) - mock_object.get_port_type = MagicMock(return_value="QSFP_DD") mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object]) port_mapping = PortMapping() task = CmisManagerTask(port_mapping) - task.task_run([False]) + task.task_run() task.task_stop() assert task.task_process is None @@ -578,10 +577,9 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): def test_CmisManagerTask_task_worker(self, mock_chassis): mock_sfp = MagicMock() mock_sfp.get_presence = MagicMock(return_value=True) - mock_sfp.get_port_type = MagicMock(return_value="QSFP_DD") - mock_sfp.get_transceiver_info = MagicMock(return_value={'type_abbrv_name':'QSFP_DD', 'memory_type':'paged'}) - mock_sfp.get_module_state = MagicMock(return_value="ModuleReady") + mock_sfp.get_transceiver_info = MagicMock(return_value={'type_abbrv_name':'QSFP_DD'}) mock_sfp.get_cmis_state = MagicMock(return_value={ + 'module_state': 'ModuleReady', 'config_state': { 'ConfigStatusLane1': 'ConfigSuccess', 'ConfigStatusLane2': 'ConfigSuccess', @@ -604,10 +602,12 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): } }) mock_sfp.has_cmis_application_update = MagicMock(return_value=(True, 1)) - mock_sfp.set_cmis_application_stop = MagicMock(return_value=True) + mock_sfp.set_cmis_datapath_deinit = MagicMock(return_value=True) + mock_sfp.set_cmis_datapath_init = MagicMock(return_value=True) + mock_sfp.tx_disable_channel = MagicMock(return_value=True) + mock_sfp.set_lpmode = MagicMock(return_value=True) mock_sfp.set_cmis_application_apsel = MagicMock(return_value=True) - mock_sfp.set_cmis_application_start = MagicMock(return_value=True) - mock_sfp.set_cmis_application_txon = MagicMock(return_value=True) + mock_sfp.is_flat_memory = MagicMock(return_value=False) mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp]) mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) @@ -627,23 +627,25 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): # Case 1: Module Inserted --> DP_DEINIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.has_cmis_application_update.call_count > 0 - assert mock_sfp.set_cmis_application_stop.call_count > 0 + assert mock_sfp.has_cmis_application_update.call_count == 1 + assert mock_sfp.set_cmis_datapath_deinit.call_count == 1 + assert mock_sfp.tx_disable_channel.call_count == 1 + assert mock_sfp.set_lpmode.call_count == 2 # Case 2: DP_DEINIT --> AP Configured task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.set_cmis_application_apsel.call_count > 0 + assert mock_sfp.set_cmis_application_apsel.call_count == 1 # Case 3: AP Configured --> DP_INIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.set_cmis_application_start.call_count > 0 + assert mock_sfp.set_cmis_datapath_init.call_count == 1 # Case 4: DP_INIT --> DP_TXON task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.set_cmis_application_txon.call_count > 0 + assert mock_sfp.tx_disable_channel.call_count == 2 @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 6f927a85a..d115fe66f 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -203,10 +203,6 @@ def _wrapper_get_sfp_type(physical_port): return sfp.sfp_type except (NotImplementedError, AttributeError): pass - try: - return sfp.get_port_type() - except (NotImplementedError, AttributeError): - pass return None @@ -854,6 +850,8 @@ def is_fast_reboot_enabled(): class CmisManagerTask: + NUM_CHANNELS = 8 + CMIS_STATE_UNKNOWN = 'UNKNOWN' CMIS_STATE_INSERTED = 'INSERTED' CMIS_STATE_DP_DEINIT = 'DP_DEINIT' @@ -873,6 +871,7 @@ def __init__(self, port_mapping): self.isPortConfigDone = False def dbg_print(self, message): + print("CMIS: {}".format(message)) helper_logger.log_notice("CMIS: {}".format(message)) def on_port_update_event(self, port_change_event): @@ -953,16 +952,17 @@ def task_worker(self): if pport < 0 or speed == 0 or len(lanes) < 1: continue - # Replace the physical lane id with logical lane index - # - # TODO: Add dynamic port breakout support - lanes_new = [] - lanes_old = lanes.split(',') - for i in range(len(lanes_old)): - lanes_new.append(i) - host_lanes = lanes_new + # Desired port speed on the host side host_speed = speed + # Convert the physical lane id into logical lanemask + # + # TODO: Add dynamic port breakout support by checking the physical lane offset + host_lanes = 0 + phys_lanes = lanes.split(',') + for i in range(len(phys_lanes)): + host_lanes |= (1 << i) + # double-check the HW presence before moving forward sfp = platform_chassis.get_sfp(pport) if not sfp.get_presence(): @@ -976,28 +976,34 @@ def task_worker(self): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED continue - # Skip if the memory type is flat - if info.get('memory_type', 'flat') in ['flat', 'FLAT', 'Flat']: + # Skip if it's a flat memory device + if sfp.is_flat_memory(): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue - self.dbg_print("{}: {}G, {}-lanes, state={}".format( - lport, int(speed/1000), len(host_lanes), state)) + self.dbg_print("{}: {}G, lanemask=0x{:x}, state={}".format( + lport, int(speed/1000), host_lanes, state)) if state == self.CMIS_STATE_INSERTED: (flag, appl) = sfp.has_cmis_application_update(host_speed, host_lanes) if not flag: # No application updates state = self.CMIS_STATE_READY - self.dbg_print("{}: {}G, {}-lanes, state={}".format( - lport, int(speed/1000), len(host_lanes), state)) + self.dbg_print("{}: state={}".format(lport, state)) self.port_dict[lport]['cmis_state'] = state continue self.port_dict[lport]['cmis_apsel'] = appl - sfp.set_cmis_application_stop(host_lanes) + + # D.2.2 Software Deinitialization + sfp.set_cmis_datapath_deinit(host_lanes) + sfp.set_lpmode(True) + # D.1.3 Software Configuration and Initialization + sfp.tx_disable_channel(host_lanes, True) + sfp.set_lpmode(False) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT elif state == self.CMIS_STATE_DP_DEINIT: - if sfp.get_module_state() != 'ModuleReady': + if sfp.get_cmis_state().get('module_state') != 'ModuleReady': continue appl = self.port_dict[lport]['cmis_apsel'] sfp.set_cmis_application_apsel(host_lanes, appl) @@ -1005,31 +1011,41 @@ def task_worker(self): elif state == self.CMIS_STATE_AP_CONF: st = sfp.get_cmis_state()['config_state'] done = True - for lane in host_lanes: + for lane in range(self.NUM_CHANNELS): + if (1 << lane) & host_lanes == 0: + continue name = "ConfigStatusLane{}".format(lane + 1) if st[name] != 'ConfigSuccess': done = False continue if not done: continue - sfp.set_cmis_application_start(host_lanes) + + sfp.set_cmis_datapath_init(host_lanes) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT elif state == self.CMIS_STATE_DP_INIT: st = sfp.get_cmis_state()['datapath_state'] done = True - for lane in host_lanes: + for lane in range(self.NUM_CHANNELS): + if (1 << lane) & host_lanes == 0: + continue name = "DP{}State".format(lane + 1) if st[name] != 'DataPathInitialized': done = False continue if not done: continue - sfp.set_cmis_application_txon(host_lanes) + + sfp.tx_disable_channel(host_lanes, False) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON elif state == self.CMIS_STATE_DP_TXON: st = sfp.get_cmis_state()['datapath_state'] done = True - for lane in host_lanes: + for lane in range(self.NUM_CHANNELS): + if (1 << lane) & host_lanes == 0: + continue name = "DP{}State".format(lane + 1) if st[name] != 'DataPathActivated': done = False @@ -1037,8 +1053,7 @@ def task_worker(self): if not done: continue state = self.CMIS_STATE_READY - self.dbg_print("{}: {}G, {}-lanes, state={}".format( - lport, int(speed/1000), len(host_lanes), state)) + self.dbg_print("{}: state={}".format(lport, state)) self.port_dict[lport]['cmis_state'] = state except (NotImplementedError, AttributeError): @@ -1046,7 +1061,7 @@ def task_worker(self): self.dbg_print("Stopped") - def task_run(self, y_cable_presence): + def task_run(self): if platform_chassis is None: self.dbg_print("Platform chassis is not available, stopping...") return @@ -1758,7 +1773,7 @@ def run(self): # Start the CMIS manager cmis_manager = CmisManagerTask(port_mapping_data) - cmis_manager.task_run(self.y_cable_presence) + cmis_manager.task_run() # Start the dom sensor info update thread dom_info_update = DomInfoUpdateTask(port_mapping_data) From 5f807c49921ba376622134f73c5a79cc92580fff Mon Sep 17 00:00:00 2001 From: Dante Su Date: Thu, 2 Dec 2021 11:46:19 +0000 Subject: [PATCH 11/14] drop debug print Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index d115fe66f..fb94c7cc8 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -871,7 +871,6 @@ def __init__(self, port_mapping): self.isPortConfigDone = False def dbg_print(self, message): - print("CMIS: {}".format(message)) helper_logger.log_notice("CMIS: {}".format(message)) def on_port_update_event(self, port_change_event): From a258f2a0f85c490966b32688f569aa8c605f474a Mon Sep 17 00:00:00 2001 From: Dante Su Date: Fri, 3 Dec 2021 07:05:30 +0000 Subject: [PATCH 12/14] address review comments Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 89 ++++++++------ sonic-xcvrd/xcvrd/xcvrd.py | 205 +++++++++++++++++++++++++++----- 2 files changed, 225 insertions(+), 69 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 1ccf4942b..0a4f3cb53 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -575,39 +575,55 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_task_worker(self, mock_chassis): - mock_sfp = MagicMock() - mock_sfp.get_presence = MagicMock(return_value=True) - mock_sfp.get_transceiver_info = MagicMock(return_value={'type_abbrv_name':'QSFP_DD'}) - mock_sfp.get_cmis_state = MagicMock(return_value={ - 'module_state': 'ModuleReady', - 'config_state': { - 'ConfigStatusLane1': 'ConfigSuccess', - 'ConfigStatusLane2': 'ConfigSuccess', - 'ConfigStatusLane3': 'ConfigSuccess', - 'ConfigStatusLane4': 'ConfigSuccess', - 'ConfigStatusLane5': 'ConfigSuccess', - 'ConfigStatusLane6': 'ConfigSuccess', - 'ConfigStatusLane7': 'ConfigSuccess', - 'ConfigStatusLane8': 'ConfigSuccess' + mock_xcvr_api = MagicMock() + mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True) + mock_xcvr_api.set_datapath_init = MagicMock(return_value=True) + mock_xcvr_api.tx_disable_channel = MagicMock(return_value=True) + mock_xcvr_api.set_lpmode = MagicMock(return_value=True) + mock_xcvr_api.set_application = MagicMock(return_value=True) + mock_xcvr_api.is_flat_memory = MagicMock(return_value=False) + mock_xcvr_api.get_module_type_abbreviation = MagicMock(return_value='QSFP-DD') + mock_xcvr_api.get_application_advertisement = MagicMock(return_value={ + 1: { + 'host_electrical_interface_id': '400GAUI-8 C2M (Annex 120E)', + 'module_media_interface_id': '400GBASE-DR4 (Cl 124)', + 'media_lane_count': 4, + 'host_lane_count': 8, + 'host_lane_assignment_options': 1 }, - 'datapath_state': { - 'DP1State': 'DataPathInitialized', - 'DP2State': 'DataPathInitialized', - 'DP3State': 'DataPathInitialized', - 'DP4State': 'DataPathInitialized', - 'DP5State': 'DataPathInitialized', - 'DP6State': 'DataPathInitialized', - 'DP7State': 'DataPathInitialized', - 'DP8State': 'DataPathInitialized' + 2: { + 'host_electrical_interface_id': '100GAUI-2 C2M (Annex 135G)', + 'module_media_interface_id': '100G-FR/100GBASE-FR1 (Cl 140)', + 'media_lane_count': 1, + 'host_lane_count': 2, + 'host_lane_assignment_options': 85 } }) - mock_sfp.has_cmis_application_update = MagicMock(return_value=(True, 1)) - mock_sfp.set_cmis_datapath_deinit = MagicMock(return_value=True) - mock_sfp.set_cmis_datapath_init = MagicMock(return_value=True) - mock_sfp.tx_disable_channel = MagicMock(return_value=True) - mock_sfp.set_lpmode = MagicMock(return_value=True) - mock_sfp.set_cmis_application_apsel = MagicMock(return_value=True) - mock_sfp.is_flat_memory = MagicMock(return_value=False) + mock_xcvr_api.get_module_state = MagicMock(return_value='ModuleReady') + mock_xcvr_api.get_config_datapath_hostlane_status = MagicMock(return_value={ + 'ConfigStatusLane1': 'ConfigSuccess', + 'ConfigStatusLane2': 'ConfigSuccess', + 'ConfigStatusLane3': 'ConfigSuccess', + 'ConfigStatusLane4': 'ConfigSuccess', + 'ConfigStatusLane5': 'ConfigSuccess', + 'ConfigStatusLane6': 'ConfigSuccess', + 'ConfigStatusLane7': 'ConfigSuccess', + 'ConfigStatusLane8': 'ConfigSuccess' + }) + mock_xcvr_api.get_datapath_state = MagicMock(return_value={ + 'DP1State': 'DataPathInitialized', + 'DP2State': 'DataPathInitialized', + 'DP3State': 'DataPathInitialized', + 'DP4State': 'DataPathInitialized', + 'DP5State': 'DataPathInitialized', + 'DP6State': 'DataPathInitialized', + 'DP7State': 'DataPathInitialized', + 'DP8State': 'DataPathInitialized' + }) + + mock_sfp = MagicMock() + mock_sfp.get_presence = MagicMock(return_value=True) + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api) mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp]) mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) @@ -627,25 +643,24 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): # Case 1: Module Inserted --> DP_DEINIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.has_cmis_application_update.call_count == 1 - assert mock_sfp.set_cmis_datapath_deinit.call_count == 1 - assert mock_sfp.tx_disable_channel.call_count == 1 - assert mock_sfp.set_lpmode.call_count == 2 + assert mock_xcvr_api.set_datapath_deinit.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 1 + assert mock_xcvr_api.set_lpmode.call_count == 2 # Case 2: DP_DEINIT --> AP Configured task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.set_cmis_application_apsel.call_count == 1 + assert mock_xcvr_api.set_application.call_count == 1 # Case 3: AP Configured --> DP_INIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.set_cmis_datapath_init.call_count == 1 + assert mock_xcvr_api.set_datapath_init.call_count == 1 # Case 4: DP_INIT --> DP_TXON task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() - assert mock_sfp.tx_disable_channel.call_count == 2 + assert mock_xcvr_api.tx_disable_channel.call_count == 2 @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index fb94c7cc8..6b68a3074 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -870,9 +870,12 @@ def __init__(self, port_mapping): self.isPortInitDone = False self.isPortConfigDone = False - def dbg_print(self, message): + def log_notice(self, message): helper_logger.log_notice("CMIS: {}".format(message)) + def log_error(self, message): + helper_logger.log_error("CMIS: {}".format(message)) + def on_port_update_event(self, port_change_event): if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: return @@ -915,8 +918,130 @@ def on_port_update_event(self, port_change_event): else: self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED + def get_interface_speed(self, ifname): + """ + Get the port speed from the host interface name + + Args: + ifname: String, interface name + + Returns: + Integer, the port speed if success otherwise 0 + """ + # see HOST_ELECTRICAL_INTERFACE of sff8024.py + speed = 0 + if '400G' in ifname: + speed = 400000 + elif '200G' in ifname: + speed = 200000 + elif '100G' in ifname or 'CAUI-4' in ifname: + speed = 100000 + elif '50G' in ifname or 'LAUI-2' in ifname: + speed = 50000 + elif '40G' in ifname or 'XLAUI' in ifname or 'XLPPI' in ifname: + speed = 40000 + elif '25G' in ifname: + speed = 25000 + elif '10G' in ifname or 'SFI' in ifname or 'XFI' in ifname: + speed = 10000 + elif '1000BASE' in ifname: + speed = 1000 + return speed + + def get_cmis_application_desired(self, api, channel, speed): + """ + Get the CMIS application code that matches the specified host side configurations + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + speed: + Integer, the port speed of the host interface + + Returns: + Integer, the transceiver-specific application code + """ + if speed == 0 or channel == 0: + return 0 + + host_lane_count = 0 + for lane in range(self.NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + host_lane_count += 1 + + appl_code = 0 + appl_dict = api.get_application_advertisement() + for c in appl_dict.keys(): + d = appl_dict[c] + if d.get('host_lane_count') != host_lane_count: + continue + if self.get_interface_speed(d.get('host_electrical_interface_id')) != speed: + continue + appl_code = c + break + + return (appl_code & 0xf) + + def is_cmis_application_update_required(self, api, channel, speed): + """ + Check if the CMIS application update is required + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + speed: + Integer, the port speed of the host interface + + Returns: + Boolean, true if application update is required otherwise false + """ + if speed == 0 or channel == 0 or api.is_flat_memory(): + return False + + app_new = self.get_cmis_application_desired(api, channel, speed) + if app_new != 1: + self.log_notice("Non-default application is not supported") + return False + + app_old = 0 + for lane in range(api.NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + if app_old == 0: + app_old = api.get_application(lane) + elif app_old != api.get_application(lane): + self.log_notice("Not all the lanes are in the same application mode") + self.log_notice("Forcing application update...") + return True + + if app_old == app_new: + skip = True + dp_state = api.get_datapath_state() + conf_state = api.get_config_datapath_hostlane_status() + for lane in range(api.NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + name = "DP{}State".format(lane + 1) + if dp_state[name] != 'DataPathActivated': + skip = False + break + name = "ConfigStatusLane{}".format(lane + 1) + if conf_state[name] != 'ConfigSuccess': + skip = False + break + return (not skip) + + return True + def task_worker(self): - self.dbg_print("Starting...") + self.log_notice("Starting...") # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal sel, asic_context = port_mapping.subscribe_port_update_event(['APPL_DB', 'STATE_DB']) @@ -954,7 +1079,7 @@ def task_worker(self): # Desired port speed on the host side host_speed = speed - # Convert the physical lane id into logical lanemask + # Convert the physical lane list into a logical lanemask # # TODO: Add dynamic port breakout support by checking the physical lane offset host_lanes = 0 @@ -969,49 +1094,63 @@ def task_worker(self): continue try: - # Skip if the xcvr type is not QSFP-DD - info = sfp.get_transceiver_info() - if info.get('type_abbrv_name', None) not in ['QSFP-DD', 'QSFP_DD']: - self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + # Skip if XcvrApi is not supported + api = sfp.get_xcvr_api() + if api is None: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + # Skip if it's not a paged memory device + if api.is_flat_memory(): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue - # Skip if it's a flat memory device - if sfp.is_flat_memory(): + # Skip if it's not a QSFP-DD + type = api.get_module_type_abbreviation() + if (type is None) or (type not in ['QSFP-DD', 'QSFP_DD']): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue + except AttributeError: + # Skip if these essential routines are not available + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue - self.dbg_print("{}: {}G, lanemask=0x{:x}, state={}".format( - lport, int(speed/1000), host_lanes, state)) + self.log_notice("{}: {}G, lanemask=0x{:x}, state={}".format( + lport, int(speed/1000), host_lanes, state)) + try: if state == self.CMIS_STATE_INSERTED: - (flag, appl) = sfp.has_cmis_application_update(host_speed, host_lanes) - if not flag: + has_update = self.is_cmis_application_update_required(api, host_lanes, host_speed) + if not has_update: # No application updates state = self.CMIS_STATE_READY - self.dbg_print("{}: state={}".format(lport, state)) + self.log_notice("{}: state={}".format(lport, state)) self.port_dict[lport]['cmis_state'] = state continue + appl = self.get_cmis_application_desired(api, host_lanes, host_speed) self.port_dict[lport]['cmis_apsel'] = appl # D.2.2 Software Deinitialization - sfp.set_cmis_datapath_deinit(host_lanes) - sfp.set_lpmode(True) + api.set_datapath_deinit(host_lanes) + api.set_lpmode(True) # D.1.3 Software Configuration and Initialization - sfp.tx_disable_channel(host_lanes, True) - sfp.set_lpmode(False) + api.tx_disable_channel(host_lanes, True) + api.set_lpmode(False) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT elif state == self.CMIS_STATE_DP_DEINIT: - if sfp.get_cmis_state().get('module_state') != 'ModuleReady': + if api.get_module_state() != 'ModuleReady': continue - appl = self.port_dict[lport]['cmis_apsel'] - sfp.set_cmis_application_apsel(host_lanes, appl) + + # D.1.3 Software Configuration and Initialization + api.set_application(host_lanes, self.port_dict[lport]['cmis_apsel']) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF elif state == self.CMIS_STATE_AP_CONF: - st = sfp.get_cmis_state()['config_state'] + st = api.get_config_datapath_hostlane_status() done = True for lane in range(self.NUM_CHANNELS): - if (1 << lane) & host_lanes == 0: + if ((1 << lane) & host_lanes) == 0: continue name = "ConfigStatusLane{}".format(lane + 1) if st[name] != 'ConfigSuccess': @@ -1020,14 +1159,15 @@ def task_worker(self): if not done: continue - sfp.set_cmis_datapath_init(host_lanes) + # D.1.3 Software Configuration and Initialization + api.set_datapath_init(host_lanes) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT elif state == self.CMIS_STATE_DP_INIT: - st = sfp.get_cmis_state()['datapath_state'] + st = api.get_datapath_state() done = True for lane in range(self.NUM_CHANNELS): - if (1 << lane) & host_lanes == 0: + if ((1 << lane) & host_lanes) == 0: continue name = "DP{}State".format(lane + 1) if st[name] != 'DataPathInitialized': @@ -1036,14 +1176,15 @@ def task_worker(self): if not done: continue - sfp.tx_disable_channel(host_lanes, False) + # D.1.3 Software Configuration and Initialization + api.tx_disable_channel(host_lanes, False) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON elif state == self.CMIS_STATE_DP_TXON: - st = sfp.get_cmis_state()['datapath_state'] + st = api.get_datapath_state() done = True for lane in range(self.NUM_CHANNELS): - if (1 << lane) & host_lanes == 0: + if ((1 << lane) & host_lanes) == 0: continue name = "DP{}State".format(lane + 1) if st[name] != 'DataPathActivated': @@ -1052,17 +1193,17 @@ def task_worker(self): if not done: continue state = self.CMIS_STATE_READY - self.dbg_print("{}: state={}".format(lport, state)) + self.log_notice("{}: state={}".format(lport, state)) self.port_dict[lport]['cmis_state'] = state except (NotImplementedError, AttributeError): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED - self.dbg_print("Stopped") + self.log_notice("Stopped") def task_run(self): if platform_chassis is None: - self.dbg_print("Platform chassis is not available, stopping...") + self.log_notice("Platform chassis is not available, stopping...") return self.task_process = multiprocessing.Process(target=self.task_worker) From 8951bbff97e9c2ad98052183211aefb8d0e72c8e Mon Sep 17 00:00:00 2001 From: Dante Su Date: Thu, 9 Dec 2021 08:20:03 +0000 Subject: [PATCH 13/14] add CMIS expiration and retries Signed-off-by: Dante Su --- sonic-xcvrd/xcvrd/xcvrd.py | 220 ++++++++++++++++++++++++++++--------- 1 file changed, 168 insertions(+), 52 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 6b68a3074..3aa56d36c 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -16,6 +16,7 @@ import sys import threading import time + import datetime import subprocess from sonic_py_common import daemon_base, device_info, logger @@ -850,7 +851,10 @@ def is_fast_reboot_enabled(): class CmisManagerTask: - NUM_CHANNELS = 8 + CMIS_MAX_RETRIES = 3 + CMIS_DEF_EXPIRED = 60 # seconds, default expiration time + CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP'] + CMIS_NUM_CHANNELS = 8 CMIS_STATE_UNKNOWN = 'UNKNOWN' CMIS_STATE_INSERTED = 'INSERTED' @@ -899,9 +903,9 @@ def on_port_update_event(self, port_change_event): if pport is None: return - # Skip if the port/cage type is not QSFP-DD + # Skip if the port/cage type is not a CMIS ptype = _wrapper_get_sfp_type(pport) - if ptype not in ['QSFP-DD', 'QSFP_DD']: + if ptype not in self.CMIS_MODULE_TYPES: return if lport not in self.port_dict: @@ -915,6 +919,7 @@ def on_port_update_event(self, port_change_event): if port_change_event.port_dict is not None and 'lanes' in port_change_event.port_dict: self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED + self.reset_cmis_init(lport, 0) else: self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED @@ -968,7 +973,7 @@ def get_cmis_application_desired(self, api, channel, speed): return 0 host_lane_count = 0 - for lane in range(self.NUM_CHANNELS): + for lane in range(self.CMIS_NUM_CHANNELS): if ((1 << lane) & channel) == 0: continue host_lane_count += 1 @@ -1011,7 +1016,7 @@ def is_cmis_application_update_required(self, api, channel, speed): return False app_old = 0 - for lane in range(api.NUM_CHANNELS): + for lane in range(self.CMIS_NUM_CHANNELS): if ((1 << lane) & channel) == 0: continue if app_old == 0: @@ -1025,7 +1030,7 @@ def is_cmis_application_update_required(self, api, channel, speed): skip = True dp_state = api.get_datapath_state() conf_state = api.get_config_datapath_hostlane_status() - for lane in range(api.NUM_CHANNELS): + for lane in range(self.CMIS_NUM_CHANNELS): if ((1 << lane) & channel) == 0: continue name = "DP{}State".format(lane + 1) @@ -1040,6 +1045,82 @@ def is_cmis_application_update_required(self, api, channel, speed): return True + def reset_cmis_init(self, lport, retries=0): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED + self.port_dict[lport]['cmis_retries'] = retries + self.port_dict[lport]['cmis_expired'] = None # No expiration + + def test_module_state(self, api, states): + """ + Check if the CMIS module is in the specified state + + Args: + api: + XcvrApi object + states: + List, a string list of states + + Returns: + Boolean, true if it's in the specified state, otherwise false + """ + return api.get_module_state() in states + + def test_config_error(self, api, channel, states): + """ + Check if the CMIS configuration states are in the specified state + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + cerr = api.get_config_datapath_hostlane_status() + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + key = "ConfigStatusLane{}".format(lane + 1) + if cerr[key] not in states: + done = False + break + + return done + + def test_datapath_state(self, api, channel, states): + """ + Check if the CMIS datapath states are in the specified state + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + dpstate = api.get_datapath_state() + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + key = "DP{}State".format(lane + 1) + if dpstate[key] not in states: + done = False + break + + return done + def task_worker(self): self.log_notice("Starting...") @@ -1105,9 +1186,9 @@ def task_worker(self): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue - # Skip if it's not a QSFP-DD + # Skip if it's not a CMIS module type = api.get_module_type_abbreviation() - if (type is None) or (type not in ['QSFP-DD', 'QSFP_DD']): + if (type is None) or (type not in self.CMIS_MODULE_TYPES): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue except AttributeError: @@ -1115,88 +1196,123 @@ def task_worker(self): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue - self.log_notice("{}: {}G, lanemask=0x{:x}, state={}".format( - lport, int(speed/1000), host_lanes, state)) + # CMIS expiration and retries + # + # A retry should always start over at INSETRTED state, while the + # expiration will reset the state to INSETRTED and advance the + # retry counter + now = datetime.datetime.now() + expired = self.port_dict[lport].get('cmis_expired') + retries = self.port_dict[lport].get('cmis_retries', 0) + self.log_notice("{}: {}G, lanemask=0x{:x}, state={}, retries={}".format( + lport, int(speed/1000), host_lanes, state, retries)) + if retries > self.CMIS_MAX_RETRIES: + self.log_error("{}: FAILED".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue try: + # CMIS state transitions if state == self.CMIS_STATE_INSERTED: + + appl = self.get_cmis_application_desired(api, host_lanes, host_speed) + if appl < 1: + self.log_error("{}: no suitable app for the port".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + has_update = self.is_cmis_application_update_required(api, host_lanes, host_speed) if not has_update: # No application updates - state = self.CMIS_STATE_READY - self.log_notice("{}: state={}".format(lport, state)) - self.port_dict[lport]['cmis_state'] = state + self.log_notice("{}: READY".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue - appl = self.get_cmis_application_desired(api, host_lanes, host_speed) - self.port_dict[lport]['cmis_apsel'] = appl # D.2.2 Software Deinitialization api.set_datapath_deinit(host_lanes) api.set_lpmode(True) + if not self.test_module_state(api, ['ModuleReady', 'ModuleLowPwr']): + self.log_notice("{}: unable to enter low-power mode".format(lport)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + continue + # D.1.3 Software Configuration and Initialization - api.tx_disable_channel(host_lanes, True) + if not api.tx_disable_channel(host_lanes, True): + self.log_notice("{}: unable to turn off tx power".format(lport)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + continue api.set_lpmode(False) + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT elif state == self.CMIS_STATE_DP_DEINIT: - if api.get_module_state() != 'ModuleReady': + if not self.test_module_state(api, ['ModuleReady']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + if not self.test_datapath_state(api, host_lanes, ['DataPathDeinit', 'DataPathDeactivated']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathDeinit'".format(lport)) + self.reset_cmis_init(lport, retries + 1) continue # D.1.3 Software Configuration and Initialization - api.set_application(host_lanes, self.port_dict[lport]['cmis_apsel']) + appl = self.get_cmis_application_desired(api, host_lanes, host_speed) + if appl < 1: + self.log_error("{}: no suitable app for the port".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + + if not api.set_application(host_lanes, appl): + self.log_notice("{}: unable to set application".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF elif state == self.CMIS_STATE_AP_CONF: - st = api.get_config_datapath_hostlane_status() - done = True - for lane in range(self.NUM_CHANNELS): - if ((1 << lane) & host_lanes) == 0: - continue - name = "ConfigStatusLane{}".format(lane + 1) - if st[name] != 'ConfigSuccess': - done = False - continue - if not done: + if not self.test_config_error(api, host_lanes, ['ConfigSuccess']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'ConfigSuccess'".format(lport)) + self.reset_cmis_init(lport, retries + 1) continue # D.1.3 Software Configuration and Initialization api.set_datapath_init(host_lanes) + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT elif state == self.CMIS_STATE_DP_INIT: - st = api.get_datapath_state() - done = True - for lane in range(self.NUM_CHANNELS): - if ((1 << lane) & host_lanes) == 0: - continue - name = "DP{}State".format(lane + 1) - if st[name] != 'DataPathInitialized': - done = False - continue - if not done: + if not self.test_datapath_state(api, host_lanes, ['DataPathInitialized']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) + self.reset_cmis_init(lport, retries + 1) continue # D.1.3 Software Configuration and Initialization - api.tx_disable_channel(host_lanes, False) + if not api.tx_disable_channel(host_lanes, False): + self.log_notice("{}: unable to turn on tx power".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + # TODO: Use fine grained timeout when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON elif state == self.CMIS_STATE_DP_TXON: - st = api.get_datapath_state() - done = True - for lane in range(self.NUM_CHANNELS): - if ((1 << lane) & host_lanes) == 0: - continue - name = "DP{}State".format(lane + 1) - if st[name] != 'DataPathActivated': - done = False - continue - if not done: + if not self.test_datapath_state(api, host_lanes, ['DataPathActivated']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) + self.reset_cmis_init(lport, retries + 1) continue - state = self.CMIS_STATE_READY - self.log_notice("{}: state={}".format(lport, state)) - self.port_dict[lport]['cmis_state'] = state + self.log_notice("{}: READY".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY except (NotImplementedError, AttributeError): + self.log_error("{}: internal errors".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED self.log_notice("Stopped") From de650136ffe18a578279fd7dd25406e9b3fc919a Mon Sep 17 00:00:00 2001 From: Dante Su Date: Thu, 9 Dec 2021 09:11:45 +0000 Subject: [PATCH 14/14] fix the CMIS unittestfailure Signed-off-by: Dante Su --- sonic-xcvrd/tests/test_xcvrd.py | 42 +++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 0a4f3cb53..af2001f34 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -610,16 +610,38 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): 'ConfigStatusLane7': 'ConfigSuccess', 'ConfigStatusLane8': 'ConfigSuccess' }) - mock_xcvr_api.get_datapath_state = MagicMock(return_value={ - 'DP1State': 'DataPathInitialized', - 'DP2State': 'DataPathInitialized', - 'DP3State': 'DataPathInitialized', - 'DP4State': 'DataPathInitialized', - 'DP5State': 'DataPathInitialized', - 'DP6State': 'DataPathInitialized', - 'DP7State': 'DataPathInitialized', - 'DP8State': 'DataPathInitialized' - }) + mock_xcvr_api.get_datapath_state = MagicMock(side_effect=[ + { + 'DP1State': 'DataPathDeactivated', + 'DP2State': 'DataPathDeactivated', + 'DP3State': 'DataPathDeactivated', + 'DP4State': 'DataPathDeactivated', + 'DP5State': 'DataPathDeactivated', + 'DP6State': 'DataPathDeactivated', + 'DP7State': 'DataPathDeactivated', + 'DP8State': 'DataPathDeactivated' + }, + { + 'DP1State': 'DataPathInitialized', + 'DP2State': 'DataPathInitialized', + 'DP3State': 'DataPathInitialized', + 'DP4State': 'DataPathInitialized', + 'DP5State': 'DataPathInitialized', + 'DP6State': 'DataPathInitialized', + 'DP7State': 'DataPathInitialized', + 'DP8State': 'DataPathInitialized' + }, + { + 'DP1State': 'DataPathActivated', + 'DP2State': 'DataPathActivated', + 'DP3State': 'DataPathActivated', + 'DP4State': 'DataPathActivated', + 'DP5State': 'DataPathActivated', + 'DP6State': 'DataPathActivated', + 'DP7State': 'DataPathActivated', + 'DP8State': 'DataPathActivated' + } + ]) mock_sfp = MagicMock() mock_sfp.get_presence = MagicMock(return_value=True)