Skip to content
129 changes: 129 additions & 0 deletions ansible/roles/test/files/ptftests/pfc_pause_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import ipaddress
import logging
import random
import socket
import sys
import struct
import ipaddress
import re

import ptf
import ptf.packet as scapy
import ptf.dataplane as dataplane

from ptf import config
from ptf.base_tests import BaseTest
from ptf.mask import Mask
from ptf.testutils import *

def udp_filter(pkt_str):
try:
pkt = scapy.Ether(pkt_str)
return scapy.UDP in pkt

except:
return False

def capture_matched_packets(test, exp_packet, port, device_number = 0,timeout = 1):
"""
Receive all packets on the port and return all the received packets.
As soon as the packets stop arriving, the function waits for the timeout value and returns the received packets. Therefore, this function requires a positive timeout value.
"""
if timeout <= 0:
raise Exception("%s() requires positive timeout value." % sys._getframe().f_code.co_name)

pkts = list()
while True:
result = dp_poll(test, device_number = device_number, port_number = port, timeout = timeout)
if isinstance(result, test.dataplane.PollSuccess):
if ptf.dataplane.match_exp_pkt(exp_packet, result.packet):
pkts.append(result.packet)
else:
break

return pkts

class PfcPauseTest(BaseTest):
def __init__(self):
BaseTest.__init__(self)
self.test_params = test_params_get()

def setUp(self):
add_filter(udp_filter)
self.dataplane = ptf.dataplane_instance
self.mac_src = self.test_params['mac_src']
self.mac_dst = self.test_params['mac_dst']
self.pkt_count = int(self.test_params['pkt_count'])
self.pkt_intvl = float(self.test_params['pkt_intvl'])
self.port_src = int(self.test_params['port_src'])
self.port_dst = self.test_params['port_dst']
self.ip_src = self.test_params['ip_src']
self.ip_dst = self.test_params['ip_dst']
self.dscp = self.test_params['dscp']
""" DSCP for background traffic """
self.dscp_bg = self.test_params['dscp_bg']
self.queue_paused = self.test_params['queue_paused']

def runTest(self):
pass_cnt = 0
tos = self.dscp<<2
tos_bg = self.dscp_bg<<2

for x in range(self.pkt_count):
sport = random.randint(0, 65535)
dport = random.randint(0, 65535)

pkt = simple_udp_packet(
eth_dst = self.mac_dst,
eth_src = self.mac_src,
ip_src = self.ip_src,
ip_dst = self.ip_dst,
ip_tos = tos,
udp_sport = sport,
udp_dport = dport,
ip_ttl = 64)

pkt_bg = simple_udp_packet(
eth_dst = self.mac_dst,
eth_src = self.mac_src,
ip_src = self.ip_src,
ip_dst = self.ip_dst,
ip_tos = tos_bg,
udp_sport = sport,
udp_dport = dport,
ip_ttl = 64)

exp_pkt = simple_udp_packet(
ip_src = self.ip_src,
ip_dst = self.ip_dst,
ip_tos = tos_bg,
udp_sport = sport,
udp_dport = dport,
ip_ttl = 63)

masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether, "src")
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether, "dst")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "tos")

send_packet(self, self.port_src, pkt, 1)
send_packet(self, self.port_src, pkt_bg, 1)

pkts = capture_matched_packets(self, masked_exp_pkt, self.port_dst)

time.sleep(self.pkt_intvl)

""" If the queue is paused, we should only receive the background packet """
if self.queue_paused:
pass_cnt += int(len(pkts) == 1 and scapy.Ether(pkts[0])[scapy.IP].tos == tos_bg)

else:
pass_cnt += int(len(pkts) == 2)

print "Passes: %d / %d" % (pass_cnt, self.pkt_count)

def tearDown(self):
reset_filters()
BaseTest.tearDown(self)
34 changes: 33 additions & 1 deletion tests/qos/qos_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
import pytest
import os
from ansible_host import AnsibleHost

@pytest.fixture(scope = "module")
def lossless_prio_dscp_map(testbed_devices):
dut = testbed_devices["dut"]
config_facts = dut.config_facts(host=dut.hostname, source="persistent")['ansible_facts']

if "PORT_QOS_MAP" not in config_facts.keys():
return None

port_qos_map = config_facts["PORT_QOS_MAP"]
lossless_priorities = list()
intf = port_qos_map.keys()[0]
if 'pfc_enable' not in port_qos_map[intf]:
return None

lossless_priorities = [int(x) for x in port_qos_map[intf]['pfc_enable'].split(',')]
dscp_to_tc_map = config_facts["DSCP_TO_TC_MAP"]

result = dict()
for prio in lossless_priorities:
result[prio] = list()

profile = dscp_to_tc_map.keys()[0]

for dscp in dscp_to_tc_map[profile]:
tc = dscp_to_tc_map[profile][dscp]

if int(tc) in lossless_priorities:
result[int(tc)].append(int(dscp))

return result

@pytest.fixture(scope = "module")
def conn_graph_facts(testbed_devices):
Expand Down Expand Up @@ -33,4 +65,4 @@ def leaf_fanouts(conn_graph_facts):
if peer_device not in leaf_fanouts:
leaf_fanouts.append(peer_device)

