Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 188 additions & 16 deletions ansible/roles/vm_set/files/mux_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,53 @@

This script should be started keep running in background when a topology is created by 'testbed-cli.sh add-topo'.
"""
from __future__ import print_function
import json
import logging
import os
import re
import shlex
import subprocess
import sys

from collections import defaultdict

from flask import Flask, request, jsonify


app = Flask(__name__)


logging.basicConfig(
filename='/tmp/mux_simulator.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')


def run_cmd(cmdline):
"""Use subprocess to run a command line with shell=True

Args:
cmdline (string): The command line to be executed.
cmdline (string): The command to be executed.

Raises:
Exception: If return code of running command line is not zero, an exception is raised.

Returns:
string: The stdout of running the command line.
"""
app.logger.debug(cmdline)
process = subprocess.Popen(
cmdline.split(), # lgtm [py/command-line-injection]
shlex.split(cmdline), # lgtm [py/command-line-injection]
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
ret_code = process.returncode

msg = 'cmd={}, ret_code={}, stdout={}, stderr={}'.format(cmdline, ret_code, stdout, stderr)
app.logger.debug(msg)

if ret_code != 0:
raise Exception('ret_code={}, error message="{}". cmd={}'.format(ret_code, stderr, cmdline))
raise Exception(msg)

return stdout.decode('utf-8')

Expand Down Expand Up @@ -170,7 +180,7 @@ def get_flows(vm_set, port_index):
for in_port, actions_desc in parsed:
actions = []
for field in actions_desc.split(','):
action, out_port = re.search(r'(\S+):"(\S+)"', field).groups()
action, out_port = re.search(r'([^:]+)(?:\:"(\S+)")?', field).groups()
actions.append({'action': action, 'out_port': out_port})
flows[in_port] = actions

Expand All @@ -186,8 +196,8 @@ def get_active_port(flows):
Returns:
string or None: Name of the active port or None if something is wrong.
"""
for in_port, actions in flows.items():
if len(actions) == 1:
for in_port in flows.keys():
if not in_port.startswith('muxy-'):
return in_port
return None

Expand All @@ -197,7 +207,7 @@ def get_mux_status(vm_set, port_index):

Args:
vm_set (string): The vm_set of test setup.
port_index ([type]): Index of the port.
port_index (int or string): Index of the port.

