|
| 1 | +import collections |
| 2 | +import json |
| 3 | +import logging |
| 4 | +import os |
| 5 | +import sys |
| 6 | +from ipaddress import ip_address, IPv4Address |
| 7 | + |
| 8 | +import natsort |
| 9 | +import pytest |
| 10 | + |
| 11 | +if sys.version_info.major > 2: |
| 12 | + from pathlib import Path |
| 13 | + sys.path.insert(0, str(Path(__file__).parent)) |
| 14 | + |
| 15 | +from .macsec_config_helper import enable_macsec_feature |
| 16 | +from .macsec_config_helper import disable_macsec_feature |
| 17 | +from .macsec_config_helper import setup_macsec_configuration |
| 18 | +from .macsec_config_helper import cleanup_macsec_configuration |
| 19 | +# flake8: noqa: F401 |
| 20 | +from tests.common.plugins.sanity_check import sanity_check |
| 21 | + |
| 22 | +logger = logging.getLogger(__name__) |
| 23 | + |
| 24 | + |
| 25 | +class MacsecPlugin(object): |
| 26 | + """ |
| 27 | + Pytest macsec plugin |
| 28 | + """ |
| 29 | + |
| 30 | + def __init__(self): |
| 31 | + with open(os.path.dirname(__file__) + '/profile.json') as f: |
| 32 | + self.macsec_profiles = json.load(f) |
| 33 | + for k, v in list(self.macsec_profiles.items()): |
| 34 | + self.macsec_profiles[k]["name"] = k |
| 35 | + # Set default value |
| 36 | + if "rekey_period" not in v: |
| 37 | + self.macsec_profiles[k]["rekey_period"] = 0 |
| 38 | + |
| 39 | + def _generate_macsec_profile(self, metafunc): |
| 40 | + value = metafunc.config.getoption("macsec_profile") |
| 41 | + if value == 'all': |
| 42 | + return natsort.natsorted(list(self.macsec_profiles.keys())) |
| 43 | + return [x for x in value.split(',') if x in self.macsec_profiles] |
| 44 | + |
| 45 | + def pytest_generate_tests(self, metafunc): |
| 46 | + if 'macsec_profile' in metafunc.fixturenames: |
| 47 | + profiles = self._generate_macsec_profile(metafunc) |
| 48 | + assert profiles, "Specify valid macsec profile!" |
| 49 | + metafunc.parametrize('macsec_profile', |
| 50 | + [self.macsec_profiles[x] for x in profiles], |
| 51 | + ids=profiles, |
| 52 | + scope="module") |
| 53 | + |
| 54 | + def get_ctrl_nbr_names(self, macsec_duthost, nbrhosts, tbinfo): |
| 55 | + return NotImplementedError() |
| 56 | + |
| 57 | + def downstream_neighbor(self,tbinfo, neighbor): |
| 58 | + return NotImplementedError() |
| 59 | + |
| 60 | + def upstream_neighbor(self,tbinfo, neighbor): |
| 61 | + return NotImplementedError() |
| 62 | + |
| 63 | + @pytest.fixture(scope="module") |
| 64 | + def start_macsec_service(self, macsec_duthost, macsec_nbrhosts): |
| 65 | + def __start_macsec_service(): |
| 66 | + enable_macsec_feature(macsec_duthost, macsec_nbrhosts) |
| 67 | + return __start_macsec_service |
| 68 | + |
| 69 | + @pytest.fixture(scope="module") |
| 70 | + def stop_macsec_service(self, macsec_duthost, macsec_nbrhosts): |
| 71 | + def __stop_macsec_service(): |
| 72 | + disable_macsec_feature(macsec_duthost, macsec_nbrhosts) |
| 73 | + return __stop_macsec_service |
| 74 | + |
| 75 | + @pytest.fixture(scope="module") |
| 76 | + def macsec_feature(self, start_macsec_service, stop_macsec_service): |
| 77 | + start_macsec_service() |
| 78 | + yield |
| 79 | + stop_macsec_service() |
| 80 | + |
| 81 | + @pytest.fixture(scope="module") |
| 82 | + def startup_macsec(self, request, macsec_duthost, ctrl_links, macsec_profile, tbinfo): |
| 83 | + topo_name = tbinfo['topo']['name'] |
| 84 | + def __startup_macsec(): |
| 85 | + profile = macsec_profile |
| 86 | + if request.config.getoption("neighbor_type") == "eos": |
| 87 | + if macsec_duthost.facts["asic_type"] == "vs" and profile['send_sci'] == "false": |
| 88 | + # On EOS, portchannel mac is not same as the member port mac (being as SCI), |
| 89 | + # then src mac is not equal to SCI in its sending packet. The receiver of vSONIC |
| 90 | + # will drop it for macsec kernel module does not correctly handle it. |
| 91 | + pytest.skip( |
| 92 | + "macsec on dut vsonic, neighbor eos, send_sci false") |
| 93 | + if 't2' not in topo_name: |
| 94 | + cleanup_macsec_configuration(macsec_duthost, ctrl_links, profile['name']) |
| 95 | + setup_macsec_configuration(macsec_duthost, ctrl_links, |
| 96 | + profile['name'], profile['priority'], profile['cipher_suite'], |
| 97 | + profile['primary_cak'], profile['primary_ckn'], profile['policy'], |
| 98 | + profile['send_sci'], profile['rekey_period']) |
| 99 | + logger.info( |
| 100 | + "Setup MACsec configuration with arguments:\n{}".format(locals())) |
| 101 | + return __startup_macsec |
| 102 | + |
| 103 | + @pytest.fixture(scope="module") |
| 104 | + def shutdown_macsec(self, macsec_duthost, ctrl_links, macsec_profile): |
| 105 | + def __shutdown_macsec(): |
| 106 | + profile = macsec_profile |
| 107 | + cleanup_macsec_configuration(macsec_duthost, ctrl_links, profile['name']) |
| 108 | + return __shutdown_macsec |
| 109 | + |
| 110 | + @pytest.fixture(scope="module", autouse=True) |
| 111 | + def macsec_setup(self, startup_macsec, shutdown_macsec, macsec_feature): |
| 112 | + ''' |
| 113 | + setup macsec links |
| 114 | + ''' |
| 115 | + startup_macsec() |
| 116 | + yield |
| 117 | + shutdown_macsec() |
| 118 | + |
| 119 | + @pytest.fixture(scope="module") |
| 120 | + def macsec_nbrhosts(self, ctrl_links): |
| 121 | + return {nbr["name"]: nbr for nbr in list(ctrl_links.values())} |
| 122 | + |
| 123 | + @pytest.fixture(scope="module") |
| 124 | + def ctrl_links(self, macsec_duthost, tbinfo, nbrhosts): |
| 125 | + |
| 126 | + if not nbrhosts: |
| 127 | + topo_name = tbinfo['topo']['name'] |
| 128 | + pytest.skip("None of neighbors on topology {}".format(topo_name)) |
| 129 | + |
| 130 | + ctrl_nbr_names = self.get_ctrl_nbr_names(macsec_duthost, nbrhosts, tbinfo) |
| 131 | + logger.info("Controlled links {}".format(ctrl_nbr_names)) |
| 132 | + nbrhosts = {name: nbrhosts[name] for name in ctrl_nbr_names} |
| 133 | + return self.find_links_from_nbr(macsec_duthost, tbinfo, nbrhosts) |
| 134 | + |
| 135 | + @pytest.fixture(scope="module") |
| 136 | + def unctrl_links(self, macsec_duthost, tbinfo, nbrhosts, ctrl_links): |
| 137 | + unctrl_nbr_names = set(nbrhosts.keys()) |
| 138 | + for _, nbr in ctrl_links.items(): |
| 139 | + if nbr["name"] in unctrl_nbr_names: |
| 140 | + unctrl_nbr_names.remove(nbr["name"]) |
| 141 | + |
| 142 | + logger.info("Uncontrolled links {}".format(unctrl_nbr_names)) |
| 143 | + nbrhosts = {name: nbrhosts[name] for name in unctrl_nbr_names} |
| 144 | + return self.find_links_from_nbr(macsec_duthost, tbinfo, nbrhosts) |
| 145 | + |
| 146 | + @pytest.fixture(scope="module") |
| 147 | + def downstream_links(self, macsec_duthost, tbinfo, nbrhosts): |
| 148 | + links = collections.defaultdict(dict) |
| 149 | + |
| 150 | + def filter(interface, neighbor, mg_facts, tbinfo): |
| 151 | + if self.downstream_neighbor(tbinfo, neighbor): |
| 152 | + port = mg_facts["minigraph_neighbors"][interface]["port"] |
| 153 | + if interface not in mg_facts["minigraph_ptf_indices"]: |
| 154 | + logger.info("Interface {} not in minigraph_ptf_indices".format(interface)) |
| 155 | + return |
| 156 | + links[interface] = { |
| 157 | + "name": neighbor["name"], |
| 158 | + "ptf_port_id": mg_facts["minigraph_ptf_indices"][interface], |
| 159 | + "port": port |
| 160 | + } |
| 161 | + self.find_links(macsec_duthost, tbinfo, filter) |
| 162 | + return links |
| 163 | + |
| 164 | + @pytest.fixture(scope="module") |
| 165 | + def upstream_links(self, macsec_duthost, tbinfo, nbrhosts): |
| 166 | + links = collections.defaultdict(dict) |
| 167 | + |
| 168 | + def filter(interface, neighbor, mg_facts, tbinfo): |
| 169 | + if self.upstream_neighbor(tbinfo, neighbor): |
| 170 | + for item in mg_facts["minigraph_bgp"]: |
| 171 | + if item["name"] == neighbor["name"]: |
| 172 | + if isinstance(ip_address(item["addr"]), IPv4Address): |
| 173 | + # The address of neighbor device |
| 174 | + local_ipv4_addr = item["addr"] |
| 175 | + # The address of DUT |
| 176 | + peer_ipv4_addr = item["peer_addr"] |
| 177 | + break |
| 178 | + if interface not in mg_facts["minigraph_ptf_indices"]: |
| 179 | + logger.info("Interface {} not in minigraph_ptf_indices".format(interface)) |
| 180 | + return |
| 181 | + port = mg_facts["minigraph_neighbors"][interface]["port"] |
| 182 | + links[interface] = { |
| 183 | + "name": neighbor["name"], |
| 184 | + "ptf_port_id": mg_facts["minigraph_ptf_indices"][interface], |
| 185 | + "local_ipv4_addr": local_ipv4_addr, |
| 186 | + "peer_ipv4_addr": peer_ipv4_addr, |
| 187 | + "port": port, |
| 188 | + "host": nbrhosts[neighbor["name"]]["host"] |
| 189 | + } |
| 190 | + self.find_links(macsec_duthost, tbinfo, filter) |
| 191 | + return links |
| 192 | + |
| 193 | + def find_links(self, duthost, tbinfo, filter): |
| 194 | + |
| 195 | + mg_facts = duthost.get_extended_minigraph_facts(tbinfo) |
| 196 | + for interface, neighbor in mg_facts["minigraph_neighbors"].items(): |
| 197 | + filter(interface, neighbor, mg_facts, tbinfo) |
| 198 | + |
| 199 | + def is_interface_portchannel_member(self, pc, interface): |
| 200 | + for pc_name, elements in list(pc.items()): |
| 201 | + if interface in elements['members']: |
| 202 | + return True |
| 203 | + return False |
| 204 | + |
| 205 | + def find_links_from_nbr(self, duthost, tbinfo, nbrhosts): |
| 206 | + links = collections.defaultdict(dict) |
| 207 | + def filter(interface, neighbor, mg_facts, tbinfo): |
| 208 | + if neighbor["name"] not in list(nbrhosts.keys()): |
| 209 | + return |
| 210 | + port = mg_facts["minigraph_neighbors"][interface]["port"] |
| 211 | + |
| 212 | + links[interface] = { |
| 213 | + "name": neighbor["name"], |
| 214 | + "host": nbrhosts[neighbor["name"]]["host"], |
| 215 | + "port": port, |
| 216 | + "dut_name": duthost.hostname |
| 217 | + } |
| 218 | + self.find_links(duthost, tbinfo, filter) |
| 219 | + return links |
| 220 | + |
| 221 | +class MacsecPluginT0(MacsecPlugin): |
| 222 | + """ |
| 223 | + Pytest macsec plugin |
| 224 | + """ |
| 225 | + |
| 226 | + |
| 227 | + def __init__(self): |
| 228 | + super(MacsecPluginT0, self).__init__() |
| 229 | + |
| 230 | + def get_ctrl_nbr_names(self, macsec_duthost, nbrhosts, tbinfo): |
| 231 | + ctrl_nbr_names = natsort.natsorted(nbrhosts.keys())[:2] |
| 232 | + return ctrl_nbr_names |
| 233 | + |
| 234 | + def downstream_neighbor(self,tbinfo, neighbor): |
| 235 | + if (tbinfo["topo"]["type"] == "t0" and "Server" in neighbor["name"]): |
| 236 | + return True |
| 237 | + return False |
| 238 | + |
| 239 | + def upstream_neighbor(self,tbinfo, neighbor): |
| 240 | + if (tbinfo["topo"]["type"] == "t0" and "T1" in neighbor["name"]): |
| 241 | + return True |
| 242 | + return False |
| 243 | + |
| 244 | +class MacsecPluginT2(MacsecPlugin): |
| 245 | + """ |
| 246 | + Pytest macsec plugin |
| 247 | + """ |
| 248 | + |
| 249 | + |
| 250 | + def __init__(self): |
| 251 | + super(MacsecPluginT2, self).__init__() |
| 252 | + |
| 253 | + def get_ctrl_nbr_names(self, macsec_duthost, nbrhosts, tbinfo): |
| 254 | + mg_facts = macsec_duthost.get_extended_minigraph_facts(tbinfo) |
| 255 | + ctrl_nbr_names = mg_facts['macsec_neighbors'] |
| 256 | + return ctrl_nbr_names |
| 257 | + |
| 258 | + def downstream_neighbor(self,tbinfo, neighbor): |
| 259 | + if ("t2" in tbinfo["topo"]["type"] and "T1" in neighbor["name"]): |
| 260 | + return True |
| 261 | + return False |
| 262 | + |
| 263 | + def upstream_neighbor(self,tbinfo, neighbor): |
| 264 | + if ("t2" in tbinfo["topo"]["type"] and "T3" in neighbor["name"]): |
| 265 | + return True |
| 266 | + return False |
0 commit comments