22import logging
33from tests .common .helpers .assertions import pytest_assert
44from tests .common .helpers .snmp_helpers import get_snmp_facts
5+ from tests .common .utilities import get_data_acl , recover_acl_rule
56
67try :
78 import ntplib
@@ -25,6 +26,7 @@ def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, cr
2526 """Test control plane ACL functionality on a SONiC device"""
2627
2728 duthost = duthosts [enum_rand_one_per_hwsku_hostname ]
29+ data_acl = get_data_acl (duthost )
2830 dut_mgmt_ip = duthost .mgmt_ip
2931
3032 # Start an NTP client
@@ -48,76 +50,79 @@ def test_cacl_function(duthosts, enum_rand_one_per_hwsku_hostname, localhost, cr
4850 ntp_client .request (dut_mgmt_ip )
4951 except ntplib .NTPException :
5052 pytest .fail ("NTP did timed out when expected to succeed!" )
51-
52- # Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
53- duthost .copy (src = "scripts/config_service_acls.sh" , dest = "/tmp/config_service_acls.sh" , mode = "0755" )
54-
55- # We run the config_service_acls.sh script in the background because it
56- # will install ACL rules which will only allow control plane traffic
57- # to an unused IP range. Thus, if it works properly, it will sever our
58- # SSH session, but we don't want the script itself to get killed,
59- # because it is also responsible for resetting the control plane ACLs
60- # back to their previous, working state
61- duthost .shell ("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &" )
62-
63- # Wait until we are unable to SSH into the DuT
64- res = localhost .wait_for (host = dut_mgmt_ip ,
65- port = SONIC_SSH_PORT ,
66- state = 'stopped' ,
67- search_regex = SONIC_SSH_REGEX ,
68- delay = 30 ,
69- timeout = 40 ,
70- module_ignore_errors = True )
71-
72- pytest_assert (not res .is_failed , "SSH port did not stop. {}" .format (res .get ('msg' , '' )))
73-
74- # Try to SSH back into the DuT, it should time out
75- res = localhost .wait_for (host = dut_mgmt_ip ,
76- port = SONIC_SSH_PORT ,
77- state = 'started' ,
78- search_regex = SONIC_SSH_REGEX ,
79- delay = 0 ,
80- timeout = 10 ,
81- module_ignore_errors = True )
82-
83- pytest_assert (res .is_failed , "SSH did not timeout when expected. {}" .format (res .get ('msg' , '' )))
84-
85- # Ensure we CANNOT gather basic SNMP facts from the device
86- res = get_snmp_facts (localhost , host = dut_mgmt_ip , version = 'v2c' , community = creds ['snmp_rocommunity' ],
87- module_ignore_errors = True )
88-
89- pytest_assert ('ansible_facts' not in res and "No SNMP response received before timeout" in res .get ('msg' , '' ))
90-
91- # Ensure we cannot send an NTP request to the DUT
92- if NTPLIB_INSTALLED :
93- try :
94- ntp_client .request (dut_mgmt_ip )
95- pytest .fail ("NTP did not time out when expected" )
96- except ntplib .NTPException :
97- pass
98-
99- # Wait until the original service ACLs are reinstated and the SSH port on the
100- # DUT is open to us once again. Note that the timeout here should be set sufficiently
101- # long enough to allow config_service_acls.sh to reset the ACLs to their original
102- # configuration.
103- res = localhost .wait_for (host = dut_mgmt_ip ,
104- port = SONIC_SSH_PORT ,
105- state = 'started' ,
106- search_regex = SONIC_SSH_REGEX ,
107- delay = 0 ,
108- timeout = 90 ,
53+ try :
54+ # Copy config_service_acls.sh to the DuT (this also implicitly verifies we can successfully SSH to the DuT)
55+ duthost .copy (src = "scripts/config_service_acls.sh" , dest = "/tmp/config_service_acls.sh" , mode = "0755" )
56+
57+ # We run the config_service_acls.sh script in the background because it
58+ # will install ACL rules which will only allow control plane traffic
59+ # to an unused IP range. Thus, if it works properly, it will sever our
60+ # SSH session, but we don't want the script itself to get killed,
61+ # because it is also responsible for resetting the control plane ACLs
62+ # back to their previous, working state
63+ duthost .shell ("nohup /tmp/config_service_acls.sh < /dev/null > /dev/null 2>&1 &" )
64+
65+ # Wait until we are unable to SSH into the DuT
66+ res = localhost .wait_for (host = dut_mgmt_ip ,
67+ port = SONIC_SSH_PORT ,
68+ state = 'stopped' ,
69+ search_regex = SONIC_SSH_REGEX ,
70+ delay = 30 ,
71+ timeout = 40 ,
72+ module_ignore_errors = True )
73+
74+ pytest_assert (not res .is_failed , "SSH port did not stop. {}" .format (res .get ('msg' , '' )))
75+
76+ # Try to SSH back into the DuT, it should time out
77+ res = localhost .wait_for (host = dut_mgmt_ip ,
78+ port = SONIC_SSH_PORT ,
79+ state = 'started' ,
80+ search_regex = SONIC_SSH_REGEX ,
81+ delay = 0 ,
82+ timeout = 10 ,
83+ module_ignore_errors = True )
84+
85+ pytest_assert (res .is_failed , "SSH did not timeout when expected. {}" .format (res .get ('msg' , '' )))
86+
87+ # Ensure we CANNOT gather basic SNMP facts from the device
88+ res = get_snmp_facts (localhost , host = dut_mgmt_ip , version = 'v2c' , community = creds ['snmp_rocommunity' ],
10989 module_ignore_errors = True )
11090
111- pytest_assert (not res .is_failed , "SSH did not start working when expected. {}" .format (res .get ('msg' , '' )))
112-
113- # Delete config_service_acls.sh from the DuT
114- duthost .file (path = "/tmp/config_service_acls.sh" , state = "absent" )
115-
116- # Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
117- get_snmp_facts (localhost ,
118- host = dut_mgmt_ip ,
119- version = "v2c" ,
120- community = creds ['snmp_rocommunity' ],
121- wait = True ,
122- timeout = 20 ,
123- interval = 20 )
91+ pytest_assert ('ansible_facts' not in res and "No SNMP response received before timeout" in res .get ('msg' , '' ))
92+
93+ # Ensure we cannot send an NTP request to the DUT
94+ if NTPLIB_INSTALLED :
95+ try :
96+ ntp_client .request (dut_mgmt_ip )
97+ pytest .fail ("NTP did not time out when expected" )
98+ except ntplib .NTPException :
99+ pass
100+
101+ # Wait until the original service ACLs are reinstated and the SSH port on the
102+ # DUT is open to us once again. Note that the timeout here should be set sufficiently
103+ # long enough to allow config_service_acls.sh to reset the ACLs to their original
104+ # configuration.
105+ res = localhost .wait_for (host = dut_mgmt_ip ,
106+ port = SONIC_SSH_PORT ,
107+ state = 'started' ,
108+ search_regex = SONIC_SSH_REGEX ,
109+ delay = 0 ,
110+ timeout = 90 ,
111+ module_ignore_errors = True )
112+
113+ pytest_assert (not res .is_failed , "SSH did not start working when expected. {}" .format (res .get ('msg' , '' )))
114+
115+ # Delete config_service_acls.sh from the DuT
116+ duthost .file (path = "/tmp/config_service_acls.sh" , state = "absent" )
117+
118+ # Ensure we can gather basic SNMP facts from the device once again. Should fail on timeout
119+ get_snmp_facts (localhost ,
120+ host = dut_mgmt_ip ,
121+ version = "v2c" ,
122+ community = creds ['snmp_rocommunity' ],
123+ wait = True ,
124+ timeout = 20 ,
125+ interval = 20 )
126+ finally :
127+ if data_acl :
128+ recover_acl_rule (duthost , data_acl )
0 commit comments