diff --git a/tests/tacacs/test_authorization.py b/tests/tacacs/test_authorization.py index 01c9c1b9972..f3ee23c912b 100644 --- a/tests/tacacs/test_authorization.py +++ b/tests/tacacs/test_authorization.py @@ -1,8 +1,10 @@ import logging import paramiko +import time import pytest -from .utils import stop_tacacs_server, start_tacacs_server, per_command_check_skip_versions, remove_all_tacacs_server, get_ld_path +from tests.tacacs.utils import stop_tacacs_server, start_tacacs_server +from tests.tacacs.utils import per_command_check_skip_versions, remove_all_tacacs_server, get_ld_path from tests.common.helpers.assertions import pytest_assert from tests.common.utilities import skip_release @@ -14,22 +16,28 @@ logger = logging.getLogger(__name__) -TIMEOUT_LIMIT = 120 +TIMEOUT_LIMIT = 120 + def ssh_connect_remote(remote_ip, remote_username, remote_password): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(remote_ip, username=remote_username, password=remote_password, allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) + ssh.connect( + remote_ip, username=remote_username, password=remote_password, allow_agent=False, + look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) return ssh + def check_ssh_connect_remote_failed(remote_ip, remote_username, remote_password): login_failed = False try: ssh_connect_remote(remote_ip, remote_username, remote_password) except paramiko.ssh_exception.AuthenticationException as e: login_failed = True + logger.info("Paramiko SSH connect failed with authentication: " + repr(e)) + + pytest_assert(login_failed) - pytest_assert(login_failed == True) def ssh_run_command(ssh_client, command): stdin, stdout, stderr = ssh_client.exec_command(command, timeout=TIMEOUT_LIMIT) @@ -38,10 +46,11 @@ def ssh_run_command(ssh_client, command): stderr_lines = stderr.readlines() return exit_code, stdout_lines, stderr_lines + def check_ssh_output(res, exp_val): content_exist = False - for l in res: - if exp_val in l: + for line in res: + if exp_val in line: content_exist = True break pytest_assert(content_exist) @@ -62,15 +71,21 @@ def check_ssh_output_any_of(res, exp_vals): def remote_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds): duthost = duthosts[enum_rand_one_per_hwsku_hostname] dutip = duthost.mgmt_ip - with ssh_connect_remote(dutip, tacacs_creds['tacacs_authorization_user'], tacacs_creds['tacacs_authorization_user_passwd']) as ssh_client: + with ssh_connect_remote( + dutip, + tacacs_creds['tacacs_authorization_user'], + tacacs_creds['tacacs_authorization_user_passwd'] + ) as ssh_client: yield ssh_client + @pytest.fixture def local_user_client(): with paramiko.SSHClient() as ssh_client: ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) yield ssh_client + @pytest.fixture(scope="module", autouse=True) def check_image_version(duthost): """Skips this test if the SONiC image installed on DUT is older than 202112 @@ -81,10 +96,25 @@ def check_image_version(duthost): """ skip_release(duthost, per_command_check_skip_versions) -def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client): + +@pytest.fixture +def setup_authorization_tacacs(duthosts, enum_rand_one_per_hwsku_hostname): duthost = duthosts[enum_rand_one_per_hwsku_hostname] duthost.shell("sudo config aaa authorization tacacs+") + yield + duthost.shell("sudo config aaa authorization local") # Default authorization method is local + +@pytest.fixture +def setup_authorization_tacacs_local(duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + duthost.shell("sudo config aaa authorization \"tacacs+ local\"") + yield + duthost.shell("sudo config aaa authorization local") # Default authorization method is local + + +def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] """ Verify TACACS+ user run command in server side whitelist: If command have local permission, user can run command. @@ -93,10 +123,6 @@ def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, pytest_assert(exit_code == 0) check_ssh_output(stdout, 'AAA authentication') - """ - Verify TACACS+ user run command in server side whitelist: - If command not have local permission, user can't run command. - """ exit_code, stdout, stderr = ssh_run_command(remote_user_client, "config aaa") pytest_assert(exit_code == 1) check_ssh_output(stderr, 'Root privileges are required for this operation') @@ -108,15 +134,20 @@ def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, # Verify Local user can't login. dutip = duthost.mgmt_ip - check_ssh_connect_remote_failed(dutip, tacacs_creds['local_user'], - tacacs_creds['local_user_passwd']) + check_ssh_connect_remote_failed( + dutip, tacacs_creds['local_user'], + tacacs_creds['local_user_passwd'] + ) -def test_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client): - check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client) +def test_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, setup_authorization_tacacs, + tacacs_creds, check_tacacs, remote_user_client): + check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client) -def test_authorization_tacacs_only_some_server_down(localhost, duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client): +def test_authorization_tacacs_only_some_server_down( + duthosts, enum_rand_one_per_hwsku_hostname, + setup_authorization_tacacs, tacacs_creds, ptfhost, check_tacacs, remote_user_client): """ Setup multiple tacacs server for this UT. Tacacs server 127.0.0.1 not accessible. @@ -132,6 +163,11 @@ def test_authorization_tacacs_only_some_server_down(localhost, duthosts, enum_ra duthost.shell("sudo config tacacs add %s" % invalid_tacacs_server_ip) duthost.shell("sudo config tacacs add %s" % tacacs_server_ip) + # The above "config tacacs add" commands will trigger hostcfgd to regenerate tacacs config. + # If we immediately run "show aaa" command, the client may still be using the first invalid tacacs server. + # The second valid tacacs may not take effect yet. Wait some time for the valid tacacs server to take effect. + time.sleep(2) + """ Verify TACACS+ user run command in server side whitelist: If command have local permission, user can run command. @@ -139,14 +175,15 @@ def test_authorization_tacacs_only_some_server_down(localhost, duthosts, enum_ra Verify TACACS+ user can't run command not in server side whitelist. Verify Local user can't login. """ - check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client) + check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client) # Cleanup duthost.shell("sudo config tacacs delete %s" % invalid_tacacs_server_ip) + duthost.shell("sudo config tacacs timeout 5") -def test_authorization_tacacs_only_then_server_down_after_login(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client): - duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization tacacs+") + +def test_authorization_tacacs_only_then_server_down_after_login( + setup_authorization_tacacs, ptfhost, check_tacacs, remote_user_client): # Verify when server are accessible, TACACS+ user can run command in server side whitelist. exit_code, stdout, stderr = ssh_run_command(remote_user_client, "show aaa") @@ -164,9 +201,11 @@ def test_authorization_tacacs_only_then_server_down_after_login(duthosts, enum_r # Cleanup UT. start_tacacs_server(ptfhost) -def test_authorization_tacacs_and_local(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client): + +def test_authorization_tacacs_and_local( + duthosts, enum_rand_one_per_hwsku_hostname, + setup_authorization_tacacs_local, tacacs_creds, check_tacacs, remote_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization \"tacacs+ local\"") """ Verify TACACS+ user run command in server side whitelist: @@ -174,12 +213,7 @@ def test_authorization_tacacs_and_local(duthosts, enum_rand_one_per_hwsku_hostna """ exit_code, stdout, stderr = ssh_run_command(remote_user_client, "show aaa") pytest_assert(exit_code == 0) - check_ssh_output(stdout, 'AAA authentication') - """ - Verify TACACS+ user run command in server side whitelist: - If command not have local permission, user can't run command. - """ exit_code, stdout, stderr = ssh_run_command(remote_user_client, "config aaa") pytest_assert(exit_code == 1) check_ssh_output(stderr, 'Root privileges are required for this operation') @@ -191,13 +225,16 @@ def test_authorization_tacacs_and_local(duthosts, enum_rand_one_per_hwsku_hostna # Verify Local user can't login. dutip = duthost.mgmt_ip - check_ssh_connect_remote_failed(dutip, tacacs_creds['local_user'], - tacacs_creds['local_user_passwd']) + check_ssh_connect_remote_failed( + dutip, tacacs_creds['local_user'], + tacacs_creds['local_user_passwd'] + ) -def test_authorization_tacacs_and_local_then_server_down_after_login(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): +def test_authorization_tacacs_and_local_then_server_down_after_login( + duthosts, enum_rand_one_per_hwsku_hostname, + setup_authorization_tacacs_local, tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization \"tacacs+ local\"") # Shutdown tacacs server stop_tacacs_server(ptfhost) @@ -215,9 +252,11 @@ def test_authorization_tacacs_and_local_then_server_down_after_login(duthosts, e # Verify Local user can login when tacacs closed, and run command with local permission. dutip = duthost.mgmt_ip - local_user_client.connect(dutip, username=tacacs_creds['local_user'], - password=tacacs_creds['local_user_passwd'], - allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) + local_user_client.connect( + dutip, username=tacacs_creds['local_user'], + password=tacacs_creds['local_user_passwd'], + allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT + ) exit_code, stdout, stderr = ssh_run_command(local_user_client, "show aaa") pytest_assert(exit_code == 0) @@ -226,15 +265,17 @@ def test_authorization_tacacs_and_local_then_server_down_after_login(duthosts, e # Start tacacs server start_tacacs_server(ptfhost) - # Verify after Local user login, then server becomes accessible, Local user still can run command with local permission. + # Verify after Local user login, then server becomes accessible, + # Local user still can run command with local permission. exit_code, stdout, stderr = ssh_run_command(local_user_client, "show aaa") pytest_assert(exit_code == 0) check_ssh_output(stdout, 'AAA authentication') -def test_authorization_local(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): +def test_authorization_local( + duthosts, enum_rand_one_per_hwsku_hostname, + tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization local") """ TACACS server up: @@ -244,10 +285,6 @@ def test_authorization_local(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_ pytest_assert(exit_code == 0) check_ssh_output(stdout, 'AAA authentication') - """ - TACACS server up: - Verify TACACS+ user can't run command if not have permission in local. - """ exit_code, stdout, stderr = ssh_run_command(remote_user_client, "config aaa") pytest_assert(exit_code == 1) check_ssh_output(stderr, 'Root privileges are required for this operation') @@ -260,9 +297,11 @@ def test_authorization_local(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_ Verify Local user can login, and run command with local permission. """ dutip = duthost.mgmt_ip - local_user_client.connect(dutip, username=tacacs_creds['local_user'], - password=tacacs_creds['local_user_passwd'], - allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) + local_user_client.connect( + dutip, username=tacacs_creds['local_user'], + password=tacacs_creds['local_user_passwd'], + allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT + ) exit_code, stdout, stderr = ssh_run_command(local_user_client, "show aaa") pytest_assert(exit_code == 0) @@ -272,16 +311,19 @@ def test_authorization_local(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_ start_tacacs_server(ptfhost) -def test_bypass_authorization(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, check_tacacs, remote_user_client): +def test_bypass_authorization( + duthosts, enum_rand_one_per_hwsku_hostname, + setup_authorization_tacacs, check_tacacs, remote_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization tacacs+") """ Verify user can't run script with sh/python with following command. python ./testscript.py - NOTE: TACACS UT using tac_plus as server side, there is a bug that tac_plus can't handle an authorization message contains more than 10 attributes. - Because every command parameter will convert to a TACACS attribute, please don't using more than 5 command parameters in test case. + NOTE: TACACS UT using tac_plus as server side, there is a bug that tac_plus can't handle an authorization + message contains more than 10 attributes. + Because every command parameter will convert to a TACACS attribute, please don't using more than 5 + command parameters in test case. """ exit_code, stdout, stderr = ssh_run_command(remote_user_client, 'echo "" >> ./testscript.py') pytest_assert(exit_code == 0) @@ -301,22 +343,18 @@ def test_bypass_authorization(duthosts, enum_rand_one_per_hwsku_hostname, tacacs pytest_assert(exit_code == 0) check_ssh_output(stdout, '/bin/sh') - """ - Verify user can't run command with loader: - /lib/x86_64-linux-gnu/ld-linux-x86-64.so.2 sh - """ + # Verify user can't run command with loader: + # /lib/x86_64-linux-gnu/ld-linux-x86-64.so.2 sh ld_path = get_ld_path(duthost) if not ld_path: exit_code, stdout, stderr = ssh_run_command(remote_user_client, ld_path + " sh") pytest_assert(exit_code == 1) check_ssh_output(stdout, 'authorize failed by TACACS+ with given arguments, not executing') - """ - Verify user can't run command with prefix/quoting: - \sh - "sh" - echo $(sh -c ls) - """ + # Verify user can't run command with prefix/quoting: + # \sh + # "sh" + # echo $(sh -c ls) exit_code, stdout, stderr = ssh_run_command(remote_user_client, "\\sh") pytest_assert(exit_code == 1) check_ssh_output(stdout, 'authorize failed by TACACS+ with given arguments, not executing') @@ -330,9 +368,11 @@ def test_bypass_authorization(duthosts, enum_rand_one_per_hwsku_hostname, tacacs pytest_assert(exit_code == 0) check_ssh_output(stdout, 'authorize failed by TACACS+ with given arguments, not executing') -def test_backward_compatibility_disable_authorization(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): + +def test_backward_compatibility_disable_authorization( + duthosts, enum_rand_one_per_hwsku_hostname, + tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] - duthost.shell("sudo config aaa authorization local") # Verify domain account can run command if have permission in local. exit_code, stdout, stderr = ssh_run_command(remote_user_client, "show aaa") @@ -344,14 +384,18 @@ def test_backward_compatibility_disable_authorization(duthosts, enum_rand_one_pe # Verify domain account can't login to device successfully. dutip = duthost.mgmt_ip - check_ssh_connect_remote_failed(dutip, tacacs_creds['tacacs_authorization_user'], - tacacs_creds['tacacs_authorization_user_passwd']) + check_ssh_connect_remote_failed( + dutip, tacacs_creds['tacacs_authorization_user'], + tacacs_creds['tacacs_authorization_user_passwd'] + ) # Verify local admin account can run command if have permission in local. dutip = duthost.mgmt_ip - local_user_client.connect(dutip, username=tacacs_creds['local_user'], - password=tacacs_creds['local_user_passwd'], - allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) + local_user_client.connect( + dutip, username=tacacs_creds['local_user'], + password=tacacs_creds['local_user_passwd'], + allow_agent=False, look_for_keys=False, auth_timeout=TIMEOUT_LIMIT + ) exit_code, stdout, stderr = ssh_run_command(local_user_client, "show aaa") pytest_assert(exit_code == 0) diff --git a/tests/tacacs/utils.py b/tests/tacacs/utils.py index 6008f88d7c7..48b45f947e5 100644 --- a/tests/tacacs/utils.py +++ b/tests/tacacs/utils.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -# per-command authorization and accounting feature not avaliable in following versions +# per-command authorization and accounting feature not available in following versions per_command_check_skip_versions = ["201811", "201911", "202012", "202106"] @@ -27,13 +27,20 @@ def check_all_services_status(ptfhost): def start_tacacs_server(ptfhost): + def tacacs_running(ptfhost): + out = ptfhost.command("service tacacs_plus status", module_ignore_errors=True)["stdout"] + return "tacacs+ running" in out + ptfhost.command("service tacacs_plus restart", module_ignore_errors=True) - return "tacacs+ running" in ptfhost.command("service tacacs_plus status", module_ignore_errors=True)["stdout_lines"] + return wait_until(5, 1, 0, tacacs_running, ptfhost) def stop_tacacs_server(ptfhost): - ptfhost.service(name="tacacs_plus", state="stopped") - check_all_services_status(ptfhost) + def tacacs_not_running(ptfhost): + out = ptfhost.command("service tacacs_plus status", module_ignore_errors=True)["stdout"] + return "tacacs+ apparently not running" in out + ptfhost.shell("service tacacs_plus stop") + return wait_until(5, 1, 0, tacacs_not_running, ptfhost) def setup_local_user(duthost, tacacs_creds):