Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,21 +804,26 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port,

return True

def validate_ip_mask(ctx, ip_addr):
def is_valid_ip_interface(ctx, ip_addr):
split_ip_mask = ip_addr.split("/")
if len(split_ip_mask) < 2:
return False

# Check if the IP address is correct or if there are leading zeros.
ip_obj = ipaddress.ip_address(split_ip_mask[0])

# Check if the mask is correct
mask_range = 33 if isinstance(ip_obj, ipaddress.IPv4Address) else 129
# If mask is not specified
if len(split_ip_mask) < 2:
return 0
if isinstance(ip_obj, ipaddress.IPv4Address):
# Since the IP address is used as a part of a key in Redis DB,
# do not tolerate extra zeros in IPv4.
if str(ip_obj) != split_ip_mask[0]:
return False

if not int(split_ip_mask[1]) in range(1, mask_range):
return 0
# Check if the mask is correct
net = ipaddress.ip_network(ip_addr, strict=False)
if str(net.prefixlen) != split_ip_mask[1] or net.prefixlen == 0:
return False

return str(ip_obj) + '/' + str(int(split_ip_mask[1]))
return True

def cli_sroute_to_config(ctx, command_str, strict_nh = True):
if len(command_str) < 2 or len(command_str) > 9:
Expand Down Expand Up @@ -3543,10 +3548,9 @@ def add(ctx, interface_name, ip_addr, gw):
try:
net = ipaddress.ip_network(ip_addr, strict=False)
if '/' not in ip_addr:
ip_addr = str(net)
ip_addr += '/' + str(net.prefixlen)

ip_addr = validate_ip_mask(ctx, ip_addr)
if not ip_addr:
if not is_valid_ip_interface(ctx, ip_addr):
raise ValueError('')

if interface_name == 'eth0':
Expand Down Expand Up @@ -3608,10 +3612,9 @@ def remove(ctx, interface_name, ip_addr):
try:
net = ipaddress.ip_network(ip_addr, strict=False)
if '/' not in ip_addr:
ip_addr = str(net)

ip_addr = validate_ip_mask(ctx, ip_addr)
if not ip_addr:
ip_addr += '/' + str(net.prefixlen)

if not is_valid_ip_interface(ctx, ip_addr):
raise ValueError('')

if interface_name == 'eth0':
Expand Down
30 changes: 12 additions & 18 deletions tests/ip_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_add_del_interface_valid_ipv4(self):
obj = {'config_db':db.cfgdb}

# config int ip add Ethernet64 10.10.10.1/24
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet64", "10.10.10.1/24"], obj=obj)
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet64", "10.10.10.1/24"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet64', '10.10.10.1/24') in db.cfgdb.get_table('INTERFACE')
Expand Down Expand Up @@ -59,22 +59,16 @@ def test_add_interface_ipv4_invalid_mask(self):
assert result.exit_code != 0
assert ERROR_MSG in result.output

def test_add_del_interface_ipv4_with_leading_zeros(self):
def test_add_interface_ipv4_with_leading_zeros(self):
db = Db()
runner = CliRunner()
obj = {'config_db':db.cfgdb}

# config int ip add Ethernet68 10.10.10.002/24
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "10.10.10.002/24"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet68', '10.10.10.2/24') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet68 10.10.10.002/24
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "10.10.10.002/24"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet68', '10.10.10.2/24') not in db.cfgdb.get_table('INTERFACE')
assert ERROR_MSG in result.output

''' Tests for IPv6 '''

Expand All @@ -84,13 +78,13 @@ def test_add_del_interface_valid_ipv6(self):
obj = {'config_db':db.cfgdb}

# config int ip add Ethernet72 2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj)
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet72', '2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet72 2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj)
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet72', '2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34') not in db.cfgdb.get_table('INTERFACE')
Expand Down Expand Up @@ -122,34 +116,34 @@ def test_add_del_interface_ipv6_with_leading_zeros(self):
runner = CliRunner()
obj = {'config_db':db.cfgdb}

# config int ip del Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34
# config int ip add Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') not in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34') not in db.cfgdb.get_table('INTERFACE')

def test_add_del_interface_shortened_ipv6_with_leading_zeros(self):
db = Db()
runner = CliRunner()
obj = {'config_db':db.cfgdb}

# config int ip del Ethernet68 3000::001/64
# config int ip add Ethernet68 3000::001/64
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "3000::001/64"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet68', '3000::1/64') in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '3000::001/64') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet68 3000::001/64
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "3000::001/64"], obj=obj)
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "3000::001/64"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet68', '3000::1/64') not in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '3000::001/64') not in db.cfgdb.get_table('INTERFACE')

@classmethod
def teardown_class(cls):
Expand Down