diff --git a/config/main.py b/config/main.py index 90531997ee..01b9cf9ee3 100644 --- a/config/main.py +++ b/config/main.py @@ -92,6 +92,11 @@ asic_type = None +DSCP_RANGE = click.IntRange(min=0, max=63) +TTL_RANGE = click.IntRange(min=0, max=255) +QUEUE_RANGE = click.IntRange(min=0, max=255) +GRE_TYPE_RANGE = click.IntRange(min=0, max=65535) + # # Helper functions # @@ -953,6 +958,19 @@ def cache_arp_entries(): open(restore_flag_file, 'w').close() return success + +def validate_ipv4_address(ctx, param, ip_addr): + """Helper function to validate ipv4 address + """ + try: + ip_n = ipaddress.ip_network(ip_addr, False) + if ip_n.version != 4: + raise click.UsageError("{} is not a valid IPv4 address".format(ip_addr)) + return ip_addr + except ValueError as e: + raise click.UsageError(str(e)) + + # This is our main entrypoint - the main 'config' command @click.group(cls=clicommon.AbbreviationGroup, context_settings=CONTEXT_SETTINGS) @click.pass_context @@ -1775,12 +1793,12 @@ def mirror_session(): @mirror_session.command('add') @click.argument('session_name', metavar='', required=True) -@click.argument('src_ip', metavar='', required=True) -@click.argument('dst_ip', metavar='', required=True) -@click.argument('dscp', metavar='', required=True) -@click.argument('ttl', metavar='', required=True) -@click.argument('gre_type', metavar='[gre_type]', required=False) -@click.argument('queue', metavar='[queue]', required=False) +@click.argument('src_ip', metavar='', callback=validate_ipv4_address, required=True) +@click.argument('dst_ip', metavar='', callback=validate_ipv4_address, required=True) +@click.argument('dscp', metavar='', type=DSCP_RANGE, required=True) +@click.argument('ttl', metavar='', type=TTL_RANGE, required=True) +@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False) +@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False) @click.option('--policer') def add(session_name, src_ip, dst_ip, dscp, ttl, gre_type, queue, policer): """ Add ERSPAN mirror session.(Legacy support) """ @@ -1799,12 +1817,12 @@ def erspan(ctx): @erspan.command('add') @click.argument('session_name', metavar='', required=True) -@click.argument('src_ip', metavar='', required=True) -@click.argument('dst_ip', metavar='', required=True) -@click.argument('dscp', metavar='', required=True) -@click.argument('ttl', metavar='', required=True) -@click.argument('gre_type', metavar='[gre_type]', required=False) -@click.argument('queue', metavar='[queue]', required=False) +@click.argument('src_ip', metavar='', callback=validate_ipv4_address, required=True) +@click.argument('dst_ip', metavar='', callback=validate_ipv4_address,required=True) +@click.argument('dscp', metavar='', type=DSCP_RANGE, required=True) +@click.argument('ttl', metavar='', type=TTL_RANGE, required=True) +@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False) +@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False) @click.argument('src_port', metavar='[src_port]', required=False) @click.argument('direction', metavar='[direction]', required=False) @click.option('--policer') @@ -1877,7 +1895,7 @@ def span(ctx): @click.argument('dst_port', metavar='', required=True) @click.argument('src_port', metavar='[src_port]', required=False) @click.argument('direction', metavar='[direction]', required=False) -@click.argument('queue', metavar='[queue]', required=False) +@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False) @click.option('--policer') def add(session_name, dst_port, src_port, direction, queue, policer): """ Add SPAN mirror session """ diff --git a/tests/config_mirror_session_test.py b/tests/config_mirror_session_test.py new file mode 100644 index 0000000000..883faea1ac --- /dev/null +++ b/tests/config_mirror_session_test.py @@ -0,0 +1,150 @@ +import pytest +import config.main as config +from unittest import mock +from click.testing import CliRunner + +ERR_MSG_IP_FAILURE = "does not appear to be an IPv4 or IPv6 network" +ERR_MSG_IP_VERSION_FAILURE = "not a valid IPv4 address" +ERR_MSG_VALUE_FAILURE = "Invalid value for" + +def test_mirror_session_add(): + runner = CliRunner() + + # Verify invalid src_ip + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_FAILURE in result.stdout + + # Verify invalid dst_ip + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_FAILURE in result.stdout + + # Verify invalid ip version + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1::1", "2::2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_VERSION_FAILURE in result.stdout + + # Verify invalid dscp + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid ttl + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid gre + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid queue + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Positive case + with mock.patch('config.main.add_erspan') as mocked: + result = runner.invoke( + config.config.commands["mirror_session"].commands["add"], + ["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"]) + + mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None) + + + +def test_mirror_session_erspan_add(): + runner = CliRunner() + + # Verify invalid src_ip + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_FAILURE in result.stdout + + # Verify invalid dst_ip + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_FAILURE in result.stdout + + # Verify invalid ip version + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1::1", "2::2", "8", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_IP_VERSION_FAILURE in result.stdout + + # Verify invalid dscp + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid ttl + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid gre + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Verify invalid queue + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Positive case + with mock.patch('config.main.add_erspan') as mocked: + result = runner.invoke( + config.config.commands["mirror_session"].commands["erspan"].commands["add"], + ["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"]) + + mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None, None, None) + + +def test_mirror_session_span_add(): + runner = CliRunner() + + # Verify invalid queue + result = runner.invoke( + config.config.commands["mirror_session"].commands["span"].commands["add"], + ["test_session", "Ethernet0", "Ethernet4", "rx", "65536"]) + assert result.exit_code != 0 + assert ERR_MSG_VALUE_FAILURE in result.stdout + + # Positive case + with mock.patch('config.main.add_span') as mocked: + result = runner.invoke( + config.config.commands["mirror_session"].commands["span"].commands["add"], + ["test_session", "Ethernet0", "Ethernet4", "rx", "100"]) + + mocked.assert_called_with("test_session", "Ethernet0", "Ethernet4", "rx", 100, None) +