Raises:
Exception: If no active port is found, raise an exception.
Expand All @@ -215,6 +225,7 @@ def get_mux_status(vm_set, port_index):
"tor_b": "enp59s0f1.3272"
},
"active_side": "tor_a",
"active_port": "enp59s0f1.3216",
"flows": {
"muxy-vms17-8-0":
[
Expand Down Expand Up @@ -247,6 +258,7 @@ def get_mux_status(vm_set, port_index):
for side, port in mux_status['ports'].items():
if port == active_port:
mux_status['active_side'] = side
mux_status['active_port'] = port
return mux_status


Expand All @@ -255,7 +267,7 @@ def set_active_side(vm_set, port_index, new_active_side):

Args:
vm_set (string): The vm_set of test setup.
port_index ([type]): Index of the port.
port_index (int or string): Index of the port.
new_active_side (string): Either "tor_a" or "tor_b".

Returns:
Expand All @@ -273,6 +285,7 @@ def set_active_side(vm_set, port_index, new_active_side):
run_cmd('ovs-ofctl --names add-flow mbr-{}-{} in_port="{}",actions=output:"{}"'
.format(vm_set, port_index, new_active_port, nic_port))
mux_status['active_side'] = new_active_side
mux_status['active_port'] = new_active_port
return mux_status


Expand All @@ -281,7 +294,7 @@ def _validate_param(vm_set, port_index=None):

Args:
vm_set (string): The vm_set of test setup.
port_index ([type]): Index of the port.
port_index (int or string): Index of the port.

Returns:
tuple: Return the result in a tuple. The first item is either True or False. The second item is extra message.
Expand Down Expand Up @@ -325,6 +338,7 @@ def mux_cable(vm_set, port_index):
"""
valid, msg = _validate_param(vm_set, port_index)
if not valid:
app.logger.error('{} {} {}'.format(request.method, request.url, msg))
return jsonify({'err_msg': msg}), 400

if request.method == 'GET':
Expand All @@ -333,19 +347,24 @@ def mux_cable(vm_set, port_index):
mux_status = get_mux_status(vm_set, port_index)
return jsonify(mux_status)
except Exception as e:
return jsonify({'err_msg': 'Get mux status failed: {}'.format(repr(e))}), 500
err_msg = 'Get mux status failed: {}'.format(repr(e))
app.logger.error('{} {} {}'.format(request.method, request.url, err_msg))
return jsonify({'err_msg': err_msg}), 500
else:
# Set the active side of mux
data = request.get_json()
valid, msg = _validate_posted_data(data)
if not valid:
app.logger.error('{} {} {}'.format(request.method, request.url, msg))
return jsonify({'err_msg': msg}), 400

try:
mux_status = set_active_side(vm_set, port_index, data['active_side'])
return jsonify(mux_status)
except Exception as e:
return jsonify({'err_msg': 'Set active side failed: {}'.format(repr(e))}), 500
err_msg = 'Set active side failed: {}'.format(repr(e))
app.logger.error('{} {} {}'.format(request.method, request.url, err_msg))
return jsonify({'err_msg': err_msg}), 500


def get_mux_bridges(vm_set):
Expand Down Expand Up @@ -384,16 +403,169 @@ def all_mux_status(vm_set):
all_mux_status[bridge] = get_mux_status(vm_set, port_index)
return jsonify(all_mux_status)
except Exception as e:
return jsonify({'err_msg': 'Get all mux status failed, vm_set: {}, exception: {}'.format(vm_set, repr(e))}), 500
err_msg = 'Get all mux status failed, vm_set: {}, exception: {}'.format(vm_set, repr(e))
app.logger.error('{} {} {}'.format(request.method, request.url, err_msg))
return jsonify({'err_msg': err_msg}), 500


def _validate_out_ports(data):
"""Validate the posted data for updating flow action.

Args:
data (dict): Posted json data. Expected:
{"out_ports": [<port>, <port>, ...]}
where <port> could be "nic", "tor_a" or "tor_b".

Returns:
tuple: Return the result in a tuple. The first item is either True or False. The second item is extra message.
"""
supported_out_ports = ['nic', 'tor_a', 'tor_b']
try:
assert 'out_ports' in data, 'Missing "out_ports" field'
for port in data['out_ports']:
assert port in supported_out_ports, 'Unsupported port: "{}", supported: {}'.format(port, supported_out_ports)
return True, ''
except Exception as e:
return False, 'Validate out_ports {} failed with exception: {}'.format(json.dumps(data), repr(e))


def update_flow_action_to_nic(mux_status, action):
"""Update the action for the flow to "nic".

Args:
mux_status (dict): Current mux status.
action (string): The action to be applied to flow. Either "output" or "drop".

Returns:
dict: The new mux status.
"""
in_port = mux_status['active_port']
out_port = mux_status['ports']['nic'] if action == 'output' else None
action_desc = '{}:"{}"'.format(action, out_port) if out_port else action
cmdline = 'ovs-ofctl --name mod-flows {} \'in_port="{}" actions={}\''.format(
mux_status['bridge'],
in_port,
action_desc)
run_cmd(cmdline)
flow = {in_port: [{'action': action, 'out_port': out_port}]}
mux_status['flows'].update(flow)
return mux_status


def update_flow_action_to_tor(mux_status, action, tor_ports):
"""Update the action for the flow to "tor_a" and/or "tor_b".

Args:
mux_status (dict): Current mux status.
action (string): The action to be applied to flow. Either "output" or "drop".
tor_ports (list): A list like ["tor_a", "tor_b"].

Returns:
dict: The new mux status.
"""
nic_port = mux_status['ports']['nic'] # muxy-<vm_set>-<port_index>
old_output_tor_ports = set([item['out_port'] \
for item in mux_status['flows'][nic_port] if item['action'] == 'output'])

update_output_tor_ports = set(mux_status['ports'][tor_port] for tor_port in tor_ports)

if action == 'output':
output_tor_ports = old_output_tor_ports.union(update_output_tor_ports)
else:
output_tor_ports = old_output_tor_ports - update_output_tor_ports

if output_tor_ports == old_output_tor_ports: # No need to update
return mux_status

if len(output_tor_ports) == 0:
action_desc='drop'
else:
action_desc=','.join(['output:"{}"'.format(port) for port in output_tor_ports])

cmdline = 'ovs-ofctl --name mod-flows {} \'in_port="{}" actions={}\''.format(
mux_status['bridge'],
nic_port,
action_desc)
run_cmd(cmdline)
flow = {nic_port: [{'action': 'output', 'out_port': port} for port in output_tor_ports]}
mux_status['flows'].update(flow)
return mux_status


def update_flow_action(vm_set, port_index, action, data):
"""Update action of flows.

Args:
vm_set (string): The vm_set of test setup. Parsed by flask from request URL.
port_index (string): Index of the port. Parsed by flask from request URL.
action (string): The action to be applied to flow. Either "output" or "drop".
data (dict): Posted json data. Expected:
{"out_ports": [<port>, <port>, ...]}
where <port> could be "nic", "tor_a" or "tor_b".

Returns:
dict: The new mux status.
"""
mux_status = get_mux_status(vm_set, port_index)
tor_ports = []
for out_port in data['out_ports']:
if out_port == 'nic':
mux_status = update_flow_action_to_nic(mux_status, action)
elif out_port == 'tor_a' or out_port == 'tor_b':
tor_ports.append(out_port)
if tor_ports:
mux_status = update_flow_action_to_tor(mux_status, action, tor_ports)
return mux_status


@app.route('/mux/<vm_set>/<port_index>/<action>', methods=['POST'])
def mux_cable_flow_update(vm_set, port_index, action):
"""Handler for changing flow action.

Args:
vm_set (string): The vm_set of test setup. Parsed by flask from request URL.
port_index (string): Index of the port. Parsed by flask from request URL.
action (string): The action to be applied to flow. Either "output" or "drop".

Returns:
object: Return a flask response object.
"""
if action not in ["output", "drop"]:
err_msg = 'In "/mux/<vm_set>/<port_index>/<action>", action must be "output" or "drop".'
app.logger.error('{} {} {}'.format(request.method, request.url, err_msg))
return jsonify({'err_msg': err_msg}), 404

valid, msg = _validate_param(vm_set, port_index)
if not valid:
app.logger.error('{} {} {}'.format(request.method, request.url, msg))
return jsonify({'err_msg': msg}), 400

data = request.get_json()
valid, msg = _validate_out_ports(data)
if not valid:
app.logger.error('{} {} {}'.format(request.method, request.url, msg))
return jsonify({'err_msg': msg}), 400

try:
mux_status = update_flow_action(vm_set, port_index, action, data)
return jsonify(mux_status)
except Exception as e:
err_msg = 'Update flow action failed: {}'.format(repr(e))
app.logger.error('{} {} {}'.format(request.method, request.url, err_msg))
return jsonify({'err_msg': err_msg}), 500


if __name__ == '__main__':
usage = '''
Start mux simulator server at specified port.
$ sudo python <prog> <port>
'''
if '-v' in sys.argv:
app.logger.setLevel(logging.DEBUG)

if len(sys.argv) < 2:
print(usage)
app.logger.error(usage)
sys.exit(1)
print('Mux simulator listening at port {}'.format(sys.argv[1]))

app.logger.info('Mux simulator listening at port {}'.format(sys.argv[1]))
app.run(host='0.0.0.0', port=sys.argv[1])