Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions ansible/roles/test/files/ptftests/py3/dhcpv6_counter_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import ast
import time
import subprocess
import scapy
# Packet Test Framework imports
import ptf
import ptf.packet
import ptf.testutils as testutils
from ptf import config
from ptf.base_tests import BaseTest
Expand All @@ -15,14 +18,15 @@
DHCP6_Rebind = scapy.layers.dhcp6.DHCP6_Rebind
DHCP6_Release = scapy.layers.dhcp6.DHCP6_Release
DHCP6_Decline = scapy.layers.dhcp6.DHCP6_Decline
DHCP6_Reconf= scapy.layers.dhcp6.DHCP6_Reconf
DHCP6_Reconf = scapy.layers.dhcp6.DHCP6_Reconf
DHCP6_InfoRequest = scapy.layers.dhcp6.DHCP6_InfoRequest
DHCP6_Advertise = scapy.layers.dhcp6.DHCP6_Advertise
DHCP6_Reply = scapy.layers.dhcp6.DHCP6_Reply
DHCP6_RelayReply = scapy.layers.dhcp6.DHCP6_RelayReply
DHCP6OptRelayMsg = scapy.layers.dhcp6.DHCP6OptRelayMsg
DHCP6OptAuth = scapy.layers.dhcp6.DHCP6OptAuth


class DataplaneBaseTest(BaseTest):
def __init__(self):
BaseTest.__init__(self)
Expand All @@ -38,14 +42,17 @@ def tearDown(self):
if config["log_dir"] is not None:
self.dataplane.stop_pcap()


"""

This test tests for DHCPv6 Counter. Packets are sent from both the client and server side, and packets are verified to be received by the counter.
This test tests for DHCPv6 Counter.
Packets are sent from both the client and server side, and packets are verified to be received by the counter.

"""


class DHCPCounterTest(DataplaneBaseTest):

BROADCAST_MAC = '33:33:00:01:00:02'
BROADCAST_IP = 'ff02::1:2'
DHCP_CLIENT_PORT = 546
Expand All @@ -54,8 +61,8 @@ class DHCPCounterTest(DataplaneBaseTest):
def __init__(self):
self.test_params = testutils.test_params_get()
self.client_port_index = int(self.test_params['client_port_index'])
self.client_link_local = self.generate_client_interace_ipv6_link_local_address(self.client_port_index)
self.client_link_local = self.generate_client_interace_ipv6_link_local_address(self.client_port_index)

DataplaneBaseTest.__init__(self)

def setUp(self):
Expand All @@ -65,11 +72,12 @@ def setUp(self):
self.server_port_indices = ast.literal_eval(self.test_params['leaf_port_indices'])
self.num_dhcp_servers = int(self.test_params['num_dhcp_servers'])
self.assertTrue(self.num_dhcp_servers > 0,
"Error: This test requires at least one DHCP server to be specified!")
"Error: This test requires at least one DHCP server to be specified!")

self.server_ip = self.test_params['server_ip']
self.relay_iface_ip = self.test_params['relay_iface_ip']
self.relay_iface_mac = self.test_params['relay_iface_mac']
self.dut_mac = self.test_params['dut_mac']
self.vlan_ip = self.test_params['vlan_ip']
self.client_mac = self.dataplane.get_mac(0, self.client_port_index)

Expand All @@ -83,7 +91,8 @@ def generate_client_interace_ipv6_link_local_address(self, client_port_index):
proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
proc.communicate()

command = "ip addr show eth{} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1".format(client_port_index)
command = "ip addr show eth{} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1"\
.format(client_port_index)
proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
stdout, stderr = proc.communicate()

Expand All @@ -92,41 +101,40 @@ def generate_client_interace_ipv6_link_local_address(self, client_port_index):
def tearDown(self):
DataplaneBaseTest.tearDown(self)


"""
Packet generation functions/wrappers

"""

def create_packet(self, message):
packet = Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
packet = ptf.packet.Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
packet /= IPv6(src=self.client_link_local, dst=self.BROADCAST_IP)
packet /= UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
packet /= ptf.packet.UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
packet /= message(trid=12345)

return packet

def create_malformed_client_packet(self, message):
packet = Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
packet = ptf.packet.Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
packet /= IPv6(src=self.client_link_local, dst=self.BROADCAST_IP)
packet /= UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
packet /= message(trid=12345)/DHCP6OptAuth(optcode=100) # changes optcode to be out of client scope to test malformed counters

packet /= ptf.packet.UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
# changes optcode to be out of client scope to test malformed counters
packet /= message(trid=12345)/DHCP6OptAuth(optcode=100)
return packet

def create_server_packet(self, message):
packet = Ether(dst=self.relay_iface_mac)
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
packet /= DHCP6OptRelayMsg(message=[message(trid=12345)])

