diff --git a/ansible/roles/test/files/ptftests/dir_bcast_test.py b/ansible/roles/test/files/ptftests/dir_bcast_test.py index daeb547af27..48585d2c665 100644 --- a/ansible/roles/test/files/ptftests/dir_bcast_test.py +++ b/ansible/roles/test/files/ptftests/dir_bcast_test.py @@ -24,14 +24,14 @@ class BcastTest(BaseTest): ''' @summary: Overview of functionality - Test sends a directed broadcast packet on one of the non-VLAN RIF interface and destined to the + Test sends a directed broadcast packet on one of the non-VLAN RIF interface and destined to the broadcast IP of the VLAN RIF. It expects the packet to be broadcasted to all the member port of - VLAN - + VLAN + This class receives a text file containing the VLAN IP address/prefix and the member port list - For the device configured with VLAN interface and member ports, - - IP/UDP frame, UDP port - DHCP server port, Dst Mac = Router MAC, Dst IP = Directed Broadcast IP + For the device configured with VLAN interface and member ports, + - IP frame, Dst Mac = Router MAC, Dst IP = Directed Broadcast IP ''' #--------------------------------------------------------------------- @@ -39,6 +39,7 @@ class BcastTest(BaseTest): #--------------------------------------------------------------------- BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' DHCP_SERVER_PORT = 67 + TEST_SRC_IP = "1.1.1.1" # Some src IP def __init__(self): ''' @@ -72,7 +73,7 @@ def setUpVlan(self, file_path): entry = line.split(' ', 1) prefix = ip_network(unicode(entry[0])) self._vlan_dict[prefix] = [int(i) for i in entry[1].split()] - + #--------------------------------------------------------------------- def check_all_dir_bcast(self): @@ -83,38 +84,34 @@ def check_all_dir_bcast(self): bcast_ip = str(ip_network(vlan_pfx).broadcast_address) dst_port_list = self._vlan_dict[vlan_pfx] self.check_ip_dir_bcast(bcast_ip, dst_port_list) + self.check_bootp_dir_bcast(bcast_ip, dst_port_list) #--------------------------------------------------------------------- def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list): ''' - @summary: Check unicast IP forwarding and receiving on all member ports. + @summary: Check directed broadcast IP forwarding and receiving on all member ports. ''' - ip_src = "10.0.0.100" # Some src_ip - ip_dst = dst_bcast_ip + ip_src = self.TEST_SRC_IP + ip_dst = dst_bcast_ip src_mac = self.dataplane.get_mac(0, 0) bcast_mac = self.BROADCAST_MAC - udp_port = self.DHCP_SERVER_PORT - - pkt = simple_udp_packet(eth_dst=self.router_mac, - eth_src=src_mac, - ip_src=ip_src, - ip_dst=ip_dst, - udp_sport=udp_port, - udp_dport=udp_port) - exp_pkt = simple_udp_packet(eth_dst=bcast_mac, - eth_src=self.router_mac, - ip_src=ip_src, - ip_dst=ip_dst, - udp_sport=udp_port, - udp_dport=udp_port) + pkt = simple_ip_packet(eth_dst=self.router_mac, + eth_src=src_mac, + ip_src=ip_src, + ip_dst=ip_dst) + + exp_pkt = simple_ip_packet(eth_dst=bcast_mac, + eth_src=self.router_mac, + ip_src=ip_src, + ip_dst=ip_dst) masked_exp_pkt = Mask(exp_pkt) masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum") masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl") - - src_port = random.choice([port for port in self.src_ports if port not in dst_port_list]) + + src_port = random.choice([port for port in self.src_ports if port not in dst_port_list]) send_packet(self, src_port, pkt) logging.info("Sending packet from port " + str(src_port) + " to " + ip_dst) @@ -124,15 +121,57 @@ def check_ip_dir_bcast(self, dst_bcast_ip, dst_port_list): ''' logging.info("Received " + str(pkt_count) + " broadcast packets, expecting " + str(len(dst_port_list))) assert (pkt_count == len(dst_port_list)) - + + return + + #--------------------------------------------------------------------- + + def check_bootp_dir_bcast(self, dst_bcast_ip, dst_port_list): + ''' + @summary: Check directed broadcast BOOTP packet forwarding and receiving on all member ports. + ''' + ip_src = self.TEST_SRC_IP + ip_dst = dst_bcast_ip + src_mac = self.dataplane.get_mac(0, 0) + bcast_mac = self.BROADCAST_MAC + udp_port = self.DHCP_SERVER_PORT + + pkt = simple_udp_packet(eth_dst=self.router_mac, + eth_src=src_mac, + ip_src=ip_src, + ip_dst=ip_dst, + udp_sport=udp_port, + udp_dport=udp_port) + + exp_pkt = simple_udp_packet(eth_dst=bcast_mac, + eth_src=self.router_mac, + ip_src=ip_src, + ip_dst=ip_dst, + udp_sport=udp_port, + udp_dport=udp_port) + + masked_exp_pkt = Mask(exp_pkt) + masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "chksum") + masked_exp_pkt.set_do_not_care_scapy(scapy.IP, "ttl") + + src_port = random.choice([port for port in self.src_ports if port not in dst_port_list]) + send_packet(self, src_port, pkt) + logging.info("Sending BOOTP packet from port " + str(src_port) + " to " + ip_dst) + + pkt_count = count_matched_packets_all_ports(self, masked_exp_pkt, dst_port_list) + ''' + Check if broadcast BOOTP packet is received on all member ports of vlan + ''' + logging.info("Received " + str(pkt_count) + " broadcast BOOTP packets, expecting " + str(len(dst_port_list))) + assert (pkt_count == len(dst_port_list)) + return #--------------------------------------------------------------------- def runTest(self): """ - @summary: Send Broadcast IP packet destined to a VLAN RIF and with unicast Dst MAC + @summary: Send Broadcast IP packet destined to a VLAN RIF and with unicast Dst MAC Expect the packet to be received on all member ports of VLAN """ self.check_all_dir_bcast() -