Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ansible/roles/test/files/ptftests/py3/copp_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(self):
self.platform = test_params.get('platform', None)
self.topo_type = test_params.get('topo_type', None)
self.ip_version = test_params.get('ip_version', None)
self.neighbor_miss_trap_supported = test_params.get('neighbor_miss_trap_supported', False)

def log(self, message, debug=False):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -738,6 +739,12 @@ class VlanSubnetTest(PolicyTest):
def __init__(self):
PolicyTest.__init__(self)

# Verify with different PPS if neighbor miss trap is supported by the platform
if self.neighbor_miss_trap_supported:
self.PPS_LIMIT = 200
self.PPS_LIMIT_MIN = self.PPS_LIMIT * 0.9
self.PPS_LIMIT_MAX = self.PPS_LIMIT * 1.3

def runTest(self):
self.log("VlanSubnetTest")
self.run_suite()
Expand Down Expand Up @@ -774,6 +781,12 @@ class VlanSubnetIPinIPTest(PolicyTest):
def __init__(self):
PolicyTest.__init__(self)

# Verify with different PPS if neighbor miss trap is supported by the platform
if self.neighbor_miss_trap_supported:
self.PPS_LIMIT = 200
self.PPS_LIMIT_MIN = self.PPS_LIMIT * 0.9
self.PPS_LIMIT_MAX = self.PPS_LIMIT * 1.3

def runTest(self):
self.log("VlanSubnetIpinIPTest")
self.run_suite()
Expand Down
151 changes: 144 additions & 7 deletions tests/copp/copp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import logging
import json
import ipaddress
import ast
import random

from tests.common.config_reload import config_reload

Expand Down Expand Up @@ -39,7 +41,7 @@
_TEMP_CONFIG_DB = "/home/admin/config_db_copp_backup.json"


