diff --git a/tests/drop_counters/test_drop_counters.py b/tests/drop_counters/test_drop_counters.py index 2054dc57d41..b47f22e6fbd 100755 --- a/tests/drop_counters/test_drop_counters.py +++ b/tests/drop_counters/test_drop_counters.py @@ -854,7 +854,7 @@ def test_ip_pkt_with_expired_ttl(ptfadapter, duthost, setup, tx_dut_ports, pkt_f do_test("L3", pkt, ptfadapter, duthost, ports_info, setup["neighbor_sniff_ports"], tx_dut_ports) -@pytest.mark.parametrize("igmp_version,msg_type", [("v1", "membership_query"), ("v3", "membership_query"), ("v1", "membership_report"), +@pytest.mark.parametrize("igmp_version,msg_type", [("v1", "general_query"), ("v3", "general_query"), ("v1", "membership_report"), ("v2", "membership_report"), ("v3", "membership_report"), ("v2", "leave_group")]) def test_non_routable_igmp_pkts(ptfadapter, duthost, setup, tx_dut_ports, pkt_fields, igmp_version, msg_type, ports_info): """ @@ -875,7 +875,7 @@ def test_non_routable_igmp_pkts(ptfadapter, duthost, setup, tx_dut_ports, pkt_fi # Leave Message ALL-ROUTERS (224.0.0.2) # TODO: fix this workaround as of now current PTF and Scapy versions do not support creation of IGMP packets - # Temporaly created hex of IGMP packet layer + # Temporaly created hex of IGMP packet layer by using scapy version 2.4.3. # Example how to get HEX of specific IGMP packets: # v3_membership_query = IGMPv3(type=0x11, mrcode=0, chksum=None)/scapy.contrib.igmpv3.IGMPv3mq(gaddr="224.0.0.1", # srcaddrs=["172.16.11.1", "10.0.0.59"], qrv=1, qqic=125, numsrc=2) @@ -885,33 +885,45 @@ def test_non_routable_igmp_pkts(ptfadapter, duthost, setup, tx_dut_ports, pkt_fi # records=[gr_obj]).build() # The rest packets are build like "simple_igmp_packet" function from PTF testutils.py - ethernet_dst = {"membership_query": "01:00:5e:00:00:01", - "membership_report": "01:00:5e:02:02:04", - "leave_group": "01:00:5e:00:00:02"} - ip_dst = {"membership_query": "224.0.0.1", - "membership_report": "224.2.2.4", - "leave_group": "224.0.0.2"} - igmp_types = {"v1": {"membership_query": "\x11\x00\x0e\xfe\xe0\x00\x00\x01", - "membership_report": "\x12\x00\x0b\xf9\xe0\x02\x02\x04"}, - "v2": {"membership_report": "\x16\x00\x07\xf9\xe0\x02\x02\x04", - "leave_group": "\x17\x00\x08\xfd\xe0\x00\x00\x02"}, - "v3": {"membership_query": "\x11\x00L2\xe0\x00\x00\x01\x01}\x00\x02\xac\x10\x0b\x01\n\x00\x00;", - "membership_report": "\"\x009\xa9\x00\x00\x00\x01\x01\x00\x00\x02\xe0\x02\x02\x04\xac\x10\x0b\x01\n\x00\x00;"} - } + from scapy.contrib.igmp import IGMP + Ether = testutils.scapy.Ether + IP = testutils.scapy.IP - log_pkt_params(ports_info["dut_iface"], ethernet_dst[msg_type], ports_info["src_mac"], ip_dst[msg_type], pkt_fields["ipv4_src"]) + if "vlan" in tx_dut_ports[ports_info["dut_iface"]].lower() and msg_type == "membership_report": + pytest.skip("Test case is not supported on VLAN interface") - pkt = testutils.simple_ip_packet( - eth_dst=ethernet_dst[msg_type], # DUT port - eth_src=ports_info["src_mac"], # PTF port - ip_src=pkt_fields["ipv4_src"], # PTF source - ip_dst=ip_dst[msg_type], - ip_ttl=1, - ) + igmp_proto = 0x02 + multicast_group_addr = "224.1.1.1" + ethernet_dst = "01:00:5e:01:01:01" + ip_dst = {"general_query": "224.0.0.1", + "membership_report": multicast_group_addr} + igmp_types = {"v1": {"general_query": IGMP(type=0x11, gaddr="224.0.0.1"), + "membership_report": IGMP(type=0x12, gaddr=multicast_group_addr)}, + "v2": {"membership_report": IGMP(type=0x16, gaddr=multicast_group_addr), + "leave_group": IGMP(type=0x17, gaddr=multicast_group_addr)}, + "v3": {"general_query": "\x11\x00L2\xe0\x00\x00\x01\x01}\x00\x02\xac\x10\x0b\x01\n\x00\x00;", + "membership_report": "\"\x009\xa9\x00\x00\x00\x01\x01\x00\x00\x02\xe0\x02\x02\x04\xac\x10\x0b\x01\n\x00\x00;"} + } - del pkt[testutils.scapy.scapy.all.Raw] - pkt = pkt / igmp_types[igmp_version][msg_type] + if igmp_version == "v3": + pkt = testutils.simple_ip_packet( + eth_dst=ethernet_dst, + eth_src=ports_info["src_mac"], + ip_src=pkt_fields["ipv4_src"], + ip_dst=ip_dst[msg_type], + ip_ttl=1, + ip_proto=igmp_proto + ) + del pkt["Raw"] + pkt = pkt / igmp_types[igmp_version][msg_type] + else: + eth_layer = Ether(src=ports_info["src_mac"], dst=ethernet_dst) + ip_layer = IP(src=pkt_fields["ipv4_src"], ) + igmp_layer = igmp_types[igmp_version][msg_type] + assert igmp_layer.igmpize(ip=ip_layer, ether=eth_layer), "Can't create IGMP packet" + pkt = eth_layer/ip_layer/igmp_layer + log_pkt_params(ports_info["dut_iface"], ethernet_dst, ports_info["src_mac"], pkt.getlayer("IP").dst, pkt_fields["ipv4_src"]) do_test("L3", pkt, ptfadapter, duthost, ports_info, setup["dut_to_ptf_port_map"].values(), tx_dut_ports)