Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 74 additions & 15 deletions ansible/dualtor/nic_simulator/nic_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class OVSCommand(object):
OVS_VSCTL_LIST_PORTS_CMD = "ovs-vsctl list-ports {bridge_name}"
OVS_OFCTL_DEL_FLOWS_CMD = "ovs-ofctl del-flows {bridge_name}"
OVS_OFCTL_ADD_FLOWS_CMD = "ovs-ofctl add-flow {bridge_name} {flow}"
OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl mod-flows {bridge_name} {flow}"
OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl --strict mod-flows {bridge_name} {flow}"
OVS_OFCTL_DEL_GROUPS_CMD = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}"
OVS_OFCTL_ADD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}"
OVS_OFCTL_MOD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}"
Expand Down Expand Up @@ -251,19 +251,21 @@ def set_drop(self, recover=False):
class OVSUpstreamFlow(OVSFlow):
"""Object to represent an OVS upstream flow to output to both ToRs."""

__slots__ = ("drop_output",)
__slots__ = ("drop_output", "enable_output_ports")

def __init__(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None):
def __init__(self, in_port, packet_filter=None, output_ports=[],
group=None, priority=None, enable_output_ports=None):
super(OVSUpstreamFlow, self).__init__(
in_port, packet_filter, output_ports, group, priority)
self.drop_output = [False, False]
self.enable_output_ports = enable_output_ports if enable_output_ports else [True, True]

def to_string(self):
flow_parts = [self._str_prefix]
has_output = False
if self.output_ports:
output = ["output:%s" % port for (portid, port) in enumerate(self.output_ports)
if not self.drop_output[portid]]
if (self.get_port_enable(portid) and (not self.get_drop(portid)))]
has_output = bool(output)
if has_output:
flow_parts.append("actions=%s" % ",".join(output))
Expand All @@ -272,6 +274,9 @@ def to_string(self):
flow_parts.append("actions=drop")
return ",".join(flow_parts)

def get_port_enable(self, portid):
return self.enable_output_ports[portid]

def get_drop(self, portid):
return self.drop_output[portid]

Expand All @@ -283,7 +288,8 @@ def set_drop(self, portid=None, recover=False):
else:
self.drop_output[portid] = is_drop

self.reset()
if self.get_port_enable(portid):
self.reset()


class ForwardingState(object):
Expand Down Expand Up @@ -418,6 +424,8 @@ class OVSBridge(object):
"downstream_upper_tor_flow",
"downstream_lower_tor_flow",
"upstream_nic_flow",
"upstream_upper_tor_nic_flow",
"upstream_lower_tor_nic_flow",
"upstream_loopback2_flow",
"upstream_upper_tor_loopback3_flow",
"upstream_lower_tor_loopback3_flow",
Expand All @@ -426,7 +434,7 @@ class OVSBridge(object):
"flap_counter"
)

def __init__(self, bridge_name, loopback_ips):
def __init__(self, bridge_name, loopback_ips, duplicate_nic_upstream=False):
self.bridge_name = bridge_name
self.loopback2_ip = loopback_ips[0]
self.upper_tor_loopback3_ip = loopback_ips[1]
Expand All @@ -442,7 +450,7 @@ def __init__(self, bridge_name, loopback_ips):
self.flows = []
self.groups = []
self._init_ports()
self._init_flows()
self._init_flows(duplicate_nic_upstream)
self.states_getter = {
1: self.upstream_ecmp_flow.get_upper_tor_forwarding_state,
0: self.upstream_ecmp_flow.get_lower_tor_forwarding_state
Expand Down Expand Up @@ -489,18 +497,38 @@ def _init_ports(self):
self.lower_tor_port
)

def _init_flows(self):
def _init_flows(self, duplicate_nic_upstream=False):
"""Initialize OVS flows for the bridge."""
logging.info("Init flows for bridge %s", self.bridge_name)
self._del_flows()
self._del_groups()
# downstream flows
self.downstream_upper_tor_flow = self._add_flow(self.upper_tor_port,
output_ports=[self.ptf_port, self.server_nic], priority=10)
output_ports=[self.ptf_port, self.server_nic], priority=11)
self.downstream_lower_tor_flow = self._add_flow(self.lower_tor_port,
output_ports=[self.ptf_port, self.server_nic], priority=10)
output_ports=[self.ptf_port, self.server_nic], priority=11)

