diff --git a/ansible/roles/test/files/ptftests/py3/copp_tests.py b/ansible/roles/test/files/ptftests/py3/copp_tests.py index 33510e76e88..ebd472e19ff 100644 --- a/ansible/roles/test/files/ptftests/py3/copp_tests.py +++ b/ansible/roles/test/files/ptftests/py3/copp_tests.py @@ -87,6 +87,7 @@ def __init__(self): self.platform = test_params.get('platform', None) self.topo_type = test_params.get('topo_type', None) self.ip_version = test_params.get('ip_version', None) + self.neighbor_miss_trap_supported = test_params.get('neighbor_miss_trap_supported', False) def log(self, message, debug=False): current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -738,6 +739,12 @@ class VlanSubnetTest(PolicyTest): def __init__(self): PolicyTest.__init__(self) + # Verify with different PPS if neighbor miss trap is supported by the platform + if self.neighbor_miss_trap_supported: + self.PPS_LIMIT = 200 + self.PPS_LIMIT_MIN = self.PPS_LIMIT * 0.9 + self.PPS_LIMIT_MAX = self.PPS_LIMIT * 1.3 + def runTest(self): self.log("VlanSubnetTest") self.run_suite() @@ -774,6 +781,12 @@ class VlanSubnetIPinIPTest(PolicyTest): def __init__(self): PolicyTest.__init__(self) + # Verify with different PPS if neighbor miss trap is supported by the platform + if self.neighbor_miss_trap_supported: + self.PPS_LIMIT = 200 + self.PPS_LIMIT_MIN = self.PPS_LIMIT * 0.9 + self.PPS_LIMIT_MAX = self.PPS_LIMIT * 1.3 + def runTest(self): self.log("VlanSubnetIpinIPTest") self.run_suite() diff --git a/tests/copp/copp_utils.py b/tests/copp/copp_utils.py index 89a1053937f..f6623a6c6d2 100644 --- a/tests/copp/copp_utils.py +++ b/tests/copp/copp_utils.py @@ -8,6 +8,8 @@ import logging import json import ipaddress +import ast +import random from tests.common.config_reload import config_reload @@ -39,7 +41,7 @@ _TEMP_CONFIG_DB = "/home/admin/config_db_copp_backup.json" -def limit_policer(dut, pps_limit, nn_target_namespace): +def limit_policer(dut, pps_limit, nn_target_namespace, neighbor_miss_trap_supported): """ Updates the COPP configuration in the SWSS container to respect a given rate limit. @@ -64,12 +66,13 @@ def limit_policer(dut, pps_limit, nn_target_namespace): config_format = "config_db" dut.script( - cmd="{} {} {} {} {} {}".format(_UPDATE_COPP_SCRIPT, - pps_limit, - _BASE_COPP_CONFIG, - _TEMP_COPP_CONFIG, - config_format, - dut.facts["asic_type"]) + cmd="{} {} {} {} {} {} {}".format(_UPDATE_COPP_SCRIPT, + pps_limit, + _BASE_COPP_CONFIG, + _TEMP_COPP_CONFIG, + config_format, + dut.facts["asic_type"], + neighbor_miss_trap_supported) ) if config_format == "app_db": @@ -474,3 +477,137 @@ def get_lo_ipv4(duthost): break return loopback_ip + + +def get_copp_trap_capabilities(duthost): + """ + Fetches supported trap IDs from COPP_TRAP_CAPABILITY_TABLE in STATE_DB and returns them as a list. + Args: + duthost (SonicHost): The target device. + Returns: + list: A list of supported trap IDs. + """ + + trap_ids = duthost.shell("sonic-db-cli STATE_DB HGET 'COPP_TRAP_CAPABILITY_TABLE|traps' trap_ids")['stdout'] + return trap_ids.split(",") + + +def parse_show_copp_configuration(duthost): + """ + Parses the output of the `show copp configuration` command into a structured dictionary. + Args: + duthost (SonicHost): The target device. + Returns: + dict: A dictionary mapping trap IDs to their configuration details. + """ + + copp_config_output = duthost.shell("show copp configuration")["stdout"] + copp_config_lines = copp_config_output.splitlines() + + # Parse the command output into a structured format + copp_config_data = {} + for line in copp_config_lines[1:]: # Skip the header line + fields = line.split() + if len(fields) >= 8: + trap_id = fields[0] + copp_config_data[trap_id] = { + "trap_group": fields[1], + "trap_action": fields[2], + "cbs": fields[3], + "cir": fields[4], + "meter_type": fields[5], + "mode": fields[6], + "hw_status": fields[7] + } + + return copp_config_data + + +def is_trap_installed(duthost, trap_id): + """ + Checks if a specific trap is installed by parsing the output of `show copp configuration`. + Args: + dut (SonicHost): The target device + trap_id: The trap ID to check. + Returns: + bool: True if the trap is installed, False otherwise. + """ + + output = parse_show_copp_configuration(duthost) + assert trap_id in output, f"Trap {trap_id} not found in the configuration" + assert "hw_status" in output[trap_id], f"hw_status not found for trap {trap_id}" + + return output[trap_id]["hw_status"] == "installed" + + +def get_trap_hw_status(duthost): + """ + Retrieves the hw_status for traps from the STATE_DB. + Args: + dut (SonicHost): The target device + Returns: + dict: A dictionary mapping trap IDs to their hw_status. + """ + + state_db_data = duthost.shell("sonic-db-cli STATE_DB KEYS 'COPP_TRAP_TABLE|*'")["stdout"] + state_db_data = state_db_data.splitlines() + hw_status = {} + + for key in state_db_data: + trap_id = key.split("|")[-1] + trap_data = duthost.shell(f"sonic-db-cli STATE_DB HGETALL '{key}'")["stdout"] + trap_data_dict = ast.literal_eval(trap_data) + hw_status[trap_id] = trap_data_dict.get("hw_status", "not-installed") + + return hw_status + + +def get_random_copp_trap_config(duthost): + """ + Retrieves a random CoPP trap config from /etc/sonic/copp_cfg.json on the DUT. + Returns the trap ID, its group, and related config details from COPP_TRAP and COPP_GROUP sections + Args: + duthost (SonicHost): The target device. + Returns: + tuple: A tuple containing the following elements: + - str: The first trap ID associated with the selected trap. + - str: The trap group associated with the selected trap. + - dict: The configuration details of the selected trap group from the `COPP_GROUP` section. + """ + + copp_cfg = json.loads(duthost.shell("cat /etc/sonic/copp_cfg.json")["stdout"]) + + # Get all traps from COPP_TRAP + copp_trap_cfg = copp_cfg.get("COPP_TRAP", {}) + traps = list(copp_trap_cfg.keys()) + assert traps, "No traps found in copp_cfg.json" + + # Randomly select one trap + selected_trap = random.choice(traps) + trap_data = copp_cfg["COPP_TRAP"][selected_trap] + trap_ids = trap_data.get("trap_ids", "").split(",") + trap_group = trap_data.get("trap_group", "") + return trap_ids[0], trap_group, copp_cfg["COPP_GROUP"][trap_group] + + +def get_feature_name_from_trap_id(duthost, trap_id): + """ + Get the feature name corresponding to the given trap ID. + Args: + duthost (SonicHost): The target device. + trap_id (str): The trap ID to look up. + Returns: + bool: True if the trap ID is always enabled, False otherwise. + str: The feature name associated with the trap ID. + """ + + copp_cfg = json.loads(duthost.shell("cat /etc/sonic/copp_cfg.json")["stdout"]) + copp_trap_cfg = copp_cfg.get("COPP_TRAP", {}) + + for feature_name, feature_data in copp_trap_cfg.items(): + trap_ids = feature_data.get("trap_ids", "").split(",") + if trap_id in trap_ids: + always_enabled = feature_data.get("always_enabled", "false") + return always_enabled.lower() == "true", feature_name if always_enabled.lower() == "true" else feature_name + + return False, None diff --git a/tests/copp/scripts/update_copp_config.py b/tests/copp/scripts/update_copp_config.py index 6336cfc9d52..375996093ec 100644 --- a/tests/copp/scripts/update_copp_config.py +++ b/tests/copp/scripts/update_copp_config.py @@ -92,6 +92,13 @@ def generate_limited_pps_config(pps_limit, input_config_file, output_config_file else: continue else: + # queue1_group3 is used by neighbor_miss trap. + # test_copp.py tests the neighbor_miss trap with default CBS/CIR + # values on platforms that support it. + # The default value is 200 PPS for queue1_group3 + if tg == "queue1_group3": + if neighbor_miss_trap_supported: + continue if "cir" in group_config: group_config["cir"] = pps_limit if "cbs" in group_config: @@ -112,5 +119,9 @@ def generate_limited_pps_config(pps_limit, input_config_file, output_config_file asic_type = "" else: asic_type = ARGS[4] + if len(ARGS) < 6: + neighbor_miss_trap_supported = False + else: + neighbor_miss_trap_supported = ARGS[5].lower() == "true" generate_limited_pps_config(ARGS[0], ARGS[1], ARGS[2], config_format, asic_type) diff --git a/tests/copp/test_copp.py b/tests/copp/test_copp.py index dce710d5d0a..47623ae8612 100644 --- a/tests/copp/test_copp.py +++ b/tests/copp/test_copp.py @@ -58,12 +58,30 @@ "nn_target_namespace", "send_rate_limit", "nn_target_vlanid", - "topo_type"]) + "topo_type", + "neighbor_miss_trap_supported"]) _TOR_ONLY_PROTOCOL = ["DHCP", "DHCP6"] _TEST_RATE_LIMIT_DEFAULT = 600 _TEST_RATE_LIMIT_MARVELL = 625 +# Protocol to trap ID mapping indicating which trap +# being for which protocol. Trap ID is used to verify +# the trap installation status. +PROTOCOL_TO_TRAP_ID = { + "ARP": ["arp_req", "arp_resp", "neigh_discovery"], + "IP2ME": ["ip2me"], + "SNMP": ["ip2me"], + "SSH": ["ip2me"], + "DHCP": ["dhcp"], + "DHCP6": ["dhcpv6"], + "BGP": ["bgp", "bgpv6"], + "LACP": ["lacp"], + "LLDP": ["lldp"], + "UDLD": ["udld"], + "Default": ["default"] +} + logger = logging.getLogger(__name__) @@ -94,6 +112,24 @@ def test_policer(self, protocol, duthosts, enum_rand_one_per_hwsku_frontend_host that have a set rate limit. """ duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + + # Skip the check if the protocol is "Default" + if protocol != "Default": + trap_ids = PROTOCOL_TO_TRAP_ID.get(protocol) + is_always_enabled, feature_name = copp_utils.get_feature_name_from_trap_id(duthost, trap_ids[0]) + if is_always_enabled: + pytest_assert(copp_utils.is_trap_installed(duthost, trap_ids[0]), + f"Trap {trap_ids[0]} for protocol {protocol} is not installed") + else: + feature_list, _ = duthost.get_feature_status() + trap_installed = copp_utils.is_trap_installed(duthost, trap_ids[0]) + if feature_name in feature_list and feature_list[feature_name] == "enabled": + pytest_assert(trap_installed, + f"Trap {trap_ids[0]} for protocol {protocol} is not installed") + else: + pytest_assert(not trap_installed, + f"Trap {trap_ids[0]} for protocol {protocol} is unexpectedly installed") + _copp_runner(duthost, ptfhost, protocol, @@ -109,6 +145,20 @@ def test_trap_neighbor_miss(self, duthosts, enum_rand_one_per_hwsku_frontend_hos """ duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + + # Access test_params from the class-level variable + test_params = self.test_params + + trap_status = copp_utils.is_trap_installed(duthost, "neighbor_miss") + if test_params.neighbor_miss_trap_supported: + logger.info("neighbor_miss trap is supported by DUT") + pytest_assert(trap_status, + "neighbor_miss trap is supported but not installed") + else: + logger.info("neighbor_miss trap is not supported by DUT") + pytest_assert(not trap_status, + "neighbor_miss trap is not supported but installed") + logger.info("Verify IPV{} {} packets are rate limited".format(ip_versions, packet_type)) pytest_assert( wait_until(60, 20, 0, _copp_runner, duthost, ptfhost, packet_type, copp_testbed, dut_type, @@ -129,6 +179,9 @@ def test_add_new_trap(self, duthosts, enum_rand_one_per_hwsku_frontend_hostname, logger.info("Uninstall trap {}".format(self.trap_id)) copp_utils.uninstall_trap(duthost, self.feature_name, self.trap_id) + pytest_assert(not copp_utils.is_trap_installed(duthost, self.trap_id), + "Trap {} is still installed, expected to be uninstalled".format(self.trap_id)) + # remove ip2me because bgp traffic can fall back to ip2me trap then interfere following traffic tests if self.trap_id == "bgp": logger.info("Uninstall trap ip2me") @@ -145,6 +198,10 @@ def test_add_new_trap(self, duthosts, enum_rand_one_per_hwsku_frontend_hostname, logger.info("Set always_enabled of {} to true".format(self.trap_id)) copp_utils.configure_always_enabled_for_trap(duthost, self.trap_id, "true") + logging.info("Verify trap installed through CLI") + pytest_assert(copp_utils.is_trap_installed(duthost, self.trap_id), + "Trap {} is not installed, expected to be installed".format(self.trap_id)) + logger.info("Verify {} trap status is installed by sending traffic".format(self.trap_id)) pytest_assert( wait_until(60, 20, 0, _copp_runner, duthost, ptfhost, self.trap_id.upper(), copp_testbed, dut_type), @@ -182,6 +239,10 @@ def test_remove_trap(self, duthosts, enum_rand_one_per_hwsku_frontend_hostname, logger.info("Disable {} in feature table".format(self.feature_name)) copp_utils.disable_feature_entry(duthost, self.feature_name) + logging.info("Verify {} trap is uninstalled through CLI".format(self.trap_id)) + pytest_assert(not copp_utils.is_trap_installed(duthost, self.trap_id), + "Trap {} is not uninstalled".format(self.trap_id)) + logger.info("Verify {} trap status is uninstalled by sending traffic".format(self.trap_id)) pytest_assert( wait_until(100, 20, 0, _copp_runner, duthost, ptfhost, self.trap_id.upper(), @@ -218,12 +279,49 @@ def test_trap_config_save_after_reboot(self, duthosts, localhost, enum_rand_one_ time.sleep(180) logger.info("Verify always_enable of {} == {} in config_db".format(self.trap_id, "true")) copp_utils.verify_always_enable_value(duthost, self.trap_id, "true") + + logging.info("Verify {} trap is installed through CLI".format(self.trap_id)) + pytest_assert(copp_utils.is_trap_installed(duthost, self.trap_id), + "Trap {} is not installed, expected to be installed".format(self.trap_id)) + logger.info("Verify {} trap status is installed by sending traffic".format(self.trap_id)) pytest_assert( wait_until(200, 20, 0, _copp_runner, duthost, ptfhost, self.trap_id.upper(), copp_testbed, dut_type), "Installing {} trap fail".format(self.trap_id)) +@pytest.mark.disable_loganalyzer +def test_verify_copp_configuration_cli(duthosts, enum_rand_one_per_hwsku_frontend_hostname): + """ + Verifies the `show copp configuration` output with copp_cfg.json and hw_status in STATE_DB. + """ + + duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + + trap, trap_group, copp_group_cfg = copp_utils.get_random_copp_trap_config(duthost) + hw_status = copp_utils.get_trap_hw_status(duthost) + show_copp_config = copp_utils.parse_show_copp_configuration(duthost) + + pytest_assert(trap in show_copp_config, + f"Trap {trap} not found in show copp configuration output") + pytest_assert(trap_group == show_copp_config[trap]["trap_group"], + f"Trap group mismatch for trap {trap} (expected: \ + {trap_group}, actual: {show_copp_config[trap]['trap_group']})") + + logging.info("Verifying trap {} configuration with CLI".format(trap)) + for field in ["trap_action", "cbs", "cir", "meter_type", "mode"]: + expected_value = copp_group_cfg.get(field, "").strip() + actual_value = show_copp_config[trap].get(field, "").strip() + pytest_assert(expected_value == actual_value, + f"Field {field} mismatch for trap {trap} (expected: {expected_value}, actual: {actual_value})") + + logging.info("Verifying trap {} installation status between CLI and STATE_DB".format(trap)) + expected_hw_status = hw_status.get(trap, "not-installed") + actual_hw_status = show_copp_config[trap]["hw_status"] + pytest_assert(expected_hw_status == actual_hw_status, + f"hw_status mismatch for trap {trap} (expected: {expected_hw_status}, actual: {actual_hw_status})") + + @pytest.fixture(scope="class") def dut_type(duthosts, enum_rand_one_per_hwsku_frontend_hostname): duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] @@ -257,6 +355,9 @@ def copp_testbed( duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] test_params = _gather_test_params(tbinfo, duthost, request, duts_minigraph_facts) + # Store test_params in the TestCOPP class + TestCOPP.test_params = test_params + if not is_backend_topology: # There is no upstream neighbor in T1 backend topology. Test is skipped on T0 backend. # For Non T2 topologies, setting upStreamDuthost as duthost to cover dualTOR and MLAG scenarios. @@ -315,7 +416,8 @@ def _copp_runner(dut, ptf, protocol, test_params, dut_type, has_trap=True, "asic_type": dut.facts["asic_type"], "platform": dut.facts["platform"], "topo_type": test_params.topo_type, - "ip_version": ip_version} + "ip_version": ip_version, + "neighbor_miss_trap_supported": test_params.neighbor_miss_trap_supported} dut_ip = dut.mgmt_ip device_sockets = ["0-{}@tcp://127.0.0.1:10900".format(test_params.nn_target_port), @@ -397,6 +499,8 @@ def _gather_test_params(tbinfo, duthost, request, duts_minigraph_facts): peerip6 = bgp_peer["peer_addr"] break + neighbor_miss_trap_supported = "neighbor_miss" in copp_utils.get_copp_trap_capabilities(duthost) + logging.info("nn_target_port {} nn_target_interface {} nn_target_namespace {} nn_target_vlanid {}" .format(nn_target_port, nn_target_interface, nn_target_namespace, nn_target_vlanid)) @@ -411,7 +515,8 @@ def _gather_test_params(tbinfo, duthost, request, duts_minigraph_facts): nn_target_namespace=nn_target_namespace, send_rate_limit=send_rate_limit, nn_target_vlanid=nn_target_vlanid, - topo_type=topo_type) + topo_type=topo_type, + neighbor_miss_trap_supported=neighbor_miss_trap_supported) def _setup_testbed(dut, creds, ptf, test_params, tbinfo, upStreamDuthost, is_backend_topology): @@ -426,7 +531,7 @@ def _setup_testbed(dut, creds, ptf, test_params, tbinfo, upStreamDuthost, is_bac rate_limit = _TEST_RATE_LIMIT_MARVELL logging.info("Update the rate limit for the COPP policer") - copp_utils.limit_policer(dut, rate_limit, test_params.nn_target_namespace) + copp_utils.limit_policer(dut, rate_limit, test_params.nn_target_namespace, test_params.neighbor_miss_trap_supported) # Multi-asic will not support this mode as of now. if test_params.swap_syncd: