Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions ansible/roles/test/files/ptftests/dir_bcast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,22 @@
class BcastTest(BaseTest):
'''
@summary: Overview of functionality
Test sends a directed broadcast packet on one of the non-VLAN RIF interface and destined to the
Test sends a directed broadcast packet on one of the non-VLAN RIF interface and destined to the
broadcast IP of the VLAN RIF. It expects the packet to be broadcasted to all the member port of
VLAN
VLAN

This class receives a text file containing the VLAN IP address/prefix and the member port list

For the device configured with VLAN interface and member ports,
- IP/UDP frame, UDP port - DHCP server port, Dst Mac = Router MAC, Dst IP = Directed Broadcast IP
For the device configured with VLAN interface and member ports,
- IP frame, Dst Mac = Router MAC, Dst IP = Directed Broadcast IP
'''

#---------------------------------------------------------------------
# Class variables
#---------------------------------------------------------------------
BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff'
DHCP_SERVER_PORT = 67
TEST_SRC_IP = "1.1.1.1" # Some src IP

def __init__(self):
'''
Expand Down Expand Up @@ -72,7 +73,7 @@ def setUpVlan(self, file_path):
entry = line.split(' ', 1)
prefix = ip_network(unicode(entry[0]))
self._vlan_dict[prefix] = [int(i) for i in entry[1].split()]

#---------------------------------------------------------------------

def check_all_dir_bcast(self):
Expand All @@ -83,38 +84,34 @@ def check_all_dir_bcast(self):
bcast_ip = str(ip_network(vlan_pfx).broadcast_address)
dst_port_list = self._vlan_dict[vlan_pfx]
self.check_ip_dir_bcast(bcast_ip, dst_port_list)
self.check_bootp_dir_bcast(bcast_ip, dst_port_list)

#---------------------------------------------------------------------

def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list):
'''
@summary: Check unicast IP forwarding and receiving on all member ports.
@summary: Check directed broadcast IP forwarding and receiving on all member ports.
'''
ip_src = "10.0.0.100" # Some src_ip
ip_dst = dst_bcast_ip
ip_src = self.TEST_SRC_IP
ip_dst = dst_bcast_ip
src_mac = self.dataplane.get_mac(0, 0)
bcast_mac = self.BROADCAST_MAC
udp_port = self.DHCP_SERVER_PORT

pkt = simple_udp_packet(eth_dst=self.router_mac,
eth_src=src_mac,
ip_src=ip_src,
ip_dst=ip_dst,
udp_sport=udp_port,
udp_dport=udp_port)

exp_pkt = simple_udp_packet(eth_dst=bcast_mac,
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst,
udp_sport=udp_port,
udp_dport=udp_port)
pkt = simple_ip_packet(eth_dst=self.router_mac,
eth_src=src_mac,
ip_src=ip_src,
ip_dst=ip_dst)

exp_pkt = simple_ip_packet(eth_dst=bcast_mac,
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst)

masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl")
src_port = random.choice([port for port in self.src_ports if port not in dst_port_list])

src_port = random.choice([port for port in self.src_ports if port not in dst_port_list])
send_packet(self, src_port, pkt)
logging.info("Sending packet from port " + str(src_port) + " to " + ip_dst)

Expand All @@ -124,15 +121,57 @@ def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list):
'''
logging.info("Received " + str(pkt_count) + " broadcast packets, expecting " + str(len(dst_port_list)))
assert (pkt_count == len(dst_port_list))


return

#---------------------------------------------------------------------

def check_bootp_dir_bcast(self, dst_bcast_ip, dst_port_list):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it looks like most of the functionality except the packet is same for both functions. We could create a single function for common code and use these functions only to create the packet. Approving from my side!

'''
@summary: Check directed broadcast BOOTP packet forwarding and receiving on all member ports.
'''
ip_src = self.TEST_SRC_IP
ip_dst = dst_bcast_ip
src_mac = self.dataplane.get_mac(0, 0)
bcast_mac = self.BROADCAST_MAC
udp_port = self.DHCP_SERVER_PORT

pkt = simple_udp_packet(eth_dst=self.router_mac,
eth_src=src_mac,
ip_src=ip_src,
ip_dst=ip_dst,
udp_sport=udp_port,
udp_dport=udp_port)

exp_pkt = simple_udp_packet(eth_dst=bcast_mac,
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst,
udp_sport=udp_port,
udp_dport=udp_port)

masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum")
masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl")

src_port = random.choice([port for port in self.src_ports if port not in dst_port_list])
send_packet(self, src_port, pkt)
logging.info("Sending BOOTP packet from port " + str(src_port) + " to " + ip_dst)

pkt_count = count_matched_packets_all_ports(self, masked_exp_pkt, dst_port_list)
'''
Check if broadcast BOOTP packet is received on all member ports of vlan
'''
logging.info("Received " + str(pkt_count) + " broadcast BOOTP packets, expecting " + str(len(dst_port_list)))
assert (pkt_count == len(dst_port_list))

return

#---------------------------------------------------------------------

def runTest(self):
"""
@summary: Send Broadcast IP packet destined to a VLAN RIF and with unicast Dst MAC
@summary: Send Broadcast IP packet destined to a VLAN RIF and with unicast Dst MAC
Expect the packet to be received on all member ports of VLAN
"""
self.check_all_dir_bcast()