Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ ignore = [
"D102", # Missing docstring in public method (currently in too many places)
"FBT",
"PLR",
"PTH",
"TRY",
]
select = ["ALL"]
Expand All @@ -257,6 +256,11 @@ known-first-party = ["ansiblelint"]
"test/**/*.py" = ["S"]
"src/ansiblelint/rules/*.py" = ["S"]
"src/ansiblelint/testing/*.py" = ["S"]
# Temporary disabled until we fix them:
"src/ansiblelint/{testing,schemas,rules}/*.py" = ["PTH"]
"src/ansiblelint/{utils,file_utils,runner,loaders,constants,config,cli,_mockings,__main__}.py" = [
"PTH",
]

[tool.setuptools.dynamic]
optional-dependencies.docs = { file = [".config/requirements-docs.txt"] }
Expand Down
5 changes: 3 additions & 2 deletions src/ansiblelint/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import subprocess
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, TextIO

from ansible_compat.config import ansible_version
Expand Down Expand Up @@ -226,11 +227,11 @@ def main(argv: list[str] | None = None) -> int: # noqa: C901
for level, message in log_entries:
_logger.log(level, message)
_logger.debug("Options: %s", options)
_logger.debug(os.getcwd())
_logger.debug("CWD: %s", Path.cwd())

if not options.offline:
# pylint: disable=import-outside-toplevel
from ansiblelint.schemas import refresh_schemas
from ansiblelint.schemas.__main__ import refresh_schemas

refresh_schemas()

Expand Down
62 changes: 50 additions & 12 deletions src/ansiblelint/_mockings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,46 @@

import contextlib
import logging
import os
import re
import sys
from typing import TYPE_CHECKING

from ansiblelint.config import options
from ansiblelint.constants import ANSIBLE_MOCKED_MODULE, RC

if TYPE_CHECKING:
from pathlib import Path

_logger = logging.getLogger(__name__)


