Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 62 additions & 62 deletions ansible/roles/test/files/ptftests/dir_bcast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,35 @@
Description: This file contains the Directed Broadcast test for SONIC

Usage: Examples of how to use log analyzer
ptf --test-dir ptftests dir_bcast_test.BcastTest --platform remote -t "testbed_type='t0';router_mac='00:01:02:03:04:05';vlan_info='/root/vlan_info.txt'" --relax --debug info --log-file /tmp/dir_bcast_test.log --disable-vxlan --disable-geneve --disable-erspan --disable-mpls --disable-nvgre
ptf --test-dir ptftests dir_bcast_test.BcastTest \
--platform remote \
-t "testbed_type='t0';router_mac='00:01:02:03:04:05';vlan_info='/root/vlan_info.txt'"
--relax \
--debug info \
--log-file /tmp/dir_bcast_test.log \
--disable-vxlan
--disable-geneve \
--disable-erspan \
--disable-mpls \
--disable-nvgre

'''

#---------------------------------------------------------------------
# ---------------------------------------------------------------------
# Global imports
#---------------------------------------------------------------------
# ---------------------------------------------------------------------
import logging
import random
import json
import ptf
import ptf.packet as scapy
import ptf.dataplane as dataplane

from ptf import config
from ptf.base_tests import BaseTest
from ptf.mask import Mask
from ptf.testutils import *
from ipaddress import ip_address, ip_network
from ptf.testutils import test_params_get, simple_ip_packet, simple_udp_packet,\
send_packet, count_matched_packets_all_ports
from ipaddress import ip_network


class BcastTest(BaseTest):
'''
Expand All @@ -34,9 +45,9 @@ class BcastTest(BaseTest):
- IP frame, Dst Mac = Router MAC, Dst IP = Directed Broadcast IP
'''

#---------------------------------------------------------------------
# ---------------------------------------------------------------------
# Class variables
#---------------------------------------------------------------------
# ---------------------------------------------------------------------
BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff'
DHCP_SERVER_PORT = 67
TEST_SRC_IP = "1.1.1.1" # Some src IP
Expand All @@ -48,55 +59,34 @@ def __init__(self):
BaseTest.__init__(self)
self.test_params = test_params_get()

#---------------------------------------------------------------------
# ---------------------------------------------------------------------

def setUp(self):
self.dataplane = ptf.dataplane_instance
self.router_mac = self.test_params['router_mac']
self.setUpVlan(self.test_params['vlan_info'])
if self.test_params['testbed_type'] == 't0':
self.src_ports = range(1, 25) + range(28, 32)
if self.test_params['testbed_type'] in ['t0-52', 'm0']:
self.src_ports = range(0, 52)
if self.test_params['testbed_type'] == 't0-56':
self.src_ports = range(0, 32)
if self.test_params['testbed_type'] == 't0-64':
self.src_ports = range(0, 2) + range(4, 18) + range(20, 33) + range(36, 43) + range(48, 49) + range(52, 59)
if self.test_params['testbed_type'] == 't0-116':
self.src_ports = range(24, 32)
if self.test_params['testbed_type'] == 't0-120':
self.src_ports = [48, 49, 54, 55, 60, 61, 66, 67]

#---------------------------------------------------------------------

def setUpVlan(self, file_path):
'''
@summary: Populate the VLAN dictionary with IP/Prefix and member port list
'''
self._vlan_dict = {}
with open(file_path, 'r') as f:
for line in f.readlines():
entry = line.split(' ', 1)
prefix = ip_network(unicode(entry[0]))
if prefix.version != 4:
continue
self._vlan_dict[prefix] = [int(i) for i in entry[1].split()]
ptf_test_port_map = self.test_params['ptf_test_port_map']
with open(ptf_test_port_map) as f:
self.ptf_test_port_map = json.load(f)
self.src_ports = self.ptf_test_port_map['ptf_src_ports']
self._vlan_dict = self.ptf_test_port_map['vlan_ip_port_pair']

#---------------------------------------------------------------------
# ---------------------------------------------------------------------

def check_all_dir_bcast(self):
'''
@summary: Loop through all the VLANs and send directed broadcast packets
'''
for vlan_pfx in self._vlan_dict:
bcast_ip = str(ip_network(vlan_pfx).broadcast_address)
dst_port_list = self._vlan_dict[vlan_pfx]
self.check_ip_dir_bcast(bcast_ip, dst_port_list)
self.check_bootp_dir_bcast(bcast_ip, dst_port_list)
for vlan_pfx, dst_ports in self._vlan_dict.items():
if ip_network(vlan_pfx).version == 4:
bcast_ip = str(ip_network(vlan_pfx).broadcast_address)
logging.info("bcast_ip: {}, vlan_pfx: {}, dst_ports: {}".format(
bcast_ip, vlan_pfx, dst_ports))
self.check_ip_dir_bcast(bcast_ip, dst_ports)
self.check_bootp_dir_bcast(bcast_ip, dst_ports)

#---------------------------------------------------------------------
# ---------------------------------------------------------------------

def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list):
def check_ip_dir_bcast(self, dst_bcast_ip, dst_ports):
'''
@summary: Check directed broadcast IP forwarding and receiving on all member ports.
'''
Expand All @@ -111,30 +101,35 @@ def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list):
ip_dst=ip_dst)

exp_pkt = simple_ip_packet(eth_dst=bcast_mac,
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst)
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst)

masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl")

src_port = random.choice([port for port in self.src_ports if port not in dst_port_list])
src_port = random.choice(
[port for port in self.src_ports if port not in dst_ports])
send_packet(self, src_port, pkt)
logging.info("Sending packet from port " + str(src_port) + " to " + ip_dst)
logging.info("Sending packet from port " +
str(src_port) + " to " + ip_dst)

pkt_count = count_matched_packets_all_ports(self, masked_exp_pkt, dst_port_list)
pkt_count = count_matched_packets_all_ports(
self, masked_exp_pkt, dst_ports)
'''
Check if broadcast packet is received on all member ports of vlan
'''
logging.info("Received " + str(pkt_count) + " broadcast packets, expecting " + str(len(dst_port_list)))
assert (pkt_count == len(dst_port_list)), "received {} expected {}".format(pkt_count, len(dst_port_list))
logging.info("Received " + str(pkt_count) +
" broadcast packets, expecting " + str(len(dst_ports)))
assert (pkt_count == len(dst_ports)), "received {} expected {}".format(
pkt_count, len(dst_ports))

return

#---------------------------------------------------------------------
# ---------------------------------------------------------------------

def check_bootp_dir_bcast(self, dst_bcast_ip, dst_port_list):
def check_bootp_dir_bcast(self, dst_bcast_ip, dst_ports):
'''
@summary: Check directed broadcast BOOTP packet forwarding and receiving on all member ports.
'''
Expand Down Expand Up @@ -162,20 +157,25 @@ def check_bootp_dir_bcast(self, dst_bcast_ip, dst_port_list):
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl")

src_port = random.choice([port for port in self.src_ports if port not in dst_port_list])
src_port = random.choice(
[port for port in self.src_ports if port not in dst_ports])
send_packet(self, src_port, pkt)
logging.info("Sending BOOTP packet from port " + str(src_port) + " to " + ip_dst)
logging.info("Sending BOOTP packet from port " +
str(src_port) + " to " + ip_dst)

pkt_count = count_matched_packets_all_ports(self, masked_exp_pkt, dst_port_list)
pkt_count = count_matched_packets_all_ports(
self, masked_exp_pkt, dst_ports)
'''
Check if broadcast BOOTP packet is received on all member ports of vlan
'''
logging.info("Received " + str(pkt_count) + " broadcast BOOTP packets, expecting " + str(len(dst_port_list)))
assert (pkt_count == len(dst_port_list)), "received {} expected {}".format(pkt_count, len(dst_port_list))
logging.info("Received " + str(pkt_count) +
" broadcast BOOTP packets, expecting " + str(len(dst_ports)))
assert (pkt_count == len(dst_ports)), "received {} expected {}".format(
pkt_count, len(dst_ports))

return

#---------------------------------------------------------------------
# ---------------------------------------------------------------------

def runTest(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,15 @@ ip/test_ip_packet.py::TestIPPacket::test_forward_ip_packet_with_0xffff_chksum_to
conditions:
- "asic_type in ['mellanox'] or hwsku in ['Arista-7280CR3-C40']"

#######################################
##### ipfwd #####
#######################################
ipfwd/test_dir_bcast.py:
skip:
reason: "Unsupported topology."
conditions:
- "topo_type not in ['t0', 'm0', 'mx'] or 'dualtor' in topo_name"

#######################################
##### mvrf #####
#######################################
Expand Down
35 changes: 35 additions & 0 deletions tests/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,41 @@ def safe_filename(filename, replacement_char='_'):
return re.sub(illegal_chars_pattern, replacement_char, filename)


def get_neighbor_port_list(duthost, neighbor_name):
"""
@summary: Get neighbor port in dut by neighbor_name
@param duthost: The DUT
@param neighbor_name: name or keyword contained in name of neighbor
@return a list of port name
Sample output: ["Ethernet45", "Ethernet46"]
"""
config_facts = duthost.get_running_config_facts()
neighbor_port_list = []
for port_name, value in list(config_facts["DEVICE_NEIGHBOR"].items()):
if neighbor_name.upper() in value["name"].upper():
neighbor_port_list.append(port_name)

return neighbor_port_list


def get_neighbor_ptf_port_list(duthost, neighbor_name, tbinfo):
"""
@summary: Get neighbor port in ptf by neighbor_name
@param duthost: The DUT
@param neighbor_name: name or keyword contained in name of neighbor
@param tbinfo: testbed information
@return a list of port index
Sample output: [45, 46]
"""
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
neighbor_port_list = get_neighbor_port_list(duthost, neighbor_name)
ptf_port_list = []
for neighbor_port in neighbor_port_list:
ptf_port_list.append(mg_facts["minigraph_ptf_indices"][neighbor_port])

return ptf_port_list


def get_upstream_neigh_type(topo_type, is_upper=True):
"""
@summary: Get neighbor type by topo type
Expand Down
82 changes: 66 additions & 16 deletions tests/ipfwd/test_dir_bcast.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,86 @@
import pytest
import json
import logging

from tests.common.fixtures.ptfhost_utils import copy_ptftests_directory # lgtm[py/unused-import]
from tests.common.fixtures.ptfhost_utils import copy_ptftests_directory # noqa F401
from tests.ptf_runner import ptf_runner
from datetime import datetime

from tests.common.dualtor.mux_simulator_control import toggle_all_simulator_ports_to_rand_selected_tor_m # noqa F401
from tests.common.utilities import get_neighbor_ptf_port_list
from tests.common.helpers.constants import UPSTREAM_NEIGHBOR_MAP
pytestmark = [
pytest.mark.topology('t0', 'm0')
pytest.mark.topology('t0', 'm0', 'mx')
]

def test_dir_bcast(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):
logger = logging.getLogger(__name__)

PTF_TEST_PORT_MAP = '/root/ptf_test_port_map.json'


def get_ptf_src_ports(tbinfo, duthost):
# Source ports are upstream ports
upstream_neightbor_name = UPSTREAM_NEIGHBOR_MAP[tbinfo["topo"]["type"]]
ptf_src_ports = get_neighbor_ptf_port_list(duthost, upstream_neightbor_name, tbinfo)
return ptf_src_ports


def get_ptf_dst_ports(duthost, mg_facts, testbed_type):
if "dualtor" in testbed_type:
# In dualtor, only active port in active tor could be dst port
mux_status_out = duthost.show_and_parse("show mux status")
mux_active_ports = []
for mux_info in mux_status_out:
if mux_info['status'] == 'active':
mux_active_ports.append(mux_info['port'])

vlan_ip_port_pair = {}
for vlan_intf in mg_facts['minigraph_vlan_interfaces']:
vlan_subnet = vlan_intf["subnet"]
vlan_name = vlan_intf["attachto"]

ptf_dst_ports = []
for member in mg_facts['minigraph_vlans'][vlan_name]['members']:
if "Ethernet" in member:
if "dualtor" not in testbed_type:
ptf_dst_ports.append(mg_facts['minigraph_port_indices'][member])
elif member in mux_active_ports:
ptf_dst_ports.append(mg_facts['minigraph_port_indices'][member])

if ptf_dst_ports:
vlan_ip_port_pair[vlan_subnet] = ptf_dst_ports

return vlan_ip_port_pair


def ptf_test_port_map(duthost, ptfhost, mg_facts, testbed_type, tbinfo):
ptf_test_port_map = {}
ptf_src_ports = get_ptf_src_ports(tbinfo, duthost)
vlan_ip_port_pair = get_ptf_dst_ports(duthost, mg_facts, testbed_type)

ptf_test_port_map = {
'ptf_src_ports': ptf_src_ports,
'vlan_ip_port_pair': vlan_ip_port_pair
}
ptfhost.copy(content=json.dumps(ptf_test_port_map), dest=PTF_TEST_PORT_MAP)


def test_dir_bcast(duthosts, rand_one_dut_hostname, ptfhost, tbinfo,
toggle_all_simulator_ports_to_rand_selected_tor_m): # noqa F811
duthost = duthosts[rand_one_dut_hostname]
support_testbed_types = frozenset(['t0', 't0-16', 't0-52', 't0-56', 't0-64', 't0-64-32', 't0-116', 'm0'])
testbed_type = tbinfo['topo']['name']
if testbed_type not in support_testbed_types:
pytest.skip("Not support given test bed type %s" % testbed_type)
logger.info("tbinfo: {}".format(tbinfo))

# Copy VLAN information file to PTF-docker
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
extra_vars = {
'minigraph_vlan_interfaces': mg_facts['minigraph_vlan_interfaces'],
'minigraph_vlans': mg_facts['minigraph_vlans'],
'minigraph_port_indices': mg_facts['minigraph_ptf_indices'],
'minigraph_portchannels': mg_facts['minigraph_portchannels']
}
ptfhost.host.options['variable_manager'].extra_vars.update(extra_vars)
ptfhost.template(src="../ansible/roles/test/templates/fdb.j2", dest="/root/vlan_info.txt")
logger.info("mg_facts: {}".format(mg_facts))

ptf_test_port_map(duthost, ptfhost, mg_facts, testbed_type, tbinfo)

# Start PTF runner
params = {
'testbed_type': testbed_type,
'router_mac': duthost.facts['router_mac'],
'vlan_info': '/root/vlan_info.txt'
'ptf_test_port_map': PTF_TEST_PORT_MAP
}
log_file = "/tmp/dir_bcast.BcastTest.{}.log".format(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))
ptf_runner(
Expand Down