Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions tests/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import traceback
import copy
import tempfile
import paramiko
from io import BytesIO

import pytest
Expand Down Expand Up @@ -901,3 +902,42 @@ def recover_acl_rule(duthost, data_acl):

logger.info("Applying {}".format(dut_conf_file_path))
duthost.command("acl-loader update full {}".format(dut_conf_file_path))


def _paramiko_ssh(ip_address, username, passwords):
"""
Connect to the device via ssh using paramiko
Args:
ip_address (str): The ip address of device
username (str): The username of device
passwords (str or list): Potential passwords of device
this argument can be either a string or a list of string
Returns:
AuthResult: the ssh session of device
"""
if isinstance(passwords, str):
candidate_passwords = [passwords]
elif isinstance(passwords, list):
candidate_passwords = passwords
else:
raise Exception("The passwords argument must be either a string or a list of string.")

for password in candidate_passwords:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip_address, username=username, password=password,
allow_agent=False, look_for_keys=False, timeout=10)
return ssh, password
except paramiko.AuthenticationException:
continue
except Exception as e:
logging.info("Cannot access device {} via ssh, error: {}".format(ip_address, e))
raise e
logging.info("Cannot access device {} via ssh, error: Password incorrect".format(ip_address))
raise paramiko.AuthenticationException


def paramiko_ssh(ip_address, username, passwords):
ssh, pwd = _paramiko_ssh(ip_address, username, passwords)
return ssh
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ def creds_on_dut(duthost):
creds["console_user"] = {}
creds["console_password"] = {}

creds["ansible_altpasswords"] = []

for k, v in console_login_creds.items():
creds["console_user"][k] = v["user"]
creds["console_password"][k] = v["passwd"]
Expand Down
30 changes: 15 additions & 15 deletions tests/ssh/test_ssh_ciphers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
pytest.mark.device_type('vs')
]


def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_cipher, creds, typename):
duthost = duthosts[rand_one_dut_hostname]
dutuser, dutpass = creds['sonicadmin_user'], creds['sonicadmin_password']
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get(
"ansible_altpassword")
sonic_admin_alt_passwords = creds["ansible_altpasswords"]
dut_passwords = [dutpass, sonic_admin_alt_password] + sonic_admin_alt_passwords
dutip = duthost.mgmt_ip

if typename == "enc":
Expand All @@ -29,27 +32,24 @@ def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_ci

ssh_cmd = "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no {} {}@{}".format(ssh_cipher_option, dutuser, dutip)

try:
connect = pexpect.spawn(ssh_cmd)
connect.expect('.*[Pp]assword:')
connect.sendline(dutpass)

i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
pytest_assert(i == 0, "Failed to connect")
except:
for dutpass in dut_passwords:
try:
connect = pexpect.spawn(ssh_cmd)
connect.expect('.*[Pp]assword:')
connect.sendline(sonic_admin_alt_password)
connect.sendline(dutpass)

i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
i = connect.expect(
'{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
pytest_assert(i == 0, "Failed to connect")
except pexpect.exceptions.EOF:
pytest.fail("EOF reached")
except pexpect.exceptions.TIMEOUT:
pytest.fail("Timeout reached")
return
except Exception as e:
pytest.fail("Cannot connect to DUT host via SSH: {}".format(e))
output = connect.before.decode()
if "Permission denied" in output:
continue
else:
pytest.fail(e)
pytest.fail("Cannot connect to DUT host via SSH")


def test_ssh_protocol_version(duthosts, rand_one_dut_hostname):
duthost = duthosts[rand_one_dut_hostname]
Expand Down
23 changes: 11 additions & 12 deletions tests/ssh/test_ssh_limit.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
import json
import logging
import paramiko
import pytest
import time
from tests.common.helpers.assertions import pytest_assert, pytest_require
from tests.tacacs.test_authorization import ssh_connect_remote
from tests.tacacs.conftest import tacacs_creds
from tests.tacacs.conftest import tacacs_creds # noqa F401
from tests.tacacs.utils import setup_local_user
from tests.common.utilities import paramiko_ssh

pytestmark = [
pytest.mark.disable_loganalyzer,
Expand Down Expand Up @@ -56,13 +56,12 @@ def modify_templates(duthost, tacacs_creds, creds):
type = get_device_type(duthost)
user = tacacs_creds['local_user']

try:
# Duthost shell not support run command with J2 template in command text.
admin_session = ssh_connect_remote(dut_ip, creds['sonicadmin_user'], creds['sonicadmin_password'])
except paramiko.AuthenticationException:
# try ssh with ansible_altpassword again
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get("ansible_altpassword")
admin_session = ssh_connect_remote(dut_ip, creds['sonicadmin_user'], sonic_admin_alt_password)
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get(
"ansible_altpassword")
# Duthost shell not support run command with J2 template in command text.
admin_session = paramiko_ssh(ip_address=dut_ip, username=creds['sonicadmin_user'],
passwords=[creds['sonicadmin_password'], sonic_admin_alt_password]
+ creds["ansible_altpasswords"])

# Backup and change /usr/share/sonic/templates/pam_limits.j2
additional_content = "session required pam_limits.so"
Expand Down Expand Up @@ -138,14 +137,14 @@ def test_ssh_limits(duthosts, rand_one_dut_hostname, tacacs_creds, setup_limit):
local_user_password = tacacs_creds['local_user_passwd']

# Create multiple login session to test maxlogins limit, first session will success
ssh_session_1 = ssh_connect_remote(dut_ip, local_user, local_user_password)
ssh_session_1 = paramiko_ssh(dut_ip, local_user, local_user_password)
login_message_1 = get_login_result(ssh_session_1)

logging.debug("Login session 1 result:\n{0}\n".format(login_message_1))
pytest_assert("There were too many logins for" not in login_message_1)

# The second session will be disconnect by device
ssh_session_2 = ssh_connect_remote(dut_ip, local_user, local_user_password)
ssh_session_2 = paramiko_ssh(dut_ip, local_user, local_user_password)
login_message_2 = get_login_result(ssh_session_2)

logging.debug("Login session 2 result:\n{0}\n".format(login_message_2))
Expand Down