Skip to content
Merged
7 changes: 3 additions & 4 deletions tests/arp/test_tagged_arp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pprint

from tests.common.fixtures.ptfhost_utils import change_mac_addresses # lgtm[py/unused-import]
from tests.common.fixtures.duthost_utils import ports_list, vlan_ports_list
from tests.common.fixtures.duthost_utils import ports_list, utils_vlan_ports_list
from tests.common.helpers.assertions import pytest_require


Expand Down Expand Up @@ -80,9 +80,8 @@ def build_arp_packet(vlan_id, neighbor_mac, dst_mac, neighbor_ip):
ip_tgt=neighbor_ip)
return pkt


@pytest.mark.bsl
def test_tagged_arp_pkt(ptfadapter, vlan_ports_list, duthosts, rand_one_dut_hostname):
def test_tagged_arp_pkt(ptfadapter, utils_vlan_ports_list, duthosts, rand_one_dut_hostname):
"""
Send tagged GARP packets from each port.
Verify packets egress without tag from ports whose PVID same with ingress port.
Expand All @@ -91,7 +90,7 @@ def test_tagged_arp_pkt(ptfadapter, vlan_ports_list, duthosts, rand_one_dut_host
"""
duthost = duthosts[rand_one_dut_hostname]
router_mac = duthost.facts['router_mac']
for vlan_port in vlan_ports_list:
for vlan_port in utils_vlan_ports_list:
port_index = vlan_port["port_index"][0]
# Send GARP packets to switch to populate the arp table with dummy MACs for each port
# Totally 10 dummy MACs for each port, send 1 packet for each dummy MAC
Expand Down
115 changes: 114 additions & 1 deletion tests/common/fixtures/duthost_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import itertools
import collections
import ipaddress

from jinja2 import Template

Expand Down Expand Up @@ -159,7 +160,7 @@ def ports_list(duthosts, rand_one_dut_hostname, rand_selected_dut, tbinfo):


