Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions tests/cacl/test_cacl_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from tests.common.config_reload import config_reload
from tests.common.utilities import wait_until

from tests.common.dualtor.mux_simulator_control import toggle_all_simulator_ports_to_upper_tor # lgtm[py/unused-import]
from tests.common.dualtor.dual_tor_utils import upper_tor_host, lower_tor_host # lgtm[py/unused-import]
from tests.common.helpers.assertions import pytest_assert

logger = logging.getLogger(__name__)
Expand All @@ -30,6 +31,34 @@ def ignore_hardcoded_cacl_rule_on_dualtor(tbinfo):
]
ignored_iptable_rules += rules_to_ignore

@pytest.fixture(scope="function", params=["active_tor", "standby_tor"])
def duthost_dualtor(request, upper_tor_host, lower_tor_host, toggle_all_simulator_ports_to_upper_tor):
which_tor = request.param

# Add expected DHCP mark iptable rules for standby tor, not for active tor.
if which_tor == 'standby_tor':
dut = lower_tor_host
logger.info("Select standby tor, generate expected DHCP iptables rules for standby tor.")
else:
logger.info("Select active tor, don't need to add expected DHCP mark rules.")
dut = upper_tor_host
return dut

@pytest.fixture
def expected_dhcp_rules_for_standby(duthost_dualtor, lower_tor_host):
if duthost_dualtor.hostname == lower_tor_host.hostname:
expected_dhcp_rules = []
mark_keys = duthost_dualtor.shell('/usr/bin/redis-cli -n 6 --raw keys "DHCP_PACKET_MARK*"', module_ignore_errors=True)['stdout']
mark_keys = mark_keys.split("\n")
for key in mark_keys:
mark = duthost_dualtor.shell('/usr/bin/redis-cli -n 6 --raw hget "{}" "mark"'.format(key), module_ignore_errors=False)['stdout']
if not mark:
continue
rule = "-A DHCP -m mark --mark {} -j DROP".format(mark)
expected_dhcp_rules.append(rule)
return expected_dhcp_rules
else:
return

