Skip to content
102 changes: 97 additions & 5 deletions tests/drop_counters/test_drop_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def setup(duthost, testbed):
"""
port_channel_members = {}
vlan_members = {}
configured_vlans = []
rif_members = []
combined_drop_counter = False

Expand Down Expand Up @@ -85,13 +86,19 @@ def setup(duthost, testbed):
for dut_port, neigh in mg_facts['minigraph_neighbors'].items():
neighbor_sniff_ports.append(mg_facts['minigraph_port_indices'][dut_port])

for vlan_name in mg_facts["minigraph_vlans"].keys():
match = re.findall("\d{1,4}", vlan_name)
if match:
configured_vlans.append(int(match[0]))

setup_information = {
"port_channel_members": port_channel_members,
"vlan_members": vlan_members,
"rif_members": rif_members,
"dut_to_ptf_port_map": mg_facts["minigraph_port_indices"],
"combined_drop_counter": combined_drop_counter,
"neighbor_sniff_ports": neighbor_sniff_ports
"neighbor_sniff_ports": neighbor_sniff_ports,
"vlans": configured_vlans
}

return setup_information
Expand Down Expand Up @@ -249,7 +256,7 @@ def base_verification(discard_group, pkt, ptfadapter, duthost, combined_counter,

def test_equal_smac_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with equal SMAC and DMAC is dropped and L2 drop cunter incremented
@summary: Verify that packet with equal SMAC and DMAC is dropped and L2 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

Expand All @@ -270,9 +277,94 @@ def test_equal_smac_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fiel
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_multicast_smac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with multicast SMAC is dropped and L2 drop counter incremented
"""
multicast_smac = "01:00:5e:00:01:02"
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, multicast_smac, pkt_fields["ip_dst"], pkt_fields["ip_src"])

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=multicast_smac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst=pkt_fields["ip_dst"], # DUT source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_reserved_dmac_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with reserved DMAC is dropped and L2 drop counter incremented
@used_mac_address:
01:80:C2:00:00:05 - reserved for future standardization
01:80:C2:00:00:08 - provider Bridge group address
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
reserved_mac_addr = ["01:80:C2:00:00:05", "01:80:C2:00:00:08"]

for reserved_dmac in reserved_mac_addr:
log_pkt_params(dut_iface, dst_mac, reserved_dmac, pkt_fields["ip_dst"], pkt_fields["ip_src"])
pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=reserved_dmac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst=pkt_fields["ip_dst"], # DUT source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"])

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_not_expected_vlan_tag_drop(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that VLAN tagged packet which VLAN ID does not match ingress port VLAN ID is dropped
and L2 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

log_pkt_params(dut_iface, dst_mac, src_mac, pkt_fields["ip_dst"], pkt_fields["ip_src"])

# Generate unexisted vlan value
vlan_id = None
while not vlan_id:
interim = random.randint(2, 1000)
if interim not in setup["vlans"]:
vlan_id = interim

pkt = testutils.simple_tcp_packet(
eth_dst=dst_mac, # DUT port
eth_src=src_mac, # PTF port
ip_src=pkt_fields["ip_src"], # PTF source
ip_dst=pkt_fields["ip_dst"], # DUT source
tcp_sport=pkt_fields["tcp_sport"],
tcp_dport=pkt_fields["tcp_dport"],
dl_vlan_enable=True,
vlan_vid=vlan_id,
)

base_verification("L2", pkt, ptfadapter, duthost, setup["combined_drop_counter"], ptf_tx_port_id, dut_iface)

# Verify packets were not egresed the DUT
exp_pkt = expected_packet_mask(pkt)
testutils.verify_no_packet_any(ptfadapter, exp_pkt, ports=setup["neighbor_sniff_ports"])


def test_dst_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with loopback destination IP adress is dropped and L3 drop cunter incremented
@summary: Verify that packet with loopback destination IP adress is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_dst = "127.0.0.1"
Expand All @@ -296,7 +388,7 @@ def test_dst_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_f

def test_src_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with loopback source IP adress is dropped and L3 drop cunter incremented
@summary: Verify that packet with loopback source IP adress is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)
ip_src = "127.0.0.1"
Expand All @@ -320,7 +412,7 @@ def test_src_ip_is_loopback_addr(ptfadapter, duthost, setup, tx_dut_ports, pkt_f

def test_dst_ip_absent(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields):
"""
@summary: Verify that packet with absent destination IP address is dropped and L3 drop cunter incremented
@summary: Verify that packet with absent destination IP address is dropped and L3 drop counter incremented
"""
dut_iface, ptf_tx_port_id, dst_mac, src_mac = get_test_ports_info(ptfadapter, duthost, setup, tx_dut_ports)

Expand Down