def limit_policer(dut, pps_limit, nn_target_namespace):
def limit_policer(dut, pps_limit, nn_target_namespace, neighbor_miss_trap_supported):
"""
Updates the COPP configuration in the SWSS container to respect a given rate limit.

Expand All @@ -64,12 +66,13 @@ def limit_policer(dut, pps_limit, nn_target_namespace):
config_format = "config_db"

dut.script(
cmd="{} {} {} {} {} {}".format(_UPDATE_COPP_SCRIPT,
pps_limit,
_BASE_COPP_CONFIG,
_TEMP_COPP_CONFIG,
config_format,
dut.facts["asic_type"])
cmd="{} {} {} {} {} {} {}".format(_UPDATE_COPP_SCRIPT,
pps_limit,
_BASE_COPP_CONFIG,
_TEMP_COPP_CONFIG,
config_format,
dut.facts["asic_type"],
neighbor_miss_trap_supported)
)

if config_format == "app_db":
Expand Down Expand Up @@ -474,3 +477,137 @@ def get_lo_ipv4(duthost):
break

return loopback_ip


def get_copp_trap_capabilities(duthost):
"""
Fetches supported trap IDs from COPP_TRAP_CAPABILITY_TABLE in STATE_DB and returns them as a list.
Args:
duthost (SonicHost): The target device.
Returns:
list: A list of supported trap IDs.
"""

trap_ids = duthost.shell("sonic-db-cli STATE_DB HGET 'COPP_TRAP_CAPABILITY_TABLE|traps' trap_ids")['stdout']
return trap_ids.split(",")


def parse_show_copp_configuration(duthost):
"""
Parses the output of the `show copp configuration` command into a structured dictionary.
Args:
duthost (SonicHost): The target device.
Returns:
dict: A dictionary mapping trap IDs to their configuration details.
"""

copp_config_output = duthost.shell("show copp configuration")["stdout"]
copp_config_lines = copp_config_output.splitlines()

# Parse the command output into a structured format
copp_config_data = {}
for line in copp_config_lines[1:]: # Skip the header line
fields = line.split()
if len(fields) >= 8:
trap_id = fields[0]
copp_config_data[trap_id] = {
"trap_group": fields[1],
"trap_action": fields[2],
"cbs": fields[3],
"cir": fields[4],
"meter_type": fields[5],
"mode": fields[6],
"hw_status": fields[7]
}

return copp_config_data


def is_trap_installed(duthost, trap_id):
"""
Checks if a specific trap is installed by parsing the output of `show copp configuration`.
Args:
dut (SonicHost): The target device
trap_id: The trap ID to check.
Returns:
bool: True if the trap is installed, False otherwise.
"""

output = parse_show_copp_configuration(duthost)
assert trap_id in output, f"Trap {trap_id} not found in the configuration"
assert "hw_status" in output[trap_id], f"hw_status not found for trap {trap_id}"

return output[trap_id]["hw_status"] == "installed"


def get_trap_hw_status(duthost):
"""
Retrieves the hw_status for traps from the STATE_DB.
Args:
dut (SonicHost): The target device
Returns:
dict: A dictionary mapping trap IDs to their hw_status.
"""

state_db_data = duthost.shell("sonic-db-cli STATE_DB KEYS 'COPP_TRAP_TABLE|*'")["stdout"]
state_db_data = state_db_data.splitlines()
hw_status = {}

for key in state_db_data:
trap_id = key.split("|")[-1]
trap_data = duthost.shell(f"sonic-db-cli STATE_DB HGETALL '{key}'")["stdout"]
trap_data_dict = ast.literal_eval(trap_data)
hw_status[trap_id] = trap_data_dict.get("hw_status", "not-installed")

return hw_status


def get_random_copp_trap_config(duthost):
"""
Retrieves a random CoPP trap config from /etc/sonic/copp_cfg.json on the DUT.
Returns the trap ID, its group, and related config details from COPP_TRAP and COPP_GROUP sections
Args:
duthost (SonicHost): The target device.
Returns:
tuple: A tuple containing the following elements:
- str: The first trap ID associated with the selected trap.
- str: The trap group associated with the selected trap.
- dict: The configuration details of the selected trap group from the `COPP_GROUP` section.
"""

copp_cfg = json.loads(duthost.shell("cat /etc/sonic/copp_cfg.json")["stdout"])

# Get all traps from COPP_TRAP
copp_trap_cfg = copp_cfg.get("COPP_TRAP", {})
traps = list(copp_trap_cfg.keys())
assert traps, "No traps found in copp_cfg.json"

# Randomly select one trap
selected_trap = random.choice(traps)
trap_data = copp_cfg["COPP_TRAP"][selected_trap]
trap_ids = trap_data.get("trap_ids", "").split(",")
trap_group = trap_data.get("trap_group", "")
return trap_ids[0], trap_group, copp_cfg["COPP_GROUP"][trap_group]


def get_feature_name_from_trap_id(duthost, trap_id):
"""
Get the feature name corresponding to the given trap ID.
Args:
duthost (SonicHost): The target device.
trap_id (str): The trap ID to look up.
Returns:
bool: True if the trap ID is always enabled, False otherwise.
str: The feature name associated with the trap ID.
"""

copp_cfg = json.loads(duthost.shell("cat /etc/sonic/copp_cfg.json")["stdout"])
copp_trap_cfg = copp_cfg.get("COPP_TRAP", {})

for feature_name, feature_data in copp_trap_cfg.items():
trap_ids = feature_data.get("trap_ids", "").split(",")
if trap_id in trap_ids:
always_enabled = feature_data.get("always_enabled", "false")
return always_enabled.lower() == "true", feature_name if always_enabled.lower() == "true" else feature_name

return False, None
11 changes: 11 additions & 0 deletions tests/copp/scripts/update_copp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ def generate_limited_pps_config(pps_limit, input_config_file, output_config_file
else:
continue
else:
# queue1_group3 is used by neighbor_miss trap.
# test_copp.py tests the neighbor_miss trap with default CBS/CIR
# values on platforms that support it.
# The default value is 200 PPS for queue1_group3
if tg == "queue1_group3":
if neighbor_miss_trap_supported:
continue
if "cir" in group_config:
group_config["cir"] = pps_limit
if "cbs" in group_config:
Expand All @@ -112,5 +119,9 @@ def generate_limited_pps_config(pps_limit, input_config_file, output_config_file
asic_type = ""
else:
asic_type = ARGS[4]
if len(ARGS) < 6:
neighbor_miss_trap_supported = False
else:
neighbor_miss_trap_supported = ARGS[5].lower() == "true"

generate_limited_pps_config(ARGS[0], ARGS[1], ARGS[2], config_format, asic_type)
Loading
Loading