Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 266 additions & 0 deletions ansible/roles/test/files/acstests/acltb_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
'''
Description: This file contains the ACL test for SONiC testbed

Implemented according to the https://github.com/Azure/SONiC/wiki/ACL-test-plan

Usage: Examples of how to use:
ptf --test-dir acstests acltb_test.AclTest --platform remote -t 'router_mac="00:02:03:04:05:00";verbose=True;route_info="/tmp/route_info.txt"'
'''

#---------------------------------------------------------------------
# Global imports
#---------------------------------------------------------------------
import random
import time
import logging
import ptf.packet as scapy
import socket
import ptf.dataplane as dataplane

from ptf.testutils import *
from ptf.mask import Mask
import ipaddress

import os
import logging
import unittest

import ptf
from ptf.base_tests import BaseTest
from ptf import config
import ptf.dataplane as dataplane
import ptf.testutils as testutils

import pprint

class AclTest(BaseTest):
'''
@summary: ACL tests on testbed topo: t1
'''

#---------------------------------------------------------------------
# Class variables
#---------------------------------------------------------------------
PORT_COUNT = 31 # temporary exclude the last port

def __init__(self):
'''
@summary: constructor
'''
BaseTest.__init__(self)
self.test_params = testutils.test_params_get()
#---------------------------------------------------------------------

def setUp(self):
'''
@summary: Setup for the test
'''

self.dataplane = ptf.dataplane_instance
self.router_mac = self.test_params['router_mac']

#---------------------------------------------------------------------

'''
For diagnostic purposes only
'''
def print_route_info(self):
pprint.pprint(self.route_info)
return
#---------------------------------------------------------------------

def verify_packet_any_port(self, pkt, ports=[], device_number=0):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@andriymoroz-mlnx andriymoroz-mlnx Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already there:
https://github.com/p4lang/ptf/blob/master/src/ptf/testutils.py#L2326
I borrowed this function from the fib_test.py; have no idea why the one from the ptf wasn't used...
UPD: returned result is slightly different

Copy link
Contributor

@stcheng stcheng Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the difference here is that the ptf one has this assertion: verify_no_other_packets(test, device_number=device_number) on line 2352. and here it doesn't have this line. the reason is that sometimes LLDP and other random message will fail the test if this assertion exists.

however, a better way to avoid duplicating this function is to add --relax flag when starting ptf. thus this 'verify_no_other_packets' will be ignored.

I will update the FIB test later to use the p4 one and remove the duplication. could you here remove this first and use the --relax flag in the command?

Copy link

@liatgrozovik liatgrozovik Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice enhancement which we should add to the backlog and handle it when we have the time for it.
As FIB test is also using it and was already merged and used, I suggest we will merge this test mostly as found as working and have it run on testbed daily. Please consider this as a compromise as we would like to move on and handle urgent task while being able to run these tests daily

"""
@summary: Check that the packet is received on _any_ of the specified ports belonging to
the given device (default device_number is 0).

The function returns when either the expected packet is received or timeout (1 second).

Also verifies that the packet is or received on any other ports for this
device, and that no other packets are received on the device (unless --relax
is in effect).
@param pkt : packet to verify
@param ports : list of ports

@return: index of the port on which the packet is received and the packet.
"""
received = False
match_index = 0
(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=device_number, exp_pkt=pkt, timeout=1)

if rcv_port in ports:
match_index = ports.index(rcv_port)
received = True

return (match_index, rcv_pkt, received)
#---------------------------------------------------------------------

def runSendReceiveTest(self, pkt2send, src_port , pkt2recv, destination_ports):
"""
@summary Send packet and verify it is received/not received on the expected ports
"""

masked2recv = Mask(pkt2recv)
masked2recv.set_do_not_care_scapy(scapy.Ether, "dst")
masked2recv.set_do_not_care_scapy(scapy.Ether, "src")

send_packet(self, src_port, pkt2send)
(index, rcv_pkt, received) = self.verify_packet_any_port(masked2recv, destination_ports)

self.tests_total += 1

return received

#---------------------------------------------------------------------
def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports):
"""
@summary: Crete and send packet to verify each ACL rule
@return: Number of tests passed
"""

tests_passed = 0
self.tests_total = 0

print "\nPort to sent packets to: %d" % src_port
print "Destination IP: %s" % dst_ip_blocked
print "Ports to expect packet from: ",
pprint.pprint(dst_ports)
print "Dst IP expected to be blocked: ", dst_ip_blocked

