Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 77 additions & 72 deletions tests/cacl/test_cacl_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from tests.common.helpers.assertions import pytest_assert
from tests.common.helpers.snmp_helpers import get_snmp_facts
from tests.common.utilities import get_data_acl, recover_acl_rule

try:
import ntplib
Expand All @@ -19,10 +20,11 @@
SONIC_SSH_REGEX = 'OpenSSH_[\\w\\.]+ Debian'


def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, creds, recover_acl_rule):
def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, creds):
"""Test control plane ACL functionality on a SONiC device"""

duthost = duthosts[enum_rand_one_per_hwsku_hostname]
data_acl = get_data_acl(duthost)
dut_mgmt_ip = duthost.mgmt_ip

# Start an NTP client
Expand All @@ -46,76 +48,79 @@ def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, cr
ntp_client.request(dut_mgmt_ip)
except ntplib.NTPException:
pytest.fail("NTP did timed out when expected to succeed!")

# Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
duthost.copy(src="scripts/config_service_acls.sh", dest="/tmp/config_service_acls.sh", mode="0755")

# We run the config_service_acls.sh script in the background because it
# will install ACL rules which will only allow control plane traffic
# to an unused IP range. Thus, if it works properly, it will sever our
# SSH session, but we don't want the script itself to get killed,
# because it is also responsible for resetting the control plane ACLs
# back to their previous, working state
duthost.shell("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &")

# Wait until we are unable to SSH into the DuT
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='stopped',
search_regex=SONIC_SSH_REGEX,
delay=30,
timeout=40,
module_ignore_errors=True)

pytest_assert(not res.is_failed, "SSH port did not stop. {}".format(res.get('msg', '')))

# Try to SSH back into the DuT, it should time out
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='started',
search_regex=SONIC_SSH_REGEX,
delay=0,
timeout=10,
module_ignore_errors=True)

pytest_assert(res.is_failed, "SSH did not timeout when expected. {}".format(res.get('msg', '')))

# Ensure we CANNOT gather basic SNMP facts from the device
res = get_snmp_facts(localhost, host=dut_mgmt_ip, version='v2c', community=creds['snmp_rocommunity'],
module_ignore_errors=True)

pytest_assert('ansible_facts' not in res and "No SNMP response received before timeout" in res.get('msg', ''))

# Ensure we cannot send an NTP request to the DUT
if NTPLIB_INSTALLED:
try:
ntp_client.request(dut_mgmt_ip)
pytest.fail("NTP did not time out when expected")
except ntplib.NTPException:
pass

# Wait until the original service ACLs are reinstated and the SSH port on the
# DUT is open to us once again. Note that the timeout here should be set sufficiently
# long enough to allow config_service_acls.sh to reset the ACLs to their original
# configuration.
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='started',
search_regex=SONIC_SSH_REGEX,
delay=0,
timeout=90,
try:
# Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
duthost.copy(src="scripts/config_service_acls.sh", dest="/tmp/config_service_acls.sh", mode="0755")

# We run the config_service_acls.sh script in the background because it
# will install ACL rules which will only allow control plane traffic
# to an unused IP range. Thus, if it works properly, it will sever our
# SSH session, but we don't want the script itself to get killed,
# because it is also responsible for resetting the control plane ACLs
# back to their previous, working state
duthost.shell("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &")

# Wait until we are unable to SSH into the DuT
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='stopped',
search_regex=SONIC_SSH_REGEX,
delay=30,
timeout=40,
module_ignore_errors=True)

pytest_assert(not res.is_failed, "SSH port did not stop. {}".format(res.get('msg', '')))

# Try to SSH back into the DuT, it should time out
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='started',
search_regex=SONIC_SSH_REGEX,
delay=0,
timeout=10,
module_ignore_errors=True)

pytest_assert(res.is_failed, "SSH did not timeout when expected. {}".format(res.get('msg', '')))

# Ensure we CANNOT gather basic SNMP facts from the device
res = get_snmp_facts(localhost, host=dut_mgmt_ip, version='v2c', community=creds['snmp_rocommunity'],
module_ignore_errors=True)

pytest_assert(not res.is_failed, "SSH did not start working when expected. {}".format(res.get('msg', '')))

# Delete config_service_acls.sh from the DuT
duthost.file(path="/tmp/config_service_acls.sh", state="absent")

# Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
get_snmp_facts(localhost,
host=dut_mgmt_ip,
version="v2c",
community=creds['snmp_rocommunity'],
wait=True,
timeout=120,
interval=20)
pytest_assert('ansible_facts' not in res and "No SNMP response received before timeout" in res.get('msg', ''))

# Ensure we cannot send an NTP request to the DUT
if NTPLIB_INSTALLED:
try:
ntp_client.request(dut_mgmt_ip)
pytest.fail("NTP did not time out when expected")
except ntplib.NTPException:
pass

# Wait until the original service ACLs are reinstated and the SSH port on the
# DUT is open to us once again. Note that the timeout here should be set sufficiently
# long enough to allow config_service_acls.sh to reset the ACLs to their original
# configuration.
res = localhost.wait_for(host=dut_mgmt_ip,
port=SONIC_SSH_PORT,
state='started',
search_regex=SONIC_SSH_REGEX,
delay=0,
timeout=90,
module_ignore_errors=True)

