From de719b1358e933f64938b1d61fd9f140270072e1 Mon Sep 17 00:00:00 2001 From: David Pilnik Date: Tue, 29 Mar 2022 11:54:29 +0300 Subject: [PATCH 1/5] add Password Hardening CLI --- config/plugins/sonic-passwh_yang.py | 537 ++++++++++++++++++++++++++++ show/plugins/sonic-passwh_yang.py | 155 ++++++++ 2 files changed, 692 insertions(+) create mode 100644 config/plugins/sonic-passwh_yang.py create mode 100644 show/plugins/sonic-passwh_yang.py diff --git a/config/plugins/sonic-passwh_yang.py b/config/plugins/sonic-passwh_yang.py new file mode 100644 index 0000000000..ad4ebb4fdd --- /dev/null +++ b/config/plugins/sonic-passwh_yang.py @@ -0,0 +1,537 @@ +""" +Autogenerated config CLI plugin. + + +""" + +import copy +import click +import utilities_common.cli as clicommon +import utilities_common.general as general +from config import config_mgmt + + +# Load sonic-cfggen from source since /usr/local/bin/sonic-cfggen does not have .py extension. +sonic_cfggen = general.load_module_from_source('sonic_cfggen', '/usr/local/bin/sonic-cfggen') + + +def exit_with_error(*args, **kwargs): + """ Print a message with click.secho and abort CLI. + + Args: + args: Positional arguments to pass to click.secho + kwargs: Keyword arguments to pass to click.secho + """ + + click.secho(*args, **kwargs) + raise click.Abort() + + +def validate_config_or_raise(cfg): + """ Validate config db data using ConfigMgmt. + + Args: + cfg (Dict): Config DB data to validate. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + try: + cfg = sonic_cfggen.FormatConverter.to_serialized(copy.deepcopy(cfg)) + config_mgmt.ConfigMgmt().loadData(cfg) + except Exception as err: + raise Exception('Failed to validate configuration: {}'.format(err)) + + +def add_entry_validated(db, table, key, data): + """ Add new entry in table and validate configuration. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to add new entry to. + key (Union[str, Tuple]): Key name in the table. + data (Dict): Entry data. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + cfg = db.get_config() + cfg.setdefault(table, {}) + if key in cfg[table]: + raise Exception(f"{key} already exists") + + cfg[table][key] = data + + validate_config_or_raise(cfg) + db.set_entry(table, key, data) + + +def update_entry_validated(db, table, key, data, create_if_not_exists=False): + """ Update entry in table and validate configuration. + If attribute value in data is None, the attribute is deleted. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to add new entry to. + key (Union[str, Tuple]): Key name in the table. + data (Dict): Entry data. + create_if_not_exists (bool): + In case entry does not exists already a new entry + is not created if this flag is set to False and + creates a new entry if flag is set to True. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + cfg = db.get_config() + cfg.setdefault(table, {}) + + if not data: + raise Exception(f"No field/values to update {key}") + + if create_if_not_exists: + cfg[table].setdefault(key, {}) + + if key not in cfg[table]: + raise Exception(f"{key} does not exist") + + entry_changed = False + for attr, value in data.items(): + if value == cfg[table][key].get(attr): + continue + entry_changed = True + if value is None: + cfg[table][key].pop(attr, None) + else: + cfg[table][key][attr] = value + + if not entry_changed: + return + + validate_config_or_raise(cfg) + db.set_entry(table, key, cfg[table][key]) + + +def del_entry_validated(db, table, key): + """ Delete entry in table and validate configuration. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to add new entry to. + key (Union[str, Tuple]): Key name in the table. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + cfg = db.get_config() + cfg.setdefault(table, {}) + if key not in cfg[table]: + raise Exception(f"{key} does not exist") + + cfg[table].pop(key) + + validate_config_or_raise(cfg) + db.set_entry(table, key, None) + + +def add_list_entry_validated(db, table, key, attr, data): + """ Add new entry into list in table and validate configuration. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to add data to. + key (Union[str, Tuple]): Key name in the table. + attr (str): Attribute name which represents a list the data needs to be added to. + data (List): Data list to add to config DB. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + cfg = db.get_config() + cfg.setdefault(table, {}) + if key not in cfg[table]: + raise Exception(f"{key} does not exist") + cfg[table][key].setdefault(attr, []) + for entry in data: + if entry in cfg[table][key][attr]: + raise Exception(f"{entry} already exists") + cfg[table][key][attr].append(entry) + + validate_config_or_raise(cfg) + db.set_entry(table, key, cfg[table][key]) + + +def del_list_entry_validated(db, table, key, attr, data): + """ Delete entry from list in table and validate configuration. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to remove data from. + key (Union[str, Tuple]): Key name in the table. + attr (str): Attribute name which represents a list the data needs to be removed from. + data (Dict): Data list to remove from config DB. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + cfg = db.get_config() + cfg.setdefault(table, {}) + if key not in cfg[table]: + raise Exception(f"{key} does not exist") + cfg[table][key].setdefault(attr, []) + for entry in data: + if entry not in cfg[table][key][attr]: + raise Exception(f"{entry} does not exist") + cfg[table][key][attr].remove(entry) + if not cfg[table][key][attr]: + cfg[table][key].pop(attr) + + validate_config_or_raise(cfg) + db.set_entry(table, key, cfg[table][key]) + + +def clear_list_entry_validated(db, table, key, attr): + """ Clear list in object and validate configuration. + + Args: + db (swsscommon.ConfigDBConnector): Config DB connector obect. + table (str): Table name to remove the list attribute from. + key (Union[str, Tuple]): Key name in the table. + attr (str): Attribute name which represents a list that needs to be removed. + Raises: + Exception: when cfg does not satisfy YANG schema. + """ + + update_entry_validated(db, table, key, {attr: None}) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +@click.group(name="passw-hardening", + cls=clicommon.AliasedGroup) +def PASSW_HARDENING(): + """ PASSWORD HARDENING part of config_db.json """ + + pass + + + + +@PASSW_HARDENING.group(name="policies", + cls=clicommon.AliasedGroup) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES(db): + """ """ + + pass + + + + +@PASSW_HARDENING_POLICIES.command(name="state") + +@click.argument( + "state", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_state(db, state): + """ state of the feature """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "state": state, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="expiration") + +@click.argument( + "expiration", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_expiration(db, expiration): + """ expiration time (days unit) """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "expiration": expiration, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="expiration-warning") + +@click.argument( + "expiration-warning", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_expiration_warning(db, expiration_warning): + """ expiration warning time (days unit) """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "expiration_warning": expiration_warning, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="history-cnt") + +@click.argument( + "history-cnt", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_history_cnt(db, history_cnt): + """ num of old password that the system will recorded """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "history_cnt": history_cnt, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="len-min") + +@click.argument( + "len-min", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_len_min(db, len_min): + """ password min length """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "len_min": len_min, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="reject-user-passw-match") + +@click.argument( + "reject-user-passw-match", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_reject_user_passw_match(db, reject_user_passw_match): + """ username password match """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "reject_user_passw_match": reject_user_passw_match, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="lower-class") + +@click.argument( + "lower-class", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_lower_class(db, lower_class): + """ password lower chars policy """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "lower_class": lower_class, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="upper-class") + +@click.argument( + "upper-class", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_upper_class(db, upper_class): + """ password upper chars policy """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "upper_class": upper_class, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="digits-class") + +@click.argument( + "digits-class", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_digits_class(db, digits_class): + """ password digits chars policy """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "digits_class": digits_class, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + +@PASSW_HARDENING_POLICIES.command(name="special-class") + +@click.argument( + "special-class", + nargs=1, + required=True, +) +@clicommon.pass_db +def PASSW_HARDENING_POLICIES_special_class(db, special_class): + """ password special chars policy """ + + table = "PASSW_HARDENING" + key = "POLICIES" + data = { + "special_class": special_class, + } + try: + update_entry_validated(db.cfgdb, table, key, data, create_if_not_exists=True) + except Exception as err: + exit_with_error(f"Error: {err}", fg="red") + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +def register(cli): + """ Register new CLI nodes in root CLI. + + Args: + cli: Root CLI node. + Raises: + Exception: when root CLI already has a command + we are trying to register. + """ + cli_node = PASSW_HARDENING + if cli_node.name in cli.commands: + raise Exception(f"{cli_node.name} already exists in CLI") + cli.add_command(PASSW_HARDENING) \ No newline at end of file diff --git a/show/plugins/sonic-passwh_yang.py b/show/plugins/sonic-passwh_yang.py new file mode 100644 index 0000000000..2123847035 --- /dev/null +++ b/show/plugins/sonic-passwh_yang.py @@ -0,0 +1,155 @@ +""" +Auto-generated show CLI plugin. + + +""" + +import click +import tabulate +import natsort +import utilities_common.cli as clicommon + + + + + +def format_attr_value(entry, attr): + """ Helper that formats attribute to be presented in the table output. + + Args: + entry (Dict[str, str]): CONFIG DB entry configuration. + attr (Dict): Attribute metadata. + + Returns: + str: fomatted attribute value. + """ + + if attr["is-leaf-list"]: + return "\n".join(entry.get(attr["name"], [])) + return entry.get(attr["name"], "N/A") + + +def format_group_value(entry, attrs): + """ Helper that formats grouped attribute to be presented in the table output. + + Args: + entry (Dict[str, str]): CONFIG DB entry configuration. + attrs (List[Dict]): Attributes metadata that belongs to the same group. + + Returns: + str: fomatted group attributes. + """ + + data = [] + for attr in attrs: + if entry.get(attr["name"]): + data.append((attr["name"] + ":", format_attr_value(entry, attr))) + return tabulate.tabulate(data, tablefmt="plain") + + + + + + + + + + + + +@click.group(name="passw-hardening", + cls=clicommon.AliasedGroup) +def PASSW_HARDENING(): + """ PASSWORD HARDENING part of config_db.json """ + + pass + + + +@PASSW_HARDENING.command(name="policies") +@clicommon.pass_db +def PASSW_HARDENING_POLICIES(db): + """ """ + + header = [ + +"STATE", +"EXPIRATION", +"EXPIRATION WARNING", +"HISTORY CNT", +"LEN MIN", +"REJECT USER PASSW MATCH", +"LOWER CLASS", +"UPPER CLASS", +"DIGITS CLASS", +"SPECIAL CLASS", + +] + + body = [] + + table = db.cfgdb.get_table("PASSW_HARDENING") + entry = table.get("POLICIES", {}) + row = [ + format_attr_value( + entry, + {'name': 'state', 'description': 'state of the feature', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'expiration', 'description': 'expiration time (days unit)', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'expiration_warning', 'description': 'expiration warning time (days unit)', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'history_cnt', 'description': 'num of old password that the system will recorded', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'len_min', 'description': 'password min length', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'reject_user_passw_match', 'description': 'username password match', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'lower_class', 'description': 'password lower chars policy', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'upper_class', 'description': 'password upper chars policy', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'digits_class', 'description': 'password digits chars policy', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), + format_attr_value( + entry, + {'name': 'special_class', 'description': 'password special chars policy', 'is-leaf-list': False, 'is-mandatory': False, 'group': ''} + ), +] + + body.append(row) + click.echo(tabulate.tabulate(body, header)) + + + + + +def register(cli): + """ Register new CLI nodes in root CLI. + + Args: + cli (click.core.Command): Root CLI node. + Raises: + Exception: when root CLI already has a command + we are trying to register. + """ + cli_node = PASSW_HARDENING + if cli_node.name in cli.commands: + raise Exception(f"{cli_node.name} already exists in CLI") + cli.add_command(PASSW_HARDENING) \ No newline at end of file From 6b50c6761154206d7c78af8bfb9dbde14985543f Mon Sep 17 00:00:00 2001 From: davidpil Date: Wed, 13 Apr 2022 12:10:02 +0000 Subject: [PATCH 2/5] Add Password Hardening CLI test --- .../assert_show_output.py | 40 +++++ .../default_config_db.json | 14 ++ tests/passw_hardening_test.py | 160 ++++++++++++++++++ 3 files changed, 214 insertions(+) create mode 100644 tests/passw_hardening_input/assert_show_output.py create mode 100644 tests/passw_hardening_input/default_config_db.json create mode 100644 tests/passw_hardening_test.py diff --git a/tests/passw_hardening_input/assert_show_output.py b/tests/passw_hardening_input/assert_show_output.py new file mode 100644 index 0000000000..9500c98be4 --- /dev/null +++ b/tests/passw_hardening_input/assert_show_output.py @@ -0,0 +1,40 @@ +""" +Module holding the correct values for show CLI command outputs for the passw_hardening_test.py +""" + +show_passw_hardening_policies_default="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +-------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +disabled 180 15 10 8 true true true true true +""" + +show_passw_hardening_policies_classes_disabled="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +-------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +disabled 180 15 10 8 false false false false false +""" + +show_passw_hardening_policies_enabled="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +enabled 180 15 10 8 true true true true true +""" + + +show_passw_hardening_policies_expiration="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +enabled 100 15 10 8 true true true true true +""" + +show_passw_hardening_policies_history_cnt="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +-------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +disabled 180 15 40 8 true true true true true +""" + +show_passw_hardening_policies_len_min="""\ +STATE EXPIRATION EXPIRATION WARNING HISTORY CNT LEN MIN REJECT USER PASSW MATCH LOWER CLASS UPPER CLASS DIGITS CLASS SPECIAL CLASS +-------- ------------ -------------------- ------------- --------- ------------------------- ------------- ------------- -------------- --------------- +disabled 180 15 10 30 true true true true true +""" \ No newline at end of file diff --git a/tests/passw_hardening_input/default_config_db.json b/tests/passw_hardening_input/default_config_db.json new file mode 100644 index 0000000000..0eb363eb41 --- /dev/null +++ b/tests/passw_hardening_input/default_config_db.json @@ -0,0 +1,14 @@ +{ + "PASSW_HARDENING|POLICIES": { + "state": "disabled", + "expiration": "180", + "expiration_warning": "15", + "history_cnt": "10", + "len_min": "8", + "reject_user_passw_match": "true", + "digits_class": "true", + "lower_class": "true", + "special_class": "true", + "upper_class": "true" + } +} diff --git a/tests/passw_hardening_test.py b/tests/passw_hardening_test.py new file mode 100644 index 0000000000..22ede141d2 --- /dev/null +++ b/tests/passw_hardening_test.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python + +import os +import logging +import show.main as show +import config.main as config + +from .passw_hardening_input import assert_show_output +from utilities_common.db import Db +from click.testing import CliRunner +from .mock_tables import dbconnector + +logger = logging.getLogger(__name__) +test_path = os.path.dirname(os.path.abspath(__file__)) +mock_db_path = os.path.join(test_path, "passw_hardening_input") + +SUCCESS = 0 +ERROR = 1 +INVALID_VALUE = 'INVALID' +EXP_GOOD_FLOW = 1 +EXP_BAD_FLOW = 0 + +class TestPasswHardening: + @classmethod + def setup_class(cls): + logger.info("SETUP") + os.environ['UTILITIES_UNIT_TESTING'] = "2" + + + @classmethod + def teardown_class(cls): + logger.info("TEARDOWN") + os.environ['UTILITIES_UNIT_TESTING'] = "0" + os.environ["UTILITIES_UNIT_TESTING_TOPOLOGY"] = "" + dbconnector.dedicated_dbs['CONFIG_DB'] = None + + def verify_passw_policies_output(self, db, runner, output, expected=EXP_GOOD_FLOW): + result = runner.invoke(show.cli.commands["passw-hardening"].commands["policies"], [], obj=db) + logger.debug("\n" + result.output) + logger.debug(result.exit_code) + + if expected: # good flow expected (default) + assert result.exit_code == SUCCESS + assert result.output == output + else: # bad flow expected + assert result.exit_code == ERROR + + def passw_hardening_set_policy(self, runner, db, attr, value, expected=EXP_GOOD_FLOW): + result = runner.invoke( + config.config.commands["passw-hardening"].commands["policies"].commands[attr], + [value], obj=db + ) + + if expected: # good flow expected (default) + logger.debug("\n" + result.output) + logger.debug(result.exit_code) + assert result.exit_code == SUCCESS + else: # bad flow expected + assert result.exit_code == ERROR + + + ######### PASSW-HARDENING ######### + + def test_passw_hardening_default(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) + + def test_passw_hardening_feature_enabled(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "enabled") + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_enabled) + + def test_passw_hardening_policies_classes_disabled(self): + """Disable passw hardening classes & reject user passw match policies""" + + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + passw_classes = { "reject-user-passw-match": "false", + "digits-class": "false", + "lower-class": "false", + "special-class": "false", + "upper-class": "false" + } + + for k, v in passw_classes.items(): + self.passw_hardening_set_policy(runner, db, k, v) + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_classes_disabled) + + def test_passw_hardening_policies_exp_time(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "enabled") + self.passw_hardening_set_policy(runner, db, "expiration", "100") + self.passw_hardening_set_policy(runner, db, "expiration-warning", "15") + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_expiration) + + def test_passw_hardening_policies_history(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "history-cnt", "40") + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_history_cnt) + + def test_passw_hardening_policies_len_min(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "len-min", "30") + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_len_min) + + def test_passw_hardening_policy_expiration_invalid(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + INVALID_EXP_TIME = "600" + + self.passw_hardening_set_policy(runner, db, "expiration", INVALID_EXP_TIME, EXP_BAD_FLOW) + + # expect default values, because invalid values should not succed to modify default configuration + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) + + def test_passw_hardening_policy_len_min_invalid(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + INVALID_EXP_LEN = "500" + + self.passw_hardening_set_policy(runner, db, "len-min", INVALID_EXP_LEN, EXP_BAD_FLOW) + + # expect default values, because invalid values should not succed to modify default configuration + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) + + def test_passw_hardening_policy_class_invalid(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + INVALID_VALUE = '?' + + self.passw_hardening_set_policy(runner, db, "expiration", INVALID_VALUE, EXP_BAD_FLOW) + + # expect default values, because invalid values should not succed to modify default configuration + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) + From 4a88b709163134b88e55750291400b40f3e5ac49 Mon Sep 17 00:00:00 2001 From: davidpil2002 <91657985+davidpil2002@users.noreply.github.com> Date: Tue, 26 Apr 2022 09:15:29 +0300 Subject: [PATCH 3/5] Update sonic-passwh_yang.py removed unuse import natsort --- show/plugins/sonic-passwh_yang.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/show/plugins/sonic-passwh_yang.py b/show/plugins/sonic-passwh_yang.py index 2123847035..926eaf09d0 100644 --- a/show/plugins/sonic-passwh_yang.py +++ b/show/plugins/sonic-passwh_yang.py @@ -6,7 +6,6 @@ import click import tabulate -import natsort import utilities_common.cli as clicommon @@ -152,4 +151,4 @@ def register(cli): cli_node = PASSW_HARDENING if cli_node.name in cli.commands: raise Exception(f"{cli_node.name} already exists in CLI") - cli.add_command(PASSW_HARDENING) \ No newline at end of file + cli.add_command(PASSW_HARDENING) From 4b81398472d93990c7178117d92559f84cec8538 Mon Sep 17 00:00:00 2001 From: David Pilnik Date: Wed, 3 Aug 2022 12:38:54 +0300 Subject: [PATCH 4/5] [password hardening]Add test to coverage bad flows --- config/plugins/sonic-passwh_yang.py | 8 +-- tests/passw_hardening_test.py | 108 ++++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 13 deletions(-) diff --git a/config/plugins/sonic-passwh_yang.py b/config/plugins/sonic-passwh_yang.py index ad4ebb4fdd..8918305e85 100644 --- a/config/plugins/sonic-passwh_yang.py +++ b/config/plugins/sonic-passwh_yang.py @@ -1,9 +1,3 @@ -""" -Autogenerated config CLI plugin. - - -""" - import copy import click import utilities_common.cli as clicommon @@ -534,4 +528,4 @@ def register(cli): cli_node = PASSW_HARDENING if cli_node.name in cli.commands: raise Exception(f"{cli_node.name} already exists in CLI") - cli.add_command(PASSW_HARDENING) \ No newline at end of file + cli.add_command(PASSW_HARDENING) diff --git a/tests/passw_hardening_test.py b/tests/passw_hardening_test.py index 2feb73d09f..e57fdfd0c8 100644 --- a/tests/passw_hardening_test.py +++ b/tests/passw_hardening_test.py @@ -67,16 +67,26 @@ def test_passw_hardening_default(self): runner = CliRunner() self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) - + def test_passw_hardening_feature_enabled(self): dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') db = Db() runner = CliRunner() - + self.passw_hardening_set_policy(runner, db, "state", "enabled") - + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_enabled) - + + def test_passw_hardening_feature_disabled(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "enabled") + self.passw_hardening_set_policy(runner, db, "state", "disabled") + + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_default) + def test_passw_hardening_policies_classes_disabled(self): """Disable passw hardening classes & reject user passw match policies""" @@ -95,7 +105,7 @@ def test_passw_hardening_policies_classes_disabled(self): self.passw_hardening_set_policy(runner, db, k, v) self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_classes_disabled) - + def test_passw_hardening_policies_exp_time(self): dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') db = Db() @@ -123,4 +133,90 @@ def test_passw_hardening_policies_len_min(self): self.passw_hardening_set_policy(runner, db, "len-min", "30") - self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_len_min) \ No newline at end of file + self.verify_passw_policies_output(db, runner, assert_show_output.show_passw_hardening_policies_len_min) + + def test_passw_hardening_bad_flow_len_min(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "enabled") + self.passw_hardening_set_policy(runner, db, "len-min", "10000", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_history_cnt(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "enabled") + self.passw_hardening_set_policy(runner, db, "history-cnt", "100000", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_state(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "state", "0", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_expiration(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "expiration", "####", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_expiration_warning(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "expiration-warning", "4000", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_upper_class(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "upper-class", "1", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_lower_class(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "lower-class", "1", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_special_class(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "special-class", "1", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_digits_class(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "digits-class", "1", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_reject_user_passw_match(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + + self.passw_hardening_set_policy(runner, db, "reject-user-passw-match", "1", EXP_BAD_FLOW) + + def test_passw_hardening_bad_flow_policy(self): + dbconnector.dedicated_dbs['CONFIG_DB'] = os.path.join(mock_db_path, 'default_config_db') + db = Db() + runner = CliRunner() + try: + self.passw_hardening_set_policy(runner, db, "no-exist-command", "1", EXP_BAD_FLOW) + except Exception as e: + # import pdb;pdb.set_trace() + if 'no-exist-command' in str(e): + pass + else: + raise e + From 24814bafdea5bf47fea223141c08de530facdcb1 Mon Sep 17 00:00:00 2001 From: David Pilnik Date: Tue, 9 Aug 2022 18:27:25 +0300 Subject: [PATCH 5/5] [password hardening]remove unuse functions created by the auto cli generator --- config/plugins/sonic-passwh_yang.py | 151 ---------------------------- show/plugins/sonic-passwh_yang.py | 28 ------ 2 files changed, 179 deletions(-) diff --git a/config/plugins/sonic-passwh_yang.py b/config/plugins/sonic-passwh_yang.py index 8918305e85..6cfe2acafe 100644 --- a/config/plugins/sonic-passwh_yang.py +++ b/config/plugins/sonic-passwh_yang.py @@ -37,29 +37,6 @@ def validate_config_or_raise(cfg): raise Exception('Failed to validate configuration: {}'.format(err)) -def add_entry_validated(db, table, key, data): - """ Add new entry in table and validate configuration. - - Args: - db (swsscommon.ConfigDBConnector): Config DB connector obect. - table (str): Table name to add new entry to. - key (Union[str, Tuple]): Key name in the table. - data (Dict): Entry data. - Raises: - Exception: when cfg does not satisfy YANG schema. - """ - - cfg = db.get_config() - cfg.setdefault(table, {}) - if key in cfg[table]: - raise Exception(f"{key} already exists") - - cfg[table][key] = data - - validate_config_or_raise(cfg) - db.set_entry(table, key, data) - - def update_entry_validated(db, table, key, data, create_if_not_exists=False): """ Update entry in table and validate configuration. If attribute value in data is None, the attribute is deleted. @@ -104,134 +81,6 @@ def update_entry_validated(db, table, key, data, create_if_not_exists=False): validate_config_or_raise(cfg) db.set_entry(table, key, cfg[table][key]) - - -def del_entry_validated(db, table, key): - """ Delete entry in table and validate configuration. - - Args: - db (swsscommon.ConfigDBConnector): Config DB connector obect. - table (str): Table name to add new entry to. - key (Union[str, Tuple]): Key name in the table. - Raises: - Exception: when cfg does not satisfy YANG schema. - """ - - cfg = db.get_config() - cfg.setdefault(table, {}) - if key not in cfg[table]: - raise Exception(f"{key} does not exist") - - cfg[table].pop(key) - - validate_config_or_raise(cfg) - db.set_entry(table, key, None) - - -def add_list_entry_validated(db, table, key, attr, data): - """ Add new entry into list in table and validate configuration. - - Args: - db (swsscommon.ConfigDBConnector): Config DB connector obect. - table (str): Table name to add data to. - key (Union[str, Tuple]): Key name in the table. - attr (str): Attribute name which represents a list the data needs to be added to. - data (List): Data list to add to config DB. - Raises: - Exception: when cfg does not satisfy YANG schema. - """ - - cfg = db.get_config() - cfg.setdefault(table, {}) - if key not in cfg[table]: - raise Exception(f"{key} does not exist") - cfg[table][key].setdefault(attr, []) - for entry in data: - if entry in cfg[table][key][attr]: - raise Exception(f"{entry} already exists") - cfg[table][key][attr].append(entry) - - validate_config_or_raise(cfg) - db.set_entry(table, key, cfg[table][key]) - - -def del_list_entry_validated(db, table, key, attr, data): - """ Delete entry from list in table and validate configuration. - - Args: - db (swsscommon.ConfigDBConnector): Config DB connector obect. - table (str): Table name to remove data from. - key (Union[str, Tuple]): Key name in the table. - attr (str): Attribute name which represents a list the data needs to be removed from. - data (Dict): Data list to remove from config DB. - Raises: - Exception: when cfg does not satisfy YANG schema. - """ - - cfg = db.get_config() - cfg.setdefault(table, {}) - if key not in cfg[table]: - raise Exception(f"{key} does not exist") - cfg[table][key].setdefault(attr, []) - for entry in data: - if entry not in cfg[table][key][attr]: - raise Exception(f"{entry} does not exist") - cfg[table][key][attr].remove(entry) - if not cfg[table][key][attr]: - cfg[table][key].pop(attr) - - validate_config_or_raise(cfg) - db.set_entry(table, key, cfg[table][key]) - - -def clear_list_entry_validated(db, table, key, attr): - """ Clear list in object and validate configuration. - - Args: - db (swsscommon.ConfigDBConnector): Config DB connector obect. - table (str): Table name to remove the list attribute from. - key (Union[str, Tuple]): Key name in the table. - attr (str): Attribute name which represents a list that needs to be removed. - Raises: - Exception: when cfg does not satisfy YANG schema. - """ - - update_entry_validated(db, table, key, {attr: None}) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @click.group(name="passw-hardening", diff --git a/show/plugins/sonic-passwh_yang.py b/show/plugins/sonic-passwh_yang.py index 926eaf09d0..04f56877a0 100644 --- a/show/plugins/sonic-passwh_yang.py +++ b/show/plugins/sonic-passwh_yang.py @@ -28,34 +28,6 @@ def format_attr_value(entry, attr): return entry.get(attr["name"], "N/A") -def format_group_value(entry, attrs): - """ Helper that formats grouped attribute to be presented in the table output. - - Args: - entry (Dict[str, str]): CONFIG DB entry configuration. - attrs (List[Dict]): Attributes metadata that belongs to the same group. - - Returns: - str: fomatted group attributes. - """ - - data = [] - for attr in attrs: - if entry.get(attr["name"]): - data.append((attr["name"] + ":", format_attr_value(entry, attr))) - return tabulate.tabulate(data, tablefmt="plain") - - - - - - - - - - - - @click.group(name="passw-hardening", cls=clicommon.AliasedGroup) def PASSW_HARDENING():