return packet

def create_unknown_server_packet(self):
packet = Ether(dst=self.relay_iface_mac)
packet = ptf.packet.Ether(dst=self.dut_mac)
packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
packet /= ptf.packet.UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)

return packet
Expand All @@ -137,10 +145,14 @@ def create_unknown_server_packet(self):
"""

def client_send(self):
client_messages = [DHCP6_Solicit, DHCP6_Request, DHCP6_Confirm, DHCP6_Renew, DHCP6_Rebind, DHCP6_Release, DHCP6_Decline, DHCP6_Reconf, DHCP6_InfoRequest]
client_messages = [DHCP6_Solicit, DHCP6_Request, DHCP6_Confirm, DHCP6_Renew,
DHCP6_Rebind, DHCP6_Release, DHCP6_Decline, DHCP6_InfoRequest]
for message in client_messages:
packet = self.create_packet(message)
testutils.send_packet(self, self.client_port_index, packet)
# sleep a short time to low down packet sending rate in case multicast packets
# flooding cause packets drop on dhcpv6 relay filter raw socket
time.sleep(1)

malformed_packet = self.create_malformed_client_packet(DHCP6_Solicit)
testutils.send_packet(self, self.client_port_index, malformed_packet)
Expand All @@ -151,6 +163,7 @@ def server_send(self):
packet = self.create_server_packet(message)
packet.src = self.dataplane.get_mac(0, self.server_port_indices[0])
testutils.send_packet(self, self.server_port_indices[0], packet)
time.sleep(1)

unknown_packet = self.create_unknown_server_packet()
unknown_packet.src = self.dataplane.get_mac(0, self.server_port_indices[0])
Expand Down
70 changes: 49 additions & 21 deletions tests/dhcp_relay/test_dhcpv6_relay.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import ipaddress
import pytest
import random
import time
import netaddr
import logging

from tests.common.fixtures.ptfhost_utils import copy_ptftests_directory # lgtm[py/unused-import]
from tests.common.fixtures.ptfhost_utils import change_mac_addresses # lgtm[py/unused-import]
from tests.common.fixtures.ptfhost_utils import copy_ptftests_directory # noqa F401
from tests.common.fixtures.ptfhost_utils import change_mac_addresses # noqa F401
from tests.common.utilities import skip_release
from tests.ptf_runner import ptf_runner
from tests.common import config_reload
from tests.common.platform.processes_utils import wait_critical_processes
from tests.common.utilities import wait_until
from tests.common.helpers.assertions import pytest_assert
from tests.common.dualtor.mux_simulator_control import toggle_all_simulator_ports_to_rand_selected_tor_m # noqa F401


pytestmark = [
pytest.mark.topology('t0', 'm0', 'mx'),
Expand All @@ -24,6 +25,14 @@

logger = logging.getLogger(__name__)


def wait_all_bgp_up(duthost):
config_facts = duthost.config_facts(host=duthost.hostname, source="running")['ansible_facts']
bgp_neighbors = config_facts.get('BGP_NEIGHBOR', {})
if not wait_until(60, 10, 0, duthost.check_bgp_session_state, bgp_neighbors.keys()):
pytest.fail("not all bgp sessions are up after config change")


@pytest.fixture(scope="module", params=[SINGLE_TOR_MODE, DUAL_TOR_MODE])
def testing_config(request, duthosts, rand_one_dut_hostname, tbinfo):
testing_mode = request.param
Expand All @@ -37,11 +46,11 @@ def testing_config(request, duthosts, rand_one_dut_hostname, tbinfo):
if testing_mode == DUAL_TOR_MODE:
if not subtype_exist or subtype_value != 'DualToR':
assert False, "Wrong DHCP setup on Dual ToR testbeds"

yield testing_mode, duthost, 'dual_testbed'
else:
yield testing_mode, duthost, 'single_testbed'


def get_subtype_from_configdb(duthost):
# HEXISTS returns 1 if the key exists, otherwise 0
subtype_exist = int(duthost.shell('redis-cli -n 4 HEXISTS "DEVICE_METADATA|localhost" "subtype"')["stdout"])
Expand All @@ -50,6 +59,7 @@ def get_subtype_from_configdb(duthost):
subtype_value = duthost.shell('redis-cli -n 4 HGET "DEVICE_METADATA|localhost" "subtype"')["stdout"]
return subtype_exist, subtype_value


@pytest.fixture(scope="module")
def dut_dhcp_relay_data(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):
""" Fixture which returns a list of dictionaries where each dictionary contains
Expand All @@ -76,7 +86,8 @@ def dut_dhcp_relay_data(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):
downlink_vlan_iface['name'] = vlan_iface_name

