diff --git a/ansible/roles/vm_set/files/mux_simulator.py b/ansible/roles/vm_set/files/mux_simulator.py index 7a8ca87bfe2..6267409976c 100644 --- a/ansible/roles/vm_set/files/mux_simulator.py +++ b/ansible/roles/vm_set/files/mux_simulator.py @@ -6,10 +6,11 @@ This script should be started keep running in background when a topology is created by 'testbed-cli.sh add-topo'. """ -from __future__ import print_function import json +import logging import os import re +import shlex import subprocess import sys @@ -17,15 +18,20 @@ from flask import Flask, request, jsonify - app = Flask(__name__) +logging.basicConfig( + filename='/tmp/mux_simulator.log', + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s') + + def run_cmd(cmdline): """Use subprocess to run a command line with shell=True Args: - cmdline (string): The command line to be executed. + cmdline (string): The command to be executed. Raises: Exception: If return code of running command line is not zero, an exception is raised. @@ -33,16 +39,20 @@ def run_cmd(cmdline): Returns: string: The stdout of running the command line. """ + app.logger.debug(cmdline) process = subprocess.Popen( - cmdline.split(), # lgtm [py/command-line-injection] + shlex.split(cmdline), # lgtm [py/command-line-injection] stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() ret_code = process.returncode + msg = 'cmd={}, ret_code={}, stdout={}, stderr={}'.format(cmdline, ret_code, stdout, stderr) + app.logger.debug(msg) + if ret_code != 0: - raise Exception('ret_code={}, error message="{}". cmd={}'.format(ret_code, stderr, cmdline)) + raise Exception(msg) return stdout.decode('utf-8') @@ -170,7 +180,7 @@ def get_flows(vm_set, port_index): for in_port, actions_desc in parsed: actions = [] for field in actions_desc.split(','): - action, out_port = re.search(r'(\S+):"(\S+)"', field).groups() + action, out_port = re.search(r'([^:]+)(?:\:"(\S+)")?', field).groups() actions.append({'action': action, 'out_port': out_port}) flows[in_port] = actions @@ -186,8 +196,8 @@ def get_active_port(flows): Returns: string or None: Name of the active port or None if something is wrong. """ - for in_port, actions in flows.items(): - if len(actions) == 1: + for in_port in flows.keys(): + if not in_port.startswith('muxy-'): return in_port return None @@ -197,7 +207,7 @@ def get_mux_status(vm_set, port_index): Args: vm_set (string): The vm_set of test setup. - port_index ([type]): Index of the port. + port_index (int or string): Index of the port. Raises: Exception: If no active port is found, raise an exception. @@ -215,6 +225,7 @@ def get_mux_status(vm_set, port_index): "tor_b": "enp59s0f1.3272" }, "active_side": "tor_a", + "active_port": "enp59s0f1.3216", "flows": { "muxy-vms17-8-0": [ @@ -247,6 +258,7 @@ def get_mux_status(vm_set, port_index): for side, port in mux_status['ports'].items(): if port == active_port: mux_status['active_side'] = side + mux_status['active_port'] = port return mux_status @@ -255,7 +267,7 @@ def set_active_side(vm_set, port_index, new_active_side): Args: vm_set (string): The vm_set of test setup. - port_index ([type]): Index of the port. + port_index (int or string): Index of the port. new_active_side (string): Either "tor_a" or "tor_b". Returns: @@ -273,6 +285,7 @@ def set_active_side(vm_set, port_index, new_active_side): run_cmd('ovs-ofctl --names add-flow mbr-{}-{} in_port="{}",actions=output:"{}"' .format(vm_set, port_index, new_active_port, nic_port)) mux_status['active_side'] = new_active_side + mux_status['active_port'] = new_active_port return mux_status @@ -281,7 +294,7 @@ def _validate_param(vm_set, port_index=None): Args: vm_set (string): The vm_set of test setup. - port_index ([type]): Index of the port. + port_index (int or string): Index of the port. Returns: tuple: Return the result in a tuple. The first item is either True or False. The second item is extra message. @@ -325,6 +338,7 @@ def mux_cable(vm_set, port_index): """ valid, msg = _validate_param(vm_set, port_index) if not valid: + app.logger.error('{} {} {}'.format(request.method, request.url, msg)) return jsonify({'err_msg': msg}), 400 if request.method == 'GET': @@ -333,19 +347,24 @@ def mux_cable(vm_set, port_index): mux_status = get_mux_status(vm_set, port_index) return jsonify(mux_status) except Exception as e: - return jsonify({'err_msg': 'Get mux status failed: {}'.format(repr(e))}), 500 + err_msg = 'Get mux status failed: {}'.format(repr(e)) + app.logger.error('{} {} {}'.format(request.method, request.url, err_msg)) + return jsonify({'err_msg': err_msg}), 500 else: # Set the active side of mux data = request.get_json() valid, msg = _validate_posted_data(data) if not valid: + app.logger.error('{} {} {}'.format(request.method, request.url, msg)) return jsonify({'err_msg': msg}), 400 try: mux_status = set_active_side(vm_set, port_index, data['active_side']) return jsonify(mux_status) except Exception as e: - return jsonify({'err_msg': 'Set active side failed: {}'.format(repr(e))}), 500 + err_msg = 'Set active side failed: {}'.format(repr(e)) + app.logger.error('{} {} {}'.format(request.method, request.url, err_msg)) + return jsonify({'err_msg': err_msg}), 500 def get_mux_bridges(vm_set): @@ -384,7 +403,156 @@ def all_mux_status(vm_set): all_mux_status[bridge] = get_mux_status(vm_set, port_index) return jsonify(all_mux_status) except Exception as e: - return jsonify({'err_msg': 'Get all mux status failed, vm_set: {}, exception: {}'.format(vm_set, repr(e))}), 500 + err_msg = 'Get all mux status failed, vm_set: {}, exception: {}'.format(vm_set, repr(e)) + app.logger.error('{} {} {}'.format(request.method, request.url, err_msg)) + return jsonify({'err_msg': err_msg}), 500 + + +def _validate_out_ports(data): + """Validate the posted data for updating flow action. + + Args: + data (dict): Posted json data. Expected: + {"out_ports": [, , ...]} + where could be "nic", "tor_a" or "tor_b". + + Returns: + tuple: Return the result in a tuple. The first item is either True or False. The second item is extra message. + """ + supported_out_ports = ['nic', 'tor_a', 'tor_b'] + try: + assert 'out_ports' in data, 'Missing "out_ports" field' + for port in data['out_ports']: + assert port in supported_out_ports, 'Unsupported port: "{}", supported: {}'.format(port, supported_out_ports) + return True, '' + except Exception as e: + return False, 'Validate out_ports {} failed with exception: {}'.format(json.dumps(data), repr(e)) + + +def update_flow_action_to_nic(mux_status, action): + """Update the action for the flow to "nic". + + Args: + mux_status (dict): Current mux status. + action (string): The action to be applied to flow. Either "output" or "drop". + + Returns: + dict: The new mux status. + """ + in_port = mux_status['active_port'] + out_port = mux_status['ports']['nic'] if action == 'output' else None + action_desc = '{}:"{}"'.format(action, out_port) if out_port else action + cmdline = 'ovs-ofctl --name mod-flows {} \'in_port="{}" actions={}\''.format( + mux_status['bridge'], + in_port, + action_desc) + run_cmd(cmdline) + flow = {in_port: [{'action': action, 'out_port': out_port}]} + mux_status['flows'].update(flow) + return mux_status + + +def update_flow_action_to_tor(mux_status, action, tor_ports): + """Update the action for the flow to "tor_a" and/or "tor_b". + + Args: + mux_status (dict): Current mux status. + action (string): The action to be applied to flow. Either "output" or "drop". + tor_ports (list): A list like ["tor_a", "tor_b"]. + + Returns: + dict: The new mux status. + """ + nic_port = mux_status['ports']['nic'] # muxy-- + old_output_tor_ports = set([item['out_port'] \ + for item in mux_status['flows'][nic_port] if item['action'] == 'output']) + + update_output_tor_ports = set(mux_status['ports'][tor_port] for tor_port in tor_ports) + + if action == 'output': + output_tor_ports = old_output_tor_ports.union(update_output_tor_ports) + else: + output_tor_ports = old_output_tor_ports - update_output_tor_ports + + if output_tor_ports == old_output_tor_ports: # No need to update + return mux_status + + if len(output_tor_ports) == 0: + action_desc='drop' + else: + action_desc=','.join(['output:"{}"'.format(port) for port in output_tor_ports]) + + cmdline = 'ovs-ofctl --name mod-flows {} \'in_port="{}" actions={}\''.format( + mux_status['bridge'], + nic_port, + action_desc) + run_cmd(cmdline) + flow = {nic_port: [{'action': 'output', 'out_port': port} for port in output_tor_ports]} + mux_status['flows'].update(flow) + return mux_status + + +def update_flow_action(vm_set, port_index, action, data): + """Update action of flows. + + Args: + vm_set (string): The vm_set of test setup. Parsed by flask from request URL. + port_index (string): Index of the port. Parsed by flask from request URL. + action (string): The action to be applied to flow. Either "output" or "drop". + data (dict): Posted json data. Expected: + {"out_ports": [, , ...]} + where could be "nic", "tor_a" or "tor_b". + + Returns: + dict: The new mux status. + """ + mux_status = get_mux_status(vm_set, port_index) + tor_ports = [] + for out_port in data['out_ports']: + if out_port == 'nic': + mux_status = update_flow_action_to_nic(mux_status, action) + elif out_port == 'tor_a' or out_port == 'tor_b': + tor_ports.append(out_port) + if tor_ports: + mux_status = update_flow_action_to_tor(mux_status, action, tor_ports) + return mux_status + + +@app.route('/mux///', methods=['POST']) +def mux_cable_flow_update(vm_set, port_index, action): + """Handler for changing flow action. + + Args: + vm_set (string): The vm_set of test setup. Parsed by flask from request URL. + port_index (string): Index of the port. Parsed by flask from request URL. + action (string): The action to be applied to flow. Either "output" or "drop". + + Returns: + object: Return a flask response object. + """ + if action not in ["output", "drop"]: + err_msg = 'In "/mux///", action must be "output" or "drop".' + app.logger.error('{} {} {}'.format(request.method, request.url, err_msg)) + return jsonify({'err_msg': err_msg}), 404 + + valid, msg = _validate_param(vm_set, port_index) + if not valid: + app.logger.error('{} {} {}'.format(request.method, request.url, msg)) + return jsonify({'err_msg': msg}), 400 + + data = request.get_json() + valid, msg = _validate_out_ports(data) + if not valid: + app.logger.error('{} {} {}'.format(request.method, request.url, msg)) + return jsonify({'err_msg': msg}), 400 + + try: + mux_status = update_flow_action(vm_set, port_index, action, data) + return jsonify(mux_status) + except Exception as e: + err_msg = 'Update flow action failed: {}'.format(repr(e)) + app.logger.error('{} {} {}'.format(request.method, request.url, err_msg)) + return jsonify({'err_msg': err_msg}), 500 if __name__ == '__main__': @@ -392,8 +560,12 @@ def all_mux_status(vm_set): Start mux simulator server at specified port. $ sudo python ''' + if '-v' in sys.argv: + app.logger.setLevel(logging.DEBUG) + if len(sys.argv) < 2: - print(usage) + app.logger.error(usage) sys.exit(1) - print('Mux simulator listening at port {}'.format(sys.argv[1])) + + app.logger.info('Mux simulator listening at port {}'.format(sys.argv[1])) app.run(host='0.0.0.0', port=sys.argv[1])