@pytest.fixture(scope="module")
def vlan_ports_list(duthosts, rand_one_dut_hostname, rand_selected_dut, tbinfo, ports_list):
def utils_vlan_ports_list(duthosts, rand_one_dut_hostname, rand_selected_dut, tbinfo, ports_list):
"""
Get configured VLAN ports
"""
Expand Down Expand Up @@ -225,3 +226,115 @@ def vlan_ports_list(duthosts, rand_one_dut_hostname, rand_selected_dut, tbinfo,
vlan_ports_list.append(vlan_port)

return vlan_ports_list

def compare_network(src_ipprefix, dst_ipprefix):
src_network = ipaddress.IPv4Interface(src_ipprefix).network
dst_network = ipaddress.IPv4Interface(dst_ipprefix).network
return src_network.overlaps(dst_network)

@pytest.fixture(scope="module")
def utils_vlan_intfs_dict_orig(duthosts, rand_one_dut_hostname, tbinfo):
Copy link
Contributor

@qiluo-msft qiluo-msft Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utils_vlan_intfs_dict_orig

It's not easy to understand the function just from the name. They are implemented to be reused. Could you add some function level comments, explaining the purpose, parameter, and return values. #Closed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also applicable to other new functions in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

'''A module level fixture to record duthost's original vlan info

Args:
duthosts: All DUTs belong to the testbed.
rand_one_dut_hostname: hostname of a random chosen dut to run test.
tbinfo: A fixture to gather information about the testbed.

Returns:
VLAN info dict with original VLAN info
Example:
{1000: {'ip':'192.168.0.1/21', 'orig': True}}
'''
duthost = duthosts[rand_one_dut_hostname]
cfg_facts = duthost.config_facts(host=duthost.hostname, source="persistent")['ansible_facts']
vlan_intfs_dict = {}
for k, v in cfg_facts['VLAN'].items():
vlanid = v['vlanid']
for addr in cfg_facts['VLAN_INTERFACE']['Vlan'+vlanid]:
if addr.find(':') == -1:
ip = addr
break
else:
continue
logger.info("Original VLAN {}, ip {}".format(vlanid, ip))
vlan_intfs_dict[int(vlanid)] = {'ip': ip, 'orig': True}
return vlan_intfs_dict

def utils_vlan_intfs_dict_add(vlan_intfs_dict, add_cnt):
'''Utilities function to add add_cnt of new VLAN

Args:
vlan_intfs_dict: Original VLAN info dict
add_cnt: number of new vlan to add

Returns:
VLAN info dict combined with original and new added VLAN info
Example:
{
1000: {'ip':'192.168.0.1/21', 'orig': True},
108: {'ip':'192.168.8.1/24', 'orig': False},
109: {'ip':'192.168.9.1/24', 'orig': False}
}
'''
vlan_cnt = 0
for i in xrange(0, 255):
vid = 100 + i
if vid in vlan_intfs_dict:
continue
ip = u'192.168.{}.1/24'.format(i)
for v in vlan_intfs_dict.values():
if compare_network(ip, v['ip']):
break
else:
logger.info("Add VLAN {}, ip {}".format(vid, ip))
vlan_intfs_dict[vid] = {'ip': ip, 'orig': False}
vlan_cnt += 1
if vlan_cnt >= add_cnt:
break
assert vlan_cnt == add_cnt
return vlan_intfs_dict

def utils_create_test_vlans(duthost, cfg_facts, vlan_ports_list, vlan_intfs_dict, delete_untagged_vlan):
'''Utilities function to create vlans for test

Args:
duthost: Device Under Test (DUT)
cfg_facts: config facts fot the duthost
vlan_ports_list: vlan ports info
vlan_intfs_dict: VLAN info dict with VLAN info
delete_untagged_vlan: check to delete unttaged vlan
'''
cmds = []
logger.info("Add vlans, assign IPs")
for k, v in vlan_intfs_dict.items():
if v['orig'] == True:
continue
cmds.append('config vlan add {}'.format(k))
cmds.append("config interface ip add Vlan{} {}".format(k, v['ip'].upper()))

# Delete untagged vlans from interfaces to avoid error message
# when adding untagged vlan to interface that already have one
if delete_untagged_vlan and '201911' not in duthost.os_version:
logger.info("Delete untagged vlans from interfaces")
for vlan_port in vlan_ports_list:
vlan_members = cfg_facts.get('VLAN_MEMBER', {})
vlan_name, vid = vlan_members.keys()[0], vlan_members.keys()[0].replace("Vlan", '')
try:
if vlan_members[vlan_name][vlan_port['dev']]['tagging_mode'] == 'untagged':
cmds.append("config vlan member del {} {}".format(vid, vlan_port['dev']))
except KeyError:
continue

logger.info("Add members to Vlans")
for vlan_port in vlan_ports_list:
for permit_vlanid in vlan_port['permit_vlanid']:
if vlan_intfs_dict[int(permit_vlanid)]['orig'] == True:
continue
cmds.append('config vlan member add {tagged} {id} {port}'.format(
tagged=('--untagged' if vlan_port['pvid'] == permit_vlanid else ''),
id=permit_vlanid,
port=vlan_port['dev']
))
logger.info("Commands: {}".format(cmds))
duthost.shell_cmds(cmds=cmds)
Empty file.
99 changes: 99 additions & 0 deletions tests/generic_config_updater/gu_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import logging
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import wait_until


logger = logging.getLogger(__name__)

def generate_tmpfile(duthost):
return duthost.shell('mktemp')['stdout']

def delete_tmpfile(duthost, tmpfile):
duthost.file(path=tmpfile, state='absent')

def apply_patch(duthost, json_data, dest_file):
duthost.copy(content=json.dumps(json_data, indent=4), dest=dest_file)

cmds = 'config apply-patch {}'.format(dest_file)

logger.info("Commands: {}".format(cmds))
output = duthost.shell(cmds, module_ignore_errors=True)

return output

def expect_op_success(duthost, output):
pytest_assert(not output['rc'], "Command is not running successfully")
pytest_assert(
"Patch applied successfully" in output['stdout'],
"Please check if json file is validate"
)

def expect_op_success_and_reset_check(duthost, output, container_name, threshold, interval, delay):
'''Add contianer reset check after op success
'''
expect_op_success(duthost, output)
if start_limit_hit(duthost, container_name):
reset_start_limit_hit(duthost, container_name, threshold, interval, delay)

def expect_res_success(duthost, output, expected_content_list, unexpected_content_list):
for expected_content in expected_content_list:
pytest_assert(
expected_content in output['stdout'],
"{} is expected content".format(expected_content)
)

for unexpected_content in unexpected_content_list:
pytest_assert(
unexpected_content not in output['stdout'],
"{} is unexpected content".format(unexpected_content)
)

def expect_op_failure(output):
logger.info("return code {}".format(output['rc']))
pytest_assert(
output['rc'],
"The command should fail with non zero return code"
)

def start_limit_hit(duthost, container_name):
"""If start-limit-hit is hit, the service will not start anyway.
"""
service_status = duthost.shell("sudo systemctl status {}.service | grep 'Active'".format(container_name))
pytest_assert(
not service_status['rc'],
"{} service status cannot be found".format(container_name)
)

for line in service_status["stdout_lines"]:
if "start-limit-hit" in line:
return True

return False

def reset_start_limit_hit(duthost, container_name, threshold, interval, delay):
"""Reset container if hit start-limit-hit
"""
logger.info("Reset container '{}' due to start-limit-hit".format(container_name))

service_reset_failed = duthost.shell("sudo systemctl reset-failed {}.service".format(container_name))
pytest_assert(
not service_reset_failed['rc'],
"{} systemctl reset-failed service fails"
)

service_start = duthost.shell("sudo systemctl start {}.service".format(container_name))
pytest_assert(
not service_start['rc'],
"{} systemctl start service fails"
)

reset_container = wait_until(threshold,
interval,
delay,
duthost.is_service_fully_started,
container_name)
pytest_assert(
reset_container,
"Failed to reset container '{}' due to start-limit-hit".format(container_name)
)
Loading