Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 48 additions & 72 deletions ansible/roles/test/files/acstests/acltb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,24 @@ def verify_packet_any_port(self, pkt, ports=[], device_number=0):
return (match_index, rcv_pkt, received)
#---------------------------------------------------------------------

def runSendReceiveTest(self, pkt2send, src_port , pkt2recv, destination_ports):
def runSendReceiveTest(self, i, pkt2send, src_port , pkt2recv, dst_ports, expect):
"""
@summary Send packet and verify it is received/not received on the expected ports
"""

masked2recv = Mask(pkt2recv)
masked2recv.set_do_not_care_scapy(scapy.Ether, "dst")
masked2recv.set_do_not_care_scapy(scapy.Ether, "src")

send_packet(self, src_port, pkt2send)
(index, rcv_pkt, received) = self.verify_packet_any_port(masked2recv, destination_ports)
(index, rcv_pkt, received) = self.verify_packet_any_port(masked2recv, dst_ports)

self.tests_total += 1

return received
passed = received == expect
print "Test #" + str(i) + " %s" % ("PASSED" if passed else "FAILED")
self.tests_passed += 1 if passed else 0

return passed

#---------------------------------------------------------------------
def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
Expand All @@ -119,127 +122,105 @@ def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
@return: Number of tests passed
"""

tests_passed = 0
self.tests_passed = 0
self.tests_total = 0

print "\nPort to sent packets to: %d" % src_port
print "Destination IP: %s" % dst_ip
print "Ports to expect packet from: ",
pprint.pprint(dst_ports)
print "Dst IP expected to be blocked: ", dst_ip_blocked
print "Dst IP expected to be blocked: %s " % dst_ip_blocked

pkt0 = simple_tcp_packet(
eth_dst = self.router_mac,
eth_src = self.dataplane.get_mac(0, 0),
ip_src = "10.0.0.1",
ip_dst = dst_ip,
tcp_sport = 0x1234,
tcp_dport = 0x50,
tcp_sport = 0x4321,
tcp_dport = 0x51,
ip_ttl = 64
)
#exp_pkt = pkt.deepcopy()

exp_pkt0 = simple_tcp_packet(
eth_dst = self.dataplane.get_mac(0, 0),
eth_src = self.router_mac,
ip_src = "10.0.0.1",
ip_dst = dst_ip,
tcp_sport = 0x1234,
tcp_dport = 0x50,
tcp_sport = 0x4321,
tcp_dport = 0x51,
ip_ttl = 63
)

print ""
# Test #1 - Verify source IP match
# Test #0 - unmatched packet - dropped
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
self.runSendReceiveTest(0, pkt, src_port, exp_pkt, dst_ports, 0)

# Test #1 - source IP match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.2"
exp_pkt['IP'].src = "10.0.0.2"
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #1 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(1, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #2 - Verify destination IP match
# Test #2 - destination IP match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].dst = dst_ip_blocked
exp_pkt['IP'].dst = dst_ip_blocked
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #2 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(2, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #3 - Verify L4 source port match
# Test #3 - L4 source port match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].sport = 0x1235
exp_pkt['TCP'].sport = 0x1235
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #3 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(3, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #4 - Verify L4 destination port match
# Test #4 - L4 destination port match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].dport = 0x1235
exp_pkt['TCP'].dport = 0x1235
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #4 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(4, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #5 - Verify ether type match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['Ethernet'].type = 0x1234
exp_pkt['Ethernet'].type = 0x1234
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #5 %s" % ("FAILED" if res else "PASSED")

# Test #6 - Verify ip protocol match
# Test #5 - IP protocol match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].proto = 0x7E
exp_pkt['IP'].proto = 0x7E
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #6 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(5, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #7 - Verify TCP flags match
# Test #6 - TCP flags match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].flags = 0x12
exp_pkt['TCP'].flags = 0x12
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #7 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(6, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #9 - Verify source port range match
# Test #7 - source port range match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].sport = 0x123A
exp_pkt['TCP'].sport = 0x123A
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #9 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(7, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #10 - Verify destination port range match
# Test #8 - destination port range match - forwarded
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].dport = 0x123A
exp_pkt['TCP'].dport = 0x123A
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #10 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(8, pkt, src_port, exp_pkt, dst_ports, 1)

# Test #11 - Verify rules priority
# Test #9 - rules priority - dropped
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.3"
exp_pkt['IP'].src = "10.0.0.3"
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (1 if res else 0)
print "Test #11 %s" % ("PASSED" if res else "FAILED")
self.runSendReceiveTest(9, pkt, src_port, exp_pkt, dst_ports, 0)

#Creates a ICMP packet
# Create a ICMP packet
pkt0 = simple_icmp_packet(
eth_dst = self.router_mac,
eth_src = self.dataplane.get_mac(0, 0),
Expand All @@ -249,7 +230,7 @@ def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
icmp_code=0,
ip_ttl = 64
)
#exp_pkt = pkt.deepcopy()

exp_pkt0 = simple_icmp_packet(
eth_dst = self.dataplane.get_mac(0, 0),
eth_src = self.router_mac,
Expand All @@ -260,17 +241,15 @@ def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
ip_ttl = 63
)

# Test #12 - Verify IP protocol & source IP match
# Test #10 - ICMP source IP match - forwarded
# IP_PROTOCOL = 0x1
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.2"
exp_pkt['IP'].src = "10.0.0.2"
pkt['IP'].proto=0x1
exp_pkt['IP'].proto=0x1
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #12 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(10, pkt, src_port, exp_pkt, dst_ports, 1)

# Create a UDP packet
pkt0 = simple_udp_packet(
eth_dst = self.router_mac,
eth_src = self.dataplane.get_mac(0, 0),
Expand All @@ -280,7 +259,7 @@ def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
udp_dport = 80,
ip_ttl = 64
)
#exp_pkt = pkt.deepcopy()

exp_pkt0 = simple_udp_packet(
eth_dst = self.dataplane.get_mac(0, 0),
eth_src = self.router_mac,
Expand All @@ -291,18 +270,15 @@ def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
ip_ttl = 63
)

# Test #13 - Verify source IP match - UDP packet and UDP protocol
# Test #11 - UDP source IP match - forwarded
# IP_PROTOCOL = 0x11
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.2"
exp_pkt['IP'].src = "10.0.0.2"
pkt['IP'].proto=0x11
exp_pkt['IP'].proto=0x11
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #13 %s" % ("FAILED" if res else "PASSED")
self.runSendReceiveTest(11, pkt, src_port, exp_pkt, dst_ports, 1)

return tests_passed, self.tests_total
return self.tests_passed, self.tests_total

#---------------------------------------------------------------------

Expand Down
Loading