def _make_module_stub(module_name: str) -> None:
if not options.cache_dir:
msg = "Cache directory not set"
raise RuntimeError(msg)
# a.b.c is treated a collection
if re.match(r"^(\w+|\w+\.\w+\.[\.\w]+)$", module_name):
parts = module_name.split(".")
if len(parts) < 3:
path = f"{options.cache_dir}/modules"
path = options.cache_dir / "modules"
module_file = f"{options.cache_dir}/modules/{module_name}.py"
namespace = None
collection = None
else:
namespace = parts[0]
collection = parts[1]
path = f"{ options.cache_dir }/collections/ansible_collections/{ namespace }/{ collection }/plugins/modules/{ '/'.join(parts[2:-1]) }"
path = (
options.cache_dir
/ "collections"
/ "ansible_collections"
/ namespace
/ collection
/ "plugins"
/ "modules"
/ ("/".join(parts[2:-1]))
)
module_file = f"{path}/{parts[-1]}.py"
os.makedirs(path, exist_ok=True)
path.mkdir(exist_ok=True, parents=True)
_write_module_stub(
filename=module_file,
name=module_file,
Expand Down Expand Up @@ -58,17 +73,29 @@ def _write_module_stub(
# pylint: disable=too-many-branches
def _perform_mockings() -> None:
"""Mock modules and roles."""
path: Path
if not options.cache_dir:
msg = "Cache directory not set"
raise RuntimeError(msg)
for role_name in options.mock_roles:
if re.match(r"\w+\.\w+\.\w+$", role_name):
namespace, collection, role_dir = role_name.split(".")
path = f"{options.cache_dir}/collections/ansible_collections/{ namespace }/{ collection }/roles/{ role_dir }/"
path = (
options.cache_dir
/ "collections"
/ "ansible_collections"
/ namespace
/ collection
/ "roles"
/ role_dir
)
else:
path = f"{options.cache_dir}/roles/{role_name}"
path = options.cache_dir / "roles" / role_name
# Avoid error from makedirs if destination is a broken symlink
if os.path.islink(path) and not os.path.exists(path): # pragma: no cover
if path.is_symlink() and not path.exists(): # pragma: no cover
_logger.warning("Removed broken symlink from %s", path)
os.unlink(path)
os.makedirs(path, exist_ok=True)
path.unlink(missing_ok=True)
path.mkdir(exist_ok=True, parents=True)

if options.mock_modules:
for module_name in options.mock_modules:
Expand All @@ -77,11 +104,22 @@ def _perform_mockings() -> None:

def _perform_mockings_cleanup() -> None:
"""Clean up mocked modules and roles."""
if not options.cache_dir:
msg = "Cache directory not set"
raise RuntimeError(msg)
for role_name in options.mock_roles:
if re.match(r"\w+\.\w+\.\w+$", role_name):
namespace, collection, role_dir = role_name.split(".")
path = f"{options.cache_dir}/collections/ansible_collections/{ namespace }/{ collection }/roles/{ role_dir }/"
path = (
options.cache_dir
/ "collections"
/ "ansible_collections"
/ namespace
/ collection
/ "roles"
/ role_dir
)
else:
path = f"{options.cache_dir}/roles/{role_name}"
path = options.cache_dir / "roles" / role_name
with contextlib.suppress(OSError):
os.rmdir(path)
path.unlink()
4 changes: 2 additions & 2 deletions src/ansiblelint/_vendor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pkgutil
import sys
import warnings
from pathlib import Path

# This package exists to host vendored top-level Python packages for downstream packaging. Any Python packages
# installed beneath this one will be masked from the Ansible loader, and available from the front of sys.path.
Expand All @@ -18,7 +18,7 @@
def _ensure_vendored_path_entry() -> None:
"""Ensure that any downstream-bundled content beneath this package is available at the top of sys.path."""
# patch our vendored dir onto sys.path
vendored_path_entry = os.path.dirname(__file__)
vendored_path_entry = str(Path(__file__).parent)
vendored_module_names = {
m[1] for m in pkgutil.iter_modules([vendored_path_entry], "")
} # m[1] == m.name
Expand Down
8 changes: 5 additions & 3 deletions src/ansiblelint/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any

from ansible_compat.runtime import Runtime
Expand Down Expand Up @@ -101,7 +102,7 @@ def render_matches(self, matches: list[MatchError]) -> None: # noqa: C901
if self.options.sarif_file:
sarif = formatters.SarifFormatter(self.options.cwd, True)
json = sarif.format_result(matches)
with open(self.options.sarif_file, "w", encoding="utf-8") as sarif_file:
with self.options.sarif_file.open("w", encoding="utf-8") as sarif_file:
sarif_file.write(json)

def count_results(self, matches: list[MatchError]) -> SummarizedResults:
Expand Down Expand Up @@ -177,11 +178,12 @@ def report_outcome(self, result: LintResult, mark_as_success: bool = False) -> i
matched_rules = self._get_matched_skippable_rules(result.matches)

if matched_rules and self.options.generate_ignore:
console_stderr.print(f"Writing ignore file to {IGNORE_FILE.default}")
ignore_file_path = Path(IGNORE_FILE.default)
console_stderr.print(f"Writing ignore file to {ignore_file_path}")
lines = set()
for rule in result.matches:
lines.add(f"{rule.filename} {rule.tag}\n")
with open(IGNORE_FILE.default, "w", encoding="utf-8") as ignore_file:
with ignore_file_path.open("w", encoding="utf-8") as ignore_file:
ignore_file.write(
"# This file contains ignores rule violations for ansible-lint\n",
)
Expand Down
7 changes: 6 additions & 1 deletion src/ansiblelint/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,12 @@ def get_cli_parser() -> argparse.ArgumentParser:
],
help="stdout formatting, json being an alias for codeclimate. (default: %(default)s)",
)
parser.add_argument("--sarif-file", default=None, help="SARIF output file")
parser.add_argument(
"--sarif-file",
default=None,
type=Path,
help="SARIF output file",
)
parser.add_argument(
"-q",
dest="quiet",
Expand Down
3 changes: 2 additions & 1 deletion src/ansiblelint/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Constants used by AnsibleLint."""
import os.path
from enum import Enum
from pathlib import Path
from typing import Literal

DEFAULT_RULESDIR = os.path.join(os.path.dirname(__file__), "rules")
Expand Down Expand Up @@ -162,7 +163,7 @@ def main():
# reusable actions, where the mounted volume might have different owner.
#
# https://github.com/ansible/ansible-lint-action/issues/138
GIT_CMD = ["git", "-c", f"safe.directory={os.getcwd()}"]
GIT_CMD = ["git", "-c", f"safe.directory={Path.cwd()}"]

CONFIG_FILENAMES = [".ansible-lint", ".config/ansible-lint.yml"]

Expand Down
9 changes: 4 additions & 5 deletions src/ansiblelint/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class RulesCollection:

def __init__(
self,
rulesdirs: list[str] | None = None,
rulesdirs: list[str] | list[Path] | None = None,
options: Options = default_options,
profile_name: str | None = None,
conditional: bool = True,
Expand All @@ -379,9 +379,8 @@ def __init__(
self.profile = []
if profile_name:
self.profile = PROFILES[profile_name]
if rulesdirs is None:
rulesdirs = []
self.rulesdirs = expand_paths_vars(rulesdirs)
rulesdirs_str = [] if rulesdirs is None else [str(r) for r in rulesdirs]
self.rulesdirs = expand_paths_vars(rulesdirs_str)
self.rules: list[BaseRule] = []
# internal rules included in order to expose them for docs as they are
# not directly loaded by our rule loader.
Expand All @@ -393,7 +392,7 @@ def __init__(
WarningRule(),
],
)
for rule in load_plugins(rulesdirs):
for rule in load_plugins(rulesdirs_str):
self.register(rule, conditional=conditional)
self.rules = sorted(self.rules)

Expand Down
3 changes: 2 additions & 1 deletion src/ansiblelint/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path

from ansiblelint.config import Options
from ansiblelint.rules import RulesCollection
Expand All @@ -40,7 +41,7 @@ class Runner:
# pylint: disable=too-many-arguments,too-many-instance-attributes
def __init__(
self,
*lintables: Lintable | str,
*lintables: Lintable | str | Path,
rules: RulesCollection,
tags: frozenset[Any] = frozenset(),
skip_list: list[str] | None = None,
Expand Down
4 changes: 0 additions & 4 deletions src/ansiblelint/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
"""Module containing cached JSON schemas."""
from ansiblelint.schemas.__main__ import refresh_schemas
from ansiblelint.schemas.main import validate_file_schema

__all__ = ("refresh_schemas", "validate_file_schema")
4 changes: 2 additions & 2 deletions src/ansiblelint/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ def run_role_defaults_main(self, defaults_main_text: str) -> list[MatchError]:


def run_ansible_lint(
*argv: str,
*argv: str | Path,
cwd: Path | None = None,
executable: str | None = None,
env: dict[str, str] | None = None,
offline: bool = True,
) -> CompletedProcess:
"""Run ansible-lint on a given path and returns its output."""
args = [*argv]
args = [str(item) for item in argv]
if offline: # pragma: no cover
args.insert(0, "--offline")

Expand Down
11 changes: 6 additions & 5 deletions src/ansiblelint/yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
from collections.abc import Iterator, Sequence
from io import StringIO
from pathlib import Path
from re import Pattern
from typing import TYPE_CHECKING, Any, Callable, Union, cast

Expand Down Expand Up @@ -85,21 +86,21 @@ def load_yamllint_config() -> YamlLintConfig:
config = YamlLintConfig(content=YAMLLINT_CONFIG)
# if we detect local yamllint config we use it but raise a warning
# as this is likely to get out of sync with our internal config.
for file in [
for path in [
".yamllint",
".yamllint.yaml",
".yamllint.yml",
os.getenv("YAMLLINT_CONFIG_FILE", ""),
os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
+ "/yamllint/config",
os.getenv("XDG_CONFIG_HOME", "~/.config") + "/yamllint/config",
]:
if os.path.isfile(file):
file = Path(path).expanduser()
if file.is_file():
_logger.debug(
"Loading custom %s config file, this extends our "
"internal yamllint config.",
file,
)
config_override = YamlLintConfig(file=file)
config_override = YamlLintConfig(file=str(file))
config_override.extend(config)
config = config_override
break
Expand Down
7 changes: 4 additions & 3 deletions test/schemas/src/rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import keyword
import sys
from pathlib import Path
from typing import Any

play_keywords = list(
Expand Down Expand Up @@ -77,7 +78,7 @@ def is_ref_used(obj: Any, ref: str) -> bool:
invalid_var_names.remove("__peg_parser__")
print("Updating invalid var names") # noqa: T201

with open("f/vars.json", "r+", encoding="utf-8") as f:
with Path("f/vars.json").open("r+", encoding="utf-8") as f:
vars_schema = json.load(f)
vars_schema["anyOf"][0]["patternProperties"] = {
f"^(?!({'|'.join(invalid_var_names)})$)[a-zA-Z_][\\w]*$": {},
Expand All @@ -88,7 +89,7 @@ def is_ref_used(obj: Any, ref: str) -> bool:
f.truncate()

print("Compiling subschemas...") # noqa: T201
with open("f/ansible.json", encoding="utf-8") as f:
with Path("f/ansible.json").open(encoding="utf-8") as f:
combined_json = json.load(f)

for subschema in ["tasks", "playbook"]:
Expand Down Expand Up @@ -134,6 +135,6 @@ def is_ref_used(obj: Any, ref: str) -> bool:
if not spare:
break

with open(f"f/{subschema}.json", "w", encoding="utf-8") as f:
with Path(f"f/{subschema}.json").open("w", encoding="utf-8") as f:
json.dump(sub_json, f, indent=2, sort_keys=True)
f.write("\n")
8 changes: 5 additions & 3 deletions test/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ def test_generate_ignore(tmp_path: Path) -> None:
lintable = Lintable(tmp_path / "vars.yaml")
lintable.content = "foo: bar\nfoo: baz\n"
lintable.write(force=True)
assert not (tmp_path / ".ansible-lint-ignore").exists()
ignore_file = tmp_path / ".ansible-lint-ignore"
assert not ignore_file.exists()
result = run_ansible_lint(lintable.filename, "--generate-ignore", cwd=tmp_path)
assert result.returncode == 2
assert (tmp_path / ".ansible-lint-ignore").exists()
with open(tmp_path / ".ansible-lint-ignore", encoding="utf-8") as f:

assert ignore_file.exists()
with ignore_file.open(encoding="utf-8") as f:
assert "vars.yaml yaml[key-duplicates]\n" in f.readlines()
# Run again and now we expect to succeed as we have an ignore file.
result = run_ansible_lint(lintable.filename, cwd=tmp_path)
Expand Down
Loading