diff --git a/show/bgp_quagga_v4.py b/show/bgp_quagga_v4.py index e384cd9d17..cd42547499 100644 --- a/show/bgp_quagga_v4.py +++ b/show/bgp_quagga_v4.py @@ -1,7 +1,8 @@ import click -from show.main import AliasedGroup, ip, run_command +from show.main import ip, run_command from utilities_common.bgp_util import get_bgp_summary_extended import utilities_common.constants as constants +import utilities_common.cli as clicommon ############################################################################### @@ -11,7 +12,7 @@ ############################################################################### -@ip.group(cls=AliasedGroup) +@ip.group(cls=clicommon.AliasedGroup) def bgp(): """Show IPv4 BGP (Border Gateway Protocol) information""" pass @@ -22,10 +23,10 @@ def bgp(): def summary(): """Show summarized information of IPv4 BGP state""" try: - device_output = run_command('sudo {} -c "show ip bgp summary"'.format(constants.RVTYSH_COMMAND), return_cmd=True) + device_output = run_command(['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp summary"], return_cmd=True) get_bgp_summary_extended(device_output) except Exception: - run_command('sudo {} -c "show ip bgp summary"'.format(constants.RVTYSH_COMMAND)) + run_command(['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp summary"]) # 'neighbors' subcommand ("show ip bgp neighbors") @@ -35,15 +36,13 @@ def summary(): def neighbors(ipaddress, info_type): """Show IP (IPv4) BGP neighbors""" - command = 'sudo {} -c "show ip bgp neighbor'.format(constants.RVTYSH_COMMAND) + command = ['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp neighbor"] if ipaddress is not None: - command += ' {}'.format(ipaddress) + command[-1] += ' {}'.format(ipaddress) # info_type is only valid if ipaddress is specified if info_type is not None: - command += ' {}'.format(info_type) - - command += '"' + command[-1] += ' {}'.format(info_type) run_command(command) diff --git a/show/bgp_quagga_v6.py b/show/bgp_quagga_v6.py index 003f4c94cf..3581a84c92 100644 --- a/show/bgp_quagga_v6.py +++ b/show/bgp_quagga_v6.py @@ -1,7 +1,8 @@ import click -from show.main import AliasedGroup, ipv6, run_command +from show.main import ipv6, run_command from utilities_common.bgp_util import get_bgp_summary_extended import utilities_common.constants as constants +import utilities_common.cli as clicommon ############################################################################### @@ -11,7 +12,7 @@ ############################################################################### -@ipv6.group(cls=AliasedGroup) +@ipv6.group(cls=clicommon.AliasedGroup) def bgp(): """Show IPv6 BGP (Border Gateway Protocol) information""" pass @@ -22,10 +23,10 @@ def bgp(): def summary(): """Show summarized information of IPv6 BGP state""" try: - device_output = run_command('sudo {} -c "show ipv6 bgp summary"'.format(constants.RVTYSH_COMMAND), return_cmd=True) + device_output = run_command(['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 bgp summary"], return_cmd=True) get_bgp_summary_extended(device_output) except Exception: - run_command('sudo {} -c "show ipv6 bgp summary"'.format(constants.RVTYSH_COMMAND)) + run_command(['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 bgp summary"]) # 'neighbors' subcommand ("show ipv6 bgp neighbors") @@ -34,5 +35,5 @@ def summary(): @click.argument('info_type', type=click.Choice(['routes', 'advertised-routes', 'received-routes']), required=True) def neighbors(ipaddress, info_type): """Show IPv6 BGP neighbors""" - command = 'sudo {} -c "show ipv6 bgp neighbor {} {}"'.format(constants.RVTYSH_COMMAND, ipaddress, info_type) + command = ['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 bgp neighbor {} {}".format(ipaddress, info_type)] run_command(command) diff --git a/show/main.py b/show/main.py index 2d21e1b3aa..d79777ebeb 100755 --- a/show/main.py +++ b/show/main.py @@ -20,6 +20,7 @@ import utilities_common.constants as constants from utilities_common.general import load_db_config from json.decoder import JSONDecodeError +from sonic_py_common.general import getstatusoutput_noshell_pipe # mock the redis for unit test purposes # try: @@ -104,17 +105,22 @@ def readJsonFile(fileName): raise click.Abort() return result -def run_command(command, display_cmd=False, return_cmd=False): +def run_command(command, display_cmd=False, return_cmd=False, shell=False): + if not shell: + command_str = ' '.join(command) + else: + command_str = command + if display_cmd: - click.echo(click.style("Command: ", fg='cyan') + click.style(command, fg='green')) + click.echo(click.style("Command: ", fg='cyan') + click.style(command_str, fg='green')) # No conversion needed for intfutil commands as it already displays # both SONiC interface name and alias name for all interfaces. - if clicommon.get_interface_naming_mode() == "alias" and not command.startswith("intfutil"): - clicommon.run_command_in_alias_mode(command) + if clicommon.get_interface_naming_mode() == "alias" and not command_str.startswith("intfutil"): + clicommon.run_command_in_alias_mode(command, shell=shell) raise sys.exit(0) - proc = subprocess.Popen(command, shell=True, text=True, stdout=subprocess.PIPE) + proc = subprocess.Popen(command, shell=shell, text=True, stdout=subprocess.PIPE) while True: if return_cmd: @@ -377,10 +383,10 @@ def event_counters(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def arp(ipaddress, iface, verbose): """Show IP ARP table""" - cmd = "nbrshow -4" + cmd = ['nbrshow', '-4'] if ipaddress is not None: - cmd += " -ip {}".format(ipaddress) + cmd += ['-ip', str(ipaddress)] if iface is not None: if clicommon.get_interface_naming_mode() == "alias": @@ -388,7 +394,7 @@ def arp(ipaddress, iface, verbose): (iface.startswith("eth"))): iface = iface_alias_converter.alias_to_name(iface) - cmd += " -if {}".format(iface) + cmd += ['-if', str(iface)] run_command(cmd, display_cmd=verbose) @@ -402,22 +408,22 @@ def arp(ipaddress, iface, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def ndp(ip6address, iface, verbose): """Show IPv6 Neighbour table""" - cmd = "nbrshow -6" + cmd = ['nbrshow', '-6'] if ip6address is not None: - cmd += " -ip {}".format(ip6address) + cmd += ['-ip', str(ip6address)] if iface is not None: - cmd += " -if {}".format(iface) + cmd += ['-if', str(iface)] run_command(cmd, display_cmd=verbose) def is_mgmt_vrf_enabled(ctx): """Check if management VRF is enabled""" if ctx.invoked_subcommand is None: - cmd = 'sonic-cfggen -d --var-json "MGMT_VRF_CONFIG"' + cmd = ['sonic-cfggen', '-d', '--var-json', "MGMT_VRF_CONFIG"] - p = subprocess.Popen(cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try : mvrf_dict = json.loads(p.stdout.read()) except ValueError: @@ -487,13 +493,13 @@ def mgmt_vrf(ctx,routes): if routes is None: click.echo("\nManagementVRF : Enabled") click.echo("\nManagement VRF interfaces in Linux:") - cmd = "ip -d link show mgmt" + cmd = ['ip', '-d', 'link', 'show', 'mgmt'] run_command(cmd) - cmd = "ip link show vrf mgmt" + cmd = ['ip', 'link', 'show', 'vrf', 'mgmt'] run_command(cmd) else: click.echo("\nRoutes in Management VRF Routing Table:") - cmd = "ip route show table 5000" + cmd = ['ip', 'route', 'show', 'table', '5000'] run_command(cmd) # @@ -577,7 +583,7 @@ def subinterfaces(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def status(subinterfacename, verbose): """Show sub port interface status information""" - cmd = "intfutil -c status" + cmd = ['intfutil', '-c', 'status'] if subinterfacename is not None: sub_intf_sep_idx = subinterfacename.find(VLAN_SUB_INTERFACE_SEPARATOR) @@ -588,9 +594,9 @@ def status(subinterfacename, verbose): if clicommon.get_interface_naming_mode() == "alias": subinterfacename = iface_alias_converter.alias_to_name(subinterfacename) - cmd += " -i {}".format(subinterfacename) + cmd += ['-i', str(subinterfacename)] else: - cmd += " -i subport" + cmd += ['-i', 'subport'] run_command(cmd, display_cmd=verbose) # @@ -609,9 +615,9 @@ def pfc(): def counters(namespace, display, verbose): """Show pfc counters""" - cmd = "pfcstat -s {}".format(display) + cmd = ['pfcstat', '-s', str(display)] if namespace is not None: - cmd += " -n {}".format(namespace) + cmd += ['-n', str(namespace)] run_command(cmd, display_cmd=verbose) @@ -619,12 +625,12 @@ def counters(namespace, display, verbose): @click.argument('interface', type=click.STRING, required=False) def priority(interface): """Show pfc priority""" - cmd = 'pfc show priority' + cmd = ['pfc', 'show', 'priority'] if interface is not None and clicommon.get_interface_naming_mode() == "alias": interface = iface_alias_converter.alias_to_name(interface) if interface is not None: - cmd += ' {0}'.format(interface) + cmd += [str(interface)] run_command(cmd) @@ -632,12 +638,12 @@ def priority(interface): @click.argument('interface', type=click.STRING, required=False) def asymmetric(interface): """Show asymmetric pfc""" - cmd = 'pfc show asymmetric' + cmd = ['pfc', 'show', 'asymmetric'] if interface is not None and clicommon.get_interface_naming_mode() == "alias": interface = iface_alias_converter.alias_to_name(interface) if interface is not None: - cmd += ' {0}'.format(interface) + cmd += [str(interface)] run_command(cmd) @@ -653,9 +659,9 @@ def pfcwd(): def config(namespace, display, verbose): """Show pfc watchdog config""" - cmd = "pfcwd show config -d {}".format(display) + cmd = ['pfcwd', 'show', 'config', '-d', str(display)] if namespace is not None: - cmd += " -n {}".format(namespace) + cmd += ['-n', str(namespace)] run_command(cmd, display_cmd=verbose) @@ -665,9 +671,9 @@ def config(namespace, display, verbose): def stats(namespace, display, verbose): """Show pfc watchdog stats""" - cmd = "pfcwd show stats -d {}".format(display) + cmd = ['pfcwd', 'show', 'stats', '-d', str(display)] if namespace is not None: - cmd += " -n {}".format(namespace) + cmd += ['-n', str(namespace)] run_command(cmd, display_cmd=verbose) @@ -688,7 +694,7 @@ def telemetry(): @telemetry.command('interval') def show_tm_interval(): """Show telemetry interval""" - command = 'watermarkcfg --show-interval' + command = ['watermarkcfg', '--show-interval'] run_command(command) @@ -711,23 +717,23 @@ def queue(): def counters(interfacename, namespace, display, verbose, json, voq): """Show queue counters""" - cmd = "queuestat" + cmd = ["queuestat"] if interfacename is not None: if clicommon.get_interface_naming_mode() == "alias": interfacename = iface_alias_converter.alias_to_name(interfacename) if interfacename is not None: - cmd += " -p {}".format(interfacename) + cmd += ['-p', str(interfacename)] if namespace is not None: - cmd += " -n {}".format(namespace) + cmd += ['-n', str(namespace)] if json: - cmd += " -j" + cmd += ["-j"] if voq: - cmd += " -V" + cmd += ["-V"] run_command(cmd, display_cmd=verbose) @@ -744,21 +750,21 @@ def watermark(): @watermark.command('unicast') def wm_q_uni(): """Show user WM for unicast queues""" - command = 'watermarkstat -t q_shared_uni' + command = ['watermarkstat', '-t', 'q_shared_uni'] run_command(command) # 'multicast' subcommand ("show queue watermarks multicast") @watermark.command('multicast') def wm_q_multi(): """Show user WM for multicast queues""" - command = 'watermarkstat -t q_shared_multi' + command = ['watermarkstat', '-t', 'q_shared_multi'] run_command(command) # 'all' subcommand ("show queue watermarks all") @watermark.command('all') def wm_q_all(): """Show user WM for all queues""" - command = 'watermarkstat -t q_shared_all' + command = ['watermarkstat', '-t', 'q_shared_all'] run_command(command) # @@ -774,21 +780,21 @@ def persistent_watermark(): @persistent_watermark.command('unicast') def pwm_q_uni(): """Show persistent WM for unicast queues""" - command = 'watermarkstat -p -t q_shared_uni' + command = ['watermarkstat', '-p', '-t', 'q_shared_uni'] run_command(command) # 'multicast' subcommand ("show queue persistent-watermarks multicast") @persistent_watermark.command('multicast') def pwm_q_multi(): """Show persistent WM for multicast queues""" - command = 'watermarkstat -p -t q_shared_multi' + command = ['watermarkstat', '-p', '-t', 'q_shared_multi'] run_command(command) # 'all' subcommand ("show queue persistent-watermarks all") @persistent_watermark.command('all') def pwm_q_all(): """Show persistent WM for all queues""" - command = 'watermarkstat -p -t q_shared_all' + command = ['watermarkstat', '-p', '-t', 'q_shared_all'] run_command(command) # @@ -807,13 +813,13 @@ def watermark(): @watermark.command('headroom') def wm_pg_headroom(): """Show user headroom WM for pg""" - command = 'watermarkstat -t pg_headroom' + command = ['watermarkstat', '-t', 'pg_headroom'] run_command(command) @watermark.command('shared') def wm_pg_shared(): """Show user shared WM for pg""" - command = 'watermarkstat -t pg_shared' + command = ['watermarkstat', '-t', 'pg_shared'] run_command(command) @priority_group.group() @@ -824,7 +830,7 @@ def drop(): @drop.command('counters') def pg_drop_counters(): """Show dropped packets for priority-group""" - command = 'pg-drop -c show' + command = ['pg-drop', '-c', 'show'] run_command(command) @priority_group.group(name='persistent-watermark') @@ -835,13 +841,13 @@ def persistent_watermark(): @persistent_watermark.command('headroom') def pwm_pg_headroom(): """Show persistent headroom WM for pg""" - command = 'watermarkstat -p -t pg_headroom' + command = ['watermarkstat', '-p', '-t', 'pg_headroom'] run_command(command) @persistent_watermark.command('shared') def pwm_pg_shared(): """Show persistent shared WM for pg""" - command = 'watermarkstat -p -t pg_shared' + command = ['watermarkstat', '-p', '-t', 'pg_shared'] run_command(command) @@ -856,13 +862,13 @@ def buffer_pool(): @buffer_pool.command('watermark') def wm_buffer_pool(): """Show user WM for buffer pools""" - command = 'watermarkstat -t buffer_pool' + command = ['watermarkstat', '-t' ,'buffer_pool'] run_command(command) @buffer_pool.command('persistent-watermark') def pwm_buffer_pool(): """Show persistent WM for buffer pools""" - command = 'watermarkstat -p -t buffer_pool' + command = ['watermarkstat', '-p', '-t', 'buffer_pool'] run_command(command) @@ -877,13 +883,13 @@ def headroom_pool(): @headroom_pool.command('watermark') def wm_headroom_pool(): """Show user WM for headroom pool""" - command = 'watermarkstat -t headroom_pool' + command = ['watermarkstat', '-t', 'headroom_pool'] run_command(command) @headroom_pool.command('persistent-watermark') def pwm_headroom_pool(): """Show persistent WM for headroom pool""" - command = 'watermarkstat -p -t headroom_pool' + command = ['watermarkstat', '-p', '-t', 'headroom_pool'] run_command(command) @@ -905,22 +911,22 @@ def mac(ctx, vlan, port, address, type, count, verbose): if ctx.invoked_subcommand is not None: return - cmd = "fdbshow" + cmd = ["fdbshow"] if vlan is not None: - cmd += " -v {}".format(vlan) + cmd += ['-v', str(vlan)] if port is not None: - cmd += " -p {}".format(port) + cmd += ['-p', str(port)] if address is not None: - cmd += " -a {}".format(address) + cmd += ['-a', str(address)] if type is not None: - cmd += " -t {}".format(type) + cmd += ['-t', str(type)] if count: - cmd += " -c" + cmd += ["-c"] run_command(cmd, display_cmd=verbose) @@ -951,10 +957,9 @@ def aging_time(ctx): @click.option('--verbose', is_flag=True, help="Enable verbose output") def route_map(route_map_name, verbose): """show route-map""" - cmd = 'sudo {} -c "show route-map'.format(constants.RVTYSH_COMMAND) + cmd = ['sudo', constants.RVTYSH_COMMAND, '-c', 'show route-map'] if route_map_name is not None: - cmd += ' {}'.format(route_map_name) - cmd += '"' + cmd[-1] += ' {}'.format(route_map_name) run_command(cmd, display_cmd=verbose) # @@ -1042,10 +1047,9 @@ def route(args, namespace, display, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def prefix_list(prefix_list_name, verbose): """show ip prefix-list""" - cmd = 'sudo {} -c "show ip prefix-list'.format(constants.RVTYSH_COMMAND) + cmd = ['sudo', constants.RVTYSH_COMMAND, '-c', 'show ip prefix-list'] if prefix_list_name is not None: - cmd += ' {}'.format(prefix_list_name) - cmd += '"' + cmd[-1] += ' {}'.format(prefix_list_name) run_command(cmd, display_cmd=verbose) @@ -1054,7 +1058,7 @@ def prefix_list(prefix_list_name, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def protocol(verbose): """Show IPv4 protocol information""" - cmd = 'sudo {} -c "show ip protocol"'.format(constants.RVTYSH_COMMAND) + cmd = ['sudo', constants.RVTYSH_COMMAND, '-c', "show ip protocol"] run_command(cmd, display_cmd=verbose) # @@ -1065,9 +1069,9 @@ def protocol(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def fib(ipaddress, verbose): """Show IP FIB table""" - cmd = "fibshow -4" + cmd = ['fibshow', '-4'] if ipaddress is not None: - cmd += " -ip {}".format(ipaddress) + cmd += ['-ip', str(ipaddress)] run_command(cmd, display_cmd=verbose) @@ -1090,10 +1094,9 @@ def ipv6(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def prefix_list(prefix_list_name, verbose): """show ip prefix-list""" - cmd = 'sudo {} -c "show ipv6 prefix-list'.format(constants.RVTYSH_COMMAND) + cmd = ['sudo', constants.RVTYSH_COMMAND, '-c', 'show ipv6 prefix-list'] if prefix_list_name is not None: - cmd += ' {}'.format(prefix_list_name) - cmd += '"' + cmd[-1] += ' {}'.format(prefix_list_name) run_command(cmd, display_cmd=verbose) @@ -1138,7 +1141,7 @@ def route(args, namespace, display, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def protocol(verbose): """Show IPv6 protocol information""" - cmd = 'sudo {} -c "show ipv6 protocol"'.format(constants.RVTYSH_COMMAND) + cmd = ['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 protocol"] run_command(cmd, display_cmd=verbose) # @@ -1207,9 +1210,9 @@ def link_local_mode(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def fib(ipaddress, verbose): """Show IP FIB table""" - cmd = "fibshow -6" + cmd = ['fibshow', '-6'] if ipaddress is not None: - cmd += " -ip {}".format(ipaddress) + cmd += ['-ip', str(ipaddress)] run_command(cmd, display_cmd=verbose) # @@ -1227,13 +1230,13 @@ def lldp(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def neighbors(interfacename, verbose): """Show LLDP neighbors""" - cmd = "sudo lldpshow -d" + cmd = ['sudo', 'lldpshow', '-d'] if interfacename is not None: if clicommon.get_interface_naming_mode() == "alias": interfacename = iface_alias_converter.alias_to_name(interfacename) - cmd += " -p {}".format(interfacename) + cmd += ['-p', str(interfacename)] run_command(cmd, display_cmd=verbose) @@ -1242,7 +1245,7 @@ def neighbors(interfacename, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def table(verbose): """Show LLDP neighbors in tabular format""" - cmd = "sudo lldpshow" + cmd = ['sudo', 'lldpshow'] run_command(cmd, display_cmd=verbose) @@ -1252,7 +1255,7 @@ def table(verbose): @cli.command() @click.argument('process', required=False) -@click.option('-l', '--lines') +@click.option('-l', '--lines', type=int) @click.option('-f', '--follow', is_flag=True) @click.option('--verbose', is_flag=True, help="Enable verbose output") def logging(process, lines, follow, verbose): @@ -1262,7 +1265,7 @@ def logging(process, lines, follow, verbose): else: log_path = "/var/log" if follow: - cmd = "sudo tail -F {}/syslog".format(log_path) + cmd = ['sudo', 'tail', '-F', '{}/syslog'.format(log_path)] run_command(cmd, display_cmd=verbose) else: if os.path.isfile("{}/syslog.1".format(log_path)): @@ -1276,8 +1279,7 @@ def logging(process, lines, follow, verbose): if lines is not None: cmd += " | tail -{}".format(lines) - run_command(cmd, display_cmd=verbose) - + run_command(cmd, display_cmd=verbose, shell=True) # # 'version' command ("show version") @@ -1291,8 +1293,8 @@ def version(verbose): platform_info = device_info.get_platform_info() chassis_info = platform.get_chassis_info() - sys_uptime_cmd = "uptime" - sys_uptime = subprocess.Popen(sys_uptime_cmd, shell=True, text=True, stdout=subprocess.PIPE) + sys_uptime_cmd = ["uptime"] + sys_uptime = subprocess.Popen(sys_uptime_cmd, text=True, stdout=subprocess.PIPE) sys_date = datetime.now() @@ -1313,8 +1315,8 @@ def version(verbose): click.echo("Uptime: {}".format(sys_uptime.stdout.read().strip())) click.echo("Date: {}".format(sys_date.strftime("%a %d %b %Y %X"))) click.echo("\nDocker images:") - cmd = 'sudo docker images --format "table {{.Repository}}\\t{{.Tag}}\\t{{.ID}}\\t{{.Size}}"' - p = subprocess.Popen(cmd, shell=True, text=True, stdout=subprocess.PIPE) + cmd = ['sudo', 'docker', 'images', '--format', "table {{.Repository}}\\t{{.Tag}}\\t{{.ID}}\\t{{.Size}}"] + p = subprocess.Popen(cmd, text=True, stdout=subprocess.PIPE) click.echo(p.stdout.read()) # @@ -1325,7 +1327,7 @@ def version(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def environment(verbose): """Show environmentals (voltages, fans, temps)""" - cmd = "sudo sensors" + cmd = ['sudo', 'sensors'] run_command(cmd, display_cmd=verbose) @@ -1337,7 +1339,7 @@ def environment(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def users(verbose): """Show users""" - cmd = "who" + cmd = ["who"] run_command(cmd, display_cmd=verbose) @@ -1356,29 +1358,29 @@ def users(verbose): @click.option('--redirect-stderr', '-r', is_flag=True, help="Redirect an intermediate errors to STDERR") def techsupport(since, global_timeout, cmd_timeout, verbose, allow_process_stop, silent, debug_dump, redirect_stderr): """Gather information for troubleshooting""" - cmd = "sudo" + cmd = ["sudo"] if global_timeout: - cmd += " timeout --kill-after={}s -s SIGTERM --foreground {}m".format(COMMAND_TIMEOUT, global_timeout) + cmd += ['timeout', '--kill-after={}s'.format(COMMAND_TIMEOUT), '-s', 'SIGTERM', '--foreground', '{}m'.format(global_timeout)] if silent: - cmd += " generate_dump" + cmd += ["generate_dump"] click.echo("Techsupport is running with silent option. This command might take a long time.") else: - cmd += " generate_dump -v" + cmd += ['generate_dump', '-v'] if allow_process_stop: - cmd += " -a" + cmd += ["-a"] if since: - cmd += " -s '{}'".format(since) + cmd += ['-s', str(since)] if debug_dump: - cmd += " -d" + cmd += ["-d"] - cmd += " -t {}".format(cmd_timeout) + cmd += ['-t', str(cmd_timeout)] if redirect_stderr: - cmd += " -r" + cmd += ["-r"] run_command(cmd, display_cmd=verbose) @@ -1423,7 +1425,7 @@ def all(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def acl(verbose): """Show acl running configuration""" - cmd = "sonic-cfggen -d --var-json ACL_RULE" + cmd = ['sonic-cfggen', '-d', '--var-json', 'ACL_RULE'] run_command(cmd, display_cmd=verbose) @@ -1433,10 +1435,10 @@ def acl(verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def ports(portname, verbose): """Show ports running configuration""" - cmd = "sonic-cfggen -d --var-json PORT" + cmd = ['sonic-cfggen', '-d', '--var-json', 'PORT'] if portname is not None: - cmd += " {0} {1}".format("--key", portname) + cmd += ["--key", str(portname)] run_command(cmd, display_cmd=verbose) @@ -1486,10 +1488,10 @@ def bgp(namespace, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def interfaces(interfacename, verbose): """Show interfaces running configuration""" - cmd = "sonic-cfggen -d --var-json INTERFACE" + cmd = ['sonic-cfggen', '-d', '--var-json', 'INTERFACE'] if interfacename is not None: - cmd += " {0} {1}".format("--key", interfacename) + cmd += ["--key", str(interfacename)] run_command(cmd, display_cmd=verbose) @@ -1715,16 +1717,20 @@ def startupconfiguration(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def bgp(verbose): """Show BGP startup configuration""" - cmd = "sudo docker ps | grep bgp | awk '{print$2}' | cut -d'-' -f3 | cut -d':' -f1" - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, text=True) - result = proc.stdout.read().rstrip() + cmd0 = ['sudo', 'docker', 'ps'] + cmd1 = ['grep', 'bgp'] + cmd2 = ['awk', '{print$2}'] + cmd3 = ['cut', '-d-', '-f3'] + cmd4 = ['cut', '-d:', "-f1"] + _, stdout = getstatusoutput_noshell_pipe(cmd0, cmd1, cmd2, cmd3, cmd4) + result = stdout.rstrip() click.echo("Routing-Stack is: {}".format(result)) if result == "quagga": - run_command('sudo docker exec bgp cat /etc/quagga/bgpd.conf', display_cmd=verbose) + run_command(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/quagga/bgpd.conf'], display_cmd=verbose) elif result == "frr": - run_command('sudo docker exec bgp cat /etc/frr/bgpd.conf', display_cmd=verbose) + run_command(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/frr/bgpd.conf'], display_cmd=verbose) elif result == "gobgp": - run_command('sudo docker exec bgp cat /etc/gpbgp/bgpd.conf', display_cmd=verbose) + run_command(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/gpbgp/bgpd.conf'], display_cmd=verbose) else: click.echo("Unidentified routing-stack") @@ -1738,18 +1744,18 @@ def bgp(verbose): def ntp(ctx, verbose): """Show NTP information""" from pkg_resources import parse_version - ntpstat_cmd = "ntpstat" - ntpcmd = "ntpq -p -n" + ntpstat_cmd = ["ntpstat"] + ntpcmd = ["ntpq", "-p", "-n"] if is_mgmt_vrf_enabled(ctx) is True: #ManagementVRF is enabled. Call ntpq using "ip vrf exec" or cgexec based on linux version os_info = os.uname() release = os_info[2].split('-') if parse_version(release[0]) > parse_version("4.9.0"): - ntpstat_cmd = "sudo ip vrf exec mgmt ntpstat" - ntpcmd = "sudo ip vrf exec mgmt ntpq -p -n" + ntpstat_cmd = ['sudo', 'ip', 'vrf', 'exec', 'mgmt', 'ntpstat'] + ntpcmd = ['sudo', 'ip', 'vrf', 'exec', 'mgmt', 'ntpq', '-p', '-n'] else: - ntpstat_cmd = "sudo cgexec -g l3mdev:mgmt ntpstat" - ntpcmd = "sudo cgexec -g l3mdev:mgmt ntpq -p -n" + ntpstat_cmd = ['sudo', 'cgexec', '-g', 'l3mdev:mgmt', 'ntpstat'] + ntpcmd = ['sudo', 'cgexec', '-g', 'l3mdev:mgmt', 'ntpq', '-p', '-n'] run_command(ntpstat_cmd, display_cmd=verbose) run_command(ntpcmd, display_cmd=verbose) @@ -1762,37 +1768,38 @@ def ntp(ctx, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def uptime(verbose): """Show system uptime""" - cmd = "uptime -p" + cmd = ['uptime', '-p'] run_command(cmd, display_cmd=verbose) @cli.command() @click.option('--verbose', is_flag=True, help="Enable verbose output") def clock(verbose): """Show date and time""" - cmd ="date" + cmd = ["date"] run_command(cmd, display_cmd=verbose) @cli.command('system-memory') @click.option('--verbose', is_flag=True, help="Enable verbose output") def system_memory(verbose): """Show memory information""" - cmd = "free -m" + cmd = ['free', '-m'] run_command(cmd, display_cmd=verbose) @cli.command('services') def services(): """Show all daemon services""" - cmd = "sudo docker ps --format '{{.Names}}'" - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, text=True) + cmd = ["sudo", "docker", "ps", "--format", '{{.Names}}'] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) while True: line = proc.stdout.readline() if line != '': print(line.rstrip()+'\t'+"docker") print("---------------------------") - cmd = "sudo docker exec {} ps aux | sed '$d'".format(line.rstrip()) - proc1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, text=True) - print(proc1.stdout.read()) + cmd0 = ["sudo", "docker", "exec", line.rstrip(), "ps", "aux"] + cmd1 = ["sed", '$d'] + _, stdout = getstatusoutput_noshell_pipe(cmd0, cmd1) + print(stdout) else: break @@ -1918,10 +1925,10 @@ def radius(db): @click.option('--verbose', is_flag=True, help="Enable verbose output") def mirror_session(session_name, verbose): """Show existing everflow sessions""" - cmd = "acl-loader show session" + cmd = ['acl-loader', 'show', 'session'] if session_name is not None: - cmd += " {}".format(session_name) + cmd += [str(session_name)] run_command(cmd, display_cmd=verbose) @@ -1934,10 +1941,10 @@ def mirror_session(session_name, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def policer(policer_name, verbose): """Show existing policers""" - cmd = "acl-loader show policer" + cmd = ['acl-loader', 'show', 'policer'] if policer_name is not None: - cmd += " {}".format(policer_name) + cmd += [str(policer_name)] run_command(cmd, display_cmd=verbose) @@ -1949,7 +1956,7 @@ def policer(policer_name, verbose): @click.option('--verbose', is_flag=True, help="Enable verbose output") def ecn(verbose): """Show ECN configuration""" - cmd = "ecnconfig -l" + cmd = ['ecnconfig', '-l'] run_command(cmd, display_cmd=verbose) @@ -1959,8 +1966,8 @@ def ecn(verbose): @cli.command('boot') def boot(): """Show boot configuration""" - cmd = "sudo sonic-installer list" - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, text=True) + cmd = ["sudo", "sonic-installer", "list"] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) click.echo(proc.stdout.read()) @@ -1970,7 +1977,7 @@ def boot(): @cli.command('mmu') def mmu(): """Show mmu configuration""" - cmd = "mmuconfig -l" + cmd = ['mmuconfig', '-l'] run_command(cmd) # @@ -1987,7 +1994,7 @@ def buffer(): @buffer.command() def configuration(): """show buffer configuration""" - cmd = "mmuconfig -l" + cmd = ['mmuconfig', '-l'] run_command(cmd) # @@ -1996,7 +2003,7 @@ def configuration(): @buffer.command() def information(): """show buffer information""" - cmd = "buffershow -l" + cmd = ['buffershow', '-l'] run_command(cmd) @@ -2008,7 +2015,7 @@ def information(): @click.option('--verbose', is_flag=True, help="Enable verbose output") def line(brief, verbose): """Show all console lines and their info include available ttyUSB devices unless specified brief mode""" - cmd = "consutil show" + (" -b" if brief else "") + cmd = ['consutil', 'show'] + (["-b"] if brief else []) run_command(cmd, display_cmd=verbose) return @@ -2022,11 +2029,11 @@ def line(brief, verbose): def ztp(status, verbose): """Show Zero Touch Provisioning status""" if os.path.isfile('/usr/bin/ztp') is False: - exit("ZTP feature unavailable in this image version") + sys.exit("ZTP feature unavailable in this image version") - cmd = "ztp status" + cmd = ['ztp', 'status'] if verbose: - cmd = cmd + " --verbose" + cmd += ["--verbose"] run_command(cmd, display_cmd=verbose) diff --git a/show/platform.py b/show/platform.py index 85d729df84..c4f2c3a29c 100644 --- a/show/platform.py +++ b/show/platform.py @@ -148,9 +148,9 @@ def temperature(): @click.argument('args', nargs=-1, type=click.UNPROCESSED) def firmware(args): """Show firmware information""" - cmd = "sudo fwutil show {}".format(" ".join(args)) + cmd = ["sudo", "fwutil", "show"] + list(args) try: - subprocess.check_call(cmd, shell=True) + subprocess.check_call(cmd) except subprocess.CalledProcessError as e: sys.exit(e.returncode) diff --git a/tests/fdbshow_test.py b/tests/fdbshow_test.py index 578b278a95..0f129df299 100755 --- a/tests/fdbshow_test.py +++ b/tests/fdbshow_test.py @@ -450,7 +450,7 @@ def test_show_mac_no_address(self): def test_show_mac_no_type(self): self.set_mock_variant("6") - result = self.runner.invoke(show.cli.commands["mac"], ["-t Static"]) + result = self.runner.invoke(show.cli.commands["mac"], ["-t", "Static"]) print(result.exit_code) print(result.output) assert result.exit_code == 0 diff --git a/tests/intfstat_test.py b/tests/intfstat_test.py index 4522e08311..f76e54c7b5 100644 --- a/tests/intfstat_test.py +++ b/tests/intfstat_test.py @@ -168,7 +168,7 @@ def test_clear_single_intfs(self): result = runner.invoke(show.cli.commands["interfaces"].commands["counters"].commands["rif"], ["Ethernet20"]) print(result.output) # remove the counters snapshot - show.run_command("intfstat -D") + show.run_command(["intfstat", "-D"]) assert 'Last cached time was' in result.output.split('\n')[0] assert show_interfaces_counters_rif_clear_single_intf in result.output @@ -180,7 +180,7 @@ def test_clear_single_interface_check_all(self): result = runner.invoke(show.cli.commands["interfaces"].commands["counters"].commands["rif"], []) print(result.stdout) # remove the counters snapshot - show.run_command("intfstat -D") + show.run_command(["intfstat", "-D"]) assert 'Last cached time was' in result.output.split('\n')[0] assert show_single_interface_check_all_clear in result.output @@ -192,7 +192,7 @@ def test_clear(self): result = runner.invoke(show.cli.commands["interfaces"].commands["counters"].commands["rif"], []) print(result.stdout) # remove the counters snapshot - show.run_command("intfstat -D") + show.run_command(["intfstat", "-D"]) assert 'Last cached time was' in result.output.split('\n')[0] assert show_interfaces_counters_rif_clear in result.output diff --git a/tests/pfcstat_test.py b/tests/pfcstat_test.py index 6ac0401b24..23d184cc36 100644 --- a/tests/pfcstat_test.py +++ b/tests/pfcstat_test.py @@ -215,7 +215,7 @@ def test_pfc_counters_with_clear(self): [] ) print(result.output) - show.run_command('pfcstat -d') + show.run_command(['pfcstat', '-d']) assert result.exit_code == 0 assert "Last cached time was" in result.output assert show_pfc_counters_output_with_clear[0] in result.output and \ @@ -262,7 +262,7 @@ def test_pfc_counters_all_with_clear(self): [] ) print(result.output) - show.run_command('pfcstat -d') + show.run_command(['pfcstat', '-d']) assert result.exit_code == 0 assert "Last cached time was" in result.output assert show_pfc_counters_all_with_clear[0] in result.output and \ diff --git a/tests/queue_counter_test.py b/tests/queue_counter_test.py index 165f6e6fb3..1a78b3eeb8 100644 --- a/tests/queue_counter_test.py +++ b/tests/queue_counter_test.py @@ -1280,7 +1280,7 @@ def test_queue_counters_with_clear(self): [] ) print(result.output) - show.run_command('queuestat -d') + show.run_command(['queuestat', '-d']) assert result.exit_code == 0 assert "Ethernet0 Last cached time was" in result.output and \ "Ethernet4 Last cached time was" in result.output and \ @@ -1318,7 +1318,7 @@ def test_queue_counters_port_json(self): runner = CliRunner() result = runner.invoke( show.cli.commands["queue"].commands["counters"], - ["Ethernet8 --json"] + ["Ethernet8", "--json"] ) assert result.exit_code == 0 print(result.output) @@ -1343,7 +1343,7 @@ def test_queue_port_voq_counters(self): runner = CliRunner() result = runner.invoke( show.cli.commands["queue"].commands["counters"], - ["Ethernet0 --voq"] + ["Ethernet0", "--voq"] ) print(result.output) assert result.exit_code == 0 diff --git a/tests/show_test.py b/tests/show_test.py index 7b3e492fc9..b7f6a9baf8 100644 --- a/tests/show_test.py +++ b/tests/show_test.py @@ -2,12 +2,15 @@ import sys import click import pytest +import subprocess import show.main as show from unittest import mock from click.testing import CliRunner +from utilities_common import constants from unittest.mock import call, MagicMock, patch EXPECTED_BASE_COMMAND = 'sudo ' +EXPECTED_BASE_COMMAND_LIST = ['sudo'] test_path = os.path.dirname(os.path.abspath(__file__)) modules_path = os.path.dirname(test_path) @@ -65,73 +68,101 @@ def teardown_class(cls): @patch('show.main.run_command') @pytest.mark.parametrize( - "cli_arguments,expected", + "cli_arguments0,expected0", [ ([], 'cat /var/log/syslog'), (['xcvrd'], "cat /var/log/syslog | grep 'xcvrd'"), (['-l', '10'], 'cat /var/log/syslog | tail -10'), - (['-f'], 'tail -F /var/log/syslog'), ] ) -def test_show_logging_default(run_command, cli_arguments, expected): +@pytest.mark.parametrize( + "cli_arguments1,expected1", + [ + (['-f'], ['tail', '-F', '/var/log/syslog']), + ] +) +def test_show_logging_default(run_command, cli_arguments0, expected0, cli_arguments1, expected1): runner = CliRunner() - result = runner.invoke(show.cli.commands["logging"], cli_arguments) - run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected, display_cmd=False) + runner.invoke(show.cli.commands["logging"], cli_arguments0) + run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected0, display_cmd=False, shell=True) + runner.invoke(show.cli.commands["logging"], cli_arguments1) + run_command.assert_called_with(EXPECTED_BASE_COMMAND_LIST + expected1, display_cmd=False) @patch('show.main.run_command') @patch('os.path.isfile', MagicMock(return_value=True)) @pytest.mark.parametrize( - "cli_arguments,expected", + "cli_arguments0,expected0", [ ([], 'cat /var/log/syslog.1 /var/log/syslog'), (['xcvrd'], "cat /var/log/syslog.1 /var/log/syslog | grep 'xcvrd'"), (['-l', '10'], 'cat /var/log/syslog.1 /var/log/syslog | tail -10'), - (['-f'], 'tail -F /var/log/syslog'), ] ) -def test_show_logging_syslog_1(run_command, cli_arguments, expected): +@pytest.mark.parametrize( + "cli_arguments1,expected1", + [ + (['-f'], ['tail', '-F', '/var/log/syslog']), + ] +) +def test_show_logging_syslog_1(run_command, cli_arguments0, expected0, cli_arguments1, expected1): runner = CliRunner() - result = runner.invoke(show.cli.commands["logging"], cli_arguments) - run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected, display_cmd=False) + runner.invoke(show.cli.commands["logging"], cli_arguments0) + run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected0, display_cmd=False, shell=True) + runner.invoke(show.cli.commands["logging"], cli_arguments1) + run_command.assert_called_with(EXPECTED_BASE_COMMAND_LIST + expected1, display_cmd=False) @patch('show.main.run_command') @patch('os.path.exists', MagicMock(return_value=True)) @pytest.mark.parametrize( - "cli_arguments,expected", + "cli_arguments0,expected0", [ ([], 'cat /var/log.tmpfs/syslog'), (['xcvrd'], "cat /var/log.tmpfs/syslog | grep 'xcvrd'"), (['-l', '10'], 'cat /var/log.tmpfs/syslog | tail -10'), - (['-f'], 'tail -F /var/log.tmpfs/syslog'), ] ) -def test_show_logging_tmpfs(run_command, cli_arguments, expected): +@pytest.mark.parametrize( + "cli_arguments1,expected1", + [ + (['-f'], ['tail', '-F', '/var/log.tmpfs/syslog']), + ] +) +def test_show_logging_tmpfs(run_command, cli_arguments0, expected0, cli_arguments1, expected1): runner = CliRunner() - result = runner.invoke(show.cli.commands["logging"], cli_arguments) - run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected, display_cmd=False) + runner.invoke(show.cli.commands["logging"], cli_arguments0) + run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected0, display_cmd=False, shell=True) + runner.invoke(show.cli.commands["logging"], cli_arguments1) + run_command.assert_called_with(EXPECTED_BASE_COMMAND_LIST + expected1, display_cmd=False) @patch('show.main.run_command') @patch('os.path.isfile', MagicMock(return_value=True)) @patch('os.path.exists', MagicMock(return_value=True)) @pytest.mark.parametrize( - "cli_arguments,expected", + "cli_arguments0,expected0", [ ([], 'cat /var/log.tmpfs/syslog.1 /var/log.tmpfs/syslog'), (['xcvrd'], "cat /var/log.tmpfs/syslog.1 /var/log.tmpfs/syslog | grep 'xcvrd'"), (['-l', '10'], 'cat /var/log.tmpfs/syslog.1 /var/log.tmpfs/syslog | tail -10'), - (['-f'], 'tail -F /var/log.tmpfs/syslog'), ] ) -def test_show_logging_tmpfs_syslog_1(run_command, cli_arguments, expected): +@pytest.mark.parametrize( + "cli_arguments1,expected1", + [ + (['-f'], ['tail', '-F', '/var/log.tmpfs/syslog']), + ] +) +def test_show_logging_tmpfs_syslog_1(run_command, cli_arguments0, expected0, cli_arguments1, expected1): runner = CliRunner() - result = runner.invoke(show.cli.commands["logging"], cli_arguments) - run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected, display_cmd=False) + runner.invoke(show.cli.commands["logging"], cli_arguments0) + run_command.assert_called_with(EXPECTED_BASE_COMMAND + expected0, display_cmd=False, shell=True) + runner.invoke(show.cli.commands["logging"], cli_arguments1) + run_command.assert_called_with(EXPECTED_BASE_COMMAND_LIST + expected1, display_cmd=False) def side_effect_subprocess_popen(*args, **kwargs): mock = MagicMock() - if args[0] == "uptime": + if ' '.join(args[0]) == "uptime": mock.stdout.read.return_value = "05:58:07 up 25 days" - elif args[0].startswith("sudo docker images"): + elif ' '.join(args[0]).startswith("sudo docker images"): mock.stdout.read.return_value = "REPOSITORY TAG" return mock @@ -158,7 +189,6 @@ def test_show_version(): result = runner.invoke(show.cli.commands["version"]) assert "SONiC OS Version: 11" in result.output - class TestShowAcl(object): def setup(self): print('SETUP') @@ -638,6 +668,314 @@ def test_temporature(self, mock_run_command): assert result.exit_code == 0 mock_run_command.assert_called_once_with(['tempershow']) + @patch('subprocess.check_call') + def test_firmware(self, mock_check_call): + runner = CliRunner() + result = runner.invoke(show.cli.commands['platform'].commands['firmware']) + assert result.exit_code == 0 + mock_check_call.assert_called_with(["sudo", "fwutil", "show"]) + + def teardown(self): + print('TEAR DOWN') + +class TestShowQuagga(object): + def setup(self): + print('SETUP') + + @patch('show.main.run_command') + @patch('show.main.get_routing_stack', MagicMock(return_value='quagga')) + def test_show_ip_bgp(self, mock_run_command): + from show.bgp_quagga_v4 import bgp + runner = CliRunner() + + result = runner.invoke(show.cli.commands["ip"].commands['bgp'].commands['summary']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp summary"], return_cmd=True) + + result = runner.invoke(show.cli.commands["ip"].commands['bgp'].commands['neighbors']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp neighbor"]) + + result = runner.invoke(show.cli.commands["ip"].commands['bgp'].commands['neighbors'], ['0.0.0.0', 'routes']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', "show ip bgp neighbor 0.0.0.0 routes"]) + + @patch('show.main.run_command') + @patch('show.main.get_routing_stack', MagicMock(return_value='quagga')) + def test_show_ipv6_bgp(self, mock_run_command): + from show.bgp_quagga_v6 import bgp + runner = CliRunner() + + result = runner.invoke(show.cli.commands["ipv6"].commands['bgp'].commands['summary']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 bgp summary"], return_cmd=True) + + result = runner.invoke(show.cli.commands["ipv6"].commands['bgp'].commands['neighbors'], ['0.0.0.0', 'routes']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', "show ipv6 bgp neighbor 0.0.0.0 routes"]) + + def teardown(self): + print('TEAR DOWN') + + +class TestShow(object): + def setup(self): + print('SETUP') + + @patch('show.main.run_command') + def test_show_arp(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["arp"], ['0.0.0.0', '-if', 'Ethernet0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['nbrshow', '-4', '-ip', '0.0.0.0', '-if', 'Ethernet0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ndp(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["ndp"], ['0.0.0.0', '-if', 'Ethernet0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['nbrshow', '-6', '-ip', '0.0.0.0', '-if', 'Ethernet0'], display_cmd=True) + + @patch('show.main.run_command') + @patch('show.main.is_mgmt_vrf_enabled', MagicMock(return_value=True)) + def test_show_mgmt_vrf_routes(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["mgmt-vrf"], ['routes']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['ip', 'route', 'show', 'table', '5000']) + + @patch('show.main.run_command') + @patch('show.main.is_mgmt_vrf_enabled', MagicMock(return_value=True)) + def test_show_mgmt_vrf(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["mgmt-vrf"]) + assert result.exit_code == 0 + assert mock_run_command.call_args_list == [ + call(['ip', '-d', 'link', 'show', 'mgmt']), + call(['ip', 'link', 'show', 'vrf', 'mgmt']) + ] + + @patch('show.main.run_command') + def test_show_pfc_priority(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["pfc"].commands['priority'], ['Ethernet0']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['pfc', 'show', 'priority', 'Ethernet0']) + + @patch('show.main.run_command') + def test_show_pfc_asymmetric(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["pfc"].commands['asymmetric'], ['Ethernet0']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['pfc', 'show', 'asymmetric', 'Ethernet0']) + + @patch('show.main.run_command') + def test_show_pfcwd_config(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["pfcwd"].commands['config'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['pfcwd', 'show', 'config', '-d', 'all'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_pfcwd_stats(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["pfcwd"].commands['stats'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['pfcwd', 'show', 'stats', '-d', 'all'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_watermark_telemetry_interval(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["watermark"].commands['telemetry'].commands['interval']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['watermarkcfg', '--show-interval']) + + @patch('show.main.run_command') + def test_show_route_map(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands["route-map"], ['BGP', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', 'show route-map BGP'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ip_prefix_list(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ip'].commands["prefix-list"], ['0.0.0.0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', 'show ip prefix-list 0.0.0.0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ip_protocol(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ip'].commands["protocol"], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', 'show ip protocol'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ip_fib(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ip'].commands["fib"], ['0.0.0.0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['fibshow', '-4', '-ip', '0.0.0.0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ipv6_prefix_list(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ipv6'].commands["prefix-list"], ['0.0.0.0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', 'show ipv6 prefix-list 0.0.0.0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ipv6_protocol(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ipv6'].commands["protocol"], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', constants.RVTYSH_COMMAND, '-c', 'show ipv6 protocol'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_ipv6_fib(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ipv6'].commands["fib"], ['0.0.0.0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['fibshow', '-6', '-ip', '0.0.0.0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_lldp_neighbors(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['lldp'].commands["neighbors"], ['Ethernet0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'lldpshow', '-d', '-p' ,'Ethernet0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_lldp_table(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['lldp'].commands["table"], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'lldpshow'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_environment(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['environment'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'sensors'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_users(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['users'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['who'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_runningconfiguration_acl(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['runningconfiguration'].commands['acl'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sonic-cfggen', '-d', '--var-json', 'ACL_RULE'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_runningconfiguration_ports(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['runningconfiguration'].commands['ports'], ['Ethernet0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sonic-cfggen', '-d', '--var-json', 'PORT', '--key', 'Ethernet0'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_runningconfiguration_interfaces(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['runningconfiguration'].commands['interfaces'], ['Ethernet0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sonic-cfggen', '-d', '--var-json', 'INTERFACE', '--key', 'Ethernet0'], display_cmd=True) + + @patch('show.main.run_command') + @patch('show.main.getstatusoutput_noshell_pipe', MagicMock(return_value=(0, 'quagga'))) + def test_show_startupconfiguration_bgp_quagga(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['startupconfiguration'].commands['bgp'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/quagga/bgpd.conf'], display_cmd=True) + + @patch('show.main.run_command') + @patch('show.main.getstatusoutput_noshell_pipe', MagicMock(return_value=(0, 'frr'))) + def test_show_startupconfiguration_bgp_frr(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['startupconfiguration'].commands['bgp'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/frr/bgpd.conf'], display_cmd=True) + + @patch('show.main.run_command') + @patch('show.main.getstatusoutput_noshell_pipe', MagicMock(return_value=(0, 'gobgp'))) + def test_show_startupconfiguration_bgp_gobgp(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['startupconfiguration'].commands['bgp'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['sudo', 'docker', 'exec', 'bgp', 'cat', '/etc/gpbgp/bgpd.conf'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_uptime(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['uptime'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['uptime', '-p'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_clock(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['clock'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['date'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_system_memory(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['system-memory'], ['--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['free', '-m'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_mirror_session(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['mirror_session'], ['SPAN', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['acl-loader', 'show', 'session', 'SPAN'], display_cmd=True) + + @patch('show.main.run_command') + def test_show_policer(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['policer'], ['policer0', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['acl-loader', 'show', 'policer', 'policer0'], display_cmd=True) + + @patch('subprocess.Popen') + def test_show_boot(self, mock_subprocess_popen): + runner = CliRunner() + result = runner.invoke(show.cli.commands['boot']) + assert result.exit_code == 0 + mock_subprocess_popen.assert_called_with(["sudo", "sonic-installer", "list"], stdout=subprocess.PIPE, text=True) + + @patch('show.main.run_command') + def test_show_mmu(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['mmu']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['mmuconfig', '-l']) + + @patch('show.main.run_command') + def test_show_lines(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['line'], ['--brief', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['consutil', 'show', '-b'], display_cmd=True) + + @patch('show.main.run_command') + @patch('os.path.isfile', MagicMock(return_value=True)) + def test_show_ztp(self, mock_run_command): + runner = CliRunner() + result = runner.invoke(show.cli.commands['ztp'], ['status', '--verbose']) + assert result.exit_code == 0 + mock_run_command.assert_called_with(['ztp', 'status', '--verbose'], display_cmd=True) + def teardown(self): print('TEAR DOWN') diff --git a/tests/techsupport_test.py b/tests/techsupport_test.py index 41664e3589..c5f65895a2 100644 --- a/tests/techsupport_test.py +++ b/tests/techsupport_test.py @@ -3,18 +3,18 @@ from unittest.mock import patch, Mock from click.testing import CliRunner -EXPECTED_BASE_COMMAND = 'sudo ' +EXPECTED_BASE_COMMAND = ['sudo'] @patch("show.main.run_command") @pytest.mark.parametrize( "cli_arguments,expected", [ - ([], 'generate_dump -v -t 5'), - (['--since', '2 days ago'], "generate_dump -v -s '2 days ago' -t 5"), - (['-g', '50'], 'timeout --kill-after=300s -s SIGTERM --foreground 50m generate_dump -v -t 5'), - (['--allow-process-stop'], 'generate_dump -v -a -t 5'), - (['--silent'], 'generate_dump -t 5'), - (['--debug-dump', '--redirect-stderr'], 'generate_dump -v -d -t 5 -r'), + ([], ['generate_dump', '-v', '-t', '5']), + (['--since', '2 days ago'], ['generate_dump', '-v', '-s', '2 days ago', '-t', '5']), + (['-g', '50'], ['timeout', '--kill-after=300s', '-s', 'SIGTERM', '--foreground', '50m', 'generate_dump', '-v', '-t', '5']), + (['--allow-process-stop'], ['generate_dump', '-v', '-a', '-t', '5']), + (['--silent'], ['generate_dump', '-t', '5']), + (['--debug-dump', '--redirect-stderr'], ['generate_dump', '-v', '-d', '-t', '5', '-r']), ] ) def test_techsupport(run_command, cli_arguments, expected): diff --git a/tests/tunnelstat_test.py b/tests/tunnelstat_test.py index f1fe716ef3..19df51f2a6 100644 --- a/tests/tunnelstat_test.py +++ b/tests/tunnelstat_test.py @@ -83,7 +83,7 @@ def test_clear(self): expected = show_vxlan_counters_clear_output # remove the counters snapshot - show.run_command("tunnelstat -D") + show.run_command(['tunnelstat', '-D']) for line in expected: assert line in result.output @@ -97,7 +97,7 @@ def test_clear_interface(self): expected = show_vxlan_counters_clear_interface_output # remove the counters snapshot - show.run_command("tunnelstat -D") + show.run_command(['tunnelstat', '-D']) for line in expected: assert line in result.output