pytest_assert(not res.is_failed, "SSH did not start working when expected. {}".format(res.get('msg', '')))

# Delete config_service_acls.sh from the DuT
duthost.file(path="/tmp/config_service_acls.sh", state="absent")

# Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
get_snmp_facts(localhost,
host=dut_mgmt_ip,
version="v2c",
community=creds['snmp_rocommunity'],
wait=True,
timeout=120,
interval=20)
finally:
if data_acl:
recover_acl_rule(duthost, data_acl)
39 changes: 39 additions & 0 deletions tests/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import threading
import time
import traceback
import copy
import tempfile
from io import StringIO
from ast import literal_eval

Expand Down Expand Up @@ -938,6 +940,43 @@ def delete_running_config(config_entry, duthost, is_json=True):
duthost.shell("rm -f {}".format("/tmp/del_config_entry.json"))


def get_data_acl(duthost):
acl_facts = duthost.acl_facts()["ansible_facts"]["ansible_acl_facts"]
pre_acl_rules = acl_facts.get("DATAACL", {}).get("rules", None)
return pre_acl_rules


def recover_acl_rule(duthost, data_acl):
base_dir = os.path.dirname(os.path.realpath(__file__))
template_dir = os.path.join(base_dir, "templates")
acl_rules_template = "default_acl_rules.json"
dut_tmp_dir = "/tmp"
dut_conf_file_path = os.path.join(dut_tmp_dir, acl_rules_template)

for key, value in data_acl.items():
if key != "DEFAULT_RULE":
seq_id = key.split('_')[1]
acl_config = json.loads(open(os.path.join(template_dir, acl_rules_template)).read())
acl_entry_template = \
acl_config["acl"]["acl-sets"]["acl-set"]["dataacl"]["acl-entries"]["acl-entry"]["1"]
acl_entry_config = acl_config["acl"]["acl-sets"]["acl-set"]["dataacl"]["acl-entries"]["acl-entry"]

acl_entry_config[seq_id] = copy.deepcopy(acl_entry_template)
acl_entry_config[seq_id]["config"]["sequence-id"] = seq_id
acl_entry_config[seq_id]["l2"]["config"]["ethertype"] = value["ETHER_TYPE"]
acl_entry_config[seq_id]["l2"]["config"]["vlan_id"] = value["VLAN_ID"]
acl_entry_config[seq_id]["input_interface"]["interface_ref"]["config"]["interface"] = value["IN_PORTS"]

with tempfile.NamedTemporaryFile(suffix=".json", prefix="acl_config", mode="w") as fp:
json.dump(acl_config, fp)
fp.flush()
logger.info("Generating config for ACL rule, ACL table - DATAACL")
duthost.template(src=fp.name, dest=dut_conf_file_path, force=True)

logger.info("Applying {}".format(dut_conf_file_path))
duthost.command("acl-loader update full {}".format(dut_conf_file_path))


