22import logging
33from tests .common .helpers .assertions import pytest_assert
44from tests .common .helpers .snmp_helpers import get_snmp_facts
5+ from tests .common .utilities import get_data_acl , recover_acl_rule
56
67try :
78 import ntplib
1920SONIC_SSH_REGEX = 'OpenSSH_[\\ w\\ .]+ Debian'
2021
2122
22- def test_cacl_function (duthosts , enum_rand_one_per_hwsku_hostname , localhost , creds , recover_acl_rule ):
23+ def test_cacl_function (duthosts , enum_rand_one_per_hwsku_hostname , localhost , creds ):
2324 """Test control plane ACL functionality on a SONiC device"""
2425
2526 duthost = duthosts [enum_rand_one_per_hwsku_hostname ]
27+ data_acl = get_data_acl (duthost )
2628 dut_mgmt_ip = duthost .mgmt_ip
2729
2830 # Start an NTP client
@@ -46,76 +48,79 @@ def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, cr
4648 ntp_client .request (dut_mgmt_ip )
4749 except ntplib .NTPException :
4850 pytest .fail ("NTP did timed out when expected to succeed!" )
49-
50- # Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
51- duthost .copy (src = "scripts/config_service_acls.sh" , dest = "/tmp/config_service_acls.sh" , mode = "0755" )
52-
53- # We run the config_service_acls.sh script in the background because it
54- # will install ACL rules which will only allow control plane traffic
55- # to an unused IP range. Thus, if it works properly, it will sever our
56- # SSH session, but we don't want the script itself to get killed,
57- # because it is also responsible for resetting the control plane ACLs
58- # back to their previous, working state
59- duthost .shell ("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &" )
60-
61- # Wait until we are unable to SSH into the DuT
62- res = localhost .wait_for (host = dut_mgmt_ip ,
63- port = SONIC_SSH_PORT ,
64- state = 'stopped' ,
65- search_regex = SONIC_SSH_REGEX ,
66- delay = 30 ,
67- timeout = 40 ,
68- module_ignore_errors = True )
69-
70- pytest_assert (not res .is_failed , "SSH port did not stop. {}" .format (res .get ('msg' , '' )))
71-
72- # Try to SSH back into the DuT, it should time out
73- res = localhost .wait_for (host = dut_mgmt_ip ,
74- port = SONIC_SSH_PORT ,
75- state = 'started' ,
76- search_regex = SONIC_SSH_REGEX ,
77- delay = 0 ,
78- timeout = 10 ,
51+ try :
52+ # Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
53+ duthost .copy (src = "scripts/config_service_acls.sh" , dest = "/tmp/config_service_acls.sh" , mode = "0755" )
54+
55+ # We run the config_service_acls.sh script in the background because it
56+ # will install ACL rules which will only allow control plane traffic
57+ # to an unused IP range. Thus, if it works properly, it will sever our
58+ # SSH session, but we don't want the script itself to get killed,
59+ # because it is also responsible for resetting the control plane ACLs
60+ # back to their previous, working state
61+ duthost .shell ("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &" )
62+
63+ # Wait until we are unable to SSH into the DuT
64+ res = localhost .wait_for (host = dut_mgmt_ip ,
65+ port = SONIC_SSH_PORT ,
66+ state = 'stopped' ,
67+ search_regex = SONIC_SSH_REGEX ,
68+ delay = 30 ,
69+ timeout = 40 ,
70+ module_ignore_errors = True )
71+
72+ pytest_assert (not res .is_failed , "SSH port did not stop. {}" .format (res .get ('msg' , '' )))
73+
74+ # Try to SSH back into the DuT, it should time out
75+ res = localhost .wait_for (host = dut_mgmt_ip ,
76+ port = SONIC_SSH_PORT ,
77+ state = 'started' ,
78+ search_regex = SONIC_SSH_REGEX ,
79+ delay = 0 ,
80+ timeout = 10 ,
81+ module_ignore_errors = True )
82+
83+ pytest_assert (res .is_failed , "SSH did not timeout when expected. {}" .format (res .get ('msg' , '' )))
84+
85+ # Ensure we CANNOT gather basic SNMP facts from the device
86+ res = get_snmp_facts (localhost , host = dut_mgmt_ip , version = 'v2c' , community = creds ['snmp_rocommunity' ],
7987 module_ignore_errors = True )
8088
81- pytest_assert (res .is_failed , "SSH did not timeout when expected. {}" .format (res .get ('msg' , '' )))
82-
83- # Ensure we CANNOT gather basic SNMP facts from the device
84- res = get_snmp_facts (localhost , host = dut_mgmt_ip , version = 'v2c' , community = creds ['snmp_rocommunity' ],
85- module_ignore_errors = True )
86-
87- pytest_assert ('ansible_facts' not in res and "No SNMP response received before timeout" in res .get ('msg' , '' ))
88-
89- # Ensure we cannot send an NTP request to the DUT
90- if NTPLIB_INSTALLED :
91- try :
92- ntp_client .request (dut_mgmt_ip )
93- pytest .fail ("NTP did not time out when expected" )
94- except ntplib .NTPException :
95- pass
96-
97- # Wait until the original service ACLs are reinstated and the SSH port on the
98- # DUT is open to us once again. Note that the timeout here should be set sufficiently
99- # long enough to allow config_service_acls.sh to reset the ACLs to their original
100- # configuration.
101- res = localhost .wait_for (host = dut_mgmt_ip ,
102- port = SONIC_SSH_PORT ,
103- state = 'started' ,
104- search_regex = SONIC_SSH_REGEX ,
105- delay = 0 ,
106- timeout = 90 ,
107- module_ignore_errors = True )
108-
109- pytest_assert (not res .is_failed , "SSH did not start working when expected. {}" .format (res .get ('msg' , '' )))
110-
111- # Delete config_service_acls.sh from the DuT
112- duthost .file (path = "/tmp/config_service_acls.sh" , state = "absent" )
113-
114- # Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
115- get_snmp_facts (localhost ,
116- host = dut_mgmt_ip ,
117- version = "v2c" ,
118- community = creds ['snmp_rocommunity' ],
119- wait = True ,
120- timeout = 120 ,
121- interval = 20 )
89+ pytest_assert ('ansible_facts' not in res and "No SNMP response received before timeout" in res .get ('msg' , '' ))
90+
91+ # Ensure we cannot send an NTP request to the DUT
92+ if NTPLIB_INSTALLED :
93+ try :
94+ ntp_client .request (dut_mgmt_ip )
95+ pytest .fail ("NTP did not time out when expected" )
96+ except ntplib .NTPException :
97+ pass
98+
99+ # Wait until the original service ACLs are reinstated and the SSH port on the
100+ # DUT is open to us once again. Note that the timeout here should be set sufficiently
101+ # long enough to allow config_service_acls.sh to reset the ACLs to their original
102+ # configuration.
103+ res = localhost .wait_for (host = dut_mgmt_ip ,
104+ port = SONIC_SSH_PORT ,
105+ state = 'started' ,
106+ search_regex = SONIC_SSH_REGEX ,
107+ delay = 0 ,
108+ timeout = 90 ,
109+ module_ignore_errors = True )
110+
111+ pytest_assert (not res .is_failed , "SSH did not start working when expected. {}" .format (res .get ('msg' , '' )))
112+
113+ # Delete config_service_acls.sh from the DuT
114+ duthost .file (path = "/tmp/config_service_acls.sh" , state = "absent" )
115+
116+ # Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
117+ get_snmp_facts (localhost ,
118+ host = dut_mgmt_ip ,
119+ version = "v2c" ,
120+ community = creds ['snmp_rocommunity' ],
121+ wait = True ,
122+ timeout = 120 ,
123+ interval = 20 )
124+ finally :
125+ if data_acl :
126+ recover_acl_rule (duthost , data_acl )
0 commit comments