Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/common/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from tests.common.connections.base_console_conn import CONSOLE_SSH, CONSOLE_SSH_MENU_PORTS, CONSOLE_TELNET
from telnet_console_conn import TelnetConsoleConn
from ssh_console_conn import SSHConsoleConn

__all__ = ["TelnetConsoleConn", "SSHConsoleConn"]

ConsoleTypeMapper = {
CONSOLE_TELNET: TelnetConsoleConn,
CONSOLE_SSH: SSHConsoleConn,
CONSOLE_SSH_MENU_PORTS: SSHConsoleConn
}

def ConsoleHost(console_type,
console_host,
console_port,
sonic_username,
sonic_password,
console_username=None,
console_password=None,
timeout_s=100):
if not ConsoleTypeMapper.has_key(console_type):
raise ValueError("console type {} is not supported yet".format(console_type))
params = {
"console_host": console_host,
"console_port": console_port,
"console_type": console_type,
"sonic_username": sonic_username,
"sonic_password": sonic_password,
"console_username": console_username,
"console_password": console_password,
"timeout": timeout_s
}
return ConsoleTypeMapper[console_type](**params)

83 changes: 83 additions & 0 deletions tests/common/connections/base_console_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Base class for console connection of SONiC devices
"""

import logging
from netmiko.cisco_base_connection import CiscoBaseConnection
from netmiko.ssh_exception import NetMikoAuthenticationException

# All supported console types
# Console login via telnet (mad console)
CONSOLE_TELNET = "console_telnet"
# Console login via SSH (digi)
CONSOLE_SSH = "console_ssh"
# Console login via SSH, then login to devices by 'menu ports'
CONSOLE_SSH_MENU_PORTS = "console_ssh_menu_ports"

class BaseConsoleConn(CiscoBaseConnection):

def __init__(self, **kwargs):
self.logger = logging.getLogger(__name__)
# Clear additional args before passing to BaseConsoleConn
all_passwords = kwargs['console_password']
key_to_rm = ['console_username', 'console_password',
'console_host', 'console_port',
'sonic_username', 'sonic_password',
'console_type']
for key in key_to_rm:
if kwargs.has_key(key):
del kwargs[key]

for i in range(0, len(all_passwords)):
kwargs['password'] = all_passwords[i]
try:
super(BaseConsoleConn, self).__init__(**kwargs)
except NetMikoAuthenticationException as e:
if i == len(all_passwords) - 1:
raise e
else:
break

def set_base_prompt(self, pri_prompt_terminator='#',
alt_prompt_terminator='$', delay_factor=1):
return super(BaseConsoleConn, self).set_base_prompt(
pri_prompt_terminator=pri_prompt_terminator,
alt_prompt_terminator=alt_prompt_terminator,
delay_factor=delay_factor)

def write_and_poll(self, command, pattern):
"""
Write a command to terminal and poll until expected pattern is found or timeout
"""
self.write_channel(command + self.RETURN)
self.read_until_pattern(pattern=pattern)

def disable_paging(self, command="", delay_factor=1):
# not supported
pass

def find_prompt(self, delay_factor=1):
return super(BaseConsoleConn, self).find_prompt(delay_factor)

def clear_buffer(self):
# todo
super(BaseConsoleConn, self).clear_buffer()

def enable(self):
# not support config mode for now
pass

def config_mode(self):
# not support config mode for now
pass

def exit_config_mode(self, exit_config, pattern):
# not support config mode for now
pass

def cleanup(self):
super(BaseConsoleConn, self).cleanup()

def disconnect(self):
super(BaseConsoleConn, self).disconnect()

154 changes: 154 additions & 0 deletions tests/common/connections/ssh_console_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from tests.common.connections.base_console_conn import CONSOLE_SSH
import time
import re
from base_console_conn import BaseConsoleConn
from netmiko.ssh_exception import NetMikoAuthenticationException

class SSHConsoleConn(BaseConsoleConn):
def __init__(self, **kwargs):
if not kwargs.has_key("console_username") \
or not kwargs.has_key("console_password"):
raise ValueError("Either console_username or console_password is not set")

# Console via SSH connection need two groups of user/passwd
self.sonic_username = kwargs['sonic_username']
self.sonic_password = kwargs['sonic_password']

if kwargs['console_type'] == CONSOLE_SSH:
kwargs['username'] = kwargs['console_username'] + r':' + str(kwargs['console_port'])
self.menu_port = None
else:
kwargs['username'] = kwargs['console_username']
self.menu_port = kwargs['console_port']
kwargs['password'] = kwargs['console_password']
kwargs['host'] = kwargs['console_host']
kwargs['device_type'] = "_ssh"
super(SSHConsoleConn, self).__init__(**kwargs)

def session_preparation(self):
self._test_channel_read()
if (self.menu_port):
# For devices logining via menu port, 2 additional login are needed
# Since we have attempted all passwords in __init__ of base class until successful login
# So self.username and self.password must be the correct ones
self.login_stage_2(username=self.username,
password=self.password,
menu_port=self.menu_port,
pri_prompt_terminator=r".*login")
# Attempt all sonic password
for i in range(0, len(self.sonic_password)):
password = self.sonic_password[i]
try:
self.login_stage_2(username=self.sonic_username,
password=password)
except NetMikoAuthenticationException as e:
if i == len(self.sonic_password) - 1:
raise e
else:
break

self.set_base_prompt()
# Clear the read buffer
time.sleep(0.3 * self.global_delay_factor)
self.clear_buffer()

def login_stage_2(self,
username,
password,
menu_port=None,
pri_prompt_terminator=r".*# ",
alt_prompt_terminator=r".*\$ ",
username_pattern=r"(?:user:|username|login|user name)",
pwd_pattern=r"assword",
delay_factor=1,
max_loops=20
):
"""
Perform a stage_2 login
"""
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(1 * delay_factor)

output = ""
return_msg = ""
i = 1
menu_port_sent = False
user_sent = False
password_sent = False
# The following prompt is only for SONiC
# Need to add more login failure prompt for other system
login_failure_prompt = r".*incorrect"
while i <= max_loops:
try:
if menu_port and not menu_port_sent:
self.write_and_poll("menu ports", "Selection:")
self.write_channel(str(self.menu_port) + self.RETURN)
menu_port_sent = True

output = self.read_channel()
return_msg += output

# Search for username pattern / send username
if not user_sent and re.search(username_pattern, output, flags=re.I):
self.write_channel(username + self.RETURN)
time.sleep(1 * delay_factor)
output = self.read_channel()
return_msg += output
user_sent = True

# Search for password pattern / send password
if user_sent and not password_sent and re.search(pwd_pattern, output, flags=re.I):
self.write_channel(password + self.RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
password_sent = True
if re.search(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is repeating in the line 74. Is there a reason to check twice?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The check in ln68-71 is not necessary. But the check in ln74-77 must be kept to handle the case when password is not required to login.

pri_prompt_terminator, output, flags=re.M
) or re.search(alt_prompt_terminator, output, flags=re.M):
return return_msg

# Check if proper data received
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg

# Check if login failed
if re.search(login_failure_prompt, output, flags=re.M):
# Wait a short time or the next login will be refused
time.sleep(1 * delay_factor)
msg = "Login failed: {}".format(self.host)
raise NetMikoAuthenticationException(msg)

self.write_channel(self.RETURN)
time.sleep(0.5 * delay_factor)
i += 1
except EOFError:
self.remote_conn.close()
msg = "Login failed: {}".format(self.host)
raise NetMikoAuthenticationException(msg)

# Last try to see if we already logged in
self.write_channel(self.RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg

self.remote_conn.close()
msg = "Login failed: {}".format(self.host)
raise NetMikoAuthenticationException(msg)

def cleanup(self):
# Send an exit to logout from SONiC
self.send_command(command_string="exit",
expect_string="login:")
# remote_conn must be closed, or the SSH session will be kept on Digi,
# and any other login is prevented
self.remote_conn.close()
del self.remote_conn

124 changes: 124 additions & 0 deletions tests/common/connections/telnet_console_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import time
import re
from base_console_conn import BaseConsoleConn
from netmiko.ssh_exception import NetMikoAuthenticationException

class TelnetConsoleConn(BaseConsoleConn):
def __init__(self, **kwargs):
# For telnet console, neither console username or password is needed
# so we assign sonic_username/sonic_password to username/password
kwargs['host'] = kwargs['console_host']
kwargs['port'] = kwargs['console_port']
# Don't set the value of password here because we will loop
# among all passwords in __init__
kwargs['username'] = kwargs['sonic_username']
kwargs['console_username'] = kwargs['sonic_username']
kwargs['console_password'] = kwargs['sonic_password']
kwargs['device_type'] = "_telnet"
super(TelnetConsoleConn, self).__init__(**kwargs)

def session_preparation(self):
super(TelnetConsoleConn, self).session_preparation()

def telnet_login(
self,
pri_prompt_terminator=r".*# ",
alt_prompt_terminator=r".*\$ ",
username_pattern=r"(?:user:|username|login|user name)",
pwd_pattern=r"assword",
delay_factor=1,
max_loops=20,
):
"""Telnet login. Can be username/password or just password."""
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(1 * delay_factor)

output = ""
return_msg = ""
login_failure_prompt = r".*incorrect"
username_sent = False
password_sent = False
i = 1
while i <= max_loops:
try:
output = self.read_channel()
return_msg += output

# Search for username pattern / send username
if not username_sent and re.search(username_pattern, output, flags=re.I):
self.write_channel(self.username + self.TELNET_RETURN)
username_sent = True
time.sleep(1 * delay_factor)
output = self.read_channel()
return_msg = output

# Search for password pattern / send password
if username_sent and not password_sent and re.search(pwd_pattern, output, flags=re.I):
self.write_channel(self.password + self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
password_sent = True
output = self.read_channel()
return_msg += output
if re.search(
pri_prompt_terminator, output, flags=re.M
) or re.search(alt_prompt_terminator, output, flags=re.M):
return return_msg

# Support direct telnet through terminal server
if re.search(r"initial configuration dialog\? \[yes/no\]: ", output):
self.write_channel("no" + self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
count = 0
while count < 15:
output = self.read_channel()
return_msg += output
if re.search(r"ress RETURN to get started", output):
output = ""
break
time.sleep(2 * delay_factor)
count += 1

# Check for device with no password configured
if re.search(r"assword required, but none set", output):
self.remote_conn.close()
msg = "Login failed - Password required, but none set: {}".format(
self.host
)
raise NetMikoAuthenticationException(msg)

# Check if proper data received
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, return_msg, flags=re.M
):
return return_msg

# Check if login failed
if re.search(login_failure_prompt, output, flags=re.M):
self.remote_conn.close()
# Wait a short time or the next login will be refused
time.sleep(1 * delay_factor)
msg = "Login failed: {} password: {}".format(self.host, self.password)
raise NetMikoAuthenticationException(msg)

self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
i += 1
except EOFError:
self.remote_conn.close()
msg = "Login failed: {}".format(self.host)
raise NetMikoAuthenticationException(msg)

# Last try to see if we already logged in
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg

self.remote_conn.close()
msg = "Login failed: {}".format(self.host)
raise NetMikoAuthenticationException(msg)

Loading