Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions ansible/roles/test/files/ptftests/py3/bfd_responder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import ptf.packet as scapy
from ptf.base_tests import BaseTest
from scapy.contrib.bfd import BFD
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6
from ptf.testutils import (send_packet, test_params_get)
from ipaddress import ip_address, IPv4Address, IPv6Address
session_timeout = 1
Expand Down Expand Up @@ -161,19 +163,37 @@ def craft_bfd_packet(self,
bfd_remote_disc,
bfd_state):
ethpart = scapy.Ether(data)
bfdpart = BFD(bytes(ethpart.payload.payload.payload))

# Parse only the mandatory 24-byte BFD header; ignore any
# trailing data such as Simple Password authentication.
bfdpart = BFD(bytes(ethpart.payload.payload.payload)[:24])
bfdpart.my_discriminator = my_discriminator
bfdpart.your_discriminator = bfd_remote_disc
bfdpart.sta = bfd_state

ethpart.payload.payload.payload = bfdpart
ethpart.src = mac_dst
ethpart.dst = mac_src
ethpart.payload.src = ip_dst
ethpart.payload.dst = ip_src

# recompute UDP checksum
ethpart.payload.payload.chksum = None
ethpart.show()

return ethpart
# If the A (Auth Present) flag is set, scapy auto-creates a
# phantom OptionalAuth with default auth_key=b'password' even
# when no auth bytes exist on the wire. Clear both to prevent
# the 11-byte auth trailer from being serialized into the
# response packet.
bfdpart.flags = bfdpart.flags & ~0x04
bfdpart.optional_auth = None

udp_sport = ethpart[scapy.UDP].sport
udp_dport = ethpart[scapy.UDP].dport

# Build the response packet from scratch so no cached raw
# bytes (e.g. auth trailing data) leak from the parsed packet.
if ethpart.type == 0x86dd: # IPv6
pkt = (scapy.Ether(src=mac_dst, dst=mac_src, type=0x86dd) /
IPv6(src=ip_dst, dst=ip_src, hlim=255) /
scapy.UDP(sport=udp_sport, dport=udp_dport) /
bfdpart)
else: # IPv4
pkt = (scapy.Ether(src=mac_dst, dst=mac_src, type=0x0800) /
IP(src=ip_dst, dst=ip_src, ttl=255) /
scapy.UDP(sport=udp_sport, dport=udp_dport) /
bfdpart)

pkt.show()
return pkt
Loading