Skip to content
Closed
66 changes: 50 additions & 16 deletions ansible/library/check_bgp_ipv6_routes_converged.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import base64


# Constants
INTERFACE_COMMAND_TEMPLATE = "sudo config interface {action} {target}"
BGP_COMMAND_TEMPLATE = "sudo config bgp {action} {target}"


def get_bgp_ipv6_routes(module):
cmd = "docker exec bgp vtysh -c 'show ipv6 route bgp json'"
rc, out, err = module.run_command(cmd, executable='/bin/bash', use_unsafe_shell=True)
Expand All @@ -18,6 +23,40 @@ def get_bgp_ipv6_routes(module):
return json.loads(out)


def perform_action(module, action, connection_type, targets, all_neighbors):
"""
Perform actions (shutdown/startup) on BGP sessions or interfaces.
"""
# Action on BGP sessions
if connection_type == "bgp_sessions":
if all_neighbors:
cmd = BGP_COMMAND_TEMPLATE.format(action=action, target="all")
execute_command(module, cmd)
else:
for session in targets:
target_session = "neighbor " + session
cmd = BGP_COMMAND_TEMPLATE.format(action=action, target=target_session)
execute_command(module, cmd)
logging.info(f"BGP sessions {action} completed.")
# Action on Interfaces
elif connection_type == "ports":
ports_str = ",".join(targets)
cmd = INTERFACE_COMMAND_TEMPLATE.format(action=action, target=ports_str)
execute_command(module, cmd)
logging.info(f"Interfaces {action} completed.")
else:
logging.info("No valid connection type provided for %s.", action)


def execute_command(module, cmd):
"""Helper function to execute shell commands."""
logging.info("Running command: %s", cmd)
rc, out, err = module.run_command(cmd, executable="/bin/bash", use_unsafe_shell=True)
if rc != 0:
module.fail_json(msg=f"Command failed: {err}")
logging.info("Command completed successfully.")


def compare_routes(running_routes, expected_routes):
expected_set = set(expected_routes.keys())
running_set = set(running_routes.keys())
Expand Down Expand Up @@ -48,7 +87,9 @@ def main():
module = AnsibleModule(
argument_spec=dict(
expected_routes=dict(required=True, type='str'),
shutdown_ports=dict(required=True, type='list', elements='str'),
shutdown_connections=dict(required=True, type='list', elements='str'),
connection_type=dict(required=False, type='str', choices=['ports', 'bgp_sessions', 'none'], default='none'),
shutdown_all_connections=dict(required=False, type='bool', default=False),
timeout=dict(required=False, type='int', default=300),
interval=dict(required=False, type='int', default=1),
log_path=dict(required=False, type='str', default='/tmp'),
Expand All @@ -71,7 +112,9 @@ def main():
else:
expected_routes = json.loads(module.params['expected_routes'])

shutdown_ports = module.params['shutdown_ports']
shutdown_connections = module.params.get('shutdown_connections', [])
connection_type = module.params.get('connection_type', 'none')
shutdown_all_connections = module.params['shutdown_all_connections']
timeout = module.params['timeout']
interval = module.params['interval']
action = module.params.get('action', 'no_action')
Expand All @@ -80,20 +123,11 @@ def main():
start_time = time.time()
logging.info("start time: %s", datetime.datetime.fromtimestamp(start_time).strftime("%H:%M:%S"))

# interface operation based on action
if action in ["shutdown", "startup"] and shutdown_ports:
ports_str = ",".join(shutdown_ports)
if action == "shutdown":
cmd = "sudo config interface shutdown {}".format(ports_str)
else:
cmd = "sudo config interface startup {}".format(ports_str)
logging.info("The command is: %s", cmd)
rc, out, err = module.run_command(cmd, executable='/bin/bash', use_unsafe_shell=True)
if rc != 0:
module.fail_json(msg=f"Failed to {action} ports: {err}")
logging.info("%s ports: %s", action, ports_str)
if not shutdown_connections or action == 'no_action':
logging.info("No connections or action is 'no_action', skipping interface operation.")
else:
logging.info("action is no_action or no shutdown_ports, skip interface operation.")
# interface operation based on action
perform_action(module, action, connection_type, shutdown_connections, shutdown_all_connections)

# Sleep some time to wait routes to be converged
time.sleep(4)
Expand All @@ -105,7 +139,7 @@ def main():
logging.info(f"BGP routes check round: {check_count}")
# record the time before getting routes in this round
before_get_route_time = time.time()
logging.info(f"Before get route time:"
logging.info(f"Before get route time: "
f" {datetime.datetime.fromtimestamp(before_get_route_time).strftime('%H:%M:%S')}")
running_routes = get_bgp_ipv6_routes(module)
logging.info("Obtained the routes")
Expand Down
Loading
Loading