Skip to content
34 changes: 28 additions & 6 deletions tests/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,18 +1112,40 @@ def capture_and_check_packet_on_dut(
duthost.file(path=pcap_save_path, state="absent")


def duthost_ssh(duthost, sonic_username, sonic_passwords, sonic_ip):
for password in sonic_passwords:
def _paramiko_ssh(ip_address, username, passwords):
"""
Connect to the device via ssh using paramiko
Args:
ip_address (str): The ip address of device
username (str): The username of device
passwords (str or list): Potential passwords of device
this argument can be either a string or a list of string
Returns:
AuthResult: the ssh session of device
"""
if isinstance(passwords, str):
candidate_passwords = [passwords]
elif isinstance(passwords, list):
candidate_passwords = passwords
else:
raise Exception("The passwords argument must be either a string or a list of string.")

for password in candidate_passwords:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(sonic_ip, username=sonic_username, password=password,
ssh.connect(ip_address, username=username, password=password,
allow_agent=False, look_for_keys=False, timeout=10)
return ssh
return ssh, password
except paramiko.AuthenticationException:
continue
except Exception as e:
logging.info("Cannot access DUT {} via ssh, error: {}".format(duthost.hostname, e))
logging.info("Cannot access device {} via ssh, error: {}".format(ip_address, e))
raise e
logging.info("Cannot access DUT {} via ssh, error: Password incorrect".format(duthost.hostname))
logging.info("Cannot access device {} via ssh, error: Password incorrect".format(ip_address))
raise paramiko.AuthenticationException


def paramiko_ssh(ip_address, username, passwords):
ssh, pwd = _paramiko_ssh(ip_address, username, passwords)
return ssh
23 changes: 9 additions & 14 deletions tests/ssh/test_ssh_limit.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import json
import logging
import paramiko
import pytest
import time
from tests.common.helpers.assertions import pytest_assert, pytest_require
from tests.tacacs.test_authorization import ssh_connect_remote
from tests.tacacs.conftest import tacacs_creds # noqa F401
from tests.tacacs.utils import setup_local_user
from tests.common.utilities import paramiko_ssh

pytestmark = [
pytest.mark.disable_loganalyzer,
Expand Down Expand Up @@ -63,16 +62,12 @@ def modify_templates(duthost, tacacs_creds, creds): # noqa F811
type = get_device_type(duthost)
user = tacacs_creds['local_user']

try:
# Duthost shell not support run command with J2 template in command text.
admin_session = ssh_connect_remote(
dut_ip, creds['sonicadmin_user'], creds['sonicadmin_password'])
except paramiko.AuthenticationException:
# try ssh with ansible_altpassword again
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get(
"ansible_altpassword")
admin_session = ssh_connect_remote(
dut_ip, creds['sonicadmin_user'], sonic_admin_alt_password)
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get(
"ansible_altpassword")
# Duthost shell not support run command with J2 template in command text.
admin_session = paramiko_ssh(ip_address=dut_ip, username=creds['sonicadmin_user'],
passwords=[creds['sonicadmin_password'], sonic_admin_alt_password]
+ creds["ansible_altpasswords"])

# Backup and change /usr/share/sonic/templates/pam_limits.j2
additional_content = "session required pam_limits.so"
Expand Down Expand Up @@ -157,14 +152,14 @@ def test_ssh_limits(duthosts, rand_one_dut_hostname, tacacs_creds, setup_limit):
local_user_password = tacacs_creds['local_user_passwd']

# Create multiple login session to test maxlogins limit, first session will success
ssh_session_1 = ssh_connect_remote(dut_ip, local_user, local_user_password)
ssh_session_1 = paramiko_ssh(dut_ip, local_user, local_user_password)
login_message_1 = get_login_result(ssh_session_1)

logging.debug("Login session 1 result:\n{0}\n".format(login_message_1))
pytest_assert("There were too many logins for" not in login_message_1)

# The second session will be disconnect by device
ssh_session_2 = ssh_connect_remote(dut_ip, local_user, local_user_password)
ssh_session_2 = paramiko_ssh(dut_ip, local_user, local_user_password)
login_message_2 = get_login_result(ssh_session_2)

logging.debug("Login session 2 result:\n{0}\n".format(login_message_2))
Expand Down
15 changes: 3 additions & 12 deletions tests/tacacs/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
remove_all_tacacs_server, get_ld_path, change_and_wait_aaa_config_update, \
ensure_tacacs_server_running_after_ut # noqa: F401
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release, wait_until
from tests.common.utilities import skip_release, wait_until, paramiko_ssh
from .utils import check_server_received
from tests.override_config_table.utilities import backup_config, restore_config, \
reload_minigraph_with_golden_config
Expand All @@ -26,20 +26,11 @@
TIMEOUT_LIMIT = 120


def ssh_connect_remote(remote_ip, remote_username, remote_password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
remote_ip, username=remote_username, password=remote_password, allow_agent=False,
look_for_keys=False, auth_timeout=TIMEOUT_LIMIT)
return ssh


def ssh_connect_remote_retry(remote_ip, remote_username, remote_password, duthost):
retry_count = 3
while retry_count > 0:
try:
return ssh_connect_remote(remote_ip, remote_username, remote_password)
return paramiko_ssh(remote_ip, remote_username, remote_password)
except paramiko.ssh_exception.AuthenticationException as e:
logger.info("Paramiko SSH connect failed with authentication: " + repr(e))

Expand All @@ -54,7 +45,7 @@ def ssh_connect_remote_retry(remote_ip, remote_username, remote_password, duthos
def check_ssh_connect_remote_failed(remote_ip, remote_username, remote_password):
login_failed = False
try:
ssh_connect_remote(remote_ip, remote_username, remote_password)
paramiko_ssh(remote_ip, remote_username, remote_password)
except paramiko.ssh_exception.AuthenticationException as e:
login_failed = True
logger.info("Paramiko SSH connect failed with authentication: " + repr(e))
Expand Down