diff --git a/sfputil/debug.py b/sfputil/debug.py index c1e407a453..113517e5b4 100644 --- a/sfputil/debug.py +++ b/sfputil/debug.py @@ -1,4 +1,5 @@ import sys +import time import click import utilities_common.cli as clicommon from utilities_common import platform_sfputil_helper @@ -19,6 +20,8 @@ ERROR_NOT_IMPLEMENTED = 5 ERROR_INVALID_PORT = 6 +CMIS_MAX_CHANNELS = 8 +TX_RX_OUTPUT_UPDATE_WAIT_TIME = 2 # seconds @click.group(cls=clicommon.AliasedGroup) def debug(): @@ -82,18 +85,61 @@ def set_output(port_name, enable, direction): Enable or disable TX/RX output based on direction ('tx' or 'rx'). """ sfp = get_sfp_object(port_name) + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo(f"{port_name}: This functionality is not implemented") + sys.exit(ERROR_NOT_IMPLEMENTED) subport = get_subport(port_name) - media_lane_count = get_media_lane_count(port_name) - - lane_mask = get_subport_lane_mask(int(subport), int(media_lane_count)) - try: if direction == "tx": - sfp.tx_disable_channel(lane_mask, enable == "disable") + lane_count = get_media_lane_count(port_name) + disable_func = sfp.tx_disable_channel + get_status_func = api.get_tx_output_status + status_key = "TxOutputStatus" elif direction == "rx": - sfp.rx_disable_channel(lane_mask, enable == "disable") + lane_count = get_host_lane_count(port_name) + disable_func = sfp.rx_disable_channel + get_status_func = api.get_rx_output_status + status_key = "RxOutputStatus" + + lane_mask = get_subport_lane_mask(int(subport), int(lane_count)) + if not disable_func(lane_mask, enable == "disable"): + click.echo(f"{port_name}: {direction.upper()} disable failed for subport {subport}") + sys.exit(EXIT_FAIL) + + time.sleep(TX_RX_OUTPUT_UPDATE_WAIT_TIME) + + output_dict = get_status_func() + if output_dict is None: + click.echo(f"{port_name}: {direction.upper()} output status not available for subport {subport}") + sys.exit(EXIT_FAIL) + + for lane in range(1, CMIS_MAX_CHANNELS + 1): + if lane_mask & (1 << (lane - 1)): + lane_status = output_dict.get(f'{status_key}{lane}') + if lane_status is None: + click.echo( + f"{port_name}: {direction.upper()} output status not available for " + f"lane {lane} on subport {subport}" + ) + sys.exit(EXIT_FAIL) + if enable == "disable": + if lane_status: + click.echo( + f"{port_name}: {direction.upper()} output on lane {lane} is still " + f"enabled on subport {subport}. Restoring state." + ) + sys.exit(EXIT_FAIL) + else: + if not lane_status: + click.echo( + f"{port_name}: {direction.upper()} output on lane {lane} is still disabled " + f"on subport {subport}. Restoring state." + ) + sys.exit(EXIT_FAIL) click.echo( f"{port_name}: {direction.upper()} output " diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 087c15624a..812005ea4e 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -1821,83 +1821,87 @@ def test_debug_loopback(self, mock_sonic_v2_connector, mock_config_db_connector, assert result.output == 'Error: \nEthernet0: subport is not present in CONFIG_DB\n' assert result.exit_code == EXIT_FAIL - # Test for 'tx-output' command - @patch('sfputil.debug.get_sfp_object') - @patch('utilities_common.platform_sfputil_helper.ConfigDBConnector') - @patch('utilities_common.platform_sfputil_helper.SonicV2Connector') - @patch('sonic_py_common.multi_asic.get_front_end_namespaces', MagicMock(return_value=[''])) - def test_tx_output(self, mock_sonic_v2_connector, mock_config_db_connector, mock_get_sfp_object): - """Test for tx-output command""" - mock_sfp = MagicMock() - mock_get_sfp_object.return_value = mock_sfp # Ensure get_sfp_object returns the mock - mock_sonic_v2_connector.return_value = MagicMock() + @pytest.mark.parametrize( + "direction, lane_count, enable, disable_func_result, output_dict, expected_echo, expected_exit", + [ + # TX disable success + ( + "tx", 2, "disable", True, {"TxOutputStatus1": False, "TxOutputStatus2": False}, + "TX output disabled", None + ), + # RX enable success + ("rx", 1, "enable", True, {"RxOutputStatus1": True}, "RX output enabled", None), + # TX disable fails to disable + ("tx", 1, "disable", True, {"TxOutputStatus1": True}, "TX output on lane 1 is still enabled", SystemExit), + # RX enable fails to enable + ("rx", 1, "enable", True, {"RxOutputStatus1": False}, "RX output on lane 1 is still disabled", SystemExit), + # TX disable_func returns False + ("tx", 1, "disable", False, {}, "TX disable failed", SystemExit), + # RX output_dict is None + ("rx", 1, "disable", True, None, "RX output status not available", SystemExit), + ] + ) + @patch("sfputil.debug.get_sfp_object") + @patch("sfputil.debug.get_subport") + @patch("sfputil.debug.get_media_lane_count") + @patch("sfputil.debug.get_host_lane_count") + @patch("sfputil.debug.time.sleep", return_value=None) + def test_set_output_cli( + self, + mock_sleep, + mock_get_host_lane_count, + mock_get_media_lane_count, + mock_get_subport, + mock_get_sfp_object, + direction, + lane_count, + enable, + disable_func_result, + output_dict, + expected_echo, + expected_exit + ): + from click.testing import CliRunner + import sfputil.main as sfputil - mock_sfp.get_presence.return_value = False + port_name = "Ethernet0" + subport = 1 runner = CliRunner() - # Test the case where the module is not applicable - mock_sfp.get_presence.return_value = True - mock_sfp.tx_disable_channel = MagicMock(side_effect=AttributeError) - result = runner.invoke(sfputil.cli.commands['debug'].commands['tx-output'], ["Ethernet0", "enable"]) - assert result.output == 'Ethernet0: TX disable is not applicable for this module\n' - assert result.exit_code == ERROR_NOT_IMPLEMENTED - - # Test the case where enabling/disabling TX works - mock_sfp.tx_disable_channel = MagicMock(return_value=None) - result = runner.invoke(sfputil.cli.commands['debug'].commands['tx-output'], ["Ethernet0", "enable"]) - assert result.output == 'Ethernet0: TX output enabled on subport 1\n' - assert result.exit_code != ERROR_NOT_IMPLEMENTED - - mock_sfp.tx_disable_channel = MagicMock(return_value=None) - result = runner.invoke(sfputil.cli.commands['debug'].commands['tx-output'], ["Ethernet0", "disable"]) - assert result.output == 'Ethernet0: TX output disabled on subport 1\n' - assert result.exit_code != ERROR_NOT_IMPLEMENTED + mock_get_subport.return_value = subport + mock_get_media_lane_count.return_value = lane_count + mock_get_host_lane_count.return_value = lane_count - # Test the case where there is a failure while disabling TX - mock_sfp.tx_disable_channel = MagicMock(side_effect=Exception("TX disable failed")) - result = runner.invoke(sfputil.cli.commands['debug'].commands['tx-output'], ["Ethernet0", "disable"]) - assert result.output == 'Ethernet0: TX disable failed due to TX disable failed\n' - assert result.exit_code == EXIT_FAIL - - # Test for 'rx-output' command - @patch('sfputil.debug.get_sfp_object') - @patch('utilities_common.platform_sfputil_helper.ConfigDBConnector') - @patch('utilities_common.platform_sfputil_helper.SonicV2Connector') - @patch('sonic_py_common.multi_asic.get_front_end_namespaces', MagicMock(return_value=[''])) - def test_rx_output(self, mock_sonic_v2_connector, mock_config_db_connector, mock_get_sfp_object): - """Test for rx-output command""" + # Mock SFP and API mock_sfp = MagicMock() - mock_get_sfp_object.return_value = mock_sfp # Ensure get_sfp_object returns the mock - mock_sonic_v2_connector.return_value = MagicMock() - - mock_sfp.get_presence.return_value = False - runner = CliRunner() - - # Test the case where the module is not applicable - mock_sfp.get_presence.return_value = True - mock_sfp.rx_disable_channel = MagicMock(side_effect=AttributeError) - result = runner.invoke(sfputil.cli.commands['debug'].commands['rx-output'], ["Ethernet0", "enable"]) - assert result.output == 'Ethernet0: RX disable is not applicable for this module\n' - assert result.exit_code == ERROR_NOT_IMPLEMENTED + mock_api = MagicMock() + if direction == "tx": + mock_sfp.tx_disable_channel.return_value = disable_func_result + mock_api.get_tx_output_status.return_value = output_dict + elif direction == "rx": + mock_sfp.rx_disable_channel.return_value = disable_func_result + mock_api.get_rx_output_status.return_value = output_dict + mock_sfp.get_xcvr_api.return_value = mock_api + mock_get_sfp_object.return_value = mock_sfp - # Test the case where enabling/disabling RX works - mock_sfp.rx_disable_channel = MagicMock(return_value=None) - result = runner.invoke(sfputil.cli.commands['debug'].commands['rx-output'], ["Ethernet0", "enable"]) - assert result.output == 'Ethernet0: RX output enabled on subport 1\n' - assert result.exit_code != ERROR_NOT_IMPLEMENTED + # Map direction to CLI command + direction_to_cli = {"tx": "tx-output", "rx": "rx-output"} + cli_cmd = direction_to_cli.get(direction, direction) - mock_sfp.rx_disable_channel = MagicMock(return_value=None) - result = runner.invoke(sfputil.cli.commands['debug'].commands['rx-output'], ["Ethernet0", "disable"]) - assert result.output == 'Ethernet0: RX output disabled on subport 1\n' - assert result.exit_code != ERROR_NOT_IMPLEMENTED + # Run CLI and check output/exit + result = runner.invoke(sfputil.cli.commands['debug'].commands.get(cli_cmd, lambda *a, **k: None), + [port_name, enable]) - # Test the case where there is a failure while disabling RX - mock_sfp.rx_disable_channel = MagicMock(side_effect=Exception("RX disable failed")) - result = runner.invoke(sfputil.cli.commands['debug'].commands['rx-output'], ["Ethernet0", "disable"]) - assert result.output == 'Ethernet0: RX disable failed due to RX disable failed\n' - assert result.exit_code == EXIT_FAIL + if expected_exit: + assert result.exit_code != 0 + assert expected_echo in result.output + else: + assert result.exit_code == 0 + assert expected_echo in result.output @pytest.mark.parametrize("subport, lane_count, expected_mask", [ + (0, 2, 0x3), + (0, 4, 0xf), (1, 1, 0x1), (1, 4, 0xf), (2, 1, 0x2), diff --git a/utilities_common/platform_sfputil_helper.py b/utilities_common/platform_sfputil_helper.py index 4138b5b064..edc3334d71 100644 --- a/utilities_common/platform_sfputil_helper.py +++ b/utilities_common/platform_sfputil_helper.py @@ -150,7 +150,7 @@ def get_subport_lane_mask(subport, lane_count): int: The lane mask calculated for the given subport and lane count. """ # Calculating the lane mask using bitwise operations. - return ((1 << lane_count) - 1) << ((subport - 1) * lane_count) + return ((1 << lane_count) - 1) << ((0 if subport == 0 else subport - 1) * lane_count) def get_sfp_object(port_name): @@ -191,7 +191,7 @@ def get_host_lane_count(port_name): lane_count = get_value_from_db_by_field("STATE_DB", "TRANSCEIVER_INFO", "host_lane_count", port_name) - if lane_count == 0 or lane_count is None: + if lane_count == 0 or lane_count is None or lane_count == '': click.echo(f"{port_name}: unable to retreive correct host lane count") sys.exit(EXIT_FAIL) @@ -202,7 +202,7 @@ def get_media_lane_count(port_name): lane_count = get_value_from_db_by_field("STATE_DB", "TRANSCEIVER_INFO", "media_lane_count", port_name) - if lane_count == 0 or lane_count is None: + if lane_count == 0 or lane_count is None or lane_count == '': click.echo(f"{port_name}: unable to retreive correct media lane count") sys.exit(EXIT_FAIL) @@ -237,7 +237,12 @@ def get_value_from_db_by_field(db_name, table_name, field, key): db.connect(getattr(db, db_name)) # Get the corresponding attribute (e.g., STATE_DB) from the connector # Retrieve the value from the database - return db.get(db_name, f"{table_name}|{key}", field) + value = db.get(db_name, f"{table_name}|{key}", field) + if value is None: + click.echo(f"Field '{field}' not found in table '{table_name}' for key '{key}' in {db_name}.") + return '' + else: + return value except (TypeError, KeyError, AttributeError) as e: click.echo(f"Error: {e}") return None @@ -277,8 +282,10 @@ def get_subport(port_name): if subport is None: click.echo(f"{port_name}: subport is not present in CONFIG_DB") sys.exit(EXIT_FAIL) + elif subport == '': + subport = 0 - return max(int(subport), 1) + return int(subport) def is_sfp_present(port_name):