pkt0 = simple_tcp_packet(
eth_dst = self.router_mac,
eth_src = self.dataplane.get_mac(0, 0),
ip_src = "10.0.0.1",
ip_dst = dst_ip,
tcp_sport = 0x1234,
tcp_dport = 0x50,
ip_ttl = 64
)
#exp_pkt = pkt.deepcopy()
exp_pkt0 = simple_tcp_packet(
eth_dst = self.dataplane.get_mac(0, 0),
eth_src = self.router_mac,
ip_src = "10.0.0.1",
ip_dst = dst_ip,
tcp_sport = 0x1234,
tcp_dport = 0x50,
ip_ttl = 63
)

print ""
# Test #1 - Verify source IP match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.2"
exp_pkt['IP'].src = "10.0.0.2"
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #1 %s" % ("FAILED" if res else "PASSED")

# Test #2 - Verify destination IP match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].dst = dst_ip_blocked
exp_pkt['IP'].dst = dst_ip_blocked
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #2 %s" % ("FAILED" if res else "PASSED")

# Test #3 - Verify L4 source port match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].sport = 0x1235
exp_pkt['TCP'].sport = 0x1235
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #3 %s" % ("FAILED" if res else "PASSED")

# Test #4 - Verify L4 destination port match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].dport = 0x1235
exp_pkt['TCP'].dport = 0x1235
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #4 %s" % ("FAILED" if res else "PASSED")

# Test #5 - Verify ether type match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['Ethernet'].type = 0x1234
exp_pkt['Ethernet'].type = 0x1234
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #5 %s" % ("FAILED" if res else "PASSED")

# Test #6 - Verify ip protocol match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].proto = 0x7E
exp_pkt['IP'].proto = 0x7E
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #6 %s" % ("FAILED" if res else "PASSED")

# Test #7 - Verify TCP flags match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].flags = 0x12
exp_pkt['TCP'].flags = 0x12
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #7 %s" % ("FAILED" if res else "PASSED")

# Test #9 - Verify source port range match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].sport = 0x123A
exp_pkt['TCP'].sport = 0x123A
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #9 %s" % ("FAILED" if res else "PASSED")

# Test #10 - Verify destination port range match
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['TCP'].dport = 0x123A
exp_pkt['TCP'].dport = 0x123A
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (0 if res else 1)
print "Test #10 %s" % ("FAILED" if res else "PASSED")

# Test #11 - Verify rules priority
pkt = pkt0.copy()
exp_pkt = exp_pkt0.copy()
pkt['IP'].src = "10.0.0.3"
exp_pkt['IP'].src = "10.0.0.3"
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports)
tests_passed += (1 if res else 0)
print "Test #11 %s" % ("PASSED" if res else "FAILED")

return tests_passed, self.tests_total

#---------------------------------------------------------------------

def runTest(self):
"""
@summary: Crete and send packet to verify each ACL rule
"""

test_result = False

self.switch_info = open(self.test_params["switch_info"], 'r').readlines()
self.tor_ports = map(int, self.switch_info[0].rstrip(",\n").split(","))
self.spine_ports = map(int, self.switch_info[1].rstrip(",\n").split(","))
self.dest_ip_addr_spine = self.switch_info[2].strip()
self.dest_ip_addr_spine_blocked = self.switch_info[3].strip()
self.dest_ip_addr_tor = self.switch_info[4].strip()
self.dest_ip_addr_tor_blocked = self.switch_info[5].strip()

# Verify ACLs on tor port
(tests_passed, tests_total) = self.runAclTests(self.dest_ip_addr_spine, self.dest_ip_addr_spine_blocked, self.tor_ports[0], self.spine_ports)
assert(tests_passed == tests_total)

# Verify ACLs on spine port
(tests_passed, tests_total) = self.runAclTests(self.dest_ip_addr_tor, self.dest_ip_addr_tor_blocked, self.spine_ports[0], self.tor_ports)
assert(tests_passed == tests_total)
Empty file.
Empty file.
1 change: 1 addition & 0 deletions ansible/roles/test/tasks/acl/acltb_match_messages.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
s, "removeNeighbor: Neighbor is still referenced"
20 changes: 20 additions & 0 deletions ansible/roles/test/tasks/acltb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#-----------------------------------------
# Apply ACL configuration
#-----------------------------------------
- name: Acl test setup on testbed
include: acltb_configure.yml
tags: acltb_configure

