Skip to content
286 changes: 268 additions & 18 deletions tests/drop_counters/test_drop_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@
def pkt_fields(duthost):
# Gather ansible facts
mg_facts = duthost.minigraph_facts(host=duthost.hostname)['ansible_facts']
ipv4_addr = None
ipv6_addr = None

for item in mg_facts["minigraph_bgp"]:
if item["name"] == mg_facts["minigraph_bgp"][0]["name"]:
if "." in item["addr"]:
ipv4_addr = item["addr"]
else:
ipv6_addr = item["addr"]

test_pkt_data = {
"ip_dst": mg_facts["minigraph_bgp"][0]["addr"],
"ip_src": "1.1.1.1",
"ipv4_dst": ipv4_addr,
"ipv4_src": "1.1.1.1",
"ipv6_dst": ipv6_addr,
"ipv6_src": "ffff::101:101",
"tcp_sport": 1234,
"tcp_dport": 4321
}
Expand All @@ -51,6 +62,7 @@ def setup(duthost, testbed):
"""
port_channel_members = {}
vlan_members = {}
configured_vlans = []
rif_members = []
combined_drop_counter = False

Expand Down Expand Up @@ -85,13 +97,17 @@ def setup(duthost, testbed):
for dut_port, neigh in mg_facts['minigraph_neighbors'].items():
neighbor_sniff_ports.append(mg_facts['minigraph_port_indices'][dut_port])

for vlan_name, vlans_data in mg_facts["minigraph_vlans"].items():
configured_vlans.append(int(vlans_data["vlanid"]))

setup_information = {
"port_channel_members": port_channel_members,
"vlan_members": vlan_members,
"rif_members": rif_members,
"dut_to_ptf_port_map": mg_facts["minigraph_port_indices"],
"combined_drop_counter": combined_drop_counter,
"neighbor_sniff_ports": neighbor_sniff_ports
"neighbor_sniff_ports": neighbor_sniff_ports,
"vlans": configured_vlans,
}

return setup_information
Expand Down Expand Up @@ -249,17 +265,41 @@ def base_verification(discard_group, pkt, ptfadapter, duthost, combined_counter,

def test_equal_smac_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with equal SMAC and DMAC is dropped and L2 drop cunter incremented
@summary: Verify that packet with equal SMAC and DMAC is dropped and L2 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, dst_mac, pkt_fields["ip_dst"], pkt_fields["ip_src"])
log_pkt_params(dut_iface, dst_mac, dst_mac, pkt_fields["ipv4_dst"], pkt_fields["ipv4_src"])

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=dst_mac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst=pkt_fields["ip_dst"], # DUT source
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_multicast_smac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with multicast SMAC is dropped and L2 drop counter incremented
"""
multicast_smac = "01:00:5e:00:01:02"
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, multicast_smac, pkt_fields["ipv4_dst"], pkt_fields["ipv4_src"])

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=multicast_smac,
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

Expand All @@ -270,20 +310,81 @@ def test_equal_smac_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fiel
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_reserved_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with reserved DMAC is dropped and L2 drop counter incremented
@used_mac_address:
01:80:C2:00:00:05 - reserved for future standardization
01:80:C2:00:00:08 - provider Bridge group address
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
reserved_mac_addr = ["01:80:C2:00:00:05", "01:80:C2:00:00:08"]

for reserved_dmac in reserved_mac_addr:
log_pkt_params(dut_iface, dst_mac, reserved_dmac, pkt_fields["ipv4_dst"], pkt_fields["ipv4_src"])
pkt = testutils.simple_tcp_packet(
eth_dst=reserved_dmac, # DUT port
eth_src=src_mac,
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_not_expected_vlan_tag_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that VLAN tagged packet which VLAN ID does not match ingress port VLAN ID is dropped
and L2 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ipv4_dst"], pkt_fields["ipv4_src"])

# Generate unexisted vlan value
vlan_id = None
while not vlan_id:
interim = random.randint(2, 1000)
if interim not in setup["vlans"]:
vlan_id = interim

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"],
dl_vlan_enable=True,
vlan_vid=vlan_id,
)

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_dst_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with loopback destination IP adress is dropped and L3 drop cunter incremented
@summary: Verify that packet with loopback destination IP adress is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_dst = "127.0.0.1"

log_pkt_params(dut_iface, dst_mac, src_mac, ip_dst, pkt_fields["ip_src"])
log_pkt_params(dut_iface, dst_mac, src_mac, ip_dst, pkt_fields["ipv4_src"])

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst=ip_dst, # DUT source
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst=ip_dst, # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

Expand All @@ -296,18 +397,18 @@ def test_dst_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_f

def test_src_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with loopback source IP adress is dropped and L3 drop cunter incremented
@summary: Verify that packet with loopback source IP adress is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_src = "127.0.0.1"

log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ip_dst"], ip_src)
log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ipv4_dst"], ip_src)

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=ip_src, # PTF source
ip_dst=pkt_fields["ip_dst"], # DUT source
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

Expand All @@ -320,17 +421,17 @@ def test_src_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_f

