Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions tests/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import copy
import tempfile
import os
import paramiko
from io import BytesIO

import pytest
Expand Down Expand Up @@ -677,3 +678,42 @@ def recover_acl_rule(duthost, data_acl):

logger.info("Applying {}".format(dut_conf_file_path))
duthost.command("acl-loader update full {}".format(dut_conf_file_path))


def _paramiko_ssh(ip_address, username, passwords):
"""
Connect to the device via ssh using paramiko
Args:
ip_address (str): The ip address of device
username (str): The username of device
passwords (str or list): Potential passwords of device
this argument can be either a string or a list of string
Returns:
AuthResult: the ssh session of device
"""
if isinstance(passwords, str):
candidate_passwords = [passwords]
elif isinstance(passwords, list):
candidate_passwords = passwords
else:
raise Exception("The passwords argument must be either a string or a list of string.")

for password in candidate_passwords:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip_address, username=username, password=password,
allow_agent=False, look_for_keys=False, timeout=10)
return ssh, password
except paramiko.AuthenticationException:
continue
except Exception as e:
logging.info("Cannot access device {} via ssh, error: {}".format(ip_address, e))
raise e
logging.info("Cannot access device {} via ssh, error: Password incorrect".format(ip_address))
raise paramiko.AuthenticationException


def paramiko_ssh(ip_address, username, passwords):
ssh, pwd = _paramiko_ssh(ip_address, username, passwords)
return ssh
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ def creds_on_dut(duthost):
creds["console_user"] = {}
creds["console_password"] = {}

creds["ansible_altpasswords"] = []

for k, v in console_login_creds.items():
creds["console_user"][k] = v["user"]
creds["console_password"][k] = v["passwd"]
Expand Down
30 changes: 15 additions & 15 deletions tests/ssh/test_ssh_ciphers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
pytest.mark.device_type('vs')
]


def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_cipher, creds, typename):
duthost = duthosts[rand_one_dut_hostname]
dutuser, dutpass = creds['sonicadmin_user'], creds['sonicadmin_password']
sonic_admin_alt_password = duthost.host.options['variable_manager']._hostvars[duthost.hostname].get(
"ansible_altpassword")
sonic_admin_alt_passwords = creds["ansible_altpasswords"]
dut_passwords = [dutpass, sonic_admin_alt_password] + sonic_admin_alt_passwords
dutip = duthost.mgmt_ip

if typename == "enc":
Expand All @@ -29,27 +32,24 @@ def connect_with_specified_ciphers(duthosts, rand_one_dut_hostname, specified_ci

ssh_cmd = "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no {} {}@{}".format(ssh_cipher_option, dutuser, dutip)

try:
connect = pexpect.spawn(ssh_cmd)
connect.expect('.*[Pp]assword:')
connect.sendline(dutpass)

i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
pytest_assert(i == 0, "Failed to connect")
except:
for dutpass in dut_passwords:
try:
connect = pexpect.spawn(ssh_cmd)
connect.expect('.*[Pp]assword:')
connect.sendline(sonic_admin_alt_password)
connect.sendline(dutpass)

i = connect.expect('{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
i = connect.expect(
'{}@{}:'.format(dutuser, duthost.hostname), timeout=10)
pytest_assert(i == 0, "Failed to connect")
except pexpect.exceptions.EOF:
pytest.fail("EOF reached")
except pexpect.exceptions.TIMEOUT:
pytest.fail("Timeout reached")
return
except Exception as e:
pytest.fail("Cannot connect to DUT host via SSH: {}".format(e))
output = connect.before.decode()
if "Permission denied" in output:
continue
else:
pytest.fail(e)
pytest.fail("Cannot connect to DUT host via SSH")


def test_ssh_protocol_version(duthosts, rand_one_dut_hostname):
duthost = duthosts[rand_one_dut_hostname]
Expand Down