for vlan_interface_info_dict in mg_facts['minigraph_vlan_interfaces']:
if (vlan_interface_info_dict['attachto'] == vlan_iface_name) and (netaddr.IPAddress(str(vlan_interface_info_dict['addr'])).version == 6):
if (vlan_interface_info_dict['attachto'] == vlan_iface_name) and \
(netaddr.IPAddress(str(vlan_interface_info_dict['addr'])).version == 6):
downlink_vlan_iface['addr'] = vlan_interface_info_dict['addr']
downlink_vlan_iface['mask'] = vlan_interface_info_dict['mask']
break
Expand All @@ -95,11 +106,12 @@ def dut_dhcp_relay_data(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):

# Obtain uplink port indicies for this DHCP relay agent
uplink_interfaces = []
uplink_port_indices =[]
uplink_port_indices = []
for iface_name, neighbor_info_dict in mg_facts['minigraph_neighbors'].items():
if neighbor_info_dict['name'] in mg_facts['minigraph_devices']:
neighbor_device_info_dict = mg_facts['minigraph_devices'][neighbor_info_dict['name']]
if 'type' in neighbor_device_info_dict and neighbor_device_info_dict['type'] in ['LeafRouter', 'MgmtLeafRouter']:
if 'type' in neighbor_device_info_dict and \
neighbor_device_info_dict['type'] in ['LeafRouter', 'MgmtLeafRouter']:
# If this uplink's physical interface is a member of a portchannel interface,
# we record the name of the portchannel interface here, as this is the actual
# interface the DHCP relay will listen on.
Expand All @@ -110,12 +122,14 @@ def dut_dhcp_relay_data(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):
if portchannel_name not in uplink_interfaces:
uplink_interfaces.append(portchannel_name)
break
# If the uplink's physical interface is not a member of a portchannel, add it to our uplink interfaces list
# If the uplink's physical interface is not a member of a portchannel,
# add it to our uplink interfaces list
if not iface_is_portchannel_member:
uplink_interfaces.append(iface_name)
uplink_port_indices.append(mg_facts['minigraph_ptf_indices'][iface_name])
if uplink_interface_link_local == "":
command = "ip addr show {} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1".format(uplink_interfaces[0])
command = "ip addr show {} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1"\
.format(uplink_interfaces[0])
res = duthost.shell(command)
if res['stdout'] != "":
uplink_interface_link_local = res['stdout']
Expand Down Expand Up @@ -148,11 +162,13 @@ def validate_dut_routes_exist(duthosts, rand_one_dut_hostname, dut_dhcp_relay_da
rtInfo = duthost.get_ip_route_info(ipaddress.ip_address(dhcp_server))
assert len(rtInfo["nexthops"]) > 0, "Failed to find route to DHCP server '{0}'".format(dhcp_server)


def check_interface_status(duthost):
if ":547" in duthost.shell("docker exec -it dhcp_relay ss -nlp | grep dhcp6relay")["stdout"]:
return True
return False


def test_interface_binding(duthosts, rand_one_dut_hostname, dut_dhcp_relay_data):
duthost = duthosts[rand_one_dut_hostname]
skip_release(duthost, ["201911", "202106"])
Expand All @@ -163,19 +179,25 @@ def test_interface_binding(duthosts, rand_one_dut_hostname, dut_dhcp_relay_data)
output = duthost.shell("docker exec -it dhcp_relay ss -nlp | grep dhcp6relay")["stdout"]
logger.info(output)
for dhcp_relay in dut_dhcp_relay_data:
assert ("*:{}".format(dhcp_relay['downlink_vlan_iface']['name']) or "*:*" in output, "{} is not found in {}".format("*:{}".format(dhcp_relay['downlink_vlan_iface']['name']), output)) or ("*:*" in output, "dhcp6relay socket is not properly binded")
assert ("*:{}".format(dhcp_relay['downlink_vlan_iface']['name']) or "*:*" in output,
"{} is not found in {}".format("*:{}".format(dhcp_relay['downlink_vlan_iface']['name']), output)) or \
("*:*" in output, "dhcp6relay socket is not properly binded")

def test_dhcpv6_relay_counter(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data):

def test_dhcpv6_relay_counter(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data,
toggle_all_simulator_ports_to_rand_selected_tor_m): # noqa F811
""" Test DHCPv6 Counter """
duthost = duthosts[rand_one_dut_hostname]
skip_release(duthost, ["201911", "202106"])

messages = ["Unknown", "Solicit", "Advertise", "Request", "Confirm", "Renew", "Rebind", "Reply", "Release", "Decline", "Reconfigure", "Information-Request", "Relay-Forward", "Relay-Reply", "Malformed"]

messages = ["Unknown", "Solicit", "Advertise", "Request", "Confirm", "Renew", "Rebind", "Reply", "Release",
"Decline", "Reconfigure", "Information-Request", "Relay-Forward", "Relay-Reply", "Malformed"]

for dhcp_relay in dut_dhcp_relay_data:

for message in messages:
cmd = 'sonic-db-cli STATE_DB hmset "DHCPv6_COUNTER_TABLE|{}" {} 0'.format(dhcp_relay['downlink_vlan_iface']['name'], message)
cmd = 'sonic-db-cli STATE_DB hmset "DHCPv6_COUNTER_TABLE|{}" {} 0'\
.format(dhcp_relay['downlink_vlan_iface']['name'], message)
duthost.shell(cmd)

# Send the DHCP relay traffic on the PTF host
Expand All @@ -191,20 +213,24 @@ def test_dhcpv6_relay_counter(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp
"relay_iface_ip": str(dhcp_relay['downlink_vlan_iface']['addr']),
"relay_iface_mac": str(dhcp_relay['downlink_vlan_iface']['mac']),
"relay_link_local": str(dhcp_relay['uplink_interface_link_local']),
"dut_mac": str(dhcp_relay['uplink_mac']),
"vlan_ip": str(dhcp_relay['downlink_vlan_iface']['addr'])},
log_file="/tmp/dhcpv6_relay_test.DHCPCounterTest.log", is_python3=True)

