diff --git a/sfputil/debug.py b/sfputil/debug.py index 113517e5b4..a23f7b3572 100644 --- a/sfputil/debug.py +++ b/sfputil/debug.py @@ -93,6 +93,20 @@ def set_output(port_name, enable, direction): subport = get_subport(port_name) + if hasattr(api, 'get_cmis_rev'): + cmis_rev = api.get_cmis_rev() + if cmis_rev is None: + click.echo(f"{port_name}: CMIS revision not available for subport {subport}") + sys.exit(EXIT_FAIL) + + # OutputStatusRx and OutputStatusTx are supported from CMIS 5.0 + if float(cmis_rev) < 5.0: + click.echo( + f"{port_name}: This functionality is not supported" + f" with CMIS version {cmis_rev}, requires CMIS 5.0 and above" + ) + sys.exit(EXIT_FAIL) + try: if direction == "tx": lane_count = get_media_lane_count(port_name) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 812005ea4e..781ca0deaa 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -1822,23 +1822,29 @@ def test_debug_loopback(self, mock_sonic_v2_connector, mock_config_db_connector, assert result.exit_code == EXIT_FAIL @pytest.mark.parametrize( - "direction, lane_count, enable, disable_func_result, output_dict, expected_echo, expected_exit", + "direction, lane_count, enable, disable_func_result, cmis_version, output_dict, expected_echo, expected_exit", [ # TX disable success ( - "tx", 2, "disable", True, {"TxOutputStatus1": False, "TxOutputStatus2": False}, + "tx", 2, "disable", True, "5.3", {"TxOutputStatus1": False, "TxOutputStatus2": False}, "TX output disabled", None ), # RX enable success - ("rx", 1, "enable", True, {"RxOutputStatus1": True}, "RX output enabled", None), + ("rx", 1, "enable", True, "5.0", {"RxOutputStatus1": True}, "RX output enabled", None), # TX disable fails to disable - ("tx", 1, "disable", True, {"TxOutputStatus1": True}, "TX output on lane 1 is still enabled", SystemExit), + ( + "tx", 1, "disable", True, "5.0", {"TxOutputStatus1": True}, + "TX output on lane 1 is still enabled", SystemExit + ), # RX enable fails to enable - ("rx", 1, "enable", True, {"RxOutputStatus1": False}, "RX output on lane 1 is still disabled", SystemExit), - # TX disable_func returns False - ("tx", 1, "disable", False, {}, "TX disable failed", SystemExit), - # RX output_dict is None - ("rx", 1, "disable", True, None, "RX output status not available", SystemExit), + ( + "rx", 1, "enable", True, "5.0", {"RxOutputStatus1": False}, + "RX output on lane 1 is still disabled", SystemExit + ), + # CMIS version is None + ("tx", 1, "disable", False, None, {}, "CMIS revision not available", SystemExit), + # CMIS version is below 5.0 + ("rx", 1, "disable", True, "4.0", None, "This functionality is not supported", SystemExit), ] ) @patch("sfputil.debug.get_sfp_object") @@ -1857,6 +1863,7 @@ def test_set_output_cli( lane_count, enable, disable_func_result, + cmis_version, output_dict, expected_echo, expected_exit @@ -1875,6 +1882,7 @@ def test_set_output_cli( # Mock SFP and API mock_sfp = MagicMock() mock_api = MagicMock() + mock_api.get_cmis_rev.return_value = cmis_version if direction == "tx": mock_sfp.tx_disable_channel.return_value = disable_func_result mock_api.get_tx_output_status.return_value = output_dict