From ccfd175034daa53494be31e8daf380a08963830a Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Jun 2023 22:33:42 +0000 Subject: [PATCH 1/5] Add intelligence to xcvrd to understand process restart and config reload Signed-off-by: Mihir Patel --- sonic-xcvrd/tests/test_xcvrd.py | 89 +++++++++++++++++++++------ sonic-xcvrd/xcvrd/xcvrd.py | 105 ++++++++++++++++++++++++++++---- 2 files changed, 162 insertions(+), 32 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 2581c7a0d..2c822232f 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -47,7 +47,8 @@ class TestXcvrdThreadException(object): def test_CmisManagerTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() - cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError) exception_received = None trace = None @@ -90,7 +91,8 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) exception_received = None trace = None try: @@ -106,6 +108,7 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) assert("subscribe_port_config_change" in str(trace)) + @patch('xcvrd.xcvrd.DaemonXcvrd.subscribe_appl_proc_info_update', MagicMock(return_value = (None, None))) @patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False)) @@ -121,7 +124,7 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill): mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.stop_event.wait = MagicMock() + xcvrd.stop_event.is_set = MagicMock(return_value = True) xcvrd.run() assert len(xcvrd.threads) == 3 @@ -401,7 +404,8 @@ def test_post_port_sfp_info_and_dom_thr_to_db_once(self): stop_event = threading.Event() xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping, xcvr_table_helper, stop_event) @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @@ -416,7 +420,8 @@ def test_init_port_sfp_status_tbl(self): stop_event = threading.Event() xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task._init_port_sfp_status_tbl(port_mapping, xcvr_table_helper, stop_event) def test_get_media_settings_key(self): @@ -575,6 +580,7 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table xcvrd.wait_for_port_config_done('') assert swsscommon.Select.select.call_count == 2 + @patch('xcvrd.xcvrd.DaemonXcvrd.subscribe_appl_proc_info_update', MagicMock(return_value = (None, None))) @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') @patch('xcvrd.xcvrd.DomInfoUpdateTask.start') @@ -584,7 +590,7 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) - xcvrd.stop_event.wait = MagicMock() + xcvrd.stop_event.is_set = MagicMock(return_value=True) xcvrd.run() assert mock_task_stop1.call_count == 1 assert mock_task_stop2.call_count == 1 @@ -593,11 +599,43 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('os.kill') + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + @patch('xcvrd.xcvrd.DaemonXcvrd.init') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.start') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.start') + def test_xcvrd_kill_with_proc_info_del_command(self, mock_task_run1, mock_task_run2, mock_init, mock_select, mock_sub_table, mock_os_kill): + mock_init.return_value = (PortMapping()) + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('XCVRD', swsscommon.DEL_COMMAND, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.stop_event.is_set = MagicMock(return_value=False) + + # Since DEL event handling is done in a while loop, we need to raise an exception to break it + # and prevent the test script from getting killed + mock_os_kill.side_effect = Exception('os.kill() is called') + + try: + xcvrd.run() + except Exception as e: + assert str(e) == "os.kill() is called" + + assert mock_task_run1.call_count == 1 + assert mock_task_run2.call_count == 1 + assert mock_init.call_count == 1 + assert mock_os_kill.call_count == 1 + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) assert not task.isPortConfigDone port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) @@ -625,7 +663,8 @@ def test_CmisManagerTask_handle_port_change_event(self): def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) cfg_port_tbl = MagicMock() cfg_port_tbl.get = MagicMock(return_value=(True, (('laser_freq', 193100),))) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -636,7 +675,8 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) cfg_port_tbl = MagicMock() cfg_port_tbl.get = MagicMock(return_value=(True, (('tx_power', -10),))) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -653,7 +693,8 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): port_mapping = PortMapping() stop_event = threading.Event() - cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) cmis_manager.wait_for_port_config_done = MagicMock() cmis_manager.start() cmis_manager.join() @@ -714,7 +755,8 @@ def get_application(lane): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) assert task.is_cmis_application_update_required(mock_xcvr_api, app_new, host_lanes_mask) == expected @@ -759,7 +801,8 @@ def get_host_lane_assignment_option_side_effect(app): mock_xcvr_api.get_host_lane_assignment_option = MagicMock(side_effect=get_host_lane_assignment_option_side_effect) port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = MagicMock() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) appl = task.get_cmis_application_desired(mock_xcvr_api, host_lane_count, speed) assert task.get_cmis_host_lanes_mask(mock_xcvr_api, appl, host_lane_count, subport) == expected @@ -865,7 +908,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + port_reinit_request_tbl = {'Ethernet0': True} + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, port_reinit_request_tbl) port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) task.on_port_update_event(port_change_event) @@ -996,7 +1040,8 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw stop_event = threading.Event() sfp_error_event = threading.Event() port_mapping = PortMapping() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl @@ -1034,7 +1079,8 @@ def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.start() assert wait_until(5, 1, task.is_alive) task.raise_exception() @@ -1050,7 +1096,8 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table) @@ -1078,7 +1125,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) port_dict = {} assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL @@ -1114,7 +1162,8 @@ def test_SfpStateUpdateTask_task_worker(self, mock_del_status_hw, port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MagicMock() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) mock_change_event.return_value = (True, {0: 0}, {}) mock_mapping_event.return_value = SYSTEM_NOT_READY @@ -1224,7 +1273,8 @@ class MockTable: port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) + port_reinit_request_tbl = MockTable() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl @@ -1490,6 +1540,7 @@ def test_get_media_val_str(self): @patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=('/tmp', None))) @patch('swsscommon.swsscommon.WarmStart', MagicMock()) @patch('xcvrd.xcvrd.DaemonXcvrd.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.xcvrd.init_appl_proc_info_tbl', MagicMock()) def test_DaemonXcvrd_init_deinit_fastboot_enabled(self): xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) with patch("subprocess.check_output") as mock_run: diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index f59cf6433..1c023d853 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -24,6 +24,7 @@ from sonic_py_common import daemon_base, device_info, logger from sonic_py_common import multi_asic from swsscommon import swsscommon + from swsscommon.swsscommon import SonicV2Connector from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import port_mapping @@ -47,6 +48,8 @@ TRANSCEIVER_STATUS_TABLE_SW_FIELDS = ["status", "error"] +APPL_PROC_INFO_TABLE = 'PROC_INFO' + # Mgminit time required as per CMIS spec MGMT_INIT_TIME_DELAY_SECS = 2 @@ -856,6 +859,43 @@ def delete_port_from_status_table_hw(logical_port_name, port_mapping, status_tbl continue status_tbl.hdel(physical_port_name, f) +# Initialize the APPL_DB PROC_INFO:XCVRD table with XCVRD_COLD_RESTART as the key +# if not already populated and update the port_reinit_request_tbl accordingly +# XCVRD_COLD_RESTART is the key which is used to determine if ports in a namespace +# need to be reinitialized and media settings are to be renotified +def init_appl_proc_info_tbl(namespaces, port_mapping_data, xcvr_table_helper): + namespace_reinit_request_tbl = {} + port_reinit_request_tbl = {} + + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + namespace_reinit_request_tbl[asic_id] = False + appl_db = SonicV2Connector(use_unix_socket_path=False, namespace=namespace) + if appl_db is None: + helper_logger.log_error("Appl {} tbl is None for asic_id {} during init".format(APPL_PROC_INFO_TABLE, asic_id)) + continue + appl_db.connect(appl_db.APPL_DB) + APPL_PROC_INFO_XCVRD_TABLE = '{}:XCVRD'.format(APPL_PROC_INFO_TABLE) + proc_info_xcvrd_dict = appl_db.get_all(appl_db.APPL_DB, '{}'.format(APPL_PROC_INFO_XCVRD_TABLE)) + if proc_info_xcvrd_dict is not None and 'XCVRD_COLD_RESTART' in proc_info_xcvrd_dict: + helper_logger.log_notice("Key XCVRD_COLD_RESTART already exists in {} tbl for asic {}!".format(APPL_PROC_INFO_XCVRD_TABLE, asic_id)) + else: + appl_db.set(appl_db.APPL_DB, '{}:XCVRD'.format(APPL_PROC_INFO_TABLE), 'XCVRD_COLD_RESTART', 'true') + namespace_reinit_request_tbl[asic_id] = True + helper_logger.log_notice("Added {} tbl for asic {}!".format(APPL_PROC_INFO_XCVRD_TABLE, asic_id)) + appl_db.close(appl_db.APPL_DB) + + for logical_port_name in port_mapping_data.logical_port_list: + port_reinit_request_tbl[logical_port_name] = False + # Get the asic to which this port belongs + asic_index = port_mapping_data.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + helper_logger.log_warning("Got invalid asic index for {} during appl PROC_INFO tbl init".format(logical_port_name)) + continue + port_reinit_request_tbl[logical_port_name] = namespace_reinit_request_tbl[asic_index] + + return port_reinit_request_tbl + def is_fast_reboot_enabled(): fastboot_enabled = subprocess.check_output('sonic-db-cli STATE_DB hget "FAST_RESTART_ENABLE_TABLE|system" enable', shell=True, universal_newlines=True) return "true" in fastboot_enabled @@ -884,7 +924,7 @@ class CmisManagerTask(threading.Thread): CMIS_STATE_REMOVED = 'REMOVED' CMIS_STATE_FAILED = 'FAILED' - def __init__(self, namespaces, port_mapping, main_thread_stop_event, skip_cmis_mgr=False): + def __init__(self, namespaces, port_mapping, main_thread_stop_event, port_reinit_request_tbl, skip_cmis_mgr=False): threading.Thread.__init__(self) self.name = "CmisManagerTask" self.exc = None @@ -897,6 +937,7 @@ def __init__(self, namespaces, port_mapping, main_thread_stop_event, skip_cmis_m self.isPortConfigDone = False self.skip_cmis_mgr = skip_cmis_mgr self.namespaces = namespaces + self.port_reinit_request_tbl = port_reinit_request_tbl def log_notice(self, message): helper_logger.log_notice("CMIS: {}".format(message)) @@ -1534,6 +1575,12 @@ def task_worker(self): if 0 != freq and freq != api.get_laser_config_freq(): need_update = True + if self.port_reinit_request_tbl[lport] == True: + # To ensure forced CMIS init is done only once for a port after xcvrd boot-up + self.port_reinit_request_tbl[lport] = False + need_update = True + self.log_notice("{}: Forcing CMIS init as port_reinit_request_tbl is True".format(lport)) + if not need_update: # No application updates self.log_notice("{}: no CMIS application update required...READY".format(lport)) @@ -1793,7 +1840,7 @@ def on_remove_logical_port(self, port_change_event): class SfpStateUpdateTask(threading.Thread): RETRY_EEPROM_READING_INTERVAL = 60 - def __init__(self, namespaces, port_mapping, main_thread_stop_event, sfp_error_event): + def __init__(self, namespaces, port_mapping, port_reinit_request_tbl, main_thread_stop_event, sfp_error_event): threading.Thread.__init__(self) self.name = "SfpStateUpdateTask" self.exc = None @@ -1810,6 +1857,7 @@ def __init__(self, namespaces, port_mapping, main_thread_stop_event, sfp_error_e self.sfp_error_dict = {} self.sfp_insert_events = {} self.namespaces = namespaces + self.port_reinit_request_tbl = port_reinit_request_tbl def _mapping_event_from_change_event(self, status, port_dict): """ @@ -1841,11 +1889,6 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he transceiver_dict = {} retry_eeprom_set = set() - warmstart = swsscommon.WarmStart() - warmstart.initialize("xcvrd", "pmon") - warmstart.checkWarmStart("xcvrd", "pmon", False) - is_warm_start = warmstart.isWarmStart() - # Post all the current interface sfp/dom threshold info to STATE_DB logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: @@ -1862,7 +1905,8 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, xcvr_table_helper.get_dom_threshold_tbl(asic_index), stop_event) # Do not notify media settings during warm reboot to avoid dataplane traffic impact - if is_warm_start == False: + if self.port_reinit_request_tbl[logical_port_name] == True: + helper_logger.log_notice("Notify media setting during boot-up for {}".format(logical_port_name)) notify_media_setting(logical_port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), port_mapping) transceiver_dict.clear() else: @@ -2330,6 +2374,7 @@ def __init__(self, log_identifier, skip_cmis_mgr=False): self.skip_cmis_mgr = skip_cmis_mgr self.namespaces = [''] self.threads = [] + self.port_reinit_request_tbl = {} # Signal handler def signal_handler(self, sig, frame): @@ -2378,6 +2423,16 @@ def load_media_settings(self): with open(media_settings_file_path, "r") as media_file: g_dict = json.load(media_file) + def subscribe_appl_proc_info_update(self): + for namespace in self.namespaces: + appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) + + sel = swsscommon.Select() + proc_info_tbl = swsscommon.SubscriberStateTable(appl_db, APPL_PROC_INFO_TABLE) + sel.addSelectable(proc_info_tbl) + + return sel, proc_info_tbl + # Initialize daemon def init(self): global platform_sfputil @@ -2429,7 +2484,12 @@ def init(self): self.wait_for_port_config_done(namespace) self.log_notice("XCVRD INIT: After port config is done") - return port_mapping.get_port_mapping(self.namespaces) + port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + + self.port_reinit_request_tbl = init_appl_proc_info_tbl(self.namespaces, port_mapping_data, self.xcvr_table_helper) + self.log_info("port_reinit_request_tbl value after init is: {}".format(self.port_reinit_request_tbl)) + + return port_mapping_data # Deinitialize daemon def deinit(self): @@ -2465,7 +2525,7 @@ def run(self): port_mapping_data = self.init() # Start the CMIS manager - cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr) + cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.port_reinit_request_tbl, self.skip_cmis_mgr) if not self.skip_cmis_mgr: cmis_manager.start() self.threads.append(cmis_manager) @@ -2476,7 +2536,7 @@ def run(self): self.threads.append(dom_info_update) # Start the sfp state info update thread - sfp_state_update = SfpStateUpdateTask(self.namespaces, port_mapping_data, self.stop_event, self.sfp_error_event) + sfp_state_update = SfpStateUpdateTask(self.namespaces, port_mapping_data, self.port_reinit_request_tbl, self.stop_event, self.sfp_error_event) sfp_state_update.start() self.threads.append(sfp_state_update) @@ -2485,7 +2545,22 @@ def run(self): for thread in self.threads: self.log_notice("Started thread {}".format(thread.getName())) - self.stop_event.wait() + sel, proc_info_tbl = self.subscribe_appl_proc_info_update() + self.log_notice("Subscribed to appl proc info update") + + while not self.stop_event.is_set(): + (state, _) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + continue + if state != swsscommon.Select.OBJECT: + self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") + continue + + (key, op, _) = proc_info_tbl.pop() + self.log_info("APPL_DB subscriber: Received {}:{} key".format(key, op)) + if op == swsscommon.DEL_COMMAND and key == 'XCVRD': + self.log_notice("DEL_COMMAND received for {}:{} key. Killing main thread now!".format(APPL_PROC_INFO_TABLE, key)) + os.kill(os.getpid(), signal.SIGKILL) self.log_info("Stop daemon main loop") @@ -2529,7 +2604,7 @@ def run(self): class XcvrTableHelper: def __init__(self, namespaces): self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ - self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl = {}, {}, {}, {}, {}, {}, {}, {} + self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl, self.appl_proc_info_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {} self.state_db = {} self.cfg_db = {} for namespace in namespaces: @@ -2543,6 +2618,7 @@ def __init__(self, namespaces): self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) appl_db = daemon_base.db_connect("APPL_DB", namespace) self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + self.appl_proc_info_tbl[asic_id] = swsscommon.Table(appl_db, APPL_PROC_INFO_TABLE) self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) @@ -2564,6 +2640,9 @@ def get_pm_tbl(self, asic_id): def get_app_port_tbl(self, asic_id): return self.app_port_tbl[asic_id] + def get_appl_proc_info_tbl(self, asic_id): + return self.appl_proc_info_tbl[asic_id] + def get_state_db(self, asic_id): return self.state_db[asic_id] From 0ab145ca83eee200c524087959e548fed1ee9350 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Wed, 14 Jun 2023 01:40:08 +0000 Subject: [PATCH 2/5] Deepcopy port_reinit_request_tbl to prevent race condition when CMIS manager modifies it parallely --- sonic-xcvrd/xcvrd/xcvrd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 1c023d853..045f577b0 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -937,7 +937,7 @@ def __init__(self, namespaces, port_mapping, main_thread_stop_event, port_reinit self.isPortConfigDone = False self.skip_cmis_mgr = skip_cmis_mgr self.namespaces = namespaces - self.port_reinit_request_tbl = port_reinit_request_tbl + self.port_reinit_request_tbl = copy.deepcopy(port_reinit_request_tbl) def log_notice(self, message): helper_logger.log_notice("CMIS: {}".format(message)) @@ -1857,7 +1857,7 @@ def __init__(self, namespaces, port_mapping, port_reinit_request_tbl, main_threa self.sfp_error_dict = {} self.sfp_insert_events = {} self.namespaces = namespaces - self.port_reinit_request_tbl = port_reinit_request_tbl + self.port_reinit_request_tbl = copy.deepcopy(port_reinit_request_tbl) def _mapping_event_from_change_event(self, status, port_dict): """ From 0e9c1edac386600bc597169ff73ec757738d2d6e Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 20 Jun 2023 21:54:01 +0000 Subject: [PATCH 3/5] - Replaced XCVRD_COLD_RESTART with MEDIA_NOTIFY_REQUIRED and CMIS_REINIT_REQUIRED keys in the PROC_INFO:XCVRD table in APPL_DB - Printing Module state while handling Insertion event in CMIS SM - Allowing xcvrd graceful shutdown followed by generating SIGABRT signal while handling DEL event for PROC_INFO:XCVRD table --- sonic-xcvrd/tests/test_xcvrd.py | 44 +++++++++--- sonic-xcvrd/xcvrd/xcvrd.py | 114 ++++++++++++++++++++++++-------- 2 files changed, 120 insertions(+), 38 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 2c822232f..9218cbab7 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -604,9 +604,12 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') @patch('xcvrd.xcvrd.DaemonXcvrd.init') + @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') @patch('xcvrd.xcvrd.DomInfoUpdateTask.start') @patch('xcvrd.xcvrd.SfpStateUpdateTask.start') - def test_xcvrd_kill_with_proc_info_del_command(self, mock_task_run1, mock_task_run2, mock_init, mock_select, mock_sub_table, mock_os_kill): + @patch('xcvrd.xcvrd.DomInfoUpdateTask.join') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.join') + def test_xcvrd_kill_with_proc_info_del_command(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init, mock_select, mock_sub_table, mock_os_kill): mock_init.return_value = (PortMapping()) mock_selectable = MagicMock() mock_selectable.pop = MagicMock( @@ -616,18 +619,14 @@ def test_xcvrd_kill_with_proc_info_del_command(self, mock_task_run1, mock_task_r xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) xcvrd.stop_event.is_set = MagicMock(return_value=False) - # Since DEL event handling is done in a while loop, we need to raise an exception to break it - # and prevent the test script from getting killed - mock_os_kill.side_effect = Exception('os.kill() is called') - - try: - xcvrd.run() - except Exception as e: - assert str(e) == "os.kill() is called" + xcvrd.run() + assert mock_task_stop1.call_count == 1 + assert mock_task_stop2.call_count == 1 assert mock_task_run1.call_count == 1 assert mock_task_run2.call_count == 1 assert mock_init.call_count == 1 + assert mock_deinit.call_count == 1 assert mock_os_kill.call_count == 1 @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) @@ -1088,12 +1087,14 @@ def test_SfpStateUpdateTask_task_run_stop(self): assert wait_until(5, 1, lambda: task.is_alive() is False) @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) + @patch('xcvrd.xcvrd.update_proc_info_xcvrd_to_db') @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') - def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): + def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info, mock_update_proc_info_xcvrd_to_db): mock_table = MagicMock() mock_table.get = MagicMock(return_value=(False, None)) port_mapping = PortMapping() + port_mapping.get_asic_id_for_logical_port = MagicMock(return_value=0) stop_event = threading.Event() sfp_error_event = threading.Event() port_reinit_request_tbl = MagicMock() @@ -1103,6 +1104,10 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): task.xcvr_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_app_port_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_status_tbl = MagicMock(return_value=mock_table) + asic_to_unprocessed_logical_ports_dict = {0: ['Ethernet0']} + task.asic_to_unprocessed_logical_ports_dict = MagicMock() + task.asic_to_unprocessed_logical_ports_dict.__getitem__.side_effect = asic_to_unprocessed_logical_ports_dict.__getitem__ + task.retry_eeprom_reading() assert mock_post_sfp_info.call_count == 0 @@ -1119,6 +1124,7 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): task.last_retry_eeprom_time = 0 mock_post_sfp_info.return_value = None task.retry_eeprom_reading() + assert mock_update_proc_info_xcvrd_to_db.call_count == 1 assert 'Ethernet0' not in task.retry_eeprom_set def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @@ -1273,7 +1279,7 @@ class MockTable: port_mapping = PortMapping() stop_event = threading.Event() sfp_error_event = threading.Event() - port_reinit_request_tbl = MockTable() + port_reinit_request_tbl = MagicMock() task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, port_reinit_request_tbl, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl @@ -1549,6 +1555,22 @@ def test_DaemonXcvrd_init_deinit_fastboot_enabled(self): xcvrd.init() xcvrd.deinit() + @patch('sonic_py_common.multi_asic.get_asic_index_from_namespace', MagicMock(return_value=(0))) + def test_init_appl_proc_info_tbl(self): + class MockTable: + def get(self, key): + return [None, None] + def set(self, key, val): + pass + + port_mapping = PortMapping() + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + port_mapping.handle_port_change_event(port_change_event) + xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) + xcvr_table_helper.get_appl_proc_info_tbl = MagicMock(return_value=MockTable()) + port_reinit_request_tbl = init_appl_proc_info_tbl(DEFAULT_NAMESPACE, port_mapping, xcvr_table_helper) + + assert port_reinit_request_tbl == dict({'Ethernet0' : True}) def wait_until(total_wait_time, interval, call_back, *args, **kwargs): wait_time = 0 diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 045f577b0..267e4c898 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -24,7 +24,6 @@ from sonic_py_common import daemon_base, device_info, logger from sonic_py_common import multi_asic from swsscommon import swsscommon - from swsscommon.swsscommon import SonicV2Connector from .xcvrd_utilities import sfp_status_helper from .xcvrd_utilities import port_mapping @@ -137,6 +136,17 @@ def get_physical_port_name_dict(logical_port_name, port_mapping): return port_name_dict +# Get dictionary of asic to set of corresponding logical ports +def get_asic_to_logical_ports_dict(port_mapping): + asic_to_logical_ports_dict = {} + for logical_port_name in port_mapping.logical_port_list: + asic_id = port_mapping.get_asic_id_for_logical_port(logical_port_name) + if asic_id not in asic_to_logical_ports_dict: + asic_to_logical_ports_dict[asic_id] = set() + asic_to_logical_ports_dict[asic_id].add(logical_port_name) + + return asic_to_logical_ports_dict + # Strip units and beautify @@ -308,6 +318,15 @@ def beautify_pm_info_dict(pm_info_dict, physical_port): continue pm_info_dict[k] = str(v) +# Updates APPL_DB PROC_INFO:XCVRD table with the given input_dict +def update_proc_info_xcvrd_to_db(asic_id, xcvr_table_helper, input_dict): + proc_info_tbl = xcvr_table_helper.get_appl_proc_info_tbl(asic_id) + if proc_info_tbl is None: + helper_logger.log_error("Appl {} tbl is None for asic_id {} while updating with dict {}".format(APPL_PROC_INFO_TABLE, asic_id, input_dict)) + return + fvs = swsscommon.FieldValuePairs([(k, v) for k, v in input_dict.items()]) + proc_info_tbl.set('XCVRD', fvs) + # Update port sfp info in db @@ -859,10 +878,12 @@ def delete_port_from_status_table_hw(logical_port_name, port_mapping, status_tbl continue status_tbl.hdel(physical_port_name, f) -# Initialize the APPL_DB PROC_INFO:XCVRD table with XCVRD_COLD_RESTART as the key -# if not already populated and update the port_reinit_request_tbl accordingly -# XCVRD_COLD_RESTART is the key which is used to determine if ports in a namespace -# need to be reinitialized and media settings are to be renotified +# Initialize the APPL_DB PROC_INFO:XCVRD table with MEDIA_NOTIFY_REQUIRED and +# CMIS_REINIT_REQUIRED as the keys if not already populated. +# If both the keys of these keys are set to False, +# port_reinit_request_tbl will be set as False. In all other cases, +# port_reinit_request_tbl will be set as True and so will be both the keys +# Return value: port_reinit_request_tbls def init_appl_proc_info_tbl(namespaces, port_mapping_data, xcvr_table_helper): namespace_reinit_request_tbl = {} port_reinit_request_tbl = {} @@ -870,20 +891,21 @@ def init_appl_proc_info_tbl(namespaces, port_mapping_data, xcvr_table_helper): for namespace in namespaces: asic_id = multi_asic.get_asic_index_from_namespace(namespace) namespace_reinit_request_tbl[asic_id] = False - appl_db = SonicV2Connector(use_unix_socket_path=False, namespace=namespace) - if appl_db is None: + proc_info_tbl = xcvr_table_helper.get_appl_proc_info_tbl(asic_id) + if proc_info_tbl is None: helper_logger.log_error("Appl {} tbl is None for asic_id {} during init".format(APPL_PROC_INFO_TABLE, asic_id)) continue - appl_db.connect(appl_db.APPL_DB) - APPL_PROC_INFO_XCVRD_TABLE = '{}:XCVRD'.format(APPL_PROC_INFO_TABLE) - proc_info_xcvrd_dict = appl_db.get_all(appl_db.APPL_DB, '{}'.format(APPL_PROC_INFO_XCVRD_TABLE)) - if proc_info_xcvrd_dict is not None and 'XCVRD_COLD_RESTART' in proc_info_xcvrd_dict: - helper_logger.log_notice("Key XCVRD_COLD_RESTART already exists in {} tbl for asic {}!".format(APPL_PROC_INFO_XCVRD_TABLE, asic_id)) - else: - appl_db.set(appl_db.APPL_DB, '{}:XCVRD'.format(APPL_PROC_INFO_TABLE), 'XCVRD_COLD_RESTART', 'true') - namespace_reinit_request_tbl[asic_id] = True - helper_logger.log_notice("Added {} tbl for asic {}!".format(APPL_PROC_INFO_XCVRD_TABLE, asic_id)) - appl_db.close(appl_db.APPL_DB) + _, proc_info_xcvrd_fvs = proc_info_tbl.get('XCVRD') + if proc_info_xcvrd_fvs is not None: + proc_info_xcvrd_fvs_dict = dict(proc_info_xcvrd_fvs) + media_notify_rqd = proc_info_xcvrd_fvs_dict.get('MEDIA_NOTIFY_REQUIRED', 'true') + cmis_reinit_rqd = proc_info_xcvrd_fvs_dict.get('CMIS_REINIT_REQUIRED', 'true') + if media_notify_rqd == 'false' and cmis_reinit_rqd == 'false': + helper_logger.log_notice("Appl {} tbl media notify and cmis reinit not required for ASIC {}".format(APPL_PROC_INFO_TABLE, asic_id)) + continue + update_proc_info_xcvrd_to_db(asic_id, xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'True', 'CMIS_REINIT_REQUIRED' : 'True'}) + namespace_reinit_request_tbl[asic_id] = True + helper_logger.log_notice("Added {} tbl for asic {}!".format(APPL_PROC_INFO_TABLE, asic_id)) for logical_port_name in port_mapping_data.logical_port_list: port_reinit_request_tbl[logical_port_name] = False @@ -939,6 +961,9 @@ def __init__(self, namespaces, port_mapping, main_thread_stop_event, port_reinit self.namespaces = namespaces self.port_reinit_request_tbl = copy.deepcopy(port_reinit_request_tbl) + def log_info(self, message): + helper_logger.log_info("CMIS: {}".format(message)) + def log_notice(self, message): helper_logger.log_notice("CMIS: {}".format(message)) @@ -1404,6 +1429,8 @@ def task_worker(self): for namespace in self.namespaces: self.wait_for_port_config_done(namespace) + asic_to_unprocessed_logical_ports_dict = get_asic_to_logical_ports_dict(self.port_mapping) + # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger) while not self.task_stopping_event.is_set(): @@ -1426,6 +1453,14 @@ def task_worker(self): self.CMIS_STATE_FAILED, self.CMIS_STATE_READY, self.CMIS_STATE_REMOVED]: + if self.port_reinit_request_tbl[lport] is True: + self.log_info("{}: Setting port_reinit_request_tbl to True".format(lport)) + self.port_reinit_request_tbl[lport] = False + asic_id = self.port_mapping.get_asic_id_for_logical_port(lport) + asic_to_unprocessed_logical_ports_dict[asic_id].remove(lport) + if len(asic_to_unprocessed_logical_ports_dict[asic_id]) == 0: + self.log_notice("Setting CMIS_REINIT_REQUIRED to false for asic: {}".format(asic_id)) + update_proc_info_xcvrd_to_db(asic_id, self.xcvr_table_helper, {'CMIS_REINIT_REQUIRED' : 'False'}) if state != self.CMIS_STATE_READY: self.port_dict[lport]['appl'] = 0 self.port_dict[lport]['host_lanes_mask'] = 0 @@ -1502,9 +1537,9 @@ def task_worker(self): self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED continue - self.log_notice("{}: {}G, lanemask=0x{:x}, state={}, appl {} host_lane_count {} " - "retries={}".format(lport, int(speed/1000), host_lanes_mask, - state, appl, host_lane_count, retries)) + self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}, Module state={} appl {} host_lane_count {} " + "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, + api.get_module_state(), appl, host_lane_count, retries)) if retries > self.CMIS_MAX_RETRIES: self.log_error("{}: FAILED".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED @@ -1576,8 +1611,6 @@ def task_worker(self): need_update = True if self.port_reinit_request_tbl[lport] == True: - # To ensure forced CMIS init is done only once for a port after xcvrd boot-up - self.port_reinit_request_tbl[lport] = False need_update = True self.log_notice("{}: Forcing CMIS init as port_reinit_request_tbl is True".format(lport)) @@ -1858,6 +1891,7 @@ def __init__(self, namespaces, port_mapping, port_reinit_request_tbl, main_threa self.sfp_insert_events = {} self.namespaces = namespaces self.port_reinit_request_tbl = copy.deepcopy(port_reinit_request_tbl) + self.asic_to_unprocessed_logical_ports_dict = {} def _mapping_event_from_change_event(self, status, port_dict): """ @@ -1909,6 +1943,12 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he helper_logger.log_notice("Notify media setting during boot-up for {}".format(logical_port_name)) notify_media_setting(logical_port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), port_mapping) transceiver_dict.clear() + self.port_reinit_request_tbl[logical_port_name] = False + self.asic_to_unprocessed_logical_ports_dict[asic_index].remove(logical_port_name) + if len(self.asic_to_unprocessed_logical_ports_dict[asic_index]) == 0: + helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to false for asic: {}".format(asic_index)) + update_proc_info_xcvrd_to_db(asic_index, self.xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'False'}) + else: retry_eeprom_set.add(logical_port_name) @@ -1944,6 +1984,7 @@ def _init_port_sfp_status_tbl(self, port_mapping, xcvr_table_helper, stop_event= def init(self): port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + self.asic_to_unprocessed_logical_ports_dict = get_asic_to_logical_ports_dict(self.port_mapping) # Post all the current interface sfp/dom threshold info to STATE_DB self.retry_eeprom_set = self._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping_data, self.xcvr_table_helper, self.main_thread_stop_event) @@ -2356,6 +2397,13 @@ def retry_eeprom_reading(self): post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_threshold_tbl(asic_index)) notify_media_setting(logical_port, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) transceiver_dict.clear() + if logical_port in self.asic_to_unprocessed_logical_ports_dict[asic_index]: + self.port_reinit_request_tbl[logical_port] = False + self.asic_to_unprocessed_logical_ports_dict[asic_index].remove(logical_port) + if len(self.asic_to_unprocessed_logical_ports_dict[asic_index]) == 0: + helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to false during retry EEPROM read for asic: {}".format(asic_index)) + update_proc_info_xcvrd_to_db(asic_index, self.xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'False'}) + retry_success_set.add(logical_port) # Update retry EEPROM set self.retry_eeprom_set -= retry_success_set @@ -2423,12 +2471,12 @@ def load_media_settings(self): with open(media_settings_file_path, "r") as media_file: g_dict = json.load(media_file) +# Subscribes APPL_DB PROC_INFO table events def subscribe_appl_proc_info_update(self): for namespace in self.namespaces: appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) - - sel = swsscommon.Select() proc_info_tbl = swsscommon.SubscriberStateTable(appl_db, APPL_PROC_INFO_TABLE) + sel = swsscommon.Select() sel.addSelectable(proc_info_tbl) return sel, proc_info_tbl @@ -2462,6 +2510,13 @@ def init(self): except Exception as e: self.log_error("Failed to load sfputil: {}".format(str(e)), True) sys.exit(SFPUTIL_LOAD_ERROR) + # force main thread to cause underlying initialization of SFPs for platforms waiting to dynamically do so, + # since signal handlers can only be installed from within the context of the main thread + if platform_chassis is not None: + try: + platform_chassis.get_num_sfps() + except NotImplementedError: + pass if multi_asic.is_multi_asic(): # Load the namespace details first from the database_global.json file. @@ -2548,6 +2603,7 @@ def run(self): sel, proc_info_tbl = self.subscribe_appl_proc_info_update() self.log_notice("Subscribed to appl proc info update") + generate_sigabrt = False while not self.stop_event.is_set(): (state, _) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: @@ -2559,10 +2615,11 @@ def run(self): (key, op, _) = proc_info_tbl.pop() self.log_info("APPL_DB subscriber: Received {}:{} key".format(key, op)) if op == swsscommon.DEL_COMMAND and key == 'XCVRD': - self.log_notice("DEL_COMMAND received for {}:{} key. Killing main thread now!".format(APPL_PROC_INFO_TABLE, key)) - os.kill(os.getpid(), signal.SIGKILL) + self.log_notice("DEL_COMMAND received for {}:{} key. Starting graceful restart to xcvrd!".format(APPL_PROC_INFO_TABLE, key)) + generate_sigabrt = True + break - self.log_info("Stop daemon main loop") + self.log_notice("Stop daemon main loop") generate_sigkill = False # check all threads are alive @@ -2599,6 +2656,9 @@ def run(self): if self.sfp_error_event.is_set(): sys.exit(SFP_SYSTEM_ERROR) + elif generate_sigabrt is True: + self.log_notice("Sending SIGABRT to xcvrd!") + os.kill(os.getpid(), signal.SIGABRT) class XcvrTableHelper: From c3d58bfd1fbcd75a8391eaf0fc5b3878838daed0 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 20 Jun 2023 22:29:07 +0000 Subject: [PATCH 4/5] Modified PROC_INFO:XCVRD values to True/False from true/false --- sonic-xcvrd/xcvrd/xcvrd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 267e4c898..4fe4a85c8 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -898,9 +898,9 @@ def init_appl_proc_info_tbl(namespaces, port_mapping_data, xcvr_table_helper): _, proc_info_xcvrd_fvs = proc_info_tbl.get('XCVRD') if proc_info_xcvrd_fvs is not None: proc_info_xcvrd_fvs_dict = dict(proc_info_xcvrd_fvs) - media_notify_rqd = proc_info_xcvrd_fvs_dict.get('MEDIA_NOTIFY_REQUIRED', 'true') - cmis_reinit_rqd = proc_info_xcvrd_fvs_dict.get('CMIS_REINIT_REQUIRED', 'true') - if media_notify_rqd == 'false' and cmis_reinit_rqd == 'false': + media_notify_rqd = proc_info_xcvrd_fvs_dict.get('MEDIA_NOTIFY_REQUIRED', 'True') + cmis_reinit_rqd = proc_info_xcvrd_fvs_dict.get('CMIS_REINIT_REQUIRED', 'True') + if media_notify_rqd == 'False' and cmis_reinit_rqd == 'False': helper_logger.log_notice("Appl {} tbl media notify and cmis reinit not required for ASIC {}".format(APPL_PROC_INFO_TABLE, asic_id)) continue update_proc_info_xcvrd_to_db(asic_id, xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'True', 'CMIS_REINIT_REQUIRED' : 'True'}) From 38b296e967ddb66ea78f157cf163df13aa5c69d7 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 20 Jun 2023 22:43:45 +0000 Subject: [PATCH 5/5] Corrected typo --- sonic-xcvrd/xcvrd/xcvrd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 4fe4a85c8..05406689b 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1459,7 +1459,7 @@ def task_worker(self): asic_id = self.port_mapping.get_asic_id_for_logical_port(lport) asic_to_unprocessed_logical_ports_dict[asic_id].remove(lport) if len(asic_to_unprocessed_logical_ports_dict[asic_id]) == 0: - self.log_notice("Setting CMIS_REINIT_REQUIRED to false for asic: {}".format(asic_id)) + self.log_notice("Setting CMIS_REINIT_REQUIRED to False for asic: {}".format(asic_id)) update_proc_info_xcvrd_to_db(asic_id, self.xcvr_table_helper, {'CMIS_REINIT_REQUIRED' : 'False'}) if state != self.CMIS_STATE_READY: self.port_dict[lport]['appl'] = 0 @@ -1946,7 +1946,7 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he self.port_reinit_request_tbl[logical_port_name] = False self.asic_to_unprocessed_logical_ports_dict[asic_index].remove(logical_port_name) if len(self.asic_to_unprocessed_logical_ports_dict[asic_index]) == 0: - helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to false for asic: {}".format(asic_index)) + helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to False for asic: {}".format(asic_index)) update_proc_info_xcvrd_to_db(asic_index, self.xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'False'}) else: @@ -2401,7 +2401,7 @@ def retry_eeprom_reading(self): self.port_reinit_request_tbl[logical_port] = False self.asic_to_unprocessed_logical_ports_dict[asic_index].remove(logical_port) if len(self.asic_to_unprocessed_logical_ports_dict[asic_index]) == 0: - helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to false during retry EEPROM read for asic: {}".format(asic_index)) + helper_logger.log_notice("Setting MEDIA_NOTIFY_REQUIRED to False during retry EEPROM read for asic: {}".format(asic_index)) update_proc_info_xcvrd_to_db(asic_index, self.xcvr_table_helper, {'MEDIA_NOTIFY_REQUIRED' : 'False'}) retry_success_set.add(logical_port)