Skip to content

Commit 2634e6b

Browse files
committed
ISS-1046: New test for bgp IPv6 link-local peering (sonic-net#148)
* adding test for bgp_link_local
1 parent 27417eb commit 2634e6b

1 file changed

Lines changed: 303 additions & 0 deletions

File tree

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
"""
2+
Test BGP functionality using IPv6 link-local addresses for peering.
3+
"""
4+
import json
5+
import logging
6+
import pytest
7+
from tests.common.helpers.assertions import pytest_assert
8+
from tests.common.utilities import wait_until
9+
from tests.common.devices.eos import EosHost
10+
from tests.common.devices.sonic import SonicHost
11+
from tests.common.config_reload import config_reload
12+
from tests.common.gu_utils import (
13+
generate_tmpfile,
14+
delete_tmpfile,
15+
apply_patch,
16+
expect_op_success,
17+
format_json_patch_for_multiasic
18+
)
19+
20+
logger = logging.getLogger(__name__)
21+
22+
pytestmark = [
23+
pytest.mark.topology('t1'),
24+
]
25+
26+
27+
def configure_bgp_link_local(host, local_asn, peer_asn, interface, is_dut=False):
28+
"""
29+
Configure BGP link-local peering
30+
"""
31+
if is_dut and host.get_frr_mgmt_framework_config():
32+
# Use JSON patch for DUT when FRR management framework is enabled
33+
json_patch = [
34+
{
35+
"op": "add",
36+
"path": f"/BGP_NEIGHBOR/{interface}",
37+
"value": {
38+
"asn": str(peer_asn),
39+
"local_addr": {interface},
40+
"name": interface,
41+
"peer_type": "dynamic"
42+
}
43+
}
44+
]
45+
46+
json_patch = format_json_patch_for_multiasic(duthost=host, json_data=json_patch, is_asic_specific=True)
47+
tmpfile = generate_tmpfile(host)
48+
49+
try:
50+
output = apply_patch(host, json_data=json_patch, dest_file=tmpfile)
51+
expect_op_success(host, output)
52+
finally:
53+
delete_tmpfile(host, tmpfile)
54+
else:
55+
# Use existing vtysh commands for non-DUT or when FRR management is disabled
56+
commands = [
57+
"configure terminal",
58+
f"router bgp {local_asn}",
59+
f"neighbor {interface} interface v6only",
60+
f"neighbor {interface} remote-as {peer_asn}",
61+
"address-family ipv6 unicast",
62+
f"neighbor {interface} activate"
63+
]
64+
65+
commands.append("end")
66+
67+
if isinstance(host, EosHost):
68+
host.run_command_list(commands)
69+
elif isinstance(host, dict) and 'host' in host:
70+
host['host'].command("vtysh -c '" + "' -c '".join(commands) + "'")
71+
else:
72+
host.shell("vtysh -c '" + "' -c '".join(commands) + "'")
73+
74+
75+
def check_bgp_session_state(host, neighbor_addr, interface):
76+
"""
77+
Check if BGP session is established
78+
"""
79+
cmd = "show bgp ipv6 unicast summary json"
80+
logger.info("Checking BGP session state...")
81+
if isinstance(host, EosHost):
82+
# For EOS neighbors
83+
result = json.loads(host.run_command_list([cmd])[0])
84+
elif isinstance(host, dict) and 'host' in host:
85+
# For Sonic neighbors
86+
result = json.loads(host['host'].command(f"vtysh -c '{cmd}'")['stdout'])
87+
else:
88+
# For DUT
89+
result = json.loads(host.shell(f"vtysh -c '{cmd}'")['stdout'])
90+
91+
logger.info(f"Checking BGP session state for interface {interface}")
92+
logger.info(f"BGP summary result: {result}")
93+
94+
# Get peers directly from the result
95+
peers = result.get('peers', {})
96+
logger.info(f"Found peers: {peers}")
97+
98+
# Check if the interface is directly in peers
99+
if interface in peers:
100+
peer_data = peers[interface]
101+
logger.info(f"Found peer data for {interface}: {peer_data}")
102+
return peer_data['state'] == 'Established'
103+
104+
logger.info(f"No matching peer found for interface {interface}")
105+
return False
106+
107+
108+
def get_first_ipv6_ethernet_interface(duthost):
109+
"""
110+
Get the first Ethernet interface that has IPv6 enabled and is up
111+
Returns tuple of (interface_name, success)
112+
"""
113+
try:
114+
ipv6_interfaces = duthost.show_and_parse("show ipv6 interfaces")
115+
116+
for iface_info in ipv6_interfaces:
117+
interface = iface_info["interface"]
118+
if interface.startswith("Ethernet") and iface_info["admin/oper"] == "up/up":
119+
return interface, True
120+
121+
return None, False
122+
except Exception as e:
123+
logger.error(f"Failed to get IPv6 interface information: {str(e)}")
124+
return None, False
125+
126+
127+
def cleanup_bgp_config(duthost, peer_host, peer_name):
128+
"""
129+
Cleanup configuration by performing config reload on both DUT and affected peer
130+
"""
131+
logger.info("Cleaning up configuration with config reload")
132+
133+
# Cleanup DUT configuration
134+
try:
135+
config_reload(duthost, config_source='config_db', wait=60)
136+
logger.info("Successfully reloaded DUT configuration")
137+
except Exception as e:
138+
logger.error(f"Failed to reload DUT configuration: {str(e)}")
139+
140+
# Cleanup peer configuration
141+
try:
142+
if isinstance(peer_host, dict) and 'host' in peer_host:
143+
host = peer_host['host']
144+
else:
145+
host = peer_host
146+
147+
config_reload(host, config_source='config_db', wait=60, is_dut=False)
148+
logger.info(f"Successfully reloaded peer {peer_name} configuration")
149+
except Exception as e:
150+
logger.error(f"Failed to reload peer {peer_name} configuration: {str(e)}")
151+
152+
153+
def deactivate_global_bgp_neighbor(host, asn, neighbor_addr, is_dut=False):
154+
"""
155+
Deactivate a global BGP neighbor configuration
156+
"""
157+
logger.info(f"Deactivating global BGP neighbor {neighbor_addr}")
158+
159+
if is_dut and host.get_frr_mgmt_framework_config():
160+
# Use JSON patch for DUT when FRR management framework is enabled
161+
json_patch = [
162+
{
163+
"op": "replace",
164+
"path": f"/BGP_NEIGHBOR/{neighbor_addr}/admin_status",
165+
"value": "down"
166+
}
167+
]
168+
tmpfile = generate_tmpfile(host)
169+
try:
170+
output = apply_patch(host, json_data=json_patch, dest_file=tmpfile)
171+
expect_op_success(host, output)
172+
finally:
173+
delete_tmpfile(host, tmpfile)
174+
else:
175+
commands = [
176+
"configure terminal",
177+
f"router bgp {asn}",
178+
"address-family ipv6 unicast",
179+
f"no neighbor {neighbor_addr} activate",
180+
"end"
181+
]
182+
if isinstance(host, dict) and 'host' in host:
183+
host['host'].command("vtysh -c '" + "' -c '".join(commands) + "'")
184+
else:
185+
host.shell("vtysh -c '" + "' -c '".join(commands) + "'")
186+
187+
188+
def test_bgp_link_local_peer(duthosts, rand_one_dut_hostname, nbrhosts, tbinfo):
189+
"""
190+
Test BGP peering over IPv6 link-local address.
191+
"""
192+
# Skip if neighbors are not sonic hosts
193+
for nbr in nbrhosts.values():
194+
if not isinstance(nbr['host'], SonicHost):
195+
pytest.skip("Test requires sonic neighbors")
196+
197+
duthost = duthosts[rand_one_dut_hostname]
198+
199+
# Get first available IPv6 Ethernet interface
200+
dut_interface, success = get_first_ipv6_ethernet_interface(duthost)
201+
pytest_assert(success and dut_interface,
202+
"Failed to find an Ethernet interface with IPv6 enabled and up")
203+
204+
# Log testbed information for debugging
205+
logger.info(f"Testing with DUT: {duthost.hostname}")
206+
logger.info(f"Available neighbor hosts: {list(nbrhosts.keys())}")
207+
logger.info(f"Selected DUT interface: {dut_interface}")
208+
209+
config_facts = duthost.get_running_config_facts()
210+
dut_asn = config_facts['DEVICE_METADATA']['localhost']['bgp_asn']
211+
212+
# Get peer ASN and host from minigraph
213+
mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
214+
bgp_neighbors = mg_facts.get('minigraph_bgp', [])
215+
216+
peer_asn = None
217+
peer_name = None
218+
for neighbor in bgp_neighbors:
219+
if neighbor['name'] in nbrhosts:
220+
peer_asn = neighbor['asn']
221+
peer_name = neighbor['name']
222+
break
223+
224+
pytest_assert(peer_asn is not None, "Could not determine peer ASN")
225+
pytest_assert(peer_name is not None, "Could not find peer name")
226+
227+
logger.info(f"Selected peer: {peer_name} (ASN: {peer_asn})")
228+
229+
# Get the corresponding peer interface
230+
peer_interfaces = mg_facts['minigraph_neighbors']
231+
232+
peer_interface = None
233+
if dut_interface in peer_interfaces:
234+
peer_data = peer_interfaces[dut_interface]
235+
if peer_data['name'] in nbrhosts:
236+
peer_interface = peer_data['port']
237+
238+
pytest_assert(peer_interface,
239+
f"Failed to find peer interface corresponding to DUT interface {dut_interface}")
240+
241+
logger.info(f"Selected peer interface: {peer_interface}")
242+
243+
try:
244+
# Find and deactivate the global BGP neighbor for our test peer
245+
for neighbor in bgp_neighbors:
246+
if neighbor['name'] == peer_name and ':' in neighbor['addr']: # IPv6 peer we're testing
247+
logger.info(f"Deactivating global BGP neighbor for test peer {peer_name}")
248+
deactivate_global_bgp_neighbor(duthost, dut_asn, neighbor['addr'], is_dut=True)
249+
deactivate_global_bgp_neighbor(nbrhosts[peer_name], neighbor['asn'],
250+
neighbor['peer_addr'], is_dut=False)
251+
break
252+
253+
# Configure BGP on DUT
254+
logger.info(f"Configuring BGP on DUT (interface: {dut_interface})")
255+
configure_bgp_link_local(duthost, dut_asn, peer_asn, dut_interface, is_dut=True)
256+
257+
# Configure BGP on peer
258+
logger.info(f"Configuring BGP on peer (interface: {peer_interface})")
259+
configure_bgp_link_local(nbrhosts[peer_name], peer_asn, dut_asn, peer_interface, is_dut=False)
260+
261+
# Wait for BGP session to establish on DUT
262+
logger.info("Waiting for BGP session to establish on DUT...")
263+
dut_established = wait_until(30, 1, 0, lambda: check_bgp_session_state(duthost, None, dut_interface))
264+
logger.info(f"DUT BGP session established: {dut_established}")
265+
266+
# Wait for BGP session to establish on peer
267+
logger.info("Waiting for BGP session to establish on peer...")
268+
peer_established = wait_until(
269+
30, 1, 0,
270+
lambda: check_bgp_session_state(
271+
nbrhosts[peer_name],
272+
None,
273+
peer_interface
274+
)
275+
)
276+
logger.info(f"Peer BGP session established: {peer_established}")
277+
278+
pytest_assert(dut_established, f"BGP session failed to establish on DUT (interface {dut_interface})")
279+
pytest_assert(peer_established, f"BGP session failed to establish on peer (interface {peer_interface})")
280+
281+
# Verify route exchange
282+
def check_received_prefixes():
283+
dut_cmd = f"show bgp ipv6 unicast neighbor {dut_interface} prefix-count json"
284+
logger.info(f"Checking DUT received prefixes with command: {dut_cmd}")
285+
286+
dut_neighbor_info = json.loads(duthost.shell(f"vtysh -c '{dut_cmd}'")['stdout'])
287+
logger.info(f"DUT neighbor info: {json.dumps(dut_neighbor_info, indent=2)}")
288+
289+
dut_received_prefixes = int(dut_neighbor_info.get('pfxCounter', 0))
290+
logger.info(f"DUT received {dut_received_prefixes} prefixes from peer")
291+
292+
return dut_received_prefixes > 0
293+
294+
logger.info("Waiting for DUT to receive prefixes from peer...")
295+
pytest_assert(
296+
wait_until(30, 5, 0, check_received_prefixes),
297+
"No prefixes received on DUT from peer after 60 seconds"
298+
)
299+
300+
finally:
301+
# Config reload will restore original configuration
302+
cleanup_bgp_config(duthost, nbrhosts[peer_name], peer_name)
303+
logger.info("Cleanup completed")

0 commit comments

Comments
 (0)