return leaf_fanouts
return leaf_fanouts
147 changes: 146 additions & 1 deletion tests/qos/qos_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,152 @@
from ansible_host import AnsibleHost
from netaddr import IPAddress, IPNetwork
import json
import re

def ansible_stdout_to_str(ansible_stdout):
"""
@Summary: The stdout of Ansible host is essentially a list of unicode characters. This function converts it to a string.
@param ansible_stdout: stdout of Ansible
@return: Return a string
"""
result = ""
for x in ansible_stdout:
result += x.encode('UTF8')
return result

def eos_to_linux_intf(eos_intf_name):
"""
@Summary: Map EOS's interface name to Linux's interface name
@param eos_intf_name: Interface name in EOS
@return: Return the interface name in Linux
"""
return eos_intf_name.replace('Ethernet', 'et').replace('/', '_')
return eos_intf_name.replace('Ethernet', 'et').replace('/', '_')

def get_active_intfs(host_ans):
"""
@Summary: Get the active interfaces of a DUT
@param host_ans: Ansible host instance of this DUT
@return: Return the list of active interfaces
"""
int_status = host_ans.show_interface(command = "status")['ansible_facts']['int_status']
active_intfs = []

for intf in int_status:
if int_status[intf]['admin_state'] == 'up' and \
int_status[intf]['oper_state'] == 'up':
active_intfs.append(intf)

return active_intfs

def get_addrs_in_subnet(subnet, n):
"""
@Summary: Get N IP addresses in a subnet
@param subnet: IPv4 subnet, e.g., '192.168.1.1/24'
@param n: # of IP addresses to get
@return: Retuen n IPv4 addresses in this subnet in a list
"""
ip_addr = subnet.split('/')[0]
ip_addrs = [str(x) for x in list(IPNetwork(subnet))]
ip_addrs.remove(ip_addr)

""" Try to avoid network and broadcast addresses """
if len(ip_addrs) >= n + 2:
del ip_addrs[0]
del ip_addrs[-1]

return ip_addrs[:n]

def get_neigh_ip(ip_addr, netmask):
"""
@Summary: Given an IP address and a netmask, get another IP in this subnet.
@param ip_addr: IPv4 address string (e.g., "192.168.1.1")
@param netmask: network mask string (e.g., "255.255.255.254")
@return: Return another IP address in this subnet
"""
prefix_len = IPAddress(netmask).netmask_bits()
ip_addrs = get_addrs_in_subnet(ip_addr + '/' + str(prefix_len), 1)

if len(ip_addrs) != 0:
return ip_addrs[0]

else:
return None

def gen_arp_responder_config(intfs, ip_addrs, mac_addrs, config_file):
"""
@Summary: Generate a configuration file for ARP responder
@param intfs: list of interfaces
@param ip_addrs: list of IP addresses
@param mac_addrs: list of MAC addresses
@param config_file: configuration file path
return: Return true if the config file is successfully generated
"""
if len(intfs) != len(ip_addrs) or len(intfs) != len(mac_addrs):
return False

config = dict()
for i in range(len(intfs)):
config[intfs[i]] = dict()
""" The config file accepts MAC addresses like 00112233445566 without any : """
config[intfs[i]][ip_addrs[i]] = mac_addrs[i].replace(':', '')

with open(config_file, 'w') as fp:
json.dump(config, fp)

return True

def check_mac_table(host_ans, mac_addrs):
"""
@Summary: Check if the DUT's MAC table (FIB) has all the MAC address information
@param host_ans: Ansible host instance of this DUT
@param mac_addrs: list of MAC addresses to check
return: Return true if the DUT's MAC table has all the MAC addresses
"""
if mac_addrs is None:
return False

stdout = ansible_stdout_to_str(host_ans.command('show mac')['stdout'])

for mac in mac_addrs:
if mac.upper() not in stdout.upper():
return False

return True

def get_mac(host_ans, ip_addr):
"""
@Summary: Get the MAC address of a given IP address in a DUT
@param host_ans: Ansible host instance of this DUT
@param ip_addr: IP address
return: Return the MAC address or None if we cannot find it
"""
cmd = 'sudo arp -a -n %s' % (ip_addr)
stdout = ansible_stdout_to_str(host_ans.command(cmd)['stdout']).strip()

if len(stdout) == 0 or 'incomplete' in stdout:
return None

pattern = re.compile(ur'(?:[0-9a-fA-F]:?){12}')
results = re.findall(pattern, stdout)

if len(results) == 0:
return None

else:
return results[0]

def config_intf_ip_mac(host_ans, intf, ip_addr, netmask, mac_addr):
"""
@Summary: Configure IP and MAC on a intferface of a PTF
@param host_ans: Ansible host instance of this PTF
@param intf: interface name
@param ip_addr: IP address, e.g., '192.168.1.1'
@param netmask: Network mask, e.g., '255.255.255.0'
@param mac_addr: MAC address, e.g., '00:11:22:33:44:55'
"""
host_ans.shell('ifconfig %s down' % intf)
host_ans.shell('ifconfig %s hw ether %s' % (intf, mac_addr))
host_ans.shell('ifconfig %s up' % intf)
host_ans.shell('ifconfig %s %s netmask %s' % (intf, ip_addr, netmask))


Loading