@pytest.fixture(scope="module")
def docker_network(duthost):
Expand Down Expand Up @@ -299,7 +328,7 @@ def generate_and_append_block_ip2me_traffic_rules(duthost, iptables_rules, ip6ta
pytest.fail("Unrecognized IP address type on interface '{}': {}".format(iface_name, ip_ntwrk))


def generate_expected_rules(duthost, docker_network, asic_index):
def generate_expected_rules(duthost, docker_network, asic_index, expected_dhcp_rules_for_standby):
iptables_rules = []
ip6tables_rules = []

Expand Down Expand Up @@ -364,6 +393,10 @@ def generate_expected_rules(duthost, docker_network, asic_index):
iptables_rules.append("-A INPUT -p udp -m udp --dport 546:547 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p udp -m udp --dport 546:547 -j ACCEPT")

# On standby tor, it has expected dhcp mark iptables rules.
if expected_dhcp_rules_for_standby:
iptables_rules.extend(expected_dhcp_rules_for_standby)

# Allow all incoming BGP traffic
iptables_rules.append("-A INPUT -p tcp -m tcp --dport 179 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p tcp -m tcp --dport 179 -j ACCEPT")
Expand Down Expand Up @@ -690,8 +723,8 @@ def generate_scale_rules(duthost, ip_type):
# add ACCEPT rule for SSH to make sure testbed access
duthost.command("iptables -I INPUT 3 -p tcp -m tcp --dport 22 -j ACCEPT")

def verify_cacl(duthost, localhost, creds, docker_network, asic_index = None):
expected_iptables_rules, expected_ip6tables_rules = generate_expected_rules(duthost, docker_network, asic_index)
def verify_cacl(duthost, localhost, creds, docker_network, expected_dhcp_rules_for_standby, asic_index = None):
expected_iptables_rules, expected_ip6tables_rules = generate_expected_rules(duthost, docker_network, asic_index, expected_dhcp_rules_for_standby)

stdout = duthost.get_asic_or_sonic_host(asic_index).command("iptables -S")["stdout"]
actual_iptables_rules = stdout.strip().split("\n")
Expand Down Expand Up @@ -758,7 +791,7 @@ def verify_nat_cacl(duthost, localhost, creds, docker_network, asic_index):
unexpected_ip6tables_rules = set(actual_ip6tables_rules) - set(expected_ip6tables_rules)
pytest_assert(len(unexpected_ip6tables_rules) == 0, "Unexpected ip6tables nat rules: {}".format(repr(unexpected_ip6tables_rules)))

def test_cacl_application(duthosts, rand_one_dut_hostname, localhost, creds, docker_network):
def test_cacl_application_nondualtor(duthosts, rand_one_dut_hostname, localhost, creds, docker_network):
"""
Test case to ensure caclmgrd is applying control plane ACLs properly

Expand All @@ -767,13 +800,22 @@ def test_cacl_application(duthosts, rand_one_dut_hostname, localhost, creds, doc
actual iptables/ip6tables rules on the DuT.
"""
duthost = duthosts[rand_one_dut_hostname]
verify_cacl(duthost, localhost, creds, docker_network)
verify_cacl(duthost, localhost, creds, docker_network, None)

def test_multiasic_cacl_application(duthosts, rand_one_dut_hostname, localhost, creds,docker_network, enum_frontend_asic_index):
def test_cacl_application_dualtor(duthost_dualtor, localhost, creds, docker_network, expected_dhcp_rules_for_standby):
"""
Test case to ensure caclmgrd is applying control plane ACLs properly on dualtor.

if enum_frontend_asic_index is None:
pytest.skip("Not Multi-asic platform. Skipping !!")
This is done by generating our own set of expected iptables and ip6tables
rules based on the DuT's configuration and comparing them against the
actual iptables/ip6tables rules on the DuT.
"""
verify_cacl(duthost_dualtor, localhost, creds, docker_network, expected_dhcp_rules_for_standby)

def test_multiasic_cacl_application(duthosts, rand_one_dut_hostname, localhost, creds,docker_network, enum_frontend_asic_index):
"""
Test case to ensure caclmgrd is applying control plane ACLs properly on multi-ASIC platform.
"""
duthost = duthosts[rand_one_dut_hostname]
verify_cacl(duthost, localhost, creds, docker_network, enum_frontend_asic_index)
verify_nat_cacl(duthost, localhost, creds, docker_network, enum_frontend_asic_index)
Expand Down
1 change: 1 addition & 0 deletions tests/common/dualtor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from tests.common.dualtor.dual_tor_utils import *
from tests.common.dualtor.mux_simulator_control import *
from tests.common.dualtor.nic_simulator_control import *
from tests.common.dualtor.dual_tor_common import *
3 changes: 3 additions & 0 deletions tests/common/dualtor/dual_tor_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""DualToR related common utilities for other modules."""
import pytest

__all__ = [
'cable_type'
]

class CableType(object):
"""Dualtor cable type."""
Expand Down
21 changes: 21 additions & 0 deletions tests/common/plugins/conditional_mark/tests_mark_conditions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ bgp/test_bgp_slb.py:
conditions:
- "'backend' in topo_name or 'mgmttor' in topo_name"

#######################################
##### cacl #####
#######################################
cacl/test_cacl_application.py::test_cacl_application_nondualtor:
skip:
reason: "test_cacl_application is only supported on non dualtor topology"
conditions:
- "topo_name in ['dualtor', 'dualtor-56', 'dualtor-120']"

cacl/test_cacl_application.py::test_cacl_application_dualtor:
skip:
reason: "test_cacl_application_dualtor is only supported on dualtor topology"
conditions:
- "topo_name not in ['dualtor', 'dualtor-56', 'dualtor-120']"

cacl/test_cacl_application.py::test_multiasic_cacl_application:
skip:
reason: "test_multiasic_cacl_application is only supported on multi-ASIC platform"
conditions:
- "is_multi_asic==False"

#######################################
##### configlet #####
#######################################
Expand Down