#-----------------------------------------
# Run ACL test
#-----------------------------------------
- name: Acl test run on testbed
include: acltb_test.yml
tags: acltb_test

#-----------------------------------------
# Clean up ACL configuration
#-----------------------------------------
- name: Clean up ACL test configuration on the testbed
include: acltb_cleanup.yml
tags: acltb_cleanup
87 changes: 87 additions & 0 deletions ansible/roles/test/tasks/acltb_cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Set facts for ACL configuration
- set_fact:
acltb_configs:
- "{{ 'acltb_test_rules.json' }}"
- "{{ 'acltb_test_table.json' }}"

# Set facts for the loganalizer
- set_fact:
testname: acl
run_dir: /tmp
out_dir: /tmp/ansible-loganalyzer-results
test_match_file: acltb_match_messages.txt
test_ignore_file: acltb_ignore_messages.txt
test_expect_file: acltb_expect_messages.txt
match_file: loganalyzer_common_match.txt
ignore_file: loganalyzer_common_ignore.txt
tests_location: "{{ 'roles/test/tasks' }}"

# Separate set_fact is required to be able to use 'testname' fact.
- set_fact:
testname_unique: "{{ testname }}.{{ ansible_date_time.date}}.{{ ansible_date_time.hour}}-{{ ansible_date_time.minute}}-{{ ansible_date_time.second}}"

# Separate set_fact is required to be able to use 'testname_unique' fact.
- set_fact:
test_out_dir: "{{ out_dir }}/{{testname_unique}}"
match_file_list: "{{ run_dir }}/{{test_match_file}},{{ run_dir }}/{{match_file}}"
ignore_file_list: "{{ run_dir }}/{{test_ignore_file}},{{ run_dir }}/{{ignore_file}}"
result_file: result.loganalysis.{{testname_unique}}.log
summary_file: summary.loganalysis.{{testname_unique}}.log

# Gather minigraph facts
- name: Gathering minigraph facts about the device
minigraph_facts: host={{ inventory_hostname }}
become: no
connection: local

- name: Read port reverse alias mapping
set_fact:
alias_reverse_map: "{{ lookup('file', 'roles/sonicv2/files/ssw/{{ sonic_hwsku }}/alias_reverse_map.json') | from_json }}"

# Generate json files with ACL configuration for tests
- template: src=acltb_test_table.j2 dest=/tmp/acltb_test_table.json
connection: local

- template: src=acltb_test_rules.j2 dest=/tmp/acltb_test_rules.json
connection: local

# Copy ACL config to the swss container via the switch
- name: Copy ACL config file to the DUT
copy: src="/tmp/{{ item }}" dest="/tmp/{{ item }}"
with_items:
- "{{ acltb_configs }}"

- name: Copy ACL config file to the swss container
command: docker cp "/tmp/{{ item }}" swss:/etc/swss/config.d/"{{ item }}"
with_items:
- "{{ acltb_configs }}"

- name: Create ACL configuration to delete rules/tables
command: docker exec -t swss bash -c "sed -i 's/SET/DEL/g' /etc/swss/config.d/{{ item }}"
with_items:
- "{{ acltb_configs }}"

- include: roles/test/files/tools/loganalyzer/loganalyzer_init.yml

- block:
- name: Apply ACL delete configuration
command: docker exec -t swss bash -c "swssconfig /etc/swss/config.d/{{ item }}"
with_items:
- "{{ acltb_configs }}"
always:
- include: roles/test/files/tools/loganalyzer/loganalyzer_analyze.yml

# Output content of result files to ansible console
- shell: cat {{ test_out_dir }}/*
register: out
- debug: var=out.stdout_lines

- name: Get the total number of error messages.
shell: grep "TOTAL MATCHES" "{{ test_out_dir }}/{{ summary_file }}" | sed -n "s/TOTAL MATCHES:[[:space:]]*//p"
register: errors_found

- name: Check the number of error messages (positive tests only).
fail: msg="{{ errors_found.stdout }} errors found while running {{ testname }} test. Please see {{ test_out_dir }}/{{ result_file }}"
when: errors_found.stdout != "0"

- include: roles/test/files/tools/loganalyzer/loganalyzer_end.yml
Loading