diff --git a/tests/common/utilities.py b/tests/common/utilities.py index 282148451ac..10195962c6b 100644 --- a/tests/common/utilities.py +++ b/tests/common/utilities.py @@ -1112,18 +1112,40 @@ def capture_and_check_packet_on_dut( duthost.file(path=pcap_save_path, state="absent") -def duthost_ssh(duthost, sonic_username, sonic_passwords, sonic_ip): - for password in sonic_passwords: +def _paramiko_ssh(ip_address, username, passwords): + """ + Connect to the device via ssh using paramiko + Args: + ip_address (str): The ip address of device + username (str): The username of device + passwords (str or list): Potential passwords of device + this argument can be either a string or a list of string + Returns: + AuthResult: the ssh session of device + """ + if isinstance(passwords, str): + candidate_passwords = [passwords] + elif isinstance(passwords, list): + candidate_passwords = passwords + else: + raise Exception("The passwords argument must be either a string or a list of string.") + + for password in candidate_passwords: try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(sonic_ip, username=sonic_username, password=password, + ssh.connect(ip_address, username=username, password=password, allow_agent=False, look_for_keys=False, timeout=10) - return ssh + return ssh, password except paramiko.AuthenticationException: continue except Exception as e: - logging.info("Cannot access DUT {} via ssh, error: {}".format(duthost.hostname, e)) + logging.info("Cannot access device {} via ssh, error: {}".format(ip_address, e)) raise e - logging.info("Cannot access DUT {} via ssh, error: Password incorrect".format(duthost.hostname)) + logging.info("Cannot access device {} via ssh, error: Password incorrect".format(ip_address)) raise paramiko.AuthenticationException + + +def paramiko_ssh(ip_address, username, passwords): + ssh, pwd = _paramiko_ssh(ip_address, username, passwords) + return ssh diff --git a/tests/ssh/test_ssh_limit.py b/tests/ssh/test_ssh_limit.py index f0d11228da5..b6a94362d2a 100755 --- a/tests/ssh/test_ssh_limit.py +++ b/tests/ssh/test_ssh_limit.py @@ -1,12 +1,11 @@ import json import logging -import paramiko import pytest import time from tests.common.helpers.assertions import pytest_assert, pytest_require -from tests.tacacs.test_authorization import ssh_connect_remote from tests.tacacs.conftest import tacacs_creds # noqa F401 from tests.tacacs.utils import setup_local_user +from tests.common.utilities import paramiko_ssh pytestmark = [ pytest.mark.disable_loganalyzer, @@ -63,16 +62,12 @@ def modify_templates(duthost, tacacs_creds, creds): # noqa F811 type = get_device_type(duthost) user = tacacs_creds['local_user'] - try: - # Duthost shell not support run command with J2 template in command text. - admin_session = ssh_connect_remote( - dut_ip, creds['sonicadmin_user'], creds['sonicadmin_password']) - except paramiko.AuthenticationException: - # try ssh with ansible_altpassword again - sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get( - "ansible_altpassword") - admin_session = ssh_connect_remote( - dut_ip, creds['sonicadmin_user'], sonic_admin_alt_password) + sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get( + "ansible_altpassword") + # Duthost shell not support run command with J2 template in command text. + admin_session = paramiko_ssh(ip_address=dut_ip, username=creds['sonicadmin_user'], + passwords=[creds['sonicadmin_password'], sonic_admin_alt_password] + + creds["ansible_altpasswords"]) # Backup and change /usr/share/sonic/templates/pam_limits.j2 additional_content = "session required pam_limits.so" @@ -157,14 +152,14 @@ def test_ssh_limits(duthosts, rand_one_dut_hostname, tacacs_creds, setup_limit): local_user_password = tacacs_creds['local_user_passwd'] # Create multiple login session to test maxlogins limit, first session will success - ssh_session_1 = ssh_connect_remote(dut_ip, local_user, local_user_password) + ssh_session_1 = paramiko_ssh(dut_ip, local_user, local_user_password) login_message_1 = get_login_result(ssh_session_1) logging.debug("Login session 1 result:\n{0}\n".format(login_message_1)) pytest_assert("There were too many logins for" not in login_message_1) # The second session will be disconnect by device - ssh_session_2 = ssh_connect_remote(dut_ip, local_user, local_user_password) + ssh_session_2 = paramiko_ssh(dut_ip, local_user, local_user_password) login_message_2 = get_login_result(ssh_session_2) logging.debug("Login session 2 result:\n{0}\n".format(login_message_2)) diff --git a/tests/tacacs/test_authorization.py b/tests/tacacs/test_authorization.py index ac9934bb5a1..b86eee88466 100644 --- a/tests/tacacs/test_authorization.py +++ b/tests/tacacs/test_authorization.py @@ -9,7 +9,7 @@ remove_all_tacacs_server, get_ld_path, change_and_wait_aaa_config_update, \ ensure_tacacs_server_running_after_ut # noqa: F401 from tests.common.helpers.assertions import pytest_assert -from tests.common.utilities import skip_release, wait_until +from tests.common.utilities import skip_release, wait_until, paramiko_ssh from .utils import check_server_received from tests.override_config_table.utilities import backup_config, restore_config, \ reload_minigraph_with_golden_config @@ -26,20 +26,11 @@ TIMEOUT_LIMIT = 120 -def ssh_connect_remote(remote_ip, remote_username, remote_password): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect( - remote_ip, username=remote_username, password=remote_password, allow_agent=False, - look_for_keys=False, auth_timeout=TIMEOUT_LIMIT) - return ssh - - def ssh_connect_remote_retry(remote_ip, remote_username, remote_password, duthost): retry_count = 3 while retry_count > 0: try: - return ssh_connect_remote(remote_ip, remote_username, remote_password) + return paramiko_ssh(remote_ip, remote_username, remote_password) except paramiko.ssh_exception.AuthenticationException as e: logger.info("Paramiko SSH connect failed with authentication: " + repr(e)) @@ -54,7 +45,7 @@ def ssh_connect_remote_retry(remote_ip, remote_username, remote_password, duthos def check_ssh_connect_remote_failed(remote_ip, remote_username, remote_password): login_failed = False try: - ssh_connect_remote(remote_ip, remote_username, remote_password) + paramiko_ssh(remote_ip, remote_username, remote_password) except paramiko.ssh_exception.AuthenticationException as e: login_failed = True logger.info("Paramiko SSH connect failed with authentication: " + repr(e))