def get_ipv4_loopback_ip(duthost):
"""
Get ipv4 loopback ip address
Expand Down
41 changes: 0 additions & 41 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import getpass
import random
import re
import tempfile

import pytest
import yaml
Expand Down Expand Up @@ -2260,43 +2259,3 @@ def format_failure(port, failure):
# HACK: We are using set_do_not_care_scapy but it will be deprecated.
if not hasattr(Mask, "set_do_not_care_scapy"):
Mask.set_do_not_care_scapy = Mask.set_do_not_care_packet


@pytest.fixture(scope="module")
def recover_acl_rule(duthosts, enum_rand_one_per_hwsku_hostname):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]

base_dir = os.path.dirname(os.path.realpath(__file__))
template_dir = os.path.join(base_dir, "common/templates")
acl_rules_template = "default_acl_rules.json"

dut_tmp_dir = "/tmp"
dut_conf_file_path = os.path.join(dut_tmp_dir, acl_rules_template)

pre_acl_rules = duthost.acl_facts()["ansible_facts"]["ansible_acl_facts"]["DATAACL"]["rules"]

yield

if pre_acl_rules:
for key, value in pre_acl_rules.items():
if key != "DEFAULT_RULE":
seq_id = key.split('_')[1]
acl_config = json.loads(open(os.path.join(template_dir, acl_rules_template)).read())
acl_entry_template = \
acl_config["acl"]["acl-sets"]["acl-set"]["dataacl"]["acl-entries"]["acl-entry"]["1"]
acl_entry_config = acl_config["acl"]["acl-sets"]["acl-set"]["dataacl"]["acl-entries"]["acl-entry"]

acl_entry_config[seq_id] = copy.deepcopy(acl_entry_template)
acl_entry_config[seq_id]["config"]["sequence-id"] = seq_id
acl_entry_config[seq_id]["l2"]["config"]["ethertype"] = value["ETHER_TYPE"]
acl_entry_config[seq_id]["l2"]["config"]["vlan_id"] = value["VLAN_ID"]
acl_entry_config[seq_id]["input_interface"]["interface_ref"]["config"]["interface"] = value["IN_PORTS"]

with tempfile.NamedTemporaryFile(suffix=".json", prefix="acl_config", mode="w") as fp:
json.dump(acl_config, fp)
fp.flush()
logger.info("Generating config for ACL rule, ACL table - DATAACL")
duthost.template(src=fp.name, dest=dut_conf_file_path, force=True)

logger.info("Applying {}".format(dut_conf_file_path))
duthost.command("acl-loader update full {}".format(dut_conf_file_path))
6 changes: 5 additions & 1 deletion tests/crm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from test_crm import RESTORE_CMDS
from tests.common.helpers.crm import CRM_POLLING_INTERVAL
from tests.common.errors import RunAnsibleModuleFail
from tests.common.utilities import recover_acl_rule

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,7 +54,10 @@ def pytest_runtest_teardown(item, nextitem):
for cmd in RESTORE_CMDS[test_name]:
logger.info(cmd)
try:
dut.shell(cmd)
if isinstance(cmd, dict):
recover_acl_rule(dut, cmd["data_acl"])
else:
dut.shell(cmd)
except RunAnsibleModuleFail as err:
failures.append("Failure during command execution '{command}':\n{error}"
.format(command=cmd, error=str(err)))
Expand Down
56 changes: 30 additions & 26 deletions tests/crm/test_crm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer
from tests.common.helpers.assertions import pytest_assert
from tests.common.helpers.crm import get_used_percent, CRM_UPDATE_TIME, CRM_POLLING_INTERVAL, EXPECT_EXCEEDED, \
EXPECT_CLEAR, THR_VERIFY_CMDS
EXPECT_CLEAR, THR_VERIFY_CMDS
from tests.common.fixtures.duthost_utils import disable_route_checker # noqa F401
from tests.common.fixtures.duthost_utils import disable_fdb_aging # noqa F401
from tests.common.utilities import wait_until
from tests.common.utilities import wait_until, get_data_acl


pytestmark = [
Expand Down Expand Up @@ -861,36 +861,40 @@ def recreate_acl_table(duthost, ports):


def test_acl_entry(duthosts, enum_rand_one_per_hwsku_frontend_hostname, enum_frontend_asic_index,
collector, tbinfo, recover_acl_rule):
collector, tbinfo):
duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]
data_acl = get_data_acl(duthost)
asichost = duthost.asic_instance(enum_frontend_asic_index)
asic_collector = collector[asichost.asic_index]

if duthost.facts["asic_type"] == "marvell":
# Remove DATA ACL Table and add it again with ports in same port group
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
tmp_ports = sorted(mg_facts["minigraph_ports"], key=lambda x: int(x[8:]))
for i in range(4):
if i == 0:
ports = ",".join(tmp_ports[17:19])
elif i == 1:
ports = ",".join(tmp_ports[24:26])
elif i == 2:
ports = ",".join([tmp_ports[20], tmp_ports[25]])
recreate_acl_table(duthost, ports)
try:
if duthost.facts["asic_type"] == "marvell":
# Remove DATA ACL Table and add it again with ports in same port group
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
tmp_ports = sorted(mg_facts["minigraph_ports"], key=lambda x: int(x[8:]))
for i in range(4):
if i == 0:
ports = ",".join(tmp_ports[17:19])
elif i == 1:
ports = ",".join(tmp_ports[24:26])
elif i == 2:
ports = ",".join([tmp_ports[20], tmp_ports[25]])
recreate_acl_table(duthost, ports)
verify_acl_crm_stats(duthost, asichost, enum_rand_one_per_hwsku_frontend_hostname,
enum_frontend_asic_index, asic_collector, tbinfo)
# Rebind DATA ACL at end to recover original config
recreate_acl_table(duthost, ports)
apply_acl_config(duthost, asichost, "test_acl_entry", asic_collector)
duthost.command("acl-loader delete")
else:
verify_acl_crm_stats(duthost, asichost, enum_rand_one_per_hwsku_frontend_hostname,
enum_frontend_asic_index, asic_collector, tbinfo)
# Rebind DATA ACL at end to recover original config
recreate_acl_table(duthost, ports)
apply_acl_config(duthost, asichost, "test_acl_entry", asic_collector)
duthost.command("acl-loader delete")
else:
verify_acl_crm_stats(duthost, asichost, enum_rand_one_per_hwsku_frontend_hostname,
enum_frontend_asic_index, asic_collector, tbinfo)

pytest_assert(crm_stats_checker,
"\"crm_stats_acl_entry_used\" counter was not decremented or "
"\"crm_stats_acl_entry_available\" counter was not incremented")
pytest_assert(crm_stats_checker,
"\"crm_stats_acl_entry_used\" counter was not decremented or "
"\"crm_stats_acl_entry_available\" counter was not incremented")
finally:
if data_acl:
RESTORE_CMDS["test_acl_entry"].append({"data_acl": data_acl})


def verify_acl_crm_stats(duthost, asichost, enum_rand_one_per_hwsku_frontend_hostname,
Expand Down