# upstream flows
if not duplicate_nic_upstream:
# NOTE: add two flows to direct gRPC traffic to its correct destination
# upstream packet to the upper ToR loopback3 from server NiC should be forwarded to the upper ToR
self.upstream_upper_tor_nic_flow = self._add_flow(
self.server_nic,
packet_filter="tcp,ip_dst=%s" % self.upper_tor_loopback3_ip,
output_ports=[self.lower_tor_port, self.upper_tor_port],
priority=10,
upstream=True,
enable_output_ports=[False, True]
)
# upstream packet to the lower ToR loopback3 from server NiC should be forwarded to the lower ToR
self.upstream_lower_tor_nic_flow = self._add_flow(
self.server_nic,
packet_filter="tcp,ip_dst=%s" % self.lower_tor_loopback3_ip,
output_ports=[self.lower_tor_port, self.upper_tor_port],
priority=10,
upstream=True,
enable_output_ports=[True, False]
)
# upstream packet from server NiC should be directed to both ToRs
self.upstream_nic_flow = self._add_flow(
self.server_nic,
Expand Down Expand Up @@ -572,10 +600,11 @@ def _del_groups(self):
self.upstream_ecmp_group = None
self.groups.clear()

def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None, upstream=False):
def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None,
upstream=False, enable_output_ports=None):
if upstream:
flow = OVSUpstreamFlow(in_port, packet_filter=packet_filter, output_ports=output_ports,
group=group, priority=priority)
group=group, priority=priority, enable_output_ports=enable_output_ports)
else:
flow = OVSFlow(in_port, packet_filter=packet_filter, output_ports=output_ports,
group=group, priority=priority)
Expand Down Expand Up @@ -639,6 +668,18 @@ def set_drop(self, portids, directions, recover):

# recover upstream
# recover upstream traffic from server NiC
if self.upstream_upper_tor_nic_flow.get_port_enable(portid):
if self.upstream_upper_tor_nic_flow.get_drop(portid):
self.upstream_upper_tor_nic_flow.set_drop(
portid=portid, recover=recover)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_upper_tor_nic_flow)
if self.upstream_lower_tor_nic_flow.get_port_enable(portid):
if self.upstream_lower_tor_nic_flow.get_drop(portid):
self.upstream_lower_tor_nic_flow.set_drop(
portid=portid, recover=recover)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_lower_tor_nic_flow)
if self.upstream_nic_flow.get_drop(portid):
self.upstream_nic_flow.set_drop(
portid=portid, recover=recover)
Expand Down Expand Up @@ -690,6 +731,16 @@ def set_drop(self, portids, directions, recover):
elif direction == 1:
# upstream
# drop upstream traffic from server NiC
if self.upstream_upper_tor_nic_flow.get_port_enable(portid):
if not self.upstream_upper_tor_nic_flow.get_drop(portid):
self.upstream_upper_tor_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_upper_tor_nic_flow)
if self.upstream_lower_tor_nic_flow.get_port_enable(portid):
if not self.upstream_lower_tor_nic_flow.get_drop(portid):
self.upstream_lower_tor_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_lower_tor_nic_flow)
if not self.upstream_nic_flow.get_drop(portid):
self.upstream_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
Expand Down Expand Up @@ -1128,7 +1179,7 @@ def start(self):
class NiCSimulator(nic_simulator_grpc_service_pb2_grpc.DualToRActiveServicer):
"""NiC simulator class, define all the gRPC calls."""

def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips):
def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips, duplicate_nic_upstream=False):
self.vm_set = vm_set
self.server_nics = self._find_all_server_nics()
self.server_nic_addresses = {
Expand All @@ -1144,7 +1195,8 @@ def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips):
if server_nic in self.server_nic_addresses:
server_nic_addr = self.server_nic_addresses[server_nic]
if server_nic_addr is not None:
self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips)
self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips, duplicate_nic_upstream)

logging.info("Starting NiC simulator to manipulate OVS bridges: %s",
json.dumps(list(self.ovs_bridges.keys()), indent=4))

Expand Down Expand Up @@ -1218,6 +1270,13 @@ def parse_args():
help="the Loopback IPs to duplicate to both ToRs: <Loopback2>,<upper ToR Loopback3>,<lower ToR Loopback3>",
dest="loopback_ips"
)
parser.add_argument(
"-n",
"--duplicate_nic_upstream",
default=False,
action="store_true",
help="Duplicate NIC upstream traffic to both ToRs (default: False)",
)
args = parser.parse_args()
return args

Expand Down Expand Up @@ -1270,7 +1329,7 @@ def main():
loopback_ips = args.loopback_ips.split(",")
if len(loopback_ips) != 3:
raise ValueError("Invalid loopback ips: {loopback_ips}".format(loopback_ips=loopback_ips))
nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips)
nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips, args.duplicate_nic_upstream)
nic_simulator.start_nic_servers()
try:
nic_simulator.start_mgmt_server()
Expand Down