55import tempfile
66import json
77
8- from scapy .all import sniff
8+ from scapy .all import sniff , Raw
99from ptf import testutils
1010
1111from tests .common .dualtor .mux_simulator_control import mux_server_url # noqa: F401
1212from tests .common .dualtor .mux_simulator_control import toggle_all_simulator_ports_to_rand_selected_tor # noqa: F401
13- from tests .common .utilities import is_ipv4_address
13+ from tests .common .utilities import is_ipv4_address , is_ipv6_address
1414from tests .common .utilities import wait_until , delete_running_config
1515from tests .common .utilities import skip_release
1616from tests .common .helpers .assertions import pytest_assert
2323DUT_ICMP_DUMP_FILE = "/tmp/icmp.pcap"
2424HOST_PORT_FLOODING_CHECK_COUNT = 5
2525ICMP_PKT_SRC_IP = "1.1.1.1"
26+ ICMP_PKT_SRC_IPV6 = "2001:db8::1"
2627ICMP_PKT_COUNT = 10
2728ICMP_PKT_FINGERPRINT = "HOSTVLANFLOODINGTEST"
2829
2930
3031@contextlib .contextmanager
3132def log_icmp_updates (duthost , iface , save_path ):
3233 """Capture icmp packets to file."""
33- start_pcap = "tcpdump -i %s -w %s icmp" % (iface , save_path )
34+ start_pcap = "tcpdump -i %s -w %s icmp or icmp6 " % (iface , save_path )
3435 stop_pcap = "pkill -f '%s'" % start_pcap
3536 start_pcap = "nohup %s &" % start_pcap
3637 duthost .shell (start_pcap )
@@ -49,8 +50,21 @@ def testbed_params(duthosts, rand_one_dut_hostname, tbinfo):
4950 vlan_intf_name = list (mg_facts ["minigraph_vlans" ].keys ())[0 ]
5051 vlan_member_ports = mg_facts ["minigraph_vlans" ][vlan_intf_name ]["members" ]
5152 vlan_member_ports_to_ptf_ports = {_ : mg_facts ["minigraph_ptf_indices" ][_ ] for _ in vlan_member_ports }
52- vlan_intf = [_ for _ in mg_facts ["minigraph_vlan_interfaces" ] if _ ["attachto" ] == vlan_intf_name
53- and is_ipv4_address (_ ["addr" ])][0 ]
53+
54+ # Try to find IPv4 address first, fall back to IPv6 if not found
55+ vlan_intf_ipv4 = [_ for _ in mg_facts ["minigraph_vlan_interfaces" ] if _ ["attachto" ] == vlan_intf_name
56+ and is_ipv4_address (_ ["addr" ])]
57+ vlan_intf_ipv6 = [_ for _ in mg_facts ["minigraph_vlan_interfaces" ] if _ ["attachto" ] == vlan_intf_name
58+ and is_ipv6_address (_ ["addr" ])]
59+
60+ # Prefer IPv4, but use IPv6 if no IPv4 is available
61+ if vlan_intf_ipv4 :
62+ vlan_intf = vlan_intf_ipv4 [0 ]
63+ elif vlan_intf_ipv6 :
64+ vlan_intf = vlan_intf_ipv6 [0 ]
65+ else :
66+ pytest .fail ("No IPv4 or IPv6 address found for VLAN interface %s" % vlan_intf_name )
67+
5468 return vlan_intf , vlan_member_ports_to_ptf_ports
5569
5670
@@ -138,13 +152,28 @@ def test_host_vlan_no_floodling(
138152 test_ptf_port_mac = ptfadapter .dataplane .get_mac (0 , test_ptf_port )
139153 dut_ports_to_check = selected_test_ports [1 :]
140154
141- icmp_pkt = testutils .simple_icmp_packet (
142- eth_dst = vlan_intf_mac ,
143- eth_src = test_ptf_port_mac ,
144- ip_src = ICMP_PKT_SRC_IP ,
145- ip_dst = vlan_intf ["addr" ],
146- icmp_data = ICMP_PKT_FINGERPRINT
147- )
155+ # Determine if we're using IPv4 or IPv6 and create appropriate ICMP packet
156+ is_ipv4 = is_ipv4_address (vlan_intf ["addr" ])
157+
158+ if is_ipv4 :
159+ icmp_pkt = testutils .simple_icmp_packet (
160+ eth_dst = vlan_intf_mac ,
161+ eth_src = test_ptf_port_mac ,
162+ ip_src = ICMP_PKT_SRC_IP ,
163+ ip_dst = vlan_intf ["addr" ],
164+ icmp_data = ICMP_PKT_FINGERPRINT
165+ )
166+ else :
167+ # For IPv6, simple_icmpv6_packet doesn't support icmp_data parameter
168+ # So we build the packet and add Raw payload manually
169+ icmp_pkt = testutils .simple_icmpv6_packet (
170+ eth_dst = vlan_intf_mac ,
171+ eth_src = test_ptf_port_mac ,
172+ ipv6_src = ICMP_PKT_SRC_IPV6 ,
173+ ipv6_dst = vlan_intf ["addr" ]
174+ )
175+ # Add custom payload data to the ICMPv6 packet for fingerprinting
176+ icmp_pkt = icmp_pkt / Raw (load = ICMP_PKT_FINGERPRINT )
148177
149178 ptfadapter .before_send = lambda * kargs , ** kwargs : time .sleep (.5 )
150179 for dut_port_to_check in dut_ports_to_check :
0 commit comments