Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/common/devices/sonic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ def _try_get_brcm_asic_name(self, output):
"th3": {"b98", "BCM5698"},
"th4": {"b99", "BCM5699"},
"th5": {"f90", "BCM7890"},
"q3d": {"8870", "8872"},
}
for asic in search_sets.keys():
for search_term in search_sets[asic]:
Expand Down
159 changes: 159 additions & 0 deletions tests/common/helpers/ptf_tests_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,169 @@

from ipaddress import ip_address, IPv4Address
from tests.common.config_reload import config_reload
from tests.common.utilities import wait_until

logger = logging.getLogger(__name__)


# ============================================================================
# Helper Functions for PTF Port Mapping and Interface Selection
# ============================================================================

def get_dut_to_ptf_port_mapping(duthost, tbinfo):
"""
Get mapping of DUT interfaces/PortChannels to PTF ports.
Only includes interfaces or PortChannels that have IP addresses configured.

Args:
duthost: DUT host object
tbinfo: Testbed info

Returns:
dict: {interface_name: ptf_index} for interfaces with IP
{portchannel_name: -1} for PortChannels with IP
"""
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
all_indices = mg_facts.get('minigraph_ptf_indices', {})
mapping = {}

# Add interfaces with IP addresses
for intf in mg_facts.get('minigraph_interfaces', []):
interface_name = intf.get('attachto')
if interface_name and intf.get('addr'):
ptf_index = all_indices.get(interface_name)
if ptf_index is not None:
mapping[interface_name] = ptf_index

# Add PortChannels with IP addresses (PTF index = -1)
for intf in mg_facts.get('minigraph_portchannel_interfaces', []):
portchannel_name = intf.get('attachto')
if portchannel_name and intf.get('addr'):
mapping[portchannel_name] = -1

return mapping


def get_interface_ip_address(interface_name, mg_facts):
"""
Get IP address for an interface from minigraph facts.

Args:
interface_name: Interface or PortChannel name
mg_facts: Minigraph facts dictionary

Returns:
str: IP address or None if not found
"""
# Check in minigraph_interfaces
for intf in mg_facts.get('minigraph_interfaces', []):
if intf.get('attachto') == interface_name and intf.get('addr'):
return str(intf['addr'])

# Check in minigraph_portchannel_interfaces
for intf in mg_facts.get('minigraph_portchannel_interfaces', []):
if intf.get('attachto') == interface_name and intf.get('addr'):
return str(intf['addr'])

return None


def select_test_interface_and_ptf_port(duthost, tbinfo):
"""
Select a random test interface/PortChannel and its corresponding PTF port.

Args:
duthost: DUT host object
tbinfo: Testbed info

Returns:
tuple: (interface_name, ptf_index) for interfaces
(portchannel_name, -1) for PortChannels
(None, None) if not found
"""
dut_to_ptf_mapping = get_dut_to_ptf_port_mapping(duthost, tbinfo)
if not dut_to_ptf_mapping:
return None, None

interface_name = random.choice(list(dut_to_ptf_mapping.keys()))
ptf_port_index = dut_to_ptf_mapping[interface_name]

logger.info("Selected: {} (PTF port: {})".format(interface_name, ptf_port_index))
return interface_name, ptf_port_index


def detect_portchannel_egress_member(duthost, tbinfo, ptf_adapter, portchannel_name, test_packet):
"""
Detect which PortChannel member interface is actually used for egress traffic.

Args:
duthost: DUT host object
tbinfo: Testbed info
ptf_adapter: PTF adapter object
portchannel_name: PortChannel name
test_packet: Test packet to send

Returns:
tuple: (member_interface, ptf_port) or (None, None)
"""
import ptf.testutils as testutils

logger.info("Detecting egress member for {}".format(portchannel_name))

# Get PortChannel members
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
portchannels = mg_facts.get('minigraph_portchannels', {})

if portchannel_name not in portchannels:
return None, None

members = portchannels[portchannel_name].get('members', [])
if not members:
return None, None

# Get PTF ports for members
ptf_indices = mg_facts.get('minigraph_ptf_indices', {})
member_ptf_ports = [(m, ptf_indices[m]) for m in members if m in ptf_indices]

if not member_ptf_ports:
return None, None

# Try each member port
num_test_packets = 100
timeout = 10.0

for member_name, member_ptf_port in member_ptf_ports:
logger.info("Trying {} (PTF port {})".format(member_name, member_ptf_port))
ptf_adapter.dataplane.flush()

# Send test packets
for i in range(num_test_packets):
testutils.send_packet(ptf_adapter, member_ptf_port, test_packet)

# Check if packets received
packets_received = 0

