From ac5d6f641211ca618c3a4c75d737b9bb1d311776 Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Mon, 28 Mar 2022 10:25:00 +0000 Subject: [PATCH 1/6] [dualtor] Add `nic_simulator` Signed-off-by: Longxiang Lyu --- .../dualtor/nic_simulator/nic_simulator.py | 615 ++++++++++++++++++ .../nic_simulator/nic_simulator_client.py | 86 +++ .../nic_simulator_grpc_service.proto | 28 + .../nic_simulator_grpc_service_pb2.py | 67 ++ .../nic_simulator_grpc_service_pb2_grpc.py | 132 ++++ 5 files changed, 928 insertions(+) create mode 100644 ansible/dualtor/nic_simulator/nic_simulator.py create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_client.py create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_service.proto create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2.py create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2_grpc.py diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py new file mode 100644 index 00000000000..d559c671780 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -0,0 +1,615 @@ +#!/usr/bin/env python3 +import abc +import argparse +import contextlib +import functools +import json +import logging +import fcntl +import functools +import grpc +import os +import re +import socket +import sys +import struct +import subprocess +import threading + +from concurrent import futures +from logging.handlers import RotatingFileHandler + +import nic_simulator_grpc_service_pb2 +import nic_simulator_grpc_service_pb2_grpc + + +# name templates +ACTIVE_ACTIVE_BRIDGE_TEMPLATE = "baa-%s-%d" +NETNS_IFACE_TEMPLATE = "eth%s" +NETNS_IFACE_PATTERN = "eth\d+" +ACTIVE_ACTIVE_INTERFACES_TEMPLATE = "iaa-%s-%d" +ACTIVE_ACTIVE_INTERFACE_PATTERN = "iaa-[\w-]+-\d+" +SERVER_NIC_INTERFACE_TEMPLATE = "nic-%s-%d" +SERVER_NIC_INTERFACE_PATTERN = "nic-[\w-]+-\d+" + + +def get_ip_address(ifname): + """Get interface IP address.""" + ifname = ifname.encode() + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + addr = socket.inet_ntoa(fcntl.ioctl( + s.fileno(), + 0x8915, # SIOCGIFADDR + struct.pack('256s', ifname[:15]) + )[20:24]) + except OSError: + addr = None + return addr + + +def run_command(cmd, check=True): + """Run a command.""" + logging.debug("COMMAND: %s", cmd) + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + check=check + ) + result.stdout = result.stdout.decode() + result.stderr = result.stderr.decode() + logging.debug("COMMAND STDOUT: %s\n", result.stdout) + logging.debug("COMMAND_STDERR: %s\n", result.stderr) + return result + + +class OVSCommand(object): + """OVS related commands.""" + + OVS_VSCTL_LIST_BR_CMD = "ovs-vsctl list-br" + OVS_VSCTL_LIST_PORTS = "ovs-vsctl list-ports {bridge_name}" + OVS_OFCTL_DEL_FLOWS = "ovs-ofctl del-flows {bridge_name}" + OVS_OFCTL_ADD_FLOWS = "ovs-ofctl add-flow {bridge_name} {flow}" + OVS_OFCTL_DEL_GROUPS = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}" + OVS_OFCTL_ADD_GROUP = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}" + OVS_OFCTL_MOD_GROUP = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}" + + @staticmethod + def ovs_vsctl_list_br(): + return run_command(OVSCommand.OVS_VSCTL_LIST_BR_CMD) + + @staticmethod + def ovs_vsctl_list_ports(bridge_name): + return run_command(OVSCommand.OVS_VSCTL_LIST_PORTS.format(bridge_name=bridge_name)) + + @staticmethod + def ovs_ofctl_del_flows(bridge_name): + return run_command(OVSCommand.OVS_OFCTL_DEL_FLOWS.format(bridge_name=bridge_name)) + + @staticmethod + def ovs_ofctl_add_flow(bridge_name, flow): + return run_command(OVSCommand.OVS_OFCTL_ADD_FLOWS.format(bridge_name=bridge_name, flow=flow)) + + @staticmethod + def ovs_ofctl_add_group(bridge_name, group): + return run_command(OVSCommand.OVS_OFCTL_ADD_GROUP.format(bridge_name=bridge_name, group=group)) + + @staticmethod + def ovs_ofctl_del_groups(bridge_name): + return run_command(OVSCommand.OVS_OFCTL_DEL_GROUPS.format(bridge_name=bridge_name)) + + @staticmethod + def ovs_ofctl_mod_groups(bridge_name, group): + return run_command(OVSCommand.OVS_OFCTL_MOD_GROUP.format(bridge_name=bridge_name, group=group)) + + +class StrObj(abc.ABC): + """Abstract class defines objects that could be represented as a string.""" + + __slots__ = ("_str",) + + @abc.abstractmethod + def to_string(): + pass + + def reinit(self): + """Re-initialize object string representation.""" + with contextlib.suppress(AttributeError): + del self._str + + def __str__(self): + if not hasattr(self, "_str"): + self._str = self.to_string() + return self._str + + def __repr__(self): + return self.__str__() + + +class OVSGroup(StrObj): + """Object to represent an OVS group.""" + + __slots__ = ("group_id", "group_type", "output_ports", "_str_prefix") + + def __init__(self, group_id, group_type, output_ports=[]): + self.group_id = group_id + self.group_type = group_type + self.output_ports = set(output_ports) + self._str_prefix = "group_id=%s,type=%s" % (self.group_id, self.group_type) + + def to_string(self): + group_parts = [self._str_prefix] + if self.output_ports: + group_parts.extend("bucket=output:%s" % _ for _ in self.output_ports) + else: + group_parts.append("bucket=drop") + return ",".join(group_parts) + + +class OVSFlow(StrObj): + """Object to represent an OVS flow.""" + + __slots__ = ("in_port", "output_ports", "group", "_str_prefix") + + def __init__(self, in_port, packet_filter=None, output_ports=[], group=None): + self.in_port = in_port + self.packet_filter = packet_filter + self.output_ports = set(output_ports) + self.group = group + if self.packet_filter: + self._str_prefix = "%s,in_port=%s" % (self.packet_filter, self.in_port) + else: + self._str_prefix = "in_port=%s" % (self.in_port) + + def to_string(self): + flow_parts = [self._str_prefix] + if self.output_ports: + flow_parts.append("actions=") + flow_parts.extend("output:%s" % _ for _ in self.output_ports) + elif self.group: + flow_parts.append("actions=group:%s" % self.group.group_id) + else: + flow_parts.append("actions=drop") + return ",".join(flow_parts) + + +class ToRState(object): + """ToR's admin forwarding state""" + STANDBY = False + ACTIVE = True + STATE_LABELS = { + STANDBY: "STANDBY", + ACTIVE: "ACTIVE" + } + + +class UpstreamECMPGroup(OVSGroup): + """Object to represent a OVS group that selects active tor ports to send packets.""" + + __slots__ = ( + "upper_tor_port", + "lower_tor_port", + "upper_tor_forwarding_state", + "lower_tor_forwarding_state", + "group_str_cache" + ) + + def __init__( + self, group_id, upper_tor_port, lower_tor_port, + upper_tor_forwarding_state=ToRState.ACTIVE, + lower_tor_forwarding_state=ToRState.ACTIVE + ): + output_ports = [] + if upper_tor_forwarding_state == ToRState.ACTIVE: + output_ports.append(upper_tor_port) + if lower_tor_forwarding_state == ToRState.ACTIVE: + output_ports.append(lower_tor_port) + super(UpstreamECMPGroup, self).__init__(group_id, "select", output_ports=output_ports) + self.upper_tor_port = upper_tor_port + self.lower_tor_port = lower_tor_port + self.upper_tor_forwarding_state = upper_tor_forwarding_state + self.lower_tor_forwarding_state = lower_tor_forwarding_state + self.group_str_cache = {} + + def set_upper_tor_forwarding_state(self, state): + if state == ToRState.ACTIVE: + if self.upper_tor_forwarding_state == ToRState.STANDBY: + self.output_ports.add(self.upper_tor_port) + self.upper_tor_forwarding_state = ToRState.ACTIVE + self.reinit() + elif state == ToRState.STANDBY: + if self.upper_tor_forwarding_state == ToRState.ACTIVE: + self.output_ports.remove(self.upper_tor_port) + self.upper_tor_forwarding_state = ToRState.STANDBY + self.reinit() + + def set_lower_tor_forwarding_state(self, state): + if state == ToRState.ACTIVE: + if self.lower_tor_forwarding_state == ToRState.STANDBY: + self.output_ports.add(self.lower_tor_port) + self.lower_tor_forwarding_state = ToRState.ACTIVE + self.reinit() + elif state == ToRState.STANDBY: + if self.lower_tor_forwarding_state == ToRState.ACTIVE: + self.output_ports.remove(self.lower_tor_port) + self.lower_tor_forwarding_state = ToRState.STANDBY + self.reinit() + + def __str__(self): + return self.group_str_cache.setdefault( + (self.upper_tor_forwarding_state, self.lower_tor_forwarding_state), + super(UpstreamECMPGroup, self).__str__() + ) + + +class UpstreamECMPFlow(OVSFlow): + """Object to represent an upstream ECMP flow that selects one of its output ports to send packets.""" + + __slots__ = ("upper_tor_port", "lower_tor_port", "upper_tor_forwarding_state", "lower_tor_forwarding_state") + + def __init__(self, in_port, group): + super(UpstreamECMPFlow, self).__init__(in_port, group=group) + + def set_upper_tor_forwarding_state(self, state): + self.group.set_upper_tor_forwarding_state(state) + + def set_lower_tor_forwarding_state(self, state): + self.group.set_lower_tor_forwarding_state(state) + + def get_upper_tor_forwarding_state(self): + return self.group.upper_tor_forwarding_state + + def get_lower_tor_forwarding_state(self): + return self.group.lower_tor_forwarding_state + + +class OVSBridge(object): + """ + Object to represent the OVS bridge for the active-active port testbed setup. + + +--------------+ + PTF (host_if) --+ +----- upper_if + | OVS bridge | + simulator netns (NiC) --+ +----- lower_if + +--------------+ + """ + + __slots__ = ( + "bridge_name", + "ports", + "lower_tor_port", + "upper_tor_port", + "server_nic", + "ptf_port", + "lock", + "flows", + "groups", + "upstream_ecmp_flow", + "upstream_ecmp_group" + ) + + def __init__(self, bridge_name): + self.bridge_name = bridge_name + self.lock = threading.RLock() + self.ports = None + self.lower_tor_port = None + self.upper_tor_port = None + self.server_nic = None + self.ptf_port = None + self.upstream_ecmp_flow = None + self.upstream_ecmp_group = None + self.flows = [] + self.groups = [] + self._init_ports() + self._init_flows() + + def _init_ports(self): + self.ports = self._get_ports() + if len(self.ports) != 4: + raise ValueError("Unhealthy bridge: %s, ports: %s" % (self.bridge_name, self.ports)) + tor_ports = [] + for port in self.ports: + if re.search(ACTIVE_ACTIVE_INTERFACE_PATTERN, port): + self.ptf_port = port + elif re.search(SERVER_NIC_INTERFACE_PATTERN, port): + self.server_nic = port + else: + tor_ports.append(port) + if len(tor_ports) != 2: + raise ValueError("Unhealthy bridge: %s, could not parse existing ports: %s" % (self.bridge_name, self.ports)) + tor_ports.sort() + self.upper_tor_port = tor_ports[0] + self.lower_tor_port = tor_ports[1] + logging.info( + "Init ports for bridge %s, server_nic: %s, ptf_port: %s, upper_tor_port: %s, lower_tor_port: %s", + self.bridge_name, + self.server_nic, + self.ptf_port, + self.upper_tor_port, + self.lower_tor_port + ) + + def _init_flows(self): + logging.info("Init flows for bridge %s", self.bridge_name) + self._del_flows() + self._del_groups() + # downstream flows + self._add_flow(self.upper_tor_port, output_ports=[self.ptf_port, self.server_nic]) + self._add_flow(self.lower_tor_port, output_ports=[self.ptf_port, self.server_nic]) + + # upstream flows + # upstream packet from server NiC should be directed to both ToRs + self._add_flow(self.server_nic, output_ports=[self.upper_tor_port, self.lower_tor_port]) + # upstream icmp packet from ptf port should be directed to both ToRs + self._add_flow(self.ptf_port, packet_filter="icmp", output_ports=[self.upper_tor_port, self.lower_tor_port]) + # upstream packet from ptf port should be ECMP directed to active ToRs + self.upstream_ecmp_group = self._add_upstream_ecmp_group(1, self.upper_tor_port, self.lower_tor_port) + self.upstream_ecmp_flow = self._add_upstream_ecmp_flow(self.ptf_port, self.upstream_ecmp_group) + + def _get_ports(self): + result = OVSCommand.ovs_vsctl_list_ports(self.bridge_name) + return result.stdout.split() + + def _del_flows(self): + OVSCommand.ovs_ofctl_del_flows(self.bridge_name) + self.upstream_ecmp_flow = None + self.flows.clear() + + def _del_groups(self): + OVSCommand.ovs_ofctl_del_groups(self.bridge_name) + self.upstream_ecmp_group = None + self.groups.clear() + + def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None): + flow = OVSFlow(in_port, packet_filter=packet_filter, output_ports=output_ports, group=group) + logging.info("Add flow to bridge %s: %s", self.bridge_name, flow) + OVSCommand.ovs_ofctl_add_flow(self.bridge_name, flow) + self.flows.append(flow) + return flow + + def _add_upstream_ecmp_group(self, group_id, upper_tor_port, lower_tor_port): + group = UpstreamECMPGroup(group_id, upper_tor_port, lower_tor_port) + logging.info("Add upstream ecmp group to bridge %s: %s", self.bridge_name, group) + OVSCommand.ovs_ofctl_add_group(self.bridge_name, group) + self.groups.append(group) + return group + + def _add_upstream_ecmp_flow(self, in_port, group): + flow = UpstreamECMPFlow(in_port, group) + logging.info("Add upstream ecmp flow to bridge %s: %s", self.bridge_name, flow) + OVSCommand.ovs_ofctl_add_flow(self.bridge_name, flow) + self.flows.append(flow) + return flow + + def set_forwarding_state(self, states): + with self.lock: + logging.info("Set bridge %s forwarding state: %s", self.bridge_name, tuple(ToRState.STATE_LABELS[_] for _ in states)) + self.upstream_ecmp_flow.set_upper_tor_forwarding_state(states[0]) + self.upstream_ecmp_flow.set_lower_tor_forwarding_state(states[1]) + OVSCommand.ovs_ofctl_mod_groups(self.bridge_name, self.upstream_ecmp_group) + return self.query_forwarding_state() + + def query_forwarding_state(self): + with self.lock: + states = (self.upstream_ecmp_flow.get_upper_tor_forwarding_state(), self.upstream_ecmp_flow.get_lower_tor_forwarding_state()) + logging.info("Query bridge %s forwarding state: %s", self.bridge_name, tuple(ToRState.STATE_LABELS[_] for _ in states)) + return states + + +def validate_request_target(response): + """Decorator to validate target gRPC server address is included in request metadata.""" + def _validate_request_target(rpc_func): + @functools.wraps(rpc_func) + def _decorated(nic_simulator, request, context): + logging.debug("Validate request metadata includes 'grpc_server'") + grpc_server = None + for meta in context.invocation_metadata(): + if meta.key == "grpc_server": + grpc_server = meta.value + break + if not grpc_server: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details("grpc_server metadata not found in the request") + return response + elif grpc_server not in nic_simulator.ovs_bridges: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details("grpc_server not found by nic_simulator") + else: + context.grpc_server = grpc_server + return rpc_func(nic_simulator, request, context) + return _decorated + return _validate_request_target + + +def validate_request_certificate(response): + """Decorator to validate client certificate.""" + def _validate_request_certificate(rpc_func): + @functools.wraps(rpc_func) + def _decorated(nic_simulator, request, context): + logging.debug("Validate client certificate") + # TODO: Add client authentication + return rpc_func(nic_simulator, request, context) + return _decorated + return _validate_request_certificate + + +class NiCSimulator(nic_simulator_grpc_service_pb2_grpc.DualTorServiceServicer): + """NiC simulator class, define all the gRPC calls.""" + + def __init__(self, vm_set): + self.vm_set = vm_set + self.server_nics = self._find_all_server_nics() + self.server_nic_addresses = {nic: get_ip_address(nic) for nic in self.server_nics} + self.ovs_bridges = {} + for bridge_name in self._find_all_bridges(): + index = bridge_name.split("-")[-1] + server_nic = NETNS_IFACE_TEMPLATE % index + # only manipulate active server nics + if server_nic in self.server_nic_addresses: + server_nic_addr = self.server_nic_addresses[server_nic] + if server_nic_addr is not None: + self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name) + logging.info("Starting NiC simulator that receives for requests to: %s", json.dumps(list(self.ovs_bridges.keys()), indent=4)) + + def _find_all_server_nics(self): + return [_ for _ in os.listdir('/sys/class/net') if re.search(NETNS_IFACE_PATTERN, _)] + + def _find_all_bridges(self): + result = OVSCommand.ovs_vsctl_list_br() + bridges = [_ for _ in result.stdout.split() if self.vm_set in _ and _.startswith(ACTIVE_ACTIVE_INTERFACE_PATTERN[0])] + return bridges + + def _find_target_server(self, context): + for meta in context.invocation_metadata(): + if meta.key == "grpc_server": + return meta.value + return None + + def _validate_client(self, context): + return True + + def _generate_error_response(context, status_code, details): + context.set_code(status_code) + context.set_details(details) + + def _init_admin_response(self): + return nic_simulator_grpc_service_pb2.AdminReply( + portid=[0, 1], + state=[False, False] + ) + + def _init_operation_reponse(self): + return nic_simulator_grpc_service_pb2.OperationReply( + portid=[0, 1], + state=[False, False] + ) + + @validate_request_target(nic_simulator_grpc_service_pb2.AdminReply()) + @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) + def QueryAdminPortState(self, request, context): + target_server = context.grpc_server + response = nic_simulator_grpc_service_pb2.AdminReply( + portid=[0, 1], + state=self.ovs_bridges[target_server].query_forwarding_state() + ) + logging.debug("QueryAdminPortState: response to client %s:\n%s", context.peer(), response) + return response + + @validate_request_target(nic_simulator_grpc_service_pb2.AdminReply()) + @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) + def SetAdminPortState(self, request, context): + target_server = context.grpc_server + response = nic_simulator_grpc_service_pb2.AdminReply( + portid=[0, 1], + state=self.ovs_bridges[target_server].set_forwarding_state(request.state) + ) + return response + + def QueryOperationPortState(self, request, context): + # TODO: Add QueryOperationPortState implementation + return nic_simulator_grpc_service_pb2.OperationReply() + + +def parse_args(): + parser = argparse.ArgumentParser( + description="NiC simulator" + ) + parser.add_argument( + "-p", + "--port", + type=int, + required=True, + help="the port to listen to" + ) + parser.add_argument( + "-v", + "--vm_set", + required=True, + help="the vm_set to identify testbed" + ) + parser.add_argument( + "-l", + "--log_level", + default="info", + choices=["critical", "error", "warning", "info", "debug"], + help="the logging level" + ) + parser.add_argument( + "-s", + "--stdout_log", + default=False, + action="store_true", + help="Redirect log to stdout" + ) + args = parser.parse_args() + return args + + +def config_logging(vm_set, log_level, log_to_stdout=False): + """ + Configure log to rotating file + + Remove the default handler from app.logger. + Add RotatingFileHandler to the app.logger. + File size: 10MB + File number: 3 + The Werkzeug handler is untouched. + """ + log_format = "%(asctime)s %(funcName)-20.20s %(levelname)-5.5s #%(lineno)-.4d| %(message)s" + root = logging.getLogger() + root.handlers.clear() + handler = RotatingFileHandler( + "/tmp/nic_simulator_{}.log".format(vm_set), + maxBytes=10*1024*1024, # 10MB + backupCount=3) + fmt = logging.Formatter(log_format) + handler.setFormatter(fmt) + handler.setLevel(log_level) + root = logging.getLogger() + root.setLevel(log_level) + root.addHandler(handler) + + if log_to_stdout: + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(log_level) + formatter = logging.Formatter(log_format) + handler.setFormatter(formatter) + root.addHandler(handler) + + +def config_env(): + """Config environment variables.""" + # NOTE: https://github.com/grpc/grpc/issues/14056 + os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0" + + +def main(): + title = \ + """ + _ _ _____ _____ _____ _____ __ __ _ _ _ _______ ____ _____ + | \ | |_ _/ ____| / ____|_ _| \/ | | | | | /\|__ __/ __ \| __ \ + | \| | | || | | (___ | | | \ / | | | | | / \ | | | | | | |__) | + | . ` | | || | \___ \ | | | |\/| | | | | | / /\ \ | | | | | | _ / + | |\ |_| || |____ ____) |_| |_| | | | |__| | |____ / ____ \| | | |__| | | \ \ + |_| \_|_____\_____| |_____/|_____|_| |_|\____/|______/_/ \_\_| \____/|_| \_\ + + """ + print(title) + args = parse_args() + logging.debug("Start nic_simulator with args: %s", args) + config_env() + config_logging(args.vm_set, args.log_level.upper(), args.stdout_log) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + nic_simulator_grpc_service_pb2_grpc.add_DualTorServiceServicer_to_server( + NiCSimulator(args.vm_set), + server + ) + server.add_insecure_port("0.0.0.0:%s" % args.port) + server.start() + server.wait_for_termination() + + +if __name__ == "__main__": + main() diff --git a/ansible/dualtor/nic_simulator/nic_simulator_client.py b/ansible/dualtor/nic_simulator/nic_simulator_client.py new file mode 100644 index 00000000000..e678eaa1e26 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_client.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +import argparse +import grpc + +from collections import namedtuple + +import nic_simulator_grpc_service_pb2 +import nic_simulator_grpc_service_pb2_grpc + + +class MetadataInterceptor(grpc.UnaryUnaryClientInterceptor): + + class _ClientCallDetails( + namedtuple( + '_ClientCallDetails', + ('method', 'timeout', 'metadata', 'credentials')), + grpc.ClientCallDetails): + """Wrapper class for initializing a new ClientCallDetails instance. + """ + pass + + def __init__(self, injected_meta): + self.injected_meta = injected_meta + + def intercept_unary_unary(self, continuation, client_call_details, request): + + if client_call_details.metadata is None: + metadata = [] + else: + metadata = list(client_call_details.metadata) + + metadata.append(self.injected_meta) + + client_call_details = self._ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + client_call_details.credentials + ) + return continuation(client_call_details, request) + + +def parse_args(): + parser = argparse.ArgumentParser( + description="NiC simulator client" + ) + parser.add_argument( + "-s", + "--server", + required=True, + help="gRPC server address" + ) + parser.add_argument( + "-p", + "--server_port", + required=True, + help="gRPC server port" + ) + return parser.parse_args() + + +def main(): + args = parse_args() + server = args.server + port = args.server_port + with grpc.insecure_channel("%s:%s" % (server, port)) as insecure_channel: + metadata_interceptor = MetadataInterceptor(("grpc_server", "192.168.0.101")) + with grpc.intercept_channel(insecure_channel, metadata_interceptor) as channel: + stub = nic_simulator_grpc_service_pb2_grpc.DualTorServiceStub(channel) + state = nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ) + state = stub.QueryAdminPortState(state) + print(state) + + state = nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, False] + ) + state = stub.SetAdminPortState(state) + print(state) + + +if __name__ == "__main__": + main() diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_service.proto b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service.proto new file mode 100644 index 00000000000..1e2cd4d94a0 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +service DualTorService { + rpc QueryAdminPortState(AdminRequest) returns (AdminReply) {} + + rpc SetAdminPortState(AdminRequest) returns (AdminReply) {} + + rpc QueryOperationPortState(OperationRequest) returns (OperationReply) {} +} + +message AdminRequest { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message AdminReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message OperationRequest { + repeated int32 portid = 1; +} + +message OperationReply { + repeated int32 portid = 1; + repeated bool state = 2; +} diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2.py b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2.py new file mode 100644 index 00000000000..2de018c4411 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: nic_simulator_grpc_service.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n nic_simulator_grpc_service.proto\"-\n\x0c\x41\x64minRequest\x12\x0e\n\x06portid\x18\x01 \x03(\x05\x12\r\n\x05state\x18\x02 \x03(\x08\"+\n\nAdminReply\x12\x0e\n\x06portid\x18\x01 \x03(\x05\x12\r\n\x05state\x18\x02 \x03(\x08\"\"\n\x10OperationRequest\x12\x0e\n\x06portid\x18\x01 \x03(\x05\"/\n\x0eOperationReply\x12\x0e\n\x06portid\x18\x01 \x03(\x05\x12\r\n\x05state\x18\x02 \x03(\x08\x32\xb9\x01\n\x0e\x44ualTorService\x12\x33\n\x13QueryAdminPortState\x12\r.AdminRequest\x1a\x0b.AdminReply\"\x00\x12\x31\n\x11SetAdminPortState\x12\r.AdminRequest\x1a\x0b.AdminReply\"\x00\x12?\n\x17QueryOperationPortState\x12\x11.OperationRequest\x1a\x0f.OperationReply\"\x00\x62\x06proto3') + + + +_ADMINREQUEST = DESCRIPTOR.message_types_by_name['AdminRequest'] +_ADMINREPLY = DESCRIPTOR.message_types_by_name['AdminReply'] +_OPERATIONREQUEST = DESCRIPTOR.message_types_by_name['OperationRequest'] +_OPERATIONREPLY = DESCRIPTOR.message_types_by_name['OperationReply'] +AdminRequest = _reflection.GeneratedProtocolMessageType('AdminRequest', (_message.Message,), { + 'DESCRIPTOR' : _ADMINREQUEST, + '__module__' : 'nic_simulator_grpc_service_pb2' + # @@protoc_insertion_point(class_scope:AdminRequest) + }) +_sym_db.RegisterMessage(AdminRequest) + +AdminReply = _reflection.GeneratedProtocolMessageType('AdminReply', (_message.Message,), { + 'DESCRIPTOR' : _ADMINREPLY, + '__module__' : 'nic_simulator_grpc_service_pb2' + # @@protoc_insertion_point(class_scope:AdminReply) + }) +_sym_db.RegisterMessage(AdminReply) + +OperationRequest = _reflection.GeneratedProtocolMessageType('OperationRequest', (_message.Message,), { + 'DESCRIPTOR' : _OPERATIONREQUEST, + '__module__' : 'nic_simulator_grpc_service_pb2' + # @@protoc_insertion_point(class_scope:OperationRequest) + }) +_sym_db.RegisterMessage(OperationRequest) + +OperationReply = _reflection.GeneratedProtocolMessageType('OperationReply', (_message.Message,), { + 'DESCRIPTOR' : _OPERATIONREPLY, + '__module__' : 'nic_simulator_grpc_service_pb2' + # @@protoc_insertion_point(class_scope:OperationReply) + }) +_sym_db.RegisterMessage(OperationReply) + +_DUALTORSERVICE = DESCRIPTOR.services_by_name['DualTorService'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _ADMINREQUEST._serialized_start=36 + _ADMINREQUEST._serialized_end=81 + _ADMINREPLY._serialized_start=83 + _ADMINREPLY._serialized_end=126 + _OPERATIONREQUEST._serialized_start=128 + _OPERATIONREQUEST._serialized_end=162 + _OPERATIONREPLY._serialized_start=164 + _OPERATIONREPLY._serialized_end=211 + _DUALTORSERVICE._serialized_start=214 + _DUALTORSERVICE._serialized_end=399 +# @@protoc_insertion_point(module_scope) diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2_grpc.py b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2_grpc.py new file mode 100644 index 00000000000..19eb8e7443c --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_service_pb2_grpc.py @@ -0,0 +1,132 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import nic_simulator_grpc_service_pb2 as nic__simulator__grpc__service__pb2 + + +class DualTorServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.QueryAdminPortState = channel.unary_unary( + '/DualTorService/QueryAdminPortState', + request_serializer=nic__simulator__grpc__service__pb2.AdminRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__service__pb2.AdminReply.FromString, + ) + self.SetAdminPortState = channel.unary_unary( + '/DualTorService/SetAdminPortState', + request_serializer=nic__simulator__grpc__service__pb2.AdminRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__service__pb2.AdminReply.FromString, + ) + self.QueryOperationPortState = channel.unary_unary( + '/DualTorService/QueryOperationPortState', + request_serializer=nic__simulator__grpc__service__pb2.OperationRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__service__pb2.OperationReply.FromString, + ) + + +class DualTorServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def QueryAdminPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SetAdminPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def QueryOperationPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_DualTorServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'QueryAdminPortState': grpc.unary_unary_rpc_method_handler( + servicer.QueryAdminPortState, + request_deserializer=nic__simulator__grpc__service__pb2.AdminRequest.FromString, + response_serializer=nic__simulator__grpc__service__pb2.AdminReply.SerializeToString, + ), + 'SetAdminPortState': grpc.unary_unary_rpc_method_handler( + servicer.SetAdminPortState, + request_deserializer=nic__simulator__grpc__service__pb2.AdminRequest.FromString, + response_serializer=nic__simulator__grpc__service__pb2.AdminReply.SerializeToString, + ), + 'QueryOperationPortState': grpc.unary_unary_rpc_method_handler( + servicer.QueryOperationPortState, + request_deserializer=nic__simulator__grpc__service__pb2.OperationRequest.FromString, + response_serializer=nic__simulator__grpc__service__pb2.OperationReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'DualTorService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class DualTorService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def QueryAdminPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorService/QueryAdminPortState', + nic__simulator__grpc__service__pb2.AdminRequest.SerializeToString, + nic__simulator__grpc__service__pb2.AdminReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def SetAdminPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorService/SetAdminPortState', + nic__simulator__grpc__service__pb2.AdminRequest.SerializeToString, + nic__simulator__grpc__service__pb2.AdminReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def QueryOperationPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorService/QueryOperationPortState', + nic__simulator__grpc__service__pb2.OperationRequest.SerializeToString, + nic__simulator__grpc__service__pb2.OperationReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) From 5ae1b6c3b237aef0c4484c697fefe1d4f6731166 Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Mon, 28 Mar 2022 12:44:40 +0000 Subject: [PATCH 2/6] Improve code Signed-off-by: Longxiang Lyu --- .../dualtor/nic_simulator/nic_simulator.py | 143 ++++++++---------- 1 file changed, 61 insertions(+), 82 deletions(-) diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py index d559c671780..4a3859da5c4 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator.py +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -1,13 +1,21 @@ #!/usr/bin/env python3 +""" +_ _ _____ _____ _____ _____ __ __ _ _ _ _______ ____ _____ +| \ | |_ _/ ____| / ____|_ _| \/ | | | | | /\|__ __/ __ \| __ \ +| \| | | || | | (___ | | | \ / | | | | | / \ | | | | | | |__) | +| . ` | | || | \___ \ | | | |\/| | | | | | / /\ \ | | | | | | _ / +| |\ |_| || |____ ____) |_| |_| | | | |__| | |____ / ____ \| | | |__| | | \ \ +|_| \_|_____\_____| |_____/|_____|_| |_|\____/|______/_/ \_\_| \____/|_| \_\ + +""" import abc import argparse import contextlib -import functools -import json -import logging import fcntl import functools import grpc +import json +import logging import os import re import socket @@ -60,8 +68,8 @@ def run_command(cmd, check=True): ) result.stdout = result.stdout.decode() result.stderr = result.stderr.decode() - logging.debug("COMMAND STDOUT: %s\n", result.stdout) - logging.debug("COMMAND_STDERR: %s\n", result.stderr) + logging.debug("COMMAND STDOUT:\n%s\n", result.stdout) + logging.debug("COMMAND STDERR:\n%s\n", result.stderr) return result @@ -69,12 +77,12 @@ class OVSCommand(object): """OVS related commands.""" OVS_VSCTL_LIST_BR_CMD = "ovs-vsctl list-br" - OVS_VSCTL_LIST_PORTS = "ovs-vsctl list-ports {bridge_name}" - OVS_OFCTL_DEL_FLOWS = "ovs-ofctl del-flows {bridge_name}" - OVS_OFCTL_ADD_FLOWS = "ovs-ofctl add-flow {bridge_name} {flow}" - OVS_OFCTL_DEL_GROUPS = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}" - OVS_OFCTL_ADD_GROUP = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}" - OVS_OFCTL_MOD_GROUP = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}" + OVS_VSCTL_LIST_PORTS_CMD = "ovs-vsctl list-ports {bridge_name}" + OVS_OFCTL_DEL_FLOWS_CMD = "ovs-ofctl del-flows {bridge_name}" + OVS_OFCTL_ADD_FLOWS_CMD = "ovs-ofctl add-flow {bridge_name} {flow}" + OVS_OFCTL_DEL_GROUPS_CMD = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}" + OVS_OFCTL_ADD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}" + OVS_OFCTL_MOD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}" @staticmethod def ovs_vsctl_list_br(): @@ -82,27 +90,27 @@ def ovs_vsctl_list_br(): @staticmethod def ovs_vsctl_list_ports(bridge_name): - return run_command(OVSCommand.OVS_VSCTL_LIST_PORTS.format(bridge_name=bridge_name)) + return run_command(OVSCommand.OVS_VSCTL_LIST_PORTS_CMD.format(bridge_name=bridge_name)) @staticmethod def ovs_ofctl_del_flows(bridge_name): - return run_command(OVSCommand.OVS_OFCTL_DEL_FLOWS.format(bridge_name=bridge_name)) + return run_command(OVSCommand.OVS_OFCTL_DEL_FLOWS_CMD.format(bridge_name=bridge_name)) @staticmethod def ovs_ofctl_add_flow(bridge_name, flow): - return run_command(OVSCommand.OVS_OFCTL_ADD_FLOWS.format(bridge_name=bridge_name, flow=flow)) + return run_command(OVSCommand.OVS_OFCTL_ADD_FLOWS_CMD.format(bridge_name=bridge_name, flow=flow)) @staticmethod def ovs_ofctl_add_group(bridge_name, group): - return run_command(OVSCommand.OVS_OFCTL_ADD_GROUP.format(bridge_name=bridge_name, group=group)) + return run_command(OVSCommand.OVS_OFCTL_ADD_GROUP_CMD.format(bridge_name=bridge_name, group=group)) @staticmethod def ovs_ofctl_del_groups(bridge_name): - return run_command(OVSCommand.OVS_OFCTL_DEL_GROUPS.format(bridge_name=bridge_name)) + return run_command(OVSCommand.OVS_OFCTL_DEL_GROUPS_CMD.format(bridge_name=bridge_name)) @staticmethod def ovs_ofctl_mod_groups(bridge_name, group): - return run_command(OVSCommand.OVS_OFCTL_MOD_GROUP.format(bridge_name=bridge_name, group=group)) + return run_command(OVSCommand.OVS_OFCTL_MOD_GROUP_CMD.format(bridge_name=bridge_name, group=group)) class StrObj(abc.ABC): @@ -111,11 +119,11 @@ class StrObj(abc.ABC): __slots__ = ("_str",) @abc.abstractmethod - def to_string(): + def to_string(self): pass - def reinit(self): - """Re-initialize object string representation.""" + def reset(self): + """Reset object string representation.""" with contextlib.suppress(AttributeError): del self._str @@ -175,8 +183,8 @@ def to_string(self): return ",".join(flow_parts) -class ToRState(object): - """ToR's admin forwarding state""" +class ForwardingState(object): + """Forwarding state""" STANDBY = False ACTIVE = True STATE_LABELS = { @@ -198,13 +206,13 @@ class UpstreamECMPGroup(OVSGroup): def __init__( self, group_id, upper_tor_port, lower_tor_port, - upper_tor_forwarding_state=ToRState.ACTIVE, - lower_tor_forwarding_state=ToRState.ACTIVE + upper_tor_forwarding_state=ForwardingState.ACTIVE, + lower_tor_forwarding_state=ForwardingState.ACTIVE ): output_ports = [] - if upper_tor_forwarding_state == ToRState.ACTIVE: + if upper_tor_forwarding_state == ForwardingState.ACTIVE: output_ports.append(upper_tor_port) - if lower_tor_forwarding_state == ToRState.ACTIVE: + if lower_tor_forwarding_state == ForwardingState.ACTIVE: output_ports.append(lower_tor_port) super(UpstreamECMPGroup, self).__init__(group_id, "select", output_ports=output_ports) self.upper_tor_port = upper_tor_port @@ -214,28 +222,28 @@ def __init__( self.group_str_cache = {} def set_upper_tor_forwarding_state(self, state): - if state == ToRState.ACTIVE: - if self.upper_tor_forwarding_state == ToRState.STANDBY: + if state == ForwardingState.ACTIVE: + if self.upper_tor_forwarding_state == ForwardingState.STANDBY: self.output_ports.add(self.upper_tor_port) - self.upper_tor_forwarding_state = ToRState.ACTIVE - self.reinit() - elif state == ToRState.STANDBY: - if self.upper_tor_forwarding_state == ToRState.ACTIVE: + self.upper_tor_forwarding_state = ForwardingState.ACTIVE + self.reset() + elif state == ForwardingState.STANDBY: + if self.upper_tor_forwarding_state == ForwardingState.ACTIVE: self.output_ports.remove(self.upper_tor_port) - self.upper_tor_forwarding_state = ToRState.STANDBY - self.reinit() + self.upper_tor_forwarding_state = ForwardingState.STANDBY + self.reset() def set_lower_tor_forwarding_state(self, state): - if state == ToRState.ACTIVE: - if self.lower_tor_forwarding_state == ToRState.STANDBY: + if state == ForwardingState.ACTIVE: + if self.lower_tor_forwarding_state == ForwardingState.STANDBY: self.output_ports.add(self.lower_tor_port) - self.lower_tor_forwarding_state = ToRState.ACTIVE - self.reinit() - elif state == ToRState.STANDBY: - if self.lower_tor_forwarding_state == ToRState.ACTIVE: + self.lower_tor_forwarding_state = ForwardingState.ACTIVE + self.reset() + elif state == ForwardingState.STANDBY: + if self.lower_tor_forwarding_state == ForwardingState.ACTIVE: self.output_ports.remove(self.lower_tor_port) - self.lower_tor_forwarding_state = ToRState.STANDBY - self.reinit() + self.lower_tor_forwarding_state = ForwardingState.STANDBY + self.reset() def __str__(self): return self.group_str_cache.setdefault( @@ -247,7 +255,7 @@ def __str__(self): class UpstreamECMPFlow(OVSFlow): """Object to represent an upstream ECMP flow that selects one of its output ports to send packets.""" - __slots__ = ("upper_tor_port", "lower_tor_port", "upper_tor_forwarding_state", "lower_tor_forwarding_state") + __slots__ = () def __init__(self, in_port, group): super(UpstreamECMPFlow, self).__init__(in_port, group=group) @@ -272,7 +280,7 @@ class OVSBridge(object): +--------------+ PTF (host_if) --+ +----- upper_if | OVS bridge | - simulator netns (NiC) --+ +----- lower_if + simulator netns (server_nic) --+ +----- lower_if +--------------+ """ @@ -306,6 +314,7 @@ def __init__(self, bridge_name): self._init_flows() def _init_ports(self): + """Initialize ports.""" self.ports = self._get_ports() if len(self.ports) != 4: raise ValueError("Unhealthy bridge: %s, ports: %s" % (self.bridge_name, self.ports)) @@ -332,6 +341,7 @@ def _init_ports(self): ) def _init_flows(self): + """Initialize OVS flows for the bridge.""" logging.info("Init flows for bridge %s", self.bridge_name) self._del_flows() self._del_groups() @@ -384,17 +394,19 @@ def _add_upstream_ecmp_flow(self, in_port, group): return flow def set_forwarding_state(self, states): + """Set forwarding state.""" with self.lock: - logging.info("Set bridge %s forwarding state: %s", self.bridge_name, tuple(ToRState.STATE_LABELS[_] for _ in states)) + logging.info("Set bridge %s forwarding state: %s", self.bridge_name, tuple(ForwardingState.STATE_LABELS[_] for _ in states)) self.upstream_ecmp_flow.set_upper_tor_forwarding_state(states[0]) self.upstream_ecmp_flow.set_lower_tor_forwarding_state(states[1]) OVSCommand.ovs_ofctl_mod_groups(self.bridge_name, self.upstream_ecmp_group) return self.query_forwarding_state() def query_forwarding_state(self): + """Query forwarding state.""" with self.lock: states = (self.upstream_ecmp_flow.get_upper_tor_forwarding_state(), self.upstream_ecmp_flow.get_lower_tor_forwarding_state()) - logging.info("Query bridge %s forwarding state: %s", self.bridge_name, tuple(ToRState.STATE_LABELS[_] for _ in states)) + logging.info("Query bridge %s forwarding state: %s", self.bridge_name, tuple(ForwardingState.STATE_LABELS[_] for _ in states)) return states @@ -461,31 +473,6 @@ def _find_all_bridges(self): bridges = [_ for _ in result.stdout.split() if self.vm_set in _ and _.startswith(ACTIVE_ACTIVE_INTERFACE_PATTERN[0])] return bridges - def _find_target_server(self, context): - for meta in context.invocation_metadata(): - if meta.key == "grpc_server": - return meta.value - return None - - def _validate_client(self, context): - return True - - def _generate_error_response(context, status_code, details): - context.set_code(status_code) - context.set_details(details) - - def _init_admin_response(self): - return nic_simulator_grpc_service_pb2.AdminReply( - portid=[0, 1], - state=[False, False] - ) - - def _init_operation_reponse(self): - return nic_simulator_grpc_service_pb2.OperationReply( - portid=[0, 1], - state=[False, False] - ) - @validate_request_target(nic_simulator_grpc_service_pb2.AdminReply()) @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) def QueryAdminPortState(self, request, context): @@ -507,6 +494,8 @@ def SetAdminPortState(self, request, context): ) return response + @validate_request_target(nic_simulator_grpc_service_pb2.OperationReply()) + @validate_request_certificate(nic_simulator_grpc_service_pb2.OperationReply()) def QueryOperationPortState(self, request, context): # TODO: Add QueryOperationPortState implementation return nic_simulator_grpc_service_pb2.OperationReply() @@ -586,17 +575,7 @@ def config_env(): def main(): - title = \ - """ - _ _ _____ _____ _____ _____ __ __ _ _ _ _______ ____ _____ - | \ | |_ _/ ____| / ____|_ _| \/ | | | | | /\|__ __/ __ \| __ \ - | \| | | || | | (___ | | | \ / | | | | | / \ | | | | | | |__) | - | . ` | | || | \___ \ | | | |\/| | | | | | / /\ \ | | | | | | _ / - | |\ |_| || |____ ____) |_| |_| | | | |__| | |____ / ____ \| | | |__| | | \ \ - |_| \_|_____\_____| |_____/|_____|_| |_|\____/|______/_/ \_\_| \____/|_| \_\ - - """ - print(title) + print(sys.modules[__name__].__doc__) args = parse_args() logging.debug("Start nic_simulator with args: %s", args) config_env() From df7ea1d74bd40b470f0bc37fa06227ff84462e55 Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Tue, 29 Mar 2022 01:37:21 +0000 Subject: [PATCH 3/6] Fix decorator returns Signed-off-by: Longxiang Lyu --- ansible/dualtor/nic_simulator/nic_simulator.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py index 4a3859da5c4..08bde4ff230 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator.py +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -277,11 +277,11 @@ class OVSBridge(object): """ Object to represent the OVS bridge for the active-active port testbed setup. - +--------------+ - PTF (host_if) --+ +----- upper_if - | OVS bridge | + +--------------+ + PTF (host_if) --+ +----- upper_if + | OVS bridge | simulator netns (server_nic) --+ +----- lower_if - +--------------+ + +--------------+ """ __slots__ = ( @@ -428,6 +428,7 @@ def _decorated(nic_simulator, request, context): elif grpc_server not in nic_simulator.ovs_bridges: context.set_code(grpc.StatusCode.NOT_FOUND) context.set_details("grpc_server not found by nic_simulator") + return response else: context.grpc_server = grpc_server return rpc_func(nic_simulator, request, context) From e17c7d101810dad4332d9f60d6f2c2d662cb8fbb Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Mon, 11 Apr 2022 12:05:44 +0000 Subject: [PATCH 4/6] Improve `nic_simulator` to start gRPC server per port Signed-off-by: Longxiang Lyu --- .../dualtor/nic_simulator/nic_simulator.py | 270 ++++++++++++++---- .../nic_simulator_grpc_mgmt_service.proto | 31 ++ .../nic_simulator_grpc_mgmt_service_pb2.py | 68 +++++ ...ic_simulator_grpc_mgmt_service_pb2_grpc.py | 132 +++++++++ 4 files changed, 440 insertions(+), 61 deletions(-) create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service.proto create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2.py create mode 100644 ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2_grpc.py diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py index 08bde4ff230..6e8f19acd18 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator.py +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -26,11 +26,16 @@ from concurrent import futures from logging.handlers import RotatingFileHandler +# from grpc_reflection.v1alpha import reflection import nic_simulator_grpc_service_pb2 import nic_simulator_grpc_service_pb2_grpc +import nic_simulator_grpc_mgmt_service_pb2 +import nic_simulator_grpc_mgmt_service_pb2_grpc +THREAD_CONCURRENCY_PER_SERVER = 2 + # name templates ACTIVE_ACTIVE_BRIDGE_TEMPLATE = "baa-%s-%d" NETNS_IFACE_TEMPLATE = "eth%s" @@ -410,32 +415,6 @@ def query_forwarding_state(self): return states -def validate_request_target(response): - """Decorator to validate target gRPC server address is included in request metadata.""" - def _validate_request_target(rpc_func): - @functools.wraps(rpc_func) - def _decorated(nic_simulator, request, context): - logging.debug("Validate request metadata includes 'grpc_server'") - grpc_server = None - for meta in context.invocation_metadata(): - if meta.key == "grpc_server": - grpc_server = meta.value - break - if not grpc_server: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details("grpc_server metadata not found in the request") - return response - elif grpc_server not in nic_simulator.ovs_bridges: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details("grpc_server not found by nic_simulator") - return response - else: - context.grpc_server = grpc_server - return rpc_func(nic_simulator, request, context) - return _decorated - return _validate_request_target - - def validate_request_certificate(response): """Decorator to validate client certificate.""" def _validate_request_certificate(rpc_func): @@ -448,14 +427,193 @@ def _decorated(nic_simulator, request, context): return _validate_request_certificate +class InterruptableThread(threading.Thread): + """Thread class that can be interrupted by Exception raised.""" + + def __init__(self, **kwargs): + super(InterruptableThread, self).__init__(**kwargs) + self._e = None + + def set_error_handler(self, error_handler): + """Add error handler callback that will be called when the thread exits with error.""" + self.error_handler = error_handler + + def run(self): + """ + @summary: Run the target function, call `start()` to start the thread + instead of directly calling this one. + """ + try: + threading.Thread.run(self) + except Exception as e: + self._e = e + if getattr(self, "error_handler", None) is not None: + self.error_handler(self._e) + + def join(self, timeout=None, suppress_exception=False): + """ + @summary: Join the thread, if `target` raises an exception, reraise it. + @timeout: Wait timeout for `target` to finish. + @suppress_exception: Default False, reraise the exception raised in + `target`. If True, return the exception instead of + raising. + """ + threading.Thread.join(self, timeout=timeout) + if self._e: + if suppress_exception: + return self._e + else: + raise(self._e) from None + + +class NiCServer(nic_simulator_grpc_service_pb2_grpc.DualTorServiceServicer, threading.Thread): + """gRPC for a NiC.""" + + def __init__(self, nic_addr, ovs_bridge): + self.nic_addr = nic_addr + self.ovs_bridge = ovs_bridge + self.server = None + self.thread = None + + @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) + def QueryAdminPortState(self, request, context): + logging.debug("QueryAdminPortState: request to server %s from client %s\n", self.nic_addr, context.peer()) + response = nic_simulator_grpc_service_pb2.AdminReply( + portid=[0, 1], + state=self.ovs_bridge.query_forwarding_state() + ) + logging.debug("QueryAdminPortState: response to client %s from server %s:\n%s", context.peer(), self.nic_addr, response) + return response + + @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) + def SetAdminPortState(self, request, context): + logging.debug("SetAdminPortState: request to server %s from client %s\n", self.nic_addr, context.peer()) + response = nic_simulator_grpc_service_pb2.AdminReply( + portid=[0, 1], + state=self.ovs_bridge.set_forwarding_state(request.state) + ) + logging.debug("SetAdminPortState: response to client %s from server %s:\n%s", context.peer(), self.nic_addr, response) + return response + + @validate_request_certificate(nic_simulator_grpc_service_pb2.OperationReply()) + def QueryOperationPortState(self, request, context): + # TODO: Add QueryOperationPortState implementation + return nic_simulator_grpc_service_pb2.OperationReply() + + def _run_server(self, binding_port): + """Run the gRPC server.""" + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=THREAD_CONCURRENCY_PER_SERVER)) + nic_simulator_grpc_service_pb2_grpc.add_DualTorServiceServicer_to_server( + self, + self.server + ) + self.server.add_insecure_port("%s:%s" % (self.nic_addr, binding_port)) + self.server.start() + self.server.wait_for_termination() + + def start(self, binding_port): + """Start the gRPC server thread.""" + self.thread = InterruptableThread(target=self._run_server, args=(binding_port,)) + self.thread.start() + + def stop(self): + """Stop the gRPC server thread.""" + self.server._state.termination_event.set() + + def join(self, timeout=None, suppress_exception=False): + """Wait the gRPC server thread termination.""" + self.thread.join(timeout=timeout, suppress_exception=suppress_exception) + + +class MgmtServer(nic_simulator_grpc_mgmt_service_pb2_grpc.DualTorMgmtServiceServicer): + """Management gRPC server to interact with sonic-mgmt.""" + + def __init__(self, binding_address, binding_port): + self.binding_address = binding_address + self.binding_port = binding_port + self.client_stubs = {} + self.server = None + + def _get_client_stub(self, nic_address): + if nic_address in self.client_stubs: + client_stub = self.client_stubs[nic_address] + else: + client_stub = nic_simulator_grpc_service_pb2_grpc.DualTorServiceStub( + grpc.insecure_channel("%s:%s" % (nic_address, self.binding_port)) + ) + self.client_stubs[nic_address] = client_stub + return client_stub + + def QueryAdminPortState(self, request, context): + nic_addresses = request.nic_addresses + logging.debug("QueryAdminPortState[mgmt]: request query admin port state for %s\n", nic_addresses) + query_responses = [] + for nic_address in nic_addresses: + client_stub = self._get_client_stub(nic_address) + try: + state = client_stub.QueryAdminPortState( + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ) + ) + query_responses.append(state) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details("Error in QueryAdminPortState to %s: %s" % (nic_address, repr(e))) + return nic_simulator_grpc_mgmt_service_pb2.ListOfAdminReply() + response = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminReply( + nic_addresses=nic_addresses, + admin_replies=query_responses + ) + logging.debug("QueryAdminPortState[mgmt]: response of query: %s", response) + return response + + def SetAdminPortState(self, request, context): + nic_addresses = request.nic_addresses + admin_requests = request.admin_requests + logging.debug("SetAdminPortState[mgmt]: request set admin port state: %s\n", request) + set_responses = [] + for nic_address, admin_request in zip(nic_addresses, admin_requests): + client_stub = self._get_client_stub(nic_address) + try: + state = client_stub.SetAdminPortState( + admin_request + ) + set_responses.append(state) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details("Error in QueryAdminPortState to %s: %s" % (nic_address, repr(e))) + return nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest() + response = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminReply( + nic_addresses=nic_addresses, + admin_replies=set_responses + ) + logging.debug("QueryAdminPortState[mgmt]: response of query: %s", response) + return response + + def QueryOperationPortState(self, request, context): + return nic_simulator_grpc_mgmt_service_pb2.ListOfOperationReply() + + def start(self): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=THREAD_CONCURRENCY_PER_SERVER)) + nic_simulator_grpc_mgmt_service_pb2_grpc.add_DualTorMgmtServiceServicer_to_server(self, self.server) + self.server.add_insecure_port("%s:%s" % (self.binding_address, self.binding_port)) + self.server.start() + self.server.wait_for_termination() + + class NiCSimulator(nic_simulator_grpc_service_pb2_grpc.DualTorServiceServicer): """NiC simulator class, define all the gRPC calls.""" - def __init__(self, vm_set): + def __init__(self, vm_set, mgmt_port, binding_port): self.vm_set = vm_set self.server_nics = self._find_all_server_nics() self.server_nic_addresses = {nic: get_ip_address(nic) for nic in self.server_nics} + self.mgmt_port = mgmt_port + self.mgmt_port_address = get_ip_address(mgmt_port) self.ovs_bridges = {} + self.binding_port = binding_port for bridge_name in self._find_all_bridges(): index = bridge_name.split("-")[-1] server_nic = NETNS_IFACE_TEMPLATE % index @@ -464,42 +622,34 @@ def __init__(self, vm_set): server_nic_addr = self.server_nic_addresses[server_nic] if server_nic_addr is not None: self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name) - logging.info("Starting NiC simulator that receives for requests to: %s", json.dumps(list(self.ovs_bridges.keys()), indent=4)) + logging.info("Starting NiC simulator to manipulate OVS bridges: %s", json.dumps(list(self.ovs_bridges.keys()), indent=4)) + + self.servers = {} + self.servers = {nic_addr: NiCServer(nic_addr, ovs_bridge) for nic_addr, ovs_bridge in self.ovs_bridges.items()} + self.mgmt_server = MgmtServer(self.mgmt_port_address, binding_port) def _find_all_server_nics(self): return [_ for _ in os.listdir('/sys/class/net') if re.search(NETNS_IFACE_PATTERN, _)] def _find_all_bridges(self): result = OVSCommand.ovs_vsctl_list_br() - bridges = [_ for _ in result.stdout.split() if self.vm_set in _ and _.startswith(ACTIVE_ACTIVE_INTERFACE_PATTERN[0])] + bridges = [_ for _ in result.stdout.split() if self.vm_set in _ and _.startswith(ACTIVE_ACTIVE_BRIDGE_TEMPLATE[0])] return bridges - @validate_request_target(nic_simulator_grpc_service_pb2.AdminReply()) - @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) - def QueryAdminPortState(self, request, context): - target_server = context.grpc_server - response = nic_simulator_grpc_service_pb2.AdminReply( - portid=[0, 1], - state=self.ovs_bridges[target_server].query_forwarding_state() - ) - logging.debug("QueryAdminPortState: response to client %s:\n%s", context.peer(), response) - return response + def start_nic_servers(self): + for nic_addr, server in self.servers.items(): + logging.debug("Starting gRPC server on NiC %s", nic_addr) + server.start(self.binding_port) - @validate_request_target(nic_simulator_grpc_service_pb2.AdminReply()) - @validate_request_certificate(nic_simulator_grpc_service_pb2.AdminReply()) - def SetAdminPortState(self, request, context): - target_server = context.grpc_server - response = nic_simulator_grpc_service_pb2.AdminReply( - portid=[0, 1], - state=self.ovs_bridges[target_server].set_forwarding_state(request.state) - ) - return response + def stop_nic_servers(self): + for nic_addr, server in self.servers.items(): + logging.debug("Stopping gRPC server on NiC %s", nic_addr) + server.stop() + server.join() - @validate_request_target(nic_simulator_grpc_service_pb2.OperationReply()) - @validate_request_certificate(nic_simulator_grpc_service_pb2.OperationReply()) - def QueryOperationPortState(self, request, context): - # TODO: Add QueryOperationPortState implementation - return nic_simulator_grpc_service_pb2.OperationReply() + def start_mgmt_server(self): + logging.debug("Starting gRPC server on mgmt port %s", self.mgmt_port_address) + self.mgmt_server.start() def parse_args(): @@ -581,14 +731,12 @@ def main(): logging.debug("Start nic_simulator with args: %s", args) config_env() config_logging(args.vm_set, args.log_level.upper(), args.stdout_log) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - nic_simulator_grpc_service_pb2_grpc.add_DualTorServiceServicer_to_server( - NiCSimulator(args.vm_set), - server - ) - server.add_insecure_port("0.0.0.0:%s" % args.port) - server.start() - server.wait_for_termination() + nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port) + nic_simulator.start_nic_servers() + try: + nic_simulator.start_mgmt_server() + except KeyboardInterrupt: + nic_simulator.stop_nic_servers() if __name__ == "__main__": diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service.proto b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service.proto new file mode 100644 index 00000000000..899267ef217 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +import "nic_simulator_grpc_service.proto"; + +service DualTorMgmtService { + rpc QueryAdminPortState(ListOfAdminRequest) returns (ListOfAdminReply) {} + + rpc SetAdminPortState(ListOfAdminRequest) returns (ListOfAdminReply) {} + + rpc QueryOperationPortState(ListOfOperationRequest) returns (ListOfOperationReply) {} +} + +message ListOfAdminRequest { + repeated string nic_addresses = 1; + repeated AdminRequest admin_requests = 2; +} + +message ListOfAdminReply { + repeated string nic_addresses = 1; + repeated AdminReply admin_replies = 2; +} + +message ListOfOperationRequest { + repeated string nic_addresses = 1; + repeated OperationRequest operation_requests = 2; +} + +message ListOfOperationReply { + repeated string nic_addresses = 1; + repeated OperationReply operation_replies = 2; +} diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2.py b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2.py new file mode 100644 index 00000000000..5d17aa2f653 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: nic_simulator_grpc_mgmt_service.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +import nic_simulator_grpc_service_pb2 as nic__simulator__grpc__service__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n%nic_simulator_grpc_mgmt_service.proto\x1a nic_simulator_grpc_service.proto\"R\n\x12ListOfAdminRequest\x12\x15\n\rnic_addresses\x18\x01 \x03(\t\x12%\n\x0e\x61\x64min_requests\x18\x02 \x03(\x0b\x32\r.AdminRequest\"M\n\x10ListOfAdminReply\x12\x15\n\rnic_addresses\x18\x01 \x03(\t\x12\"\n\radmin_replies\x18\x02 \x03(\x0b\x32\x0b.AdminReply\"^\n\x16ListOfOperationRequest\x12\x15\n\rnic_addresses\x18\x01 \x03(\t\x12-\n\x12operation_requests\x18\x02 \x03(\x0b\x32\x11.OperationRequest\"Y\n\x14ListOfOperationReply\x12\x15\n\rnic_addresses\x18\x01 \x03(\t\x12*\n\x11operation_replies\x18\x02 \x03(\x0b\x32\x0f.OperationReply2\xe1\x01\n\x12\x44ualTorMgmtService\x12?\n\x13QueryAdminPortState\x12\x13.ListOfAdminRequest\x1a\x11.ListOfAdminReply\"\x00\x12=\n\x11SetAdminPortState\x12\x13.ListOfAdminRequest\x1a\x11.ListOfAdminReply\"\x00\x12K\n\x17QueryOperationPortState\x12\x17.ListOfOperationRequest\x1a\x15.ListOfOperationReply\"\x00\x62\x06proto3') + + + +_LISTOFADMINREQUEST = DESCRIPTOR.message_types_by_name['ListOfAdminRequest'] +_LISTOFADMINREPLY = DESCRIPTOR.message_types_by_name['ListOfAdminReply'] +_LISTOFOPERATIONREQUEST = DESCRIPTOR.message_types_by_name['ListOfOperationRequest'] +_LISTOFOPERATIONREPLY = DESCRIPTOR.message_types_by_name['ListOfOperationReply'] +ListOfAdminRequest = _reflection.GeneratedProtocolMessageType('ListOfAdminRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTOFADMINREQUEST, + '__module__' : 'nic_simulator_grpc_mgmt_service_pb2' + # @@protoc_insertion_point(class_scope:ListOfAdminRequest) + }) +_sym_db.RegisterMessage(ListOfAdminRequest) + +ListOfAdminReply = _reflection.GeneratedProtocolMessageType('ListOfAdminReply', (_message.Message,), { + 'DESCRIPTOR' : _LISTOFADMINREPLY, + '__module__' : 'nic_simulator_grpc_mgmt_service_pb2' + # @@protoc_insertion_point(class_scope:ListOfAdminReply) + }) +_sym_db.RegisterMessage(ListOfAdminReply) + +ListOfOperationRequest = _reflection.GeneratedProtocolMessageType('ListOfOperationRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTOFOPERATIONREQUEST, + '__module__' : 'nic_simulator_grpc_mgmt_service_pb2' + # @@protoc_insertion_point(class_scope:ListOfOperationRequest) + }) +_sym_db.RegisterMessage(ListOfOperationRequest) + +ListOfOperationReply = _reflection.GeneratedProtocolMessageType('ListOfOperationReply', (_message.Message,), { + 'DESCRIPTOR' : _LISTOFOPERATIONREPLY, + '__module__' : 'nic_simulator_grpc_mgmt_service_pb2' + # @@protoc_insertion_point(class_scope:ListOfOperationReply) + }) +_sym_db.RegisterMessage(ListOfOperationReply) + +_DUALTORMGMTSERVICE = DESCRIPTOR.services_by_name['DualTorMgmtService'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _LISTOFADMINREQUEST._serialized_start=75 + _LISTOFADMINREQUEST._serialized_end=157 + _LISTOFADMINREPLY._serialized_start=159 + _LISTOFADMINREPLY._serialized_end=236 + _LISTOFOPERATIONREQUEST._serialized_start=238 + _LISTOFOPERATIONREQUEST._serialized_end=332 + _LISTOFOPERATIONREPLY._serialized_start=334 + _LISTOFOPERATIONREPLY._serialized_end=423 + _DUALTORMGMTSERVICE._serialized_start=426 + _DUALTORMGMTSERVICE._serialized_end=651 +# @@protoc_insertion_point(module_scope) diff --git a/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2_grpc.py b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2_grpc.py new file mode 100644 index 00000000000..ce9ca7f9951 --- /dev/null +++ b/ansible/dualtor/nic_simulator/nic_simulator_grpc_mgmt_service_pb2_grpc.py @@ -0,0 +1,132 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import nic_simulator_grpc_mgmt_service_pb2 as nic__simulator__grpc__mgmt__service__pb2 + + +class DualTorMgmtServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.QueryAdminPortState = channel.unary_unary( + '/DualTorMgmtService/QueryAdminPortState', + request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString, + ) + self.SetAdminPortState = channel.unary_unary( + '/DualTorMgmtService/SetAdminPortState', + request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString, + ) + self.QueryOperationPortState = channel.unary_unary( + '/DualTorMgmtService/QueryOperationPortState', + request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.SerializeToString, + response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.FromString, + ) + + +class DualTorMgmtServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def QueryAdminPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SetAdminPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def QueryOperationPortState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_DualTorMgmtServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'QueryAdminPortState': grpc.unary_unary_rpc_method_handler( + servicer.QueryAdminPortState, + request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.FromString, + response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.SerializeToString, + ), + 'SetAdminPortState': grpc.unary_unary_rpc_method_handler( + servicer.SetAdminPortState, + request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.FromString, + response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.SerializeToString, + ), + 'QueryOperationPortState': grpc.unary_unary_rpc_method_handler( + servicer.QueryOperationPortState, + request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.FromString, + response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'DualTorMgmtService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class DualTorMgmtService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def QueryAdminPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/QueryAdminPortState', + nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString, + nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def SetAdminPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/SetAdminPortState', + nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString, + nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def QueryOperationPortState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/QueryOperationPortState', + nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.SerializeToString, + nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) From bc017bbdc7b319e7f7f8ca968ac1b13b3eecc651 Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Mon, 11 Apr 2022 12:06:13 +0000 Subject: [PATCH 5/6] Improve `nic_simulator_client` Signed-off-by: Longxiang Lyu --- .../nic_simulator/nic_simulator_client.py | 79 ++++++++++++++++--- 1 file changed, 70 insertions(+), 9 deletions(-) diff --git a/ansible/dualtor/nic_simulator/nic_simulator_client.py b/ansible/dualtor/nic_simulator/nic_simulator_client.py index e678eaa1e26..1d240c09c0c 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator_client.py +++ b/ansible/dualtor/nic_simulator/nic_simulator_client.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 import argparse +from urllib import response import grpc from collections import namedtuple import nic_simulator_grpc_service_pb2 import nic_simulator_grpc_service_pb2_grpc +import nic_simulator_grpc_mgmt_service_pb2 +import nic_simulator_grpc_mgmt_service_pb2_grpc class MetadataInterceptor(grpc.UnaryUnaryClientInterceptor): @@ -56,6 +59,13 @@ def parse_args(): required=True, help="gRPC server port" ) + parser.add_argument( + "-m", + "--test_mgmt", + default=False, + action="store_true", + help="Test mgmt gRPC server" + ) return parser.parse_args() @@ -63,23 +73,74 @@ def main(): args = parse_args() server = args.server port = args.server_port - with grpc.insecure_channel("%s:%s" % (server, port)) as insecure_channel: - metadata_interceptor = MetadataInterceptor(("grpc_server", "192.168.0.101")) - with grpc.intercept_channel(insecure_channel, metadata_interceptor) as channel: + test_mgmt = args.test_mgmt + with grpc.insecure_channel("%s:%s" % (server, port)) as channel: + # metadata_interceptor = MetadataInterceptor(("grpc_server", "192.168.0.101")) + # with grpc.intercept_channel(insecure_channel, metadata_interceptor) as channel: + if test_mgmt: + stub = nic_simulator_grpc_mgmt_service_pb2_grpc.DualTorMgmtServiceStub(channel) + request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest( + nic_addresses=["192.168.0.3", "192.168.0.5"], + admin_requests=[ + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ), + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ), + ] + ) + response = stub.QueryAdminPortState(request) + print(response) + + request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest( + nic_addresses=["192.168.0.3", "192.168.0.5"], + admin_requests=[ + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[False, True] + ), + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, False] + ), + ] + ) + response = stub.SetAdminPortState(request) + print(response) + + request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest( + nic_addresses=["192.168.0.3", "192.168.0.5"], + admin_requests=[ + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ), + nic_simulator_grpc_service_pb2.AdminRequest( + portid=[0, 1], + state=[True, True] + ), + ] + ) + response = stub.QueryAdminPortState(request) + print(response) + else: stub = nic_simulator_grpc_service_pb2_grpc.DualTorServiceStub(channel) - state = nic_simulator_grpc_service_pb2.AdminRequest( + request = nic_simulator_grpc_service_pb2.AdminRequest( portid=[0, 1], state=[True, True] ) - state = stub.QueryAdminPortState(state) - print(state) + response = stub.QueryAdminPortState(request) + print(response) - state = nic_simulator_grpc_service_pb2.AdminRequest( + request = nic_simulator_grpc_service_pb2.AdminRequest( portid=[0, 1], state=[True, False] ) - state = stub.SetAdminPortState(state) - print(state) + response = stub.SetAdminPortState(request) + print(response) if __name__ == "__main__": From d2a264e62e217e9717828489ec0855207add4a9f Mon Sep 17 00:00:00 2001 From: Longxiang Lyu Date: Tue, 12 Apr 2022 01:50:42 +0000 Subject: [PATCH 6/6] Remove redundant parent class Signed-off-by: Longxiang Lyu --- ansible/dualtor/nic_simulator/nic_simulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py index 6e8f19acd18..ac3b6ab9045 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator.py +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -466,7 +466,7 @@ def join(self, timeout=None, suppress_exception=False): raise(self._e) from None -class NiCServer(nic_simulator_grpc_service_pb2_grpc.DualTorServiceServicer, threading.Thread): +class NiCServer(nic_simulator_grpc_service_pb2_grpc.DualTorServiceServicer): """gRPC for a NiC.""" def __init__(self, nic_addr, ovs_bridge):