diff --git a/tests/acl/test_acl.py b/tests/acl/test_acl.py index 07e4c41241d..6b02cc61b09 100644 --- a/tests/acl/test_acl.py +++ b/tests/acl/test_acl.py @@ -15,6 +15,7 @@ from tests.common import reboot, port_toggle from tests.common.helpers.assertions import pytest_require, pytest_assert +from tests.common.helpers.multi_thread_utils import SafeThreadPoolExecutor from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer, LogAnalyzerError from tests.common.config_reload import config_reload from tests.common.fixtures.ptfhost_utils import copy_arp_responder_py, run_garp_service, change_mac_addresses # noqa F401 @@ -148,25 +149,31 @@ def remove_dataacl_table(duthosts): The change is written to configdb as we don't want DATAACL recovered after reboot """ TABLE_NAME = "DATAACL" - for duthost in duthosts: - lines = duthost.shell(cmd="show acl table {}".format(TABLE_NAME))['stdout_lines'] - data_acl_existing = False - for line in lines: - if TABLE_NAME in line: - data_acl_existing = True - break - if data_acl_existing: - # Remove DATAACL - logger.info("Removing ACL table {}".format(TABLE_NAME)) - cmds = [ - "config acl remove table {}".format(TABLE_NAME), - "config save -y" - ] - duthost.shell_cmds(cmds=cmds) + with SafeThreadPoolExecutor(max_workers=8) as executor: + for duthost in duthosts: + executor.submit(remove_dataacl_table_single_dut, TABLE_NAME, duthost) yield - # Recover DUT by reloading minigraph - for duthost in duthosts: - config_reload(duthost, config_source="minigraph") + with SafeThreadPoolExecutor(max_workers=8) as executor: + # Recover DUT by reloading minigraph + for duthost in duthosts: + executor.submit(config_reload, duthost, config_source="minigraph", safe_reload=True) + + +def remove_dataacl_table_single_dut(table_name, duthost): + lines = duthost.shell(cmd="show acl table {}".format(table_name))['stdout_lines'] + data_acl_existing = False + for line in lines: + if table_name in line: + data_acl_existing = True + break + if data_acl_existing: + # Remove DATAACL + logger.info("{} Removing ACL table {}".format(duthost.hostname, table_name)) + cmds = [ + "config acl remove table {}".format(table_name), + "config save -y" + ] + duthost.shell_cmds(cmds=cmds) def get_t2_info(duthosts, tbinfo): @@ -427,7 +434,6 @@ def populate_vlan_arp_entries(setup, ptfhost, duthosts, rand_one_dut_hostname, i global DOWNSTREAM_IP_PORT_MAP # For m0 topo, need to refresh this constant for two different scenario DOWNSTREAM_IP_PORT_MAP = {} - duthost = duthosts[rand_one_dut_hostname] if setup["topo"] not in ["t0", "mx", "m0_vlan"]: def noop(): pass @@ -466,7 +472,8 @@ def populate_arp_table(): dut.command("sonic-clear arp") dut.command("sonic-clear ndp") # Wait some time to ensure the async call of clear is completed - time.sleep(20) + time.sleep(20) + for dut in duthosts: for addr in addr_list: dut.command("ping {} -c 3".format(addr), module_ignore_errors=True) @@ -477,9 +484,10 @@ def populate_arp_table(): logging.info("Stopping ARP responder") ptfhost.shell("supervisorctl stop arp_responder", module_ignore_errors=True) - duthost.command("sonic-clear fdb all") - duthost.command("sonic-clear arp") - duthost.command("sonic-clear ndp") + for dut in duthosts: + dut.command("sonic-clear fdb all") + dut.command("sonic-clear arp") + dut.command("sonic-clear ndp") @pytest.fixture(scope="module", params=["ingress", "egress"]) @@ -565,34 +573,42 @@ def acl_table(duthosts, rand_one_dut_hostname, setup, stage, ip_version, tbinfo, dut_to_analyzer_map = {} - for duthost in duthosts: - if duthost.is_supervisor_node(): - continue - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl") - loganalyzer.load_common_config() - dut_to_analyzer_map[duthost] = loganalyzer - - try: - loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_CREATE_RE] - # Ignore any other errors to reduce noise - loganalyzer.ignore_regex = [r".*"] - with loganalyzer: - create_or_remove_acl_table(duthost, acl_table_config, setup, "add", topo) - wait_until(300, 20, 0, check_msg_in_syslog, - duthost, LOG_EXPECT_ACL_TABLE_CREATE_RE) - except LogAnalyzerError as err: - # Cleanup Config DB if table creation failed - logger.error("ACL table creation failed, attempting to clean-up...") - create_or_remove_acl_table(duthost, acl_table_config, setup, "remove", topo) - raise err - + with SafeThreadPoolExecutor(max_workers=8) as executor: + for duthost in duthosts: + executor.submit(set_up_acl_table_single_dut, acl_table_config, dut_to_analyzer_map, duthost, setup, topo) try: yield acl_table_config finally: - for duthost, loganalyzer in list(dut_to_analyzer_map.items()): - loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_REMOVE_RE] - with loganalyzer: - create_or_remove_acl_table(duthost, acl_table_config, setup, "remove", topo) + with SafeThreadPoolExecutor(max_workers=8) as executor: + for duthost, loganalyzer in list(dut_to_analyzer_map.items()): + executor.submit(tear_down_acl_table_single_dut, acl_table_config, duthost, loganalyzer, setup, topo) + + +def tear_down_acl_table_single_dut(acl_table_config, duthost, loganalyzer, setup, topo): + loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_REMOVE_RE] + with loganalyzer: + create_or_remove_acl_table(duthost, acl_table_config, setup, "remove", topo) + + +def set_up_acl_table_single_dut(acl_table_config, dut_to_analyzer_map, duthost, setup, topo): + if duthost.is_supervisor_node(): + return + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl") + loganalyzer.load_common_config() + dut_to_analyzer_map[duthost] = loganalyzer + try: + loganalyzer.expect_regex = [LOG_EXPECT_ACL_TABLE_CREATE_RE] + # Ignore any other errors to reduce noise + loganalyzer.ignore_regex = [r".*"] + with loganalyzer: + create_or_remove_acl_table(duthost, acl_table_config, setup, "add", topo) + wait_until(300, 20, 0, check_msg_in_syslog, + duthost, LOG_EXPECT_ACL_TABLE_CREATE_RE) + except LogAnalyzerError as err: + # Cleanup Config DB if table creation failed + logger.error("ACL table creation failed, attempting to clean-up...") + create_or_remove_acl_table(duthost, acl_table_config, setup, "remove", topo) + raise err class BaseAclTest(six.with_metaclass(ABCMeta, object)): @@ -660,43 +676,60 @@ def acl_rules(self, duthosts, localhost, setup, acl_table, populate_vlan_arp_ent """ dut_to_analyzer_map = {} - for duthost in duthosts: - if duthost.is_supervisor_node(): - continue - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl_rules") - loganalyzer.load_common_config() - dut_to_analyzer_map[duthost] = loganalyzer - - try: - loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_CREATE_RE] - # Ignore any other errors to reduce noise - loganalyzer.ignore_regex = [r".*"] - with loganalyzer: - self.setup_rules(duthost, acl_table, ip_version) - # Give the dut some time for the ACL rules to be applied and LOG message generated - wait_until(300, 20, 0, check_msg_in_syslog, - duthost, LOG_EXPECT_ACL_RULE_CREATE_RE) - - self.post_setup_hook(duthost, localhost, populate_vlan_arp_entries, tbinfo, conn_graph_facts) - - assert self.check_rule_counters(duthost), "Rule counters should be ready!" - - except LogAnalyzerError as err: - # Cleanup Config DB if rule creation failed - logger.error("ACL rule application failed, attempting to clean-up...") - self.teardown_rules(duthost) - raise err + + with SafeThreadPoolExecutor(max_workers=8) as executor: + for duthost in duthosts: + executor.submit(self.set_up_acl_rules_single_dut, acl_table, conn_graph_facts, + dut_to_analyzer_map, duthost, ip_version, localhost, + populate_vlan_arp_entries, tbinfo) + logger.info("Set up acl_rules finished") try: yield finally: - for duthost, loganalyzer in list(dut_to_analyzer_map.items()): - if duthost.is_supervisor_node(): - continue - loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_REMOVE_RE] - with loganalyzer: - logger.info("Removing ACL rules") - self.teardown_rules(duthost) + with SafeThreadPoolExecutor(max_workers=8) as executor: + for duthost, loganalyzer in list(dut_to_analyzer_map.items()): + executor.submit(self.tear_down_acl_rule_single_dut, duthost, loganalyzer) + logger.info("Tear down acl_rules finished") + + def tear_down_acl_rule_single_dut(self, duthost, loganalyzer): + if duthost.is_supervisor_node(): + return + loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_REMOVE_RE] + with loganalyzer: + logger.info("Removing ACL rules") + self.teardown_rules(duthost) + + def set_up_acl_rules_single_dut(self, acl_table, + conn_graph_facts, dut_to_analyzer_map, duthost, # noqa F811 + ip_version, localhost, + populate_vlan_arp_entries, tbinfo): + logger.info("{}: ACL rule application started".format(duthost.hostname)) + if duthost.is_supervisor_node(): + return + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="acl_rules") + loganalyzer.load_common_config() + dut_to_analyzer_map[duthost] = loganalyzer + try: + loganalyzer.expect_regex = [LOG_EXPECT_ACL_RULE_CREATE_RE] + # Ignore any other errors to reduce noise + loganalyzer.ignore_regex = [r".*"] + with loganalyzer: + self.setup_rules(duthost, acl_table, ip_version) + # Give the dut some time for the ACL rules to be applied and LOG message generated + wait_until(300, 20, 0, check_msg_in_syslog, + duthost, LOG_EXPECT_ACL_RULE_CREATE_RE) + + self.post_setup_hook(duthost, localhost, populate_vlan_arp_entries, tbinfo, conn_graph_facts) + + assert self.check_rule_counters(duthost), "Rule counters should be ready!" + + except LogAnalyzerError as err: + # Cleanup Config DB if rule creation failed + logger.error("ACL rule application failed, attempting to clean-up...") + self.teardown_rules(duthost) + raise err + logger.info("{}: ACL rule application finished".format(duthost.hostname)) @pytest.yield_fixture(scope="class", autouse=True) def counters_sanity_check(self, duthosts, acl_rules, acl_table): diff --git a/tests/common/config_reload.py b/tests/common/config_reload.py index 4634ea1598a..1fc41582c2a 100644 --- a/tests/common/config_reload.py +++ b/tests/common/config_reload.py @@ -203,9 +203,10 @@ def _config_reload_cmd_wrapper(cmd, executable): wait_critical_processes(sonic_host) # PFCWD feature does not enable on some topology, for example M0 if config_source == 'minigraph' and pfcwd_feature_enabled(sonic_host): - pytest_assert(wait_until(wait + 300, 20, 0, chk_for_pfc_wd, sonic_host), - "PFC_WD is missing in CONFIG-DB") - + # Supervisor node doesn't have PFC_WD + if not sonic_host.is_supervisor_node(): + pytest_assert(wait_until(wait + 300, 20, 0, chk_for_pfc_wd, sonic_host), + "PFC_WD is missing in CONFIG-DB") if check_intf_up_ports: pytest_assert(wait_until(wait + 300, 20, 0, check_interface_status_of_up_ports, sonic_host), "Not all ports that are admin up on are operationally up") diff --git a/tests/common/helpers/multi_thread_utils.py b/tests/common/helpers/multi_thread_utils.py new file mode 100644 index 00000000000..09c7ca7cab6 --- /dev/null +++ b/tests/common/helpers/multi_thread_utils.py @@ -0,0 +1,27 @@ +from concurrent.futures import Future, as_completed +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Optional, List + + +class SafeThreadPoolExecutor(ThreadPoolExecutor): + """An enhanced thread pool executor + + Everytime we submit a task, it will store the feature in self.features + On the __exit__ function, it will wait all the tasks to be finished, + And check any exceptions that are raised during the task executing + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.features: Optional[List[Future]] = [] + + def submit(self, __fn, *args, **kwargs): + f = super().submit(__fn, *args, **kwargs) + self.features.append(f) + return f + + def __exit__(self, exc_type, exc_val, exc_tb): + for future in as_completed(self.features): + # if exception caught in the sub-thread, .result() will raise it in the main thread + _ = future.result() + self.shutdown(wait=True) + return False diff --git a/tests/common/port_toggle.py b/tests/common/port_toggle.py index 25557a04ba2..890288394b7 100644 --- a/tests/common/port_toggle.py +++ b/tests/common/port_toggle.py @@ -80,7 +80,7 @@ def __get_down_ports(expect_up=True): if not startup_ok: down_ports = __get_down_ports() - startup_err_msg = "Some ports did not come up as expected: {}".format(str(down_ports)) + startup_err_msg = "{}: Some ports did not come up as expected: {}".format(duthost.hostname, str(down_ports)) except Exception as e: startup_err_msg = "Startup ports failed with exception: {}".format(repr(e)) diff --git a/tests/common/reboot.py b/tests/common/reboot.py index 1bfa8b04528..cf28aebc028 100644 --- a/tests/common/reboot.py +++ b/tests/common/reboot.py @@ -285,12 +285,12 @@ def reboot(duthost, localhost, reboot_type='cold', delay=10, # minutes to the maximum wait time. If it's ready sooner, then the # function will return sooner. pytest_assert(wait_until(wait + 400, 20, 0, duthost.critical_services_fully_started), - "All critical services should be fully started!") + "{}: All critical services should be fully started!".format(hostname)) wait_critical_processes(duthost) if check_intf_up_ports: pytest_assert(wait_until(wait + 300, 20, 0, check_interface_status_of_up_ports, duthost), - "Not all ports that are admin up on are operationally up") + "{}: Not all ports that are admin up on are operationally up".format(hostname)) else: time.sleep(wait)