for message in messages:
get_message = 'sonic-db-cli STATE_DB hget "DHCPv6_COUNTER_TABLE|{}" {}'.format(dhcp_relay['downlink_vlan_iface']['name'], message)
get_message = 'sonic-db-cli STATE_DB hget "DHCPv6_COUNTER_TABLE|{}" {}'\
.format(dhcp_relay['downlink_vlan_iface']['name'], message)
message_count = duthost.shell(get_message)['stdout']
assert int(message_count) > 0, "Missing {} count".format(message)

def test_dhcp_relay_default(tbinfo, ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist, testing_config):

def test_dhcp_relay_default(tbinfo, ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data,
validate_dut_routes_exist, testing_config):
"""Test DHCP relay functionality on T0 topology.
For each DHCP relay agent running on the DuT, verify DHCP packets are relayed properly
"""
testing_mode, duthost, testbed_mode = testing_config
skip_release(duthost, ["201811", "201911", "202106"]) #TO-DO: delete skip release on 201811 and 201911
skip_release(duthost, ["201811", "201911", "202106"]) # TO-DO: delete skip release on 201811 and 201911

if testing_mode == DUAL_TOR_MODE:
skip_release(duthost, ["201811", "201911"])
Expand All @@ -228,7 +254,8 @@ def test_dhcp_relay_default(tbinfo, ptfhost, duthosts, rand_one_dut_hostname, du
log_file="/tmp/dhcpv6_relay_test.DHCPTest.log", is_python3=True)


def test_dhcp_relay_after_link_flap(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist, testing_config):
def test_dhcp_relay_after_link_flap(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data,
validate_dut_routes_exist, testing_config):
"""Test DHCP relay functionality on T0 topology after uplinks flap
For each DHCP relay agent running on the DuT, with relay agent running, flap the uplinks,
then test whether the DHCP relay agent relays packets properly.
Expand All @@ -252,7 +279,7 @@ def test_dhcp_relay_after_link_flap(ptfhost, duthosts, rand_one_dut_hostname, du
duthost.shell('ifconfig {} up'.format(iface))

# Sleep a bit to ensure uplinks are up
time.sleep(20)
wait_all_bgp_up(duthost)

# Run the DHCP relay test on the PTF host
ptf_runner(ptfhost,
Expand All @@ -272,7 +299,8 @@ def test_dhcp_relay_after_link_flap(ptfhost, duthosts, rand_one_dut_hostname, du
log_file="/tmp/dhcpv6_relay_test.DHCPTest.log", is_python3=True)


def test_dhcp_relay_start_with_uplinks_down(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist, testing_config):
def test_dhcp_relay_start_with_uplinks_down(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data,
validate_dut_routes_exist, testing_config):
"""Test DHCP relay functionality on T0 topology when relay agent starts with uplinks down
For each DHCP relay agent running on the DuT, bring the uplinks down, then restart the
relay agent while the uplinks are still down. Then test whether the DHCP relay agent
Expand Down Expand Up @@ -307,7 +335,7 @@ def test_dhcp_relay_start_with_uplinks_down(ptfhost, duthosts, rand_one_dut_host
duthost.shell('ifconfig {} up'.format(iface))

# Sleep a bit to ensure uplinks are up
time.sleep(20)
wait_all_bgp_up(duthost)

# Run the DHCP relay test on the PTF host
ptf_runner(ptfhost,
Expand Down