From 56e6889e772872303bae90cebdcb20313e5acb69 Mon Sep 17 00:00:00 2001 From: Yutong Zhang Date: Tue, 9 Apr 2024 10:03:55 +0800 Subject: [PATCH] Fix issues due to password rotation --- tests/common/utilities.py | 40 +++++++++++++++++++++++++++++++++++ tests/conftest.py | 2 ++ tests/ssh/test_ssh_ciphers.py | 30 +++++++++++++------------- tests/ssh/test_ssh_limit.py | 23 ++++++++++---------- 4 files changed, 68 insertions(+), 27 deletions(-) diff --git a/tests/common/utilities.py b/tests/common/utilities.py index 4a4aa90bd95..e47c62ca31e 100755 --- a/tests/common/utilities.py +++ b/tests/common/utilities.py @@ -17,6 +17,7 @@ import traceback import copy import tempfile +import paramiko from io import BytesIO import pytest @@ -901,3 +902,42 @@ def recover_acl_rule(duthost, data_acl): logger.info("Applying {}".format(dut_conf_file_path)) duthost.command("acl-loader update full {}".format(dut_conf_file_path)) + + +def _paramiko_ssh(ip_address, username, passwords): + """ + Connect to the device via ssh using paramiko + Args: + ip_address (str): The ip address of device + username (str): The username of device + passwords (str or list): Potential passwords of device + this argument can be either a string or a list of string + Returns: + AuthResult: the ssh session of device + """ + if isinstance(passwords, str): + candidate_passwords = [passwords] + elif isinstance(passwords, list): + candidate_passwords = passwords + else: + raise Exception("The passwords argument must be either a string or a list of string.") + + for password in candidate_passwords: + try: + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(ip_address, username=username, password=password, + allow_agent=False, look_for_keys=False, timeout=10) + return ssh, password + except paramiko.AuthenticationException: + continue + except Exception as e: + logging.info("Cannot access device {} via ssh, error: {}".format(ip_address, e)) + raise e + logging.info("Cannot access device {} via ssh, error: Password incorrect".format(ip_address)) + raise paramiko.AuthenticationException + + +def paramiko_ssh(ip_address, username, passwords): + ssh, pwd = _paramiko_ssh(ip_address, username, passwords) + return ssh diff --git a/tests/conftest.py b/tests/conftest.py index 9e22d09e7cc..86609f618d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -703,6 +703,8 @@ def creds_on_dut(duthost): creds["console_user"] = {} creds["console_password"] = {} + creds["ansible_altpasswords"] = [] + for k, v in console_login_creds.items(): creds["console_user"][k] = v["user"] creds["console_password"][k] = v["passwd"] diff --git a/tests/ssh/test_ssh_ciphers.py b/tests/ssh/test_ssh_ciphers.py index a6369e5806b..bb4c1d211cd 100644 --- a/tests/ssh/test_ssh_ciphers.py +++ b/tests/ssh/test_ssh_ciphers.py @@ -11,11 +11,14 @@ pytest.mark.device_type('vs') ] + def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_cipher, creds, typename): duthost = duthosts[rand_one_dut_hostname] dutuser, dutpass = creds['sonicadmin_user'], creds['sonicadmin_password'] sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get( "ansible_altpassword") + sonic_admin_alt_passwords = creds["ansible_altpasswords"] + dut_passwords = [dutpass, sonic_admin_alt_password] + sonic_admin_alt_passwords dutip = duthost.mgmt_ip if typename == "enc": @@ -29,27 +32,24 @@ def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_ci ssh_cmd = "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no {} {}@{}".format(ssh_cipher_option, dutuser, dutip) - try: - connect = pexpect.spawn(ssh_cmd) - connect.expect('.*[Pp]assword:') - connect.sendline(dutpass) - - i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10) - pytest_assert(i == 0, "Failed to connect") - except: + for dutpass in dut_passwords: try: connect = pexpect.spawn(ssh_cmd) connect.expect('.*[Pp]assword:') - connect.sendline(sonic_admin_alt_password) + connect.sendline(dutpass) - i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10) + i = connect.expect( + '{}@{}:'.format(dutuser, duthost.hostname), timeout=10) pytest_assert(i == 0, "Failed to connect") - except pexpect.exceptions.EOF: - pytest.fail("EOF reached") - except pexpect.exceptions.TIMEOUT: - pytest.fail("Timeout reached") + return except Exception as e: - pytest.fail("Cannot connect to DUT host via SSH: {}".format(e)) + output = connect.before.decode() + if "Permission denied" in output: + continue + else: + pytest.fail(e) + pytest.fail("Cannot connect to DUT host via SSH") + def test_ssh_protocol_version(duthosts, rand_one_dut_hostname): duthost = duthosts[rand_one_dut_hostname] diff --git a/tests/ssh/test_ssh_limit.py b/tests/ssh/test_ssh_limit.py index 6b4dc25ed80..447b01572f3 100755 --- a/tests/ssh/test_ssh_limit.py +++ b/tests/ssh/test_ssh_limit.py @@ -1,12 +1,12 @@ -import json +import json import logging import paramiko import pytest import time from tests.common.helpers.assertions import pytest_assert, pytest_require -from tests.tacacs.test_authorization import ssh_connect_remote -from tests.tacacs.conftest import tacacs_creds +from tests.tacacs.conftest import tacacs_creds # noqa F401 from tests.tacacs.utils import setup_local_user +from tests.common.utilities import paramiko_ssh pytestmark = [ pytest.mark.disable_loganalyzer, @@ -56,13 +56,12 @@ def modify_templates(duthost, tacacs_creds, creds): type = get_device_type(duthost) user = tacacs_creds['local_user'] - try: - # Duthost shell not support run command with J2 template in command text. - admin_session = ssh_connect_remote(dut_ip, creds['sonicadmin_user'], creds['sonicadmin_password']) - except paramiko.AuthenticationException: - # try ssh with ansible_altpassword again - sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get("ansible_altpassword") - admin_session = ssh_connect_remote(dut_ip, creds['sonicadmin_user'], sonic_admin_alt_password) + sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get( + "ansible_altpassword") + # Duthost shell not support run command with J2 template in command text. + admin_session = paramiko_ssh(ip_address=dut_ip, username=creds['sonicadmin_user'], + passwords=[creds['sonicadmin_password'], sonic_admin_alt_password] + + creds["ansible_altpasswords"]) # Backup and change /usr/share/sonic/templates/pam_limits.j2 additional_content = "session required pam_limits.so" @@ -138,14 +137,14 @@ def test_ssh_limits(duthosts, rand_one_dut_hostname, tacacs_creds, setup_limit): local_user_password = tacacs_creds['local_user_passwd'] # Create multiple login session to test maxlogins limit, first session will success - ssh_session_1 = ssh_connect_remote(dut_ip, local_user, local_user_password) + ssh_session_1 = paramiko_ssh(dut_ip, local_user, local_user_password) login_message_1 = get_login_result(ssh_session_1) logging.debug("Login session 1 result:\n{0}\n".format(login_message_1)) pytest_assert("There were too many logins for" not in login_message_1) # The second session will be disconnect by device - ssh_session_2 = ssh_connect_remote(dut_ip, local_user, local_user_password) + ssh_session_2 = paramiko_ssh(dut_ip, local_user, local_user_password) login_message_2 = get_login_result(ssh_session_2) logging.debug("Login session 2 result:\n{0}\n".format(login_message_2))