Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
743 changes: 743 additions & 0 deletions ansible/dualtor/nic_simulator/nic_simulator.py

Large diffs are not rendered by default.

147 changes: 147 additions & 0 deletions ansible/dualtor/nic_simulator/nic_simulator_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python3
import argparse
from urllib import response
import grpc

from collections import namedtuple

import nic_simulator_grpc_service_pb2
import nic_simulator_grpc_service_pb2_grpc
import nic_simulator_grpc_mgmt_service_pb2
import nic_simulator_grpc_mgmt_service_pb2_grpc


class MetadataInterceptor(grpc.UnaryUnaryClientInterceptor):

class _ClientCallDetails(
namedtuple(
'_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
"""Wrapper class for initializing a new ClientCallDetails instance.
"""
pass

def __init__(self, injected_meta):
self.injected_meta = injected_meta

def intercept_unary_unary(self, continuation, client_call_details, request):

if client_call_details.metadata is None:
metadata = []
else:
metadata = list(client_call_details.metadata)

metadata.append(self.injected_meta)

client_call_details = self._ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials
)
return continuation(client_call_details, request)


def parse_args():
parser = argparse.ArgumentParser(
description="NiC simulator client"
)
parser.add_argument(
"-s",
"--server",
required=True,
help="gRPC server address"
)
parser.add_argument(
"-p",
"--server_port",
required=True,
help="gRPC server port"
)
parser.add_argument(
"-m",
"--test_mgmt",
default=False,
action="store_true",
help="Test mgmt gRPC server"
)
return parser.parse_args()


def main():
args = parse_args()
server = args.server
port = args.server_port
test_mgmt = args.test_mgmt
with grpc.insecure_channel("%s:%s" % (server, port)) as channel:
# metadata_interceptor = MetadataInterceptor(("grpc_server", "192.168.0.101"))
# with grpc.intercept_channel(insecure_channel, metadata_interceptor) as channel:
if test_mgmt:
stub = nic_simulator_grpc_mgmt_service_pb2_grpc.DualTorMgmtServiceStub(channel)
request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest(
nic_addresses=["192.168.0.3", "192.168.0.5"],
admin_requests=[
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, True]
),
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, True]
),
]
)
response = stub.QueryAdminPortState(request)
print(response)

request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest(
nic_addresses=["192.168.0.3", "192.168.0.5"],
admin_requests=[
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[False, True]
),
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, False]
),
]
)
response = stub.SetAdminPortState(request)
print(response)

request = nic_simulator_grpc_mgmt_service_pb2.ListOfAdminRequest(
nic_addresses=["192.168.0.3", "192.168.0.5"],
admin_requests=[
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, True]
),
nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, True]
),
]
)
response = stub.QueryAdminPortState(request)
print(response)
else:
stub = nic_simulator_grpc_service_pb2_grpc.DualTorServiceStub(channel)
request = nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, True]
)
response = stub.QueryAdminPortState(request)
print(response)

request = nic_simulator_grpc_service_pb2.AdminRequest(
portid=[0, 1],
state=[True, False]
)
response = stub.SetAdminPortState(request)
print(response)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";

import "nic_simulator_grpc_service.proto";

service DualTorMgmtService {
rpc QueryAdminPortState(ListOfAdminRequest) returns (ListOfAdminReply) {}

rpc SetAdminPortState(ListOfAdminRequest) returns (ListOfAdminReply) {}

rpc QueryOperationPortState(ListOfOperationRequest) returns (ListOfOperationReply) {}
}

message ListOfAdminRequest {
repeated string nic_addresses = 1;
repeated AdminRequest admin_requests = 2;
}

message ListOfAdminReply {
repeated string nic_addresses = 1;
repeated AdminReply admin_replies = 2;
}

message ListOfOperationRequest {
repeated string nic_addresses = 1;
repeated OperationRequest operation_requests = 2;
}

message ListOfOperationReply {
repeated string nic_addresses = 1;
repeated OperationReply operation_replies = 2;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import nic_simulator_grpc_mgmt_service_pb2 as nic__simulator__grpc__mgmt__service__pb2


class DualTorMgmtServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.QueryAdminPortState = channel.unary_unary(
'/DualTorMgmtService/QueryAdminPortState',
request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString,
response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString,
)
self.SetAdminPortState = channel.unary_unary(
'/DualTorMgmtService/SetAdminPortState',
request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString,
response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString,
)
self.QueryOperationPortState = channel.unary_unary(
'/DualTorMgmtService/QueryOperationPortState',
request_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.SerializeToString,
response_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.FromString,
)


class DualTorMgmtServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def QueryAdminPortState(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def SetAdminPortState(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def QueryOperationPortState(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DualTorMgmtServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'QueryAdminPortState': grpc.unary_unary_rpc_method_handler(
servicer.QueryAdminPortState,
request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.FromString,
response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.SerializeToString,
),
'SetAdminPortState': grpc.unary_unary_rpc_method_handler(
servicer.SetAdminPortState,
request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.FromString,
response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.SerializeToString,
),
'QueryOperationPortState': grpc.unary_unary_rpc_method_handler(
servicer.QueryOperationPortState,
request_deserializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.FromString,
response_serializer=nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DualTorMgmtService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class DualTorMgmtService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def QueryAdminPortState(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/QueryAdminPortState',
nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString,
nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def SetAdminPortState(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/SetAdminPortState',
nic__simulator__grpc__mgmt__service__pb2.ListOfAdminRequest.SerializeToString,
nic__simulator__grpc__mgmt__service__pb2.ListOfAdminReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def QueryOperationPortState(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/DualTorMgmtService/QueryOperationPortState',
nic__simulator__grpc__mgmt__service__pb2.ListOfOperationRequest.SerializeToString,
nic__simulator__grpc__mgmt__service__pb2.ListOfOperationReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Loading