def check_packets_received():
nonlocal packets_received
while True:
result = testutils.dp_poll(ptf_adapter, device_number=0, timeout=0.1)
if isinstance(result, ptf_adapter.dataplane.PollSuccess):
if result.port == member_ptf_port:
packets_received += 1
else:
break
return packets_received >= num_test_packets

if wait_until(timeout=timeout, interval=1, delay=1, condition=check_packets_received):
logger.info("Found egress member: {} (PTF port {})".format(member_name, member_ptf_port))
ptf_adapter.dataplane.flush()
return member_name, member_ptf_port

# No working member found
ptf_adapter.dataplane.flush()
return None, None


@pytest.fixture(scope="module")
def downstream_links(rand_selected_dut, tbinfo, nbrhosts):
"""
Expand Down
16 changes: 16 additions & 0 deletions tests/qos/files/strict_priority_params.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"strict_priority_test": {
"queue": 0,
"cir_bytes_per_sec": 500000,
"pir_bytes_per_sec": 750000,
"traffic_duration": 120,
"low_traffic_bytes_per_sec": 500000,
"high_traffic_bytes_per_sec": 1500000,
"drop_threshold_low": 1000,
"drop_threshold_high": 10000,
"bandwidth_tolerance_min": 90,
"scheduler_policy": "strict_priority_rate_limit_policy",
"packet_size": 1000,
"dnx_credit_worth": 10000
}
}
92 changes: 92 additions & 0 deletions tests/qos/qos_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,98 @@ def ansible_stdout_to_str(ansible_stdout):
return result


def get_dscp_to_queue_mapping(duthost):
"""
Get DSCP to queue mapping from DUT using config facts only

Args:
duthost: DUT host object

Returns:
dict or None: DSCP to queue mapping from DUT configuration, or None if not found
"""
try:
logger.info("Getting DSCP to queue mapping from DUT config facts...")

# Get config facts from DUT
config_facts = duthost.asic_instance().config_facts(source="running")["ansible_facts"]

# Get DSCP_TO_TC_MAP (usually 'AZURE' profile)
dscp_to_tc_map_data = config_facts.get('DSCP_TO_TC_MAP', {})
if not dscp_to_tc_map_data:
logger.error("DSCP_TO_TC_MAP not found in config facts")
return None

# Use AZURE profile (most common)
dscp_to_tc_map = dscp_to_tc_map_data.get('AZURE', {})
if not dscp_to_tc_map:
# If AZURE not found, try the first available profile
if dscp_to_tc_map_data:
profile_name = list(dscp_to_tc_map_data.keys())[0]
dscp_to_tc_map = dscp_to_tc_map_data[profile_name]
logger.info(f"Using DSCP_TO_TC_MAP profile: {profile_name}")
else:
logger.error("No DSCP_TO_TC_MAP profiles found")
return None
else:
logger.info("Using DSCP_TO_TC_MAP profile: AZURE")

logger.info(f"DSCP to TC mapping: {dscp_to_tc_map}")

# Build DSCP to queue mapping
# In SONiC, TC (Traffic Class) typically maps directly to queue
# So we can use TC as queue number
dscp_to_queue_map = {}
for dscp_str, tc_str in dscp_to_tc_map.items():
try:
dscp = int(dscp_str)
queue = int(tc_str) # TC maps to queue
dscp_to_queue_map[dscp] = queue
except (ValueError, TypeError) as e:
logger.warning(f"Skipping invalid DSCP {dscp_str} -> TC {tc_str}: {e}")
continue

logger.info(f"Built DSCP to queue mapping: {dscp_to_queue_map}")

if not dscp_to_queue_map:
logger.error("No valid DSCP to queue mappings found")
return None

return dscp_to_queue_map

except Exception as e:
logger.error(f"Failed to get DSCP to queue mapping from DUT: {e}")
return None


def find_dscp_for_queue(duthost, target_queue):
"""
Find a DSCP value that maps to the target queue

Args:
duthost: DUT host object
target_queue: Target queue number

Returns:
int or None: DSCP value that maps to target queue, or None if not found
"""
# Get DSCP to queue mapping from DUT
dscp_to_queue_map = get_dscp_to_queue_mapping(duthost)
if dscp_to_queue_map is None:
logger.error("Could not get DSCP to queue mapping from DUT")
return None

for dscp, queue in dscp_to_queue_map.items():
if queue == target_queue:
logger.info(f"Found DSCP {dscp} maps to target queue {target_queue}")
return dscp

# If no exact match found, log available mappings and return None
available_mappings = {f"DSCP {dscp}": f"Queue {queue}" for dscp, queue in dscp_to_queue_map.items()}
logger.error(f"No DSCP found that maps to queue {target_queue}. Available mappings: {available_mappings}")
return None


def get_phy_intfs(host_ans):
"""
@Summary: Get the physical interfaces (e.g., EthernetX) of a DUT
Expand Down
Loading
Loading