def test_dst_ip_absent(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with absent destination IP address is dropped and L3 drop cunter incremented
@summary: Verify that packet with absent destination IP address is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, src_mac, "", pkt_fields["ip_src"])
log_pkt_params(dut_iface, dst_mac, src_mac, "", pkt_fields["ipv4_src"])

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst="", # DUT source
ip_src=pkt_fields["ipv4_src"], # PTF source
ip_dst="", # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

Expand All @@ -339,3 +440,152 @@ def test_dst_ip_absent(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


@pytest.mark.parametrize("ip_addr", ["ipv4", "ipv6"])
def test_src_ip_is_multicast_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields, ip_addr):
"""
@summary: Verify that packet with multicast source IP adress is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_src = None

if ip_addr == "ipv4":
ip_src = "224.0.0.5"
pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=ip_src,
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])
elif ip_addr == "ipv6":
if not pkt_fields["ipv6_dst"]:
pytest.skip("BGP neighbour with IPv6 addr was not found")
ip_src = "FF02:AAAA:FEE5::1:3"
pkt = testutils.simple_tcpv6_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ipv6_src=ip_src,
ipv6_dst=pkt_fields["ipv6_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])
else:
pytest.fail("Incorrect value specified for 'ip_addr' test parameter. Supported parameters: 'ipv4' and 'ipv6'")

log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ipv4_dst"], ip_src)
base_verification("L3", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, tx_dut_ports[dut_iface])

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_src_ip_is_class_e(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with source IP address in class E is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_list = ["240.0.0.1", "255.255.255.254"]

for ip_class_e in ip_list:
log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ipv4_dst"], ip_class_e)

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=ip_class_e,
ip_dst=pkt_fields["ipv4_dst"], # VM source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

base_verification("L3", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, tx_dut_ports[dut_iface])

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


@pytest.mark.parametrize("addr_type, addr_direction", [("ipv4", "src"), ("ipv6", "src"), ("ipv4", "dst"),
("ipv6", "dst")])
def test_ip_is_zero_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields, addr_type, addr_direction):
"""
@summary: Verify that packet with "0.0.0.0" source or destination IP address is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
zero_ipv4 = "0.0.0.0"
zero_ipv6 = "::0"

pkt_params = {
"eth_dst": dst_mac, # DUT port
"eth_src": src_mac, # PTF port
"tcp_sport": pkt_fields["tcp_sport"],
"tcp_dport": pkt_fields["tcp_dport"]
}

if addr_type == "ipv4":
if addr_direction == "src":
pkt_params["ip_src"] = zero_ipv4
pkt_params["ip_dst"] = pkt_fields["ipv4_dst"] # VM source
elif addr_direction == "dst":
pkt_params["ip_src"] = pkt_fields["ipv4_src"] # VM source
pkt_params["ip_dst"] = zero_ipv4
else:
pytest.fail("Incorrect value specified for 'addr_direction'. Supported parameters: 'src' and 'dst'")
pkt = testutils.simple_tcp_packet(**pkt_params)
elif addr_type == "ipv6":
if not pkt_fields["ipv6_dst"]:
pytest.skip("BGP neighbour with IPv6 addr was not found")
if addr_direction == "src":
pkt_params["ipv6_src"] = zero_ipv6
pkt_params["ipv6_dst"] = pkt_fields["ipv6_dst"] # VM source
elif addr_direction == "dst":
pkt_params["ipv6_src"] = pkt_fields["ipv6_src"] # VM source
pkt_params["ipv6_dst"] = zero_ipv6
else:
pytest.fail("Incorrect value specified for 'addr_direction'. Supported parameters: 'src' and 'dst'")
pkt = testutils.simple_tcpv6_packet(**pkt_params)
else:
pytest.fail("Incorrect value specified for 'addr_type' test parameter. Supported parameters: 'ipv4' or 'ipv6'")

logger.info(pkt_params)
base_verification("L3", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id,
tx_dut_ports[dut_iface])

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


@pytest.mark.parametrize("addr_direction", ["src", "dst"])
def test_ip_link_local(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields, addr_direction):
"""
@summary: Verify that packet with link-local address "169.254.0.0/16" is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
link_local_ip = "169.254.10.125"

pkt_params = {
"eth_dst": dst_mac, # DUT port
"eth_src": src_mac, # PTF port
"tcp_sport": pkt_fields["tcp_sport"],
"tcp_dport": pkt_fields["tcp_dport"]
}

if addr_direction == "src":
pkt_params["ip_src"] = link_local_ip
pkt_params["ip_dst"] = pkt_fields["ipv4_dst"] # VM source
elif addr_direction == "dst":
pkt_params["ip_src"] = pkt_fields["ipv4_src"] # VM source
pkt_params["ip_dst"] = link_local_ip
else:
pytest.fail("Incorrect value specified for 'addr_direction'. Supported parameters: 'src' and 'dst'")
pkt = testutils.simple_tcp_packet(**pkt_params)

logger.info(pkt_params)
base_verification("L3", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id,
tx_dut_ports[dut_iface])

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])