Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def test_get_port_mapping(self, mock_swsscommon_table):
assert port_mapping.get_asic_id_for_logical_port('Ethernet-IB0') == None
assert port_mapping.get_physical_to_logical(3) == None
assert port_mapping.get_logical_to_physical('Ethernet-IB0') == None

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
Expand Down Expand Up @@ -547,27 +547,38 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
task.on_port_update_event(port_change_event)
assert len(task.port_dict) == 1

task.get_host_tx_status = MagicMock(return_value='True')
task.get_port_admin_status = MagicMock(return_value='up')

# Case 1: Module Inserted --> DP_DEINIT
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT'

task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.set_datapath_deinit.call_count == 1
assert mock_xcvr_api.tx_disable_channel.call_count == 1
assert mock_xcvr_api.set_lpmode.call_count == 2
assert mock_xcvr_api.set_lpmode.call_count == 1
assert task.port_dict['Ethernet0']['cmis_state'] == 'AP_CONFIGURED'

# Case 2: DP_DEINIT --> AP Configured
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.set_application.call_count == 1
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_INIT'

# Case 3: AP Configured --> DP_INIT
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.set_datapath_init.call_count == 1
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_TXON'

# Case 4: DP_INIT --> DP_TXON
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.tx_disable_channel.call_count == 2
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_ACTIVATION'

@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
def test_DomInfoUpdateTask_handle_port_change_event(self):
Expand Down
Loading