diff --git a/tests/acl/templates/acltb_test_rules.j2 b/tests/acl/templates/acltb_test_rules.j2 index a654dbd21a9..9754d0ca7e6 100644 --- a/tests/acl/templates/acltb_test_rules.j2 +++ b/tests/acl/templates/acltb_test_rules.j2 @@ -31,7 +31,7 @@ }, "ip": { "config": { - "destination-ip-address": "192.168.8.1/32" + "destination-ip-address": "192.168.0.4/32" } } }, @@ -228,7 +228,7 @@ }, "ip": { "config": { - "destination-ip-address": "192.168.16.1/32" + "destination-ip-address": "192.168.0.8/32" } } }, diff --git a/tests/acl/templates/acltb_test_rules_part_2.j2 b/tests/acl/templates/acltb_test_rules_part_2.j2 index a654dbd21a9..9754d0ca7e6 100644 --- a/tests/acl/templates/acltb_test_rules_part_2.j2 +++ b/tests/acl/templates/acltb_test_rules_part_2.j2 @@ -31,7 +31,7 @@ }, "ip": { "config": { - "destination-ip-address": "192.168.8.1/32" + "destination-ip-address": "192.168.0.4/32" } } }, @@ -228,7 +228,7 @@ }, "ip": { "config": { - "destination-ip-address": "192.168.16.1/32" + "destination-ip-address": "192.168.0.8/32" } } }, diff --git a/tests/acl/test_acl.py b/tests/acl/test_acl.py index 40b4ebc727e..c77d7c51336 100644 --- a/tests/acl/test_acl.py +++ b/tests/acl/test_acl.py @@ -3,217 +3,283 @@ import random import logging import pprint - -from abc import ABCMeta, abstractmethod - import pytest +import json import ptf.testutils as testutils import ptf.mask as mask import ptf.packet as packet +from abc import ABCMeta, abstractmethod +from collections import defaultdict + from tests.common import reboot, port_toggle +from tests.common.helpers.assertions import pytest_require from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer, LogAnalyzerError from tests.common.fixtures.duthost_utils import backup_and_restore_config_db_module +from tests.common.fixtures.ptfhost_utils import copy_arp_responder_py logger = logging.getLogger(__name__) pytestmark = [ pytest.mark.acl, - pytest.mark.disable_loganalyzer, # disable automatic loganalyzer - pytest.mark.topology('t1') + pytest.mark.disable_loganalyzer, # Disable automatic loganalyzer, since we use it for the test + pytest.mark.topology("any") ] BASE_DIR = os.path.dirname(os.path.realpath(__file__)) -DUT_TMP_DIR = os.path.join('tmp', os.path.basename(BASE_DIR)) -FILES_DIR = os.path.join(BASE_DIR, 'files') -TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates') +DUT_TMP_DIR = "acl_test_dir" # Keep it under home dir so it persists through reboot +FILES_DIR = os.path.join(BASE_DIR, "files") +TEMPLATE_DIR = os.path.join(BASE_DIR, "templates") -ACL_TABLE_TEMPLATE = 'acltb_table.j2' -ACL_RULES_FULL_TEMPLATE = 'acltb_test_rules.j2' -ACL_RULES_PART_TEMPLATES = tuple('acltb_test_rules_part_{}.j2'.format(i) for i in xrange(1, 3)) -ACL_REMOVE_RULES_FILE = 'acl_rules_del.json' +ACL_TABLE_TEMPLATE = "acltb_table.j2" +ACL_RULES_FULL_TEMPLATE = "acltb_test_rules.j2" +ACL_RULES_PART_TEMPLATES = tuple("acltb_test_rules_part_{}.j2".format(i) for i in xrange(1, 3)) +ACL_REMOVE_RULES_FILE = "acl_rules_del.json" -DST_IP_TOR = '192.168.0.1' -DST_IP_TOR_FORWARDED = '192.168.8.1' -DST_IP_TOR_BLOCKED = '192.168.16.1' -DST_IP_SPINE = '192.168.128.1' -DST_IP_SPINE_FORWARDED = '192.168.136.1' -DST_IP_SPINE_BLOCKED = '192.168.144.1' +DEFAULT_SRC_IP = "20.0.0.1" -LOG_EXPECT_ACL_TABLE_CREATE_RE = '.*Created ACL table.*' -LOG_EXPECT_ACL_TABLE_REMOVE_RE = '.*Successfully deleted ACL table.*' -LOG_EXPECT_ACL_RULE_CREATE_RE = '.*Successfully created ACL rule.*' -LOG_EXPECT_ACL_RULE_REMOVE_RE = '.*Successfully deleted ACL rule.*' +DOWNSTREAM_DST_IP = "192.168.0.2" +DOWNSTREAM_IP_TO_ALLOW = "192.168.0.4" +DOWNSTREAM_IP_TO_BLOCK = "192.168.0.8" +UPSTREAM_DST_IP = "192.168.128.1" +UPSTREAM_IP_TO_ALLOW = "192.168.136.1" +UPSTREAM_IP_TO_BLOCK = "192.168.144.1" -@pytest.fixture(scope='module') -def setup(duthost, tbinfo, ptfadapter): - """ - setup fixture gathers all test required information from DUT facts and tbinfo - :param duthost: DUT host object - :param tbinfo: fixture provides information about testbed - :return: dictionary with all test required information - """ +VLAN_BASE_MAC_PATTERN = "72060001{:04}" - tor_ports = [] - spine_ports = [] - tor_ports_ids = [] - spine_ports_ids = [] - port_channels = [] - acl_table_ports = [] +LOG_EXPECT_ACL_TABLE_CREATE_RE = ".*Created ACL table.*" +LOG_EXPECT_ACL_TABLE_REMOVE_RE = ".*Successfully deleted ACL table.*" +LOG_EXPECT_ACL_RULE_CREATE_RE = ".*Successfully created ACL rule.*" +LOG_EXPECT_ACL_RULE_REMOVE_RE = ".*Successfully deleted ACL rule.*" - if tbinfo['topo']['name'] not in ('t1', 't1-lag', 't1-64-lag', 't1-64-lag-clet'): - pytest.skip('Unsupported topology') - # gather ansible facts - mg_facts = duthost.minigraph_facts(host=duthost.hostname)['ansible_facts'] +@pytest.fixture(scope="module") +def setup(duthost, tbinfo, ptfadapter): + """Gather all required test information from DUT and tbinfo. - # get the list of TOR/SPINE ports - for dut_port, neigh in mg_facts['minigraph_neighbors'].items(): - port_id = mg_facts['minigraph_port_indices'][dut_port] - if 'T0' in neigh['name']: - tor_ports.append(dut_port) - tor_ports_ids.append(port_id) - elif 'T2' in neigh['name']: - spine_ports.append(dut_port) - spine_ports_ids.append(port_id) + Args: + duthost: A fixture to interact with the DUT. + tbinfo: A fixture to gather information about the testbed. - # get the list of port channels - port_channels = mg_facts['minigraph_portchannels'] + Yields: + A Dictionary with required test information. + + """ + pytest_require( + tbinfo["topo"]["name"] != "dualtor", + "ACL test not supported on topology: \"{}\"".format(tbinfo["topo"]["name"]) + ) + + mg_facts = duthost.minigraph_facts(host=duthost.hostname)["ansible_facts"] + + # Get the list of upstream/downstream ports + downstream_ports = [] + upstream_ports = [] + downstream_port_ids = [] + upstream_port_ids = [] + + topo = tbinfo["topo"]["type"] + for interface, neighbor in mg_facts["minigraph_neighbors"].items(): + port_id = mg_facts["minigraph_port_indices"][interface] + if (topo == "t1" and "T0" in neighbor["name"]) or (topo == "t0" and "Server" in neighbor["name"]): + downstream_ports.append(interface) + downstream_port_ids.append(port_id) + elif (topo == "t1" and "T2" in neighbor["name"]) or (topo == "t0" and "T1" in neighbor["name"]): + upstream_ports.append(interface) + upstream_port_ids.append(port_id) + + # Get the list of LAGs + port_channels = mg_facts["minigraph_portchannels"] + + # TODO: We should make this more robust (i.e. bind all active front-panel ports) + acl_table_ports = [] - # get the list of port to be combined to ACL tables - if tbinfo['topo']['name'] in ('t1', 't1-lag'): - acl_table_ports += tor_ports + if topo == "t0" or tbinfo["topo"]["name"] in ("t1", "t1-lag"): + acl_table_ports += downstream_ports - if tbinfo['topo']['name'] in ('t1-lag', 't1-64-lag', 't1-64-lag-clet'): + if topo == "t0" or tbinfo["topo"]["name"] in ("t1-lag", "t1-64-lag", "t1-64-lag-clet"): acl_table_ports += port_channels else: - acl_table_ports += spine_ports + acl_table_ports += upstream_ports - logger.info('creating temporary folder for test {}'.format(DUT_TMP_DIR)) - duthost.command("mkdir -p {}".format(DUT_TMP_DIR)) + vlan_ports = [] + + if topo == "t0": + vlan_ports = [mg_facts["minigraph_port_indices"][ifname] + for ifname + in mg_facts["minigraph_vlans"].values()[0]["members"]] - host_facts = duthost.setup()['ansible_facts'] + host_facts = duthost.setup()["ansible_facts"] setup_information = { - 'router_mac': host_facts['ansible_Ethernet0']['macaddress'], - 'dut_tmp_dir': DUT_TMP_DIR, - 'tor_ports': tor_ports, - 'spine_ports': spine_ports, - 'tor_ports_ids': tor_ports_ids, - 'spine_ports_ids': spine_ports_ids, - 'port_channels': port_channels, - 'acl_table_ports': acl_table_ports, - 'dst_ip_tor': DST_IP_TOR, - 'dst_ip_tor_forwarded': DST_IP_TOR_FORWARDED, - 'dst_ip_tor_blocked': DST_IP_TOR_BLOCKED, - 'dst_ip_spine': DST_IP_SPINE, - 'dst_ip_spine_forwarded': DST_IP_SPINE_FORWARDED, - 'dst_ip_spine_blocked': DST_IP_SPINE_BLOCKED, + "router_mac": host_facts["ansible_Ethernet0"]["macaddress"], + "downstream_port_ids": downstream_port_ids, + "upstream_port_ids": upstream_port_ids, + "acl_table_ports": acl_table_ports, + "vlan_ports": vlan_ports, + "topo": topo } - logger.info('setup variables {}'.format(pprint.pformat(setup_information))) + logger.info("Gathered variables for ACL test:\n{}".format(pprint.pformat(setup_information))) - # FIXME: There seems to be some issue with the initial setup of the ptfadapter, causing some of the - # TestBasicAcl tests to fail because the forwarded packets are not being collected. This is an - # attempt to mitigate that issue while we continue to investigate the root cause. - # - # Ref: GitHub Issue #2032 - logger.info("setting up the ptfadapter") - ptfadapter.reinit() + logger.info("Creating temporary folder \"{}\" for ACL test".format(DUT_TMP_DIR)) + duthost.command("mkdir -p {}".format(DUT_TMP_DIR)) yield setup_information - logger.info('removing {}'.format(DUT_TMP_DIR)) - duthost.command('rm -rf {}'.format(DUT_TMP_DIR)) + logger.info("Removing temporary directory \"{}\"".format(DUT_TMP_DIR)) + duthost.command("rm -rf {}".format(DUT_TMP_DIR)) + + +@pytest.fixture(scope="module") +def populate_vlan_arp_entries(setup, ptfhost, duthost): + """Set up the ARP responder utility in the PTF container.""" + if setup["topo"] != "t0": + def noop(): + pass + + yield noop + + return # Don't fall through to t0 case + + addr_list = [DOWNSTREAM_DST_IP, DOWNSTREAM_IP_TO_ALLOW, DOWNSTREAM_IP_TO_BLOCK] + + vlan_host_map = defaultdict(dict) + for i in range(len(addr_list)): + mac = VLAN_BASE_MAC_PATTERN.format(i) + port = random.choice(setup["vlan_ports"]) + addr = addr_list[i] + vlan_host_map[port][str(addr)] = mac + + arp_responder_conf = {} + for port in vlan_host_map: + arp_responder_conf['eth{}'.format(port)] = vlan_host_map[port] + + with open("/tmp/from_t1.json", "w") as ar_config: + json.dump(arp_responder_conf, ar_config) + ptfhost.copy(src="/tmp/from_t1.json", dest="/tmp/from_t1.json") + + ptfhost.host.options["variable_manager"].extra_vars.update({"arp_responder_args": "-e"}) + ptfhost.template(src="templates/arp_responder.conf.j2", + dest="/etc/supervisor/conf.d/arp_responder.conf") + + ptfhost.shell("supervisorctl reread && supervisorctl update") + ptfhost.shell("supervisorctl restart arp_responder") + + def populate_arp_table(): + duthost.command("sonic-clear fdb all") + duthost.command("sonic-clear arp") + + for addr in addr_list: + duthost.command("ping {} -c 3".format(addr), module_ignore_errors=True) + + populate_arp_table() + + yield populate_arp_table + + logging.info("Stopping ARP responder") + ptfhost.shell("supervisorctl stop arp_responder") + + duthost.command("sonic-clear fdb all") + duthost.command("sonic-clear arp") @pytest.fixture(scope="module", params=["ingress", "egress"]) def stage(request, duthost): - """ - Parametrize tests for Ingress/Egress stage testing. + """Parametrize tests for Ingress/Egress stage testing. Args: - request: Pytest request fixture - duthost: DUT fixture + request: A fixture to interact with Pytest data. + duthost: A fixture to interact with the DUT. Returns: str: The ACL stage to be tested. """ - if request.param == "egress" and duthost.facts["asic_type"] in ["broadcom"]: - pytest.skip("Egress ACL stage not currently supported on {} ASIC" - .format(duthost.facts["asic_type"])) + pytest_require( + request.param == "ingress" or duthost.facts["asic_type"] not in ("broadcom"), + "Egress ACLs are not currently supported on \"{}\" ASICs".format(duthost.facts["asic_type"]) + ) return request.param -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def acl_table_config(duthost, setup, stage): - """ - generate ACL table configuration files and deploy them on DUT; - after test run cleanup artifacts on DUT - :param duthost: DUT host object - :param setup: setup parameters - :param stage: stage - :return: dictionary of table name and matching configuration file - """ + """Generate ACL table configuration files and deploy them to the DUT. + + Args: + duthost: A fixture to interact with the DUT. + setup: Parameters for the ACL tests. + stage: The ACL stage under test. - # Initialize data for ACL tables - tables_map = { - 'ingress': 'DATAINGRESS', - 'egress': 'DATAEGRESS', + Returns: + A dictionary containing the table name and the corresponding configuration file. + + """ + stage_to_name_map = { + "ingress": "DATA_INGRESS_TEST", + "egress": "DATA_EGRESS_TEST" } - acl_table_name = tables_map[stage] - tmp_dir = setup['dut_tmp_dir'] + acl_table_name = stage_to_name_map[stage] acl_table_vars = { - 'acl_table_name': acl_table_name, - 'acl_table_ports': setup['acl_table_ports'], - 'acl_table_stage': stage, - 'acl_table_type': 'L3', + "acl_table_name": acl_table_name, + "acl_table_ports": setup["acl_table_ports"], + "acl_table_stage": stage, + "acl_table_type": "L3" } - logger.info('extra variables for ACL table:\n{}'.format(pprint.pformat(acl_table_vars))) - duthost.host.options['variable_manager'].extra_vars.update(acl_table_vars) + logger.info("ACL table configuration:\n{}".format(pprint.pformat(acl_table_vars))) - logger.info('generate config for ACL table {}'.format(acl_table_name)) - acl_config = 'acl_table_{}.json'.format(acl_table_name) - acl_config_path = os.path.join(tmp_dir, acl_config) - duthost.template(src=os.path.join(TEMPLATE_DIR, ACL_TABLE_TEMPLATE), dest=acl_config_path) + acl_table_config_file = "acl_table_{}.json".format(acl_table_name) + acl_table_config_path = os.path.join(DUT_TMP_DIR, acl_table_config_file) - yield { - 'name': acl_table_name, - 'config_file': acl_config_path, + logger.info("Generating DUT config for ACL table \"{}\"".format(acl_table_name)) + duthost.host.options["variable_manager"].extra_vars.update(acl_table_vars) + duthost.template(src=os.path.join(TEMPLATE_DIR, ACL_TABLE_TEMPLATE), + dest=acl_table_config_path) + + return { + "table_name": acl_table_name, + "config_file": acl_table_config_path } @pytest.fixture(scope="module") def acl_table(duthost, acl_table_config, backup_and_restore_config_db_module): - """ - fixture to apply ACL table configuration and remove after tests - :param duthost: DUT object - :param acl_table_config: ACL table configuration dictionary - :return: forwards acl_table_config - """ + """Apply ACL table configuration and remove after tests. - name = acl_table_config['name'] - conf = acl_table_config['config_file'] + Args: + duthost: A fixture to interact with the DUT. + acl_table_config: A dictionary describing the ACL table configuration to apply. + backup_and_restore_config_db_module: A fixture that handles restoring Config DB + after the tests are over. + + Yields: + The ACL table configuration. + + """ + table_name = acl_table_config["table_name"] + config_file = acl_table_config["config_file"] - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl') + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl") loganalyzer.load_common_config() try: loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_CREATE_RE] with loganalyzer: - logger.info('creating ACL table: applying {}'.format(conf)) - # TODO: use sonic config CLI - duthost.command('sonic-cfggen -j {} --write-to-db'.format(conf)) + logger.info("Creating ACL table from config file: \"{}\"".format(config_file)) + + # TODO: Use `config` CLI to create ACL table + duthost.command("sonic-cfggen -j {} --write-to-db".format(config_file)) except LogAnalyzerError as err: - # cleanup config DB if create failed - duthost.command('config acl remove table {}'.format(name)) + # Cleanup Config DB if table creation failed + logger.error("ACL table creation failed, attempting to clean-up...") + duthost.command("config acl remove table {}".format(table_name)) raise err try: @@ -221,80 +287,87 @@ def acl_table(duthost, acl_table_config, backup_and_restore_config_db_module): finally: loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_REMOVE_RE] with loganalyzer: - logger.info('removing ACL table {}'.format(name)) - duthost.command('config acl remove table {}'.format(name)) + logger.info("Removing ACL table \"{}\"".format(table_name)) + duthost.command("config acl remove table {}".format(table_name)) class BaseAclTest(object): + """Base class for testing ACL rules. + + Subclasses must provide `setup_rules` method to prepare ACL rules for traffic testing. + + They can optionally override `teardown_rules`, which will otherwise remove the rules by + applying an empty configuration file. """ - Base class for ACL rules testing. - Derivatives have to provide @setup_rules method to prepare DUT for ACL traffic test and - optionally override @teardown_rules which base implementation is simply applying empty ACL rules - configuration file - """ + __metaclass__ = ABCMeta - ACL_COUNTERS_UPDATE_INTERVAL = 10 # seconds + ACL_COUNTERS_UPDATE_INTERVAL_SECS = 10 @abstractmethod - def setup_rules(self, dut, setup, acl_table): - """ - setup rules for test - :param dut: dut host - :param setup: setup information - :param acl_table: acl table creating fixture - :return: - """ + def setup_rules(self, dut, acl_table): + """Setup ACL rules for testing. - pass + Args: + dut: The DUT having ACLs applied. + acl_table: Configuration info for the ACL table. - def post_setup_hook(self, dut, localhost): - """ - perform actions after rules are applied - :param dut: DUT host object - :param localhost: localhost object - :return: """ - pass - def teardown_rules(self, dut, setup): - """ - teardown ACL rules after test by applying empty configuration - :param dut: DUT host object - :param setup: setup information - :return: + def post_setup_hook(self, dut, localhost, populate_vlan_arp_entries): + """Perform actions after rules have been applied. + + Args: + dut: The DUT having ACLs applied. + localhost: The host from which tests are run. + populate_vlan_arp_entries: A function to populate ARP/FDB tables for VLAN interfaces. + """ + pass + + def teardown_rules(self, dut): + """Tear down ACL rules once the tests have completed. - logger.info('removing all ACL rules') - # copy rules remove configuration - dut.copy(src=os.path.join(FILES_DIR, ACL_REMOVE_RULES_FILE), dest=setup['dut_tmp_dir']) - remove_rules_dut_path = os.path.join(setup['dut_tmp_dir'], ACL_REMOVE_RULES_FILE) - # remove rules - logger.info('applying {}'.format(remove_rules_dut_path)) - dut.command('config acl update full {}'.format(remove_rules_dut_path)) + Args: + dut: The DUT having ACLs applied. - @pytest.fixture(scope='class', autouse=True) - def acl_rules(self, duthost, localhost, setup, acl_table): """ - setup/teardown ACL rules based on test class requirements - :param duthost: DUT host object - :param localhost: localhost object - :param setup: setup information - :param acl_table: table creating fixture - :return: + logger.info("Finished with tests, removing all ACL rules...") + + # Copy empty rules configuration + dut.copy(src=os.path.join(FILES_DIR, ACL_REMOVE_RULES_FILE), dest=DUT_TMP_DIR) + remove_rules_dut_path = os.path.join(DUT_TMP_DIR, ACL_REMOVE_RULES_FILE) + + # Remove the rules + logger.info("Applying \"{}\"".format(remove_rules_dut_path)) + dut.command("config acl update full {}".format(remove_rules_dut_path)) + + @pytest.fixture(scope="class", autouse=True) + def acl_rules(self, duthost, localhost, setup, acl_table, populate_vlan_arp_entries): + """Setup/teardown ACL rules for the current set of tests. + + Args: + duthost: The DUT having ACLs applied. + localhost: The host from which tests are run. + setup: Parameters for the ACL tests. + acl_table: Configuration info for the ACL table. + populate_vlan_arp_entries: A function to populate ARP/FDB tables for VLAN interfaces. + """ - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl_rules') + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl_rules") loganalyzer.load_common_config() try: loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_CREATE_RE] with loganalyzer: - self.setup_rules(duthost, setup, acl_table) - self.post_setup_hook(duthost, localhost) + self.setup_rules(duthost, acl_table) + + self.post_setup_hook(duthost, localhost, populate_vlan_arp_entries) except LogAnalyzerError as err: - # cleanup config DB in case of log analysis error - self.teardown_rules(duthost, setup) + # Cleanup Config DB if rule creation failed + logger.error("ACL table creation failed, attempting to clean-up...") + self.teardown_rules(duthost) raise err try: @@ -302,497 +375,399 @@ def acl_rules(self, duthost, localhost, setup, acl_table): finally: loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_REMOVE_RE] with loganalyzer: - self.teardown_rules(duthost, setup) + logger.info("Removing ACL rules") + self.teardown_rules(duthost) - @pytest.yield_fixture(scope='class', autouse=True) + @pytest.yield_fixture(scope="class", autouse=True) def counters_sanity_check(self, duthost, acl_rules, acl_table): + """Validate that the counters for each rule in the rules list increased as expected. + + This fixture yields a list of rule IDs. The test case should add on to this list if + it is required to check the rule for increased counters. + + After the test cases pass, the fixture will wait for the ACL counters to update and then + check if the counters for each rule in the list were increased. + + Args: + duthost: The DUT having ACLs applied. + acl_rules: Fixture that sets up the ACL rules. + acl_table: Fixture that sets up the ACL table. + """ - counters sanity check after traffic test cases. - This fixture yields python list of rule IDs which test case should extend if - the RULE is required to check for increased counters. - After test cases passed the fixture will wait for ACL counters to update - and check if counters for each rule in the list of rules were increased. - :param duthost: DUT host object - :param acl_rules: rules creating fixture - :param acl_table: table creating fixture - :return: - """ + table_name = acl_table["table_name"] + acl_facts_before_traffic = duthost.acl_facts()["ansible_facts"]["ansible_acl_facts"][table_name]["rules"] - table_name = acl_table['name'] - acl_facts_before_traffic = duthost.acl_facts()['ansible_facts']['ansible_acl_facts'][table_name]['rules'] rule_list = [] yield rule_list if not rule_list: return - # wait for orchagent to update ACL counters - time.sleep(self.ACL_COUNTERS_UPDATE_INTERVAL) + # Wait for orchagent to update the ACL counters + time.sleep(self.ACL_COUNTERS_UPDATE_INTERVAL_SECS) - acl_facts_after_traffic = duthost.acl_facts()['ansible_facts']['ansible_acl_facts'][table_name]['rules'] + acl_facts_after_traffic = duthost.acl_facts()["ansible_facts"]["ansible_acl_facts"][table_name]["rules"] assert len(acl_facts_after_traffic) == len(acl_facts_before_traffic) for rule in rule_list: - rule = 'RULE_{}'.format(rule) - counters_after = acl_facts_after_traffic[rule] + rule = "RULE_{}".format(rule) + counters_before = acl_facts_before_traffic[rule] - logger.info('counters for {} before traffic:\n{}'.format(rule, pprint.pformat(counters_before))) - logger.info('counters for {} after traffic:\n{}'.format(rule, pprint.pformat(counters_after))) - assert counters_after['packets_count'] > counters_before['packets_count'] - assert counters_after['bytes_count'] > counters_before['bytes_count'] + logger.info("Counters for ACL rule \"{}\" before traffic:\n{}" + .format(rule, pprint.pformat(counters_before))) - @pytest.fixture(params=['tor->spine', 'spine->tor']) - def direction(self, request): - """ - used to parametrized test cases on direction - :param request: pytest request object - :return: direction - """ + counters_after = acl_facts_after_traffic[rule] + logger.info("Counters for ACL rule \"{}\" after traffic:\n{}" + .format(rule, pprint.pformat(counters_after))) + + assert counters_after["packets_count"] > counters_before["packets_count"] + assert counters_after["bytes_count"] > counters_before["bytes_count"] + @pytest.fixture(params=["downlink->uplink", "uplink->downlink"]) + def direction(self, request): + """Parametrize test based on direction of traffic.""" return request.param def get_src_port(self, setup, direction): - """ return source ports based on test case direction """ - - src_ports = setup['tor_ports_ids'] if direction == 'tor->spine' else setup['spine_ports_ids'] + """Get a source port for the current test.""" + src_ports = setup["downstream_port_ids"] if direction == "downlink->uplink" else setup["upstream_port_ids"] return random.choice(src_ports) def get_dst_ports(self, setup, direction): - """ return destination ports based on test case direction """ - - return setup['spine_ports_ids'] if direction == 'tor->spine' else setup['tor_ports_ids'] + """Get the set of possible destination ports for the current test.""" + return setup["upstream_port_ids"] if direction == "downlink->uplink" else setup["downstream_port_ids"] - def get_dst_ip(self, setup, direction): - """ return allowed destination IP based on test case direction """ - - return setup['dst_ip_spine'] if direction == 'tor->spine' else setup['dst_ip_tor'] + def get_dst_ip(self, direction): + """Get the default destination IP for the current test.""" + return UPSTREAM_DST_IP if direction == "downlink->uplink" else DOWNSTREAM_DST_IP def tcp_packet(self, setup, direction, ptfadapter): - """ create TCP packet for testing """ - + """Generate a TCP packet for testing.""" return testutils.simple_tcp_packet( - eth_dst=setup['router_mac'], + eth_dst=setup["router_mac"], eth_src=ptfadapter.dataplane.get_mac(0, 0), - ip_dst=self.get_dst_ip(setup, direction), - ip_src='20.0.0.1', + ip_dst=self.get_dst_ip(direction), + ip_src=DEFAULT_SRC_IP, tcp_sport=0x4321, tcp_dport=0x51, - ip_ttl=64, + ip_ttl=64 ) def udp_packet(self, setup, direction, ptfadapter): - """ create UDP packet for testing """ - + """Generate a UDP packet for testing.""" return testutils.simple_udp_packet( - eth_dst=setup['router_mac'], + eth_dst=setup["router_mac"], eth_src=ptfadapter.dataplane.get_mac(0, 0), - ip_dst=self.get_dst_ip(setup, direction), - ip_src='20.0.0.1', + ip_dst=self.get_dst_ip(direction), + ip_src=DEFAULT_SRC_IP, udp_sport=1234, udp_dport=80, - ip_ttl=64, + ip_ttl=64 ) def icmp_packet(self, setup, direction, ptfadapter): - """ create ICMP packet for testing """ - + """Generate an ICMP packet for testing.""" return testutils.simple_icmp_packet( - eth_dst=setup['router_mac'], + eth_dst=setup["router_mac"], eth_src=ptfadapter.dataplane.get_mac(0, 0), - ip_dst=self.get_dst_ip(setup, direction), - ip_src='20.0.0.1', + ip_dst=self.get_dst_ip(direction), + ip_src=DEFAULT_SRC_IP, icmp_type=8, icmp_code=0, ip_ttl=64, ) def expected_mask_routed_packet(self, pkt): - """ return mask for routed packet """ - + """Generate the expected mask for a routed packet.""" exp_pkt = pkt.copy() - exp_pkt['IP'].ttl -= 1 + exp_pkt["IP"].ttl -= 1 exp_pkt = mask.Mask(exp_pkt) - exp_pkt.set_do_not_care_scapy(packet.Ether, 'dst') - exp_pkt.set_do_not_care_scapy(packet.Ether, 'src') - exp_pkt.set_do_not_care_scapy(packet.IP, 'chksum') + exp_pkt.set_do_not_care_scapy(packet.Ether, "dst") + exp_pkt.set_do_not_care_scapy(packet.Ether, "src") + exp_pkt.set_do_not_care_scapy(packet.IP, "chksum") return exp_pkt def test_unmatched_blocked(self, setup, direction, ptfadapter): - """ verify that unmatched packet is dropped """ - + """Verify that unmatched packets are dropped.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) def test_source_ip_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test source IP matched packet is forwarded """ - + """Verify that we can match and forward a packet on source IP.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.2' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.2" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(1) def test_rules_priority_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test rules priorities, forward rule case """ - + """Verify that we respect rule priorites in the forwarding case.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.7' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.7" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(20) def test_rules_priority_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test rules priorities, drop rule case """ - + """Verify that we respect rule priorites in the drop case.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.3' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.3" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(7) def test_dest_ip_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test destination IP matched packet forwarded """ - + """Verify that we can match and forward a packet on destination IP.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].dst = DST_IP_TOR_FORWARDED if direction == 'spine->tor' else DST_IP_SPINE_FORWARDED - exp_pkt = self.expected_mask_routed_packet(pkt) + pkt["IP"].dst = DOWNSTREAM_IP_TO_ALLOW if direction == "uplink->downlink" else UPSTREAM_IP_TO_ALLOW - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) - - counters_sanity_check.append(2 if direction == 'spine->tor' else 3) + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) + counters_sanity_check.append(2 if direction == "uplink->downlink" else 3) def test_dest_ip_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test destination IP matched packet dropped """ - + """Verify that we can match and drop a packet on destination IP.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].dst = DST_IP_TOR_BLOCKED if direction == 'spine->tor' else DST_IP_SPINE_BLOCKED - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].dst = DOWNSTREAM_IP_TO_BLOCK if direction == "uplink->downlink" else UPSTREAM_IP_TO_BLOCK - counters_sanity_check.append(15 if direction == 'spine->tor' else 16) + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) + counters_sanity_check.append(15 if direction == "uplink->downlink" else 16) def test_source_ip_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test source IP matched packet dropped """ - + """Verify that we can match and drop a packet on source IP.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.6' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.6" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(14) def test_udp_source_ip_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test UDP source IP matched packet forwarded """ - + """Verify that we can match and forward a UDP packet on source IP.""" pkt = self.udp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.4' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.4" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(13) def test_udp_source_ip_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test UDP destination IP matched packet dropped """ - + """Verify that we can match and drop a UDP packet on source IP.""" pkt = self.udp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.8' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.8" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(26) def test_icmp_source_ip_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test ICMP source IP matched packet dropped """ - + """Verify that we can match and drop an ICMP packet on source IP.""" pkt = self.icmp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.8' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.8" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(25) def test_icmp_source_ip_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test ICMP source IP matched packet forwarded """ - + """Verify that we can match and forward an ICMP packet on source IP.""" pkt = self.icmp_packet(setup, direction, ptfadapter) - pkt['IP'].src = '20.0.0.4' - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].src = "20.0.0.4" + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(12) def test_l4_dport_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 destination port matched packet forwarded """ - + """Verify that we can match and forward on L4 destination port.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].dport = 0x1217 - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].dport = 0x1217 + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(5) def test_l4_sport_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 source port matched packet forwarded """ - + """Verify that we can match and forward on L4 source port.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].sport = 0x120D - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].sport = 0x120D + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(4) def test_l4_dport_range_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 destination port range matched packet forwarded """ - + """Verify that we can match and forward on a range of L4 destination ports.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].dport = 0x123B - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].dport = 0x123B + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(11) def test_l4_sport_range_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 source port range matched packet forwarded """ - + """Verify that we can match and forward on a range of L4 source ports.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].sport = 0x123A - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].sport = 0x123A + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(10) def test_l4_dport_range_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 destination port range matched packet dropped """ - + """Verify that we can match and drop on a range of L4 destination ports.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].dport = 0x127B - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].dport = 0x127B + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(22) def test_l4_sport_range_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 source port range matched packet dropped """ - + """Verify that we can match and drop on a range of L4 source ports.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].sport = 0x1271 - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].sport = 0x1271 + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(17) def test_ip_proto_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test IP protocol matched packet forwarded""" - + """Verify that we can match and forward on the IP protocol.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].proto = 0x7E - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].proto = 0x7E + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(5) def test_tcp_flags_match_forwarded(self, setup, direction, ptfadapter, counters_sanity_check): - """ test TCP flags matched packet forwarded """ - + """Verify that we can match and forward on the TCP flags.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].flags = 0x1B - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].flags = 0x1B + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, False) counters_sanity_check.append(6) def test_l4_dport_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 destination port matched packet dropped """ - + """Verify that we can match and drop on L4 destination port.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].dport = 0x127B - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].dport = 0x127B + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(22) def test_l4_sport_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test L4 source port matched packet dropped """ - + """Verify that we can match and drop on L4 source port.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].sport = 0x1271 - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["TCP"].sport = 0x1271 + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(10) def test_ip_proto_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test IP protocol matched packet dropped """ - + """Verify that we can match and drop on the IP protocol.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['IP'].proto = 0x7F - exp_pkt = self.expected_mask_routed_packet(pkt) - - ptfadapter.dataplane.flush() - testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + pkt["IP"].proto = 0x7F + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) counters_sanity_check.append(18) def test_tcp_flags_match_dropped(self, setup, direction, ptfadapter, counters_sanity_check): - """ test TCP flags matched packet dropped """ - + """Verify that we can match and drop on the TCP flags.""" pkt = self.tcp_packet(setup, direction, ptfadapter) - pkt['TCP'].flags = 0x24 + pkt["TCP"].flags = 0x24 + + self._verify_acl_traffic(setup, direction, ptfadapter, pkt, True) + counters_sanity_check.append(5) + + def _verify_acl_traffic(self, setup, direction, ptfadapter, pkt, dropped): exp_pkt = self.expected_mask_routed_packet(pkt) ptfadapter.dataplane.flush() testutils.send(ptfadapter, self.get_src_port(setup, direction), pkt) - testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) - counters_sanity_check.append(5) + if dropped: + testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) + else: + testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=self.get_dst_ports(setup, direction)) class TestBasicAcl(BaseAclTest): - """ - Basic ACL rules traffic tests. - Setup rules using full update, run traffic tests cases. - """ + """Test Basic functionality of ACL rules (i.e. setup with full update on a running device).""" + + def setup_rules(self, dut, acl_table): + """Setup ACL rules for testing. + + Args: + dut: The DUT having ACLs applied. + acl_table: Configuration info for the ACL table. - def setup_rules(self, dut, setup, acl_table): - """ - setup rules on DUT - :param dut: dut host - :param setup: setup information - :param acl_table: acl table creating fixture - :return: """ - name = acl_table['name'] - dut_conf_file_path = os.path.join(setup['dut_tmp_dir'], 'acl_rules_{}.json'.format(name)) + table_name = acl_table["table_name"] + dut_conf_file_path = os.path.join(DUT_TMP_DIR, "acl_rules_{}.json".format(table_name)) - logger.info('generating config for ACL rules, ACL table {}'.format(name)) + logger.info("Generating basic ACL rules config for ACL table \"{}\"".format(table_name)) dut.template(src=os.path.join(TEMPLATE_DIR, ACL_RULES_FULL_TEMPLATE), dest=dut_conf_file_path) - logger.info('applying {}'.format(dut_conf_file_path)) - dut.command('config acl update full {}'.format(dut_conf_file_path)) + logger.info("Applying ACL rules config \"{}\"".format(dut_conf_file_path)) + dut.command("config acl update full {}".format(dut_conf_file_path)) class TestIncrementalAcl(BaseAclTest): - """ - Incremental ACL rules configuration traffic tests. - Setup rules using incremental update in two parts, run traffic tests cases. + """Test ACL rule functionality with an incremental configuration. + + Verify that everything still works as expected when an ACL configuration is applied in + multiple parts. """ - def setup_rules(self, dut, setup, acl_table): - """ - setup rules on DUT for incremental test - :param dut: dut host - :param setup: setup information - :param acl_table: acl table creating fixture - :return: + def setup_rules(self, dut, acl_table): + """Setup ACL rules for testing. + + Args: + dut: The DUT having ACLs applied. + acl_table: Configuration info for the ACL table. + """ - name = acl_table['name'] - logger.info('generate incremental config for ACL rule ACL table {table_name}'.format(table_name=name)) - for i, conf in enumerate(ACL_RULES_PART_TEMPLATES): - dut_conf_file_path = os.path.join(setup['dut_tmp_dir'], 'acl_rules_{}_part_{}.json'.format(name, i)) - dut.template(src=os.path.join(TEMPLATE_DIR, conf), dest=dut_conf_file_path) - logger.info('applying {}'.format(dut_conf_file_path)) - dut.command('config acl update incremental {}'.format(dut_conf_file_path)) + table_name = acl_table["table_name"] + + logger.info("Generating incremental ACL rules config for ACL table \"{}\"" + .format(table_name)) + + for part, config_file in enumerate(ACL_RULES_PART_TEMPLATES): + dut_conf_file_path = os.path.join(DUT_TMP_DIR, "acl_rules_{}_part_{}.json".format(table_name, part)) + dut.template(src=os.path.join(TEMPLATE_DIR, config_file), dest=dut_conf_file_path) + + logger.info("Applying ACL rules config \"{}\"".format(dut_conf_file_path)) + dut.command("config acl update incremental {}".format(dut_conf_file_path)) @pytest.mark.reboot class TestAclWithReboot(TestBasicAcl): - """ - Basic ACL rules traffic tests with reboot. - Verify that the ACL configurations persist after reboot + """Test ACL rule functionality with a reboot. + + Verify that configuration persists correctly after reboot and is applied properly + upon startup. """ - def post_setup_hook(self, dut, localhost): - """ - save configuration and execute reboot after rules are applied - :param dut: dut host - :param localhost: localhost object - :return: + def post_setup_hook(self, dut, localhost, populate_vlan_arp_entries): + """Save configuration and reboot after rules are applied. + + Args: + dut: The DUT having ACLs applied. + localhost: The host from which tests are run. + populate_vlan_arp_entries: A fixture to populate ARP/FDB tables for VLAN interfaces. + """ - dut.command('config save -y') + dut.command("config save -y") reboot(dut, localhost) + populate_vlan_arp_entries() @pytest.mark.port_toggle class TestAclWithPortToggle(TestBasicAcl): - """ - Basic ACL rules traffic tests with port toggle. - Toggles ports before traffic tests. + """Test ACL rule functionality after toggling ports. + + Verify that ACLs still function as expected after links flap. """ - def post_setup_hook(self, dut, localhost): - """ - toggle ports after rules are applied - :param dut: dut host - :param localhost: localhost object - :return: + def post_setup_hook(self, dut, localhost, populate_vlan_arp_entries): + """Toggle ports after rules are applied. + + Args: + dut: The DUT having ACLs applied. + localhost: The host from which tests are run. + populate_vlan_arp_entries: A fixture to populate ARP/FDB tables for VLAN interfaces. + """ port_toggle(dut) + populate_vlan_arp_entries()