diff --git a/dissect/target/plugins/apps/webserver/apache.py b/dissect/target/plugins/apps/webserver/apache.py index 88b929c7f2..a1948e3221 100644 --- a/dissect/target/plugins/apps/webserver/apache.py +++ b/dissect/target/plugins/apps/webserver/apache.py @@ -1,12 +1,15 @@ +from __future__ import annotations + import itertools import re from datetime import datetime +from functools import cached_property from pathlib import Path -from typing import Iterator, NamedTuple, Optional +from typing import Iterator, NamedTuple -from dissect.target import plugin from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError from dissect.target.helpers.fsutil import open_decompress +from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.apps.webserver.webserver import ( WebserverAccessLogRecord, WebserverErrorLogRecord, @@ -20,6 +23,31 @@ class LogFormat(NamedTuple): pattern: re.Pattern +# e.g. ServerRoot "/etc/httpd" +RE_CONFIG_ROOT = re.compile( + r""" + [\s#]* # Optionally prefixed by space(s) or pound sign(s). + ServerRoot + \s + "?(?P[^"\s]+)" + $ + """, + re.VERBOSE, +) + +# e.g. Include conf.modules.d/*.conf and IncludeOptional conf.d/*.conf +RE_CONFIG_INCLUDE = re.compile( + r""" + [\s#]* # Optionally prefixed by space(s) or pound sign(s). + (Include|IncludeOptional) # Directive indicating that additional config files are loaded. + \s + ?(?P[^"\s]+) + $ + """, + re.VERBOSE, +) + + # e.g. CustomLog "/custom/log/location/access.log" common RE_CONFIG_CUSTOM_LOG_DIRECTIVE = re.compile( r""" @@ -40,7 +68,7 @@ class LogFormat(NamedTuple): [\s#]* # Optionally prefixed by space(s) or pound sign(s). ErrorLog # Directive indicating that a custom error log location / format is used. \s - "?(?P[^"\s$]+)"? # Location to log to, optionally wrapped in double quotes. + "?(?P[^"\s]+)"? # Location to log to, optionally wrapped in double quotes. $ """, re.VERBOSE, @@ -112,6 +140,8 @@ class LogFormat(NamedTuple): (?P.*) # The actual log message. """ +RE_ENV_VAR_IN_STRING = re.compile(r"\$\{(?P[^\"\s$]+)\}", re.VERBOSE) + LOG_FORMAT_ACCESS_COMMON = LogFormat( "common", re.compile( @@ -187,65 +217,149 @@ class ApachePlugin(WebserverPlugin): "/etc/httpd/conf/httpd.conf", "/etc/httpd.conf", ] + DEFAULT_ENVVAR_PATHS = ["/etc/apache2/envvars", "/etc/sysconfig/httpd", "/etc/rc.conf"] def __init__(self, target: Target): super().__init__(target) - self.access_log_paths, self.error_log_paths = self.get_log_paths() + self.server_root = None + self.access_paths = set() + self.error_paths = set() + self.find_logs() def check_compatible(self) -> None: - if not len(self.access_log_paths) and not len(self.error_log_paths): - raise UnsupportedPluginError("No Apache directories found") + if not self.access_paths and not self.error_paths: + raise UnsupportedPluginError("No Apache log files found") - def get_log_paths(self) -> tuple[list[Path], list[Path]]: - """ - Discover any present Apache log paths on the target system. + if self.target.os == OperatingSystem.CITRIX: + raise UnsupportedPluginError("Use the 'apps.webserver.citrix' apache plugin instead") + + def find_logs(self) -> None: + """Discover any present Apache log paths on the target system. + Populates ``self.access_paths`` and ``self.error_paths``. References: + - https://httpd.apache.org/docs/2.4/logs.html + - https://httpd.apache.org/docs/2.4/mod/mod_log_config.html - https://www.cyberciti.biz/faq/apache-logs/ - https://unix.stackexchange.com/a/269090 """ - access_log_paths = set() - error_log_paths = set() - # Check if any well known default Apache log locations exist for log_dir, log_name in itertools.product(self.DEFAULT_LOG_DIRS, self.ACCESS_LOG_NAMES): - access_log_paths.update(self.target.fs.path(log_dir).glob(f"{log_name}*")) + self.access_paths.update(self.target.fs.path(log_dir).glob(f"*{log_name}*")) for log_dir, log_name in itertools.product(self.DEFAULT_LOG_DIRS, self.ERROR_LOG_NAMES): - error_log_paths.update(self.target.fs.path(log_dir).glob(f"{log_name}*")) + self.error_paths.update(self.target.fs.path(log_dir).glob(f"*{log_name}*")) + + seen = set() # Check default Apache configs for CustomLog or ErrorLog directives for config in self.DEFAULT_CONFIG_PATHS: - if (path := self.target.fs.path(config)).exists(): - for line in path.open("rt"): - line = line.strip() + if (path := self.target.fs.path(config)).exists() and path not in seen: + self._process_conf_file(path, seen) + + # Check all .conf files inside the server root + if self.server_root: + for path in self.server_root.rglob("*.conf"): + if path not in seen: + self._process_conf_file(path, seen) + + def _process_conf_file(self, path: Path, seen: set[Path] | None = None) -> None: + """Process an Apache ``.conf`` file for ``ServerRoot``, ``CustomLog``, ``Include`` + and ``OptionalInclude`` directives. Populates ``self.access_paths`` and ``self.error_paths``. + """ + seen = set() if seen is None else seen - if not line or ("CustomLog" not in line and "ErrorLog" not in line): - continue + if path in seen: + self.target.log.warning("Detected recursion in Apache configuration, file already parsed: %s", path) + return + + seen.add(path) - if "ErrorLog" in line: - set_to_update = error_log_paths - pattern_to_use = RE_CONFIG_ERRORLOG_DIRECTIVE - else: - set_to_update = access_log_paths - pattern_to_use = RE_CONFIG_CUSTOM_LOG_DIRECTIVE + for line in path.open("rt"): + if not (line := line.strip()): + continue + + if "ServerRoot" in line: + if not (match := RE_CONFIG_ROOT.match(line)): + self.target.log.warning("Unable to parse Apache 'ServerRoot' configuration in %s: %r", path, line) + continue + location = match.groupdict().get("location") + self.server_root = self.target.fs.path(location) + + elif "CustomLog" in line or "ErrorLog" in line: + self._process_conf_line(path, line) + + elif "Include" in line: + if not (match := RE_CONFIG_INCLUDE.match(line)): + self.target.log.warning("Unable to parse Apache 'Include' configuration in %s: %r", path, line) + continue - match = pattern_to_use.match(line) - if not match: - self.target.log.warning("Unexpected Apache log configuration: %s (%s)", line, path) + location = match.groupdict().get("location") + + if "*" in location: + root, rest = location.split("*", 1) + if root.startswith("/"): + root = self.target.fs.path(root) + elif not root.startswith("/") and self.server_root: + root = self.server_root.joinpath(root) + elif not self.server_root: + self.target.log.warning("Unable to resolve relative Include in %s: %r", path, line) continue - directive = match.groupdict() - custom_log = self.target.fs.path(directive["location"]) - set_to_update.update(path for path in custom_log.parent.glob(f"{custom_log.name}*")) + for found_conf in root.glob(f"*{rest}"): + self._process_conf_file(found_conf, seen) + + elif self.server_root and (include_path := self.server_root.joinpath(location)).exists(): + self._process_conf_file(include_path, seen) + + elif (include_path := self.target.fs.path(location)).exists(): + self._process_conf_file(include_path, seen) + + else: + self.target.log.warning("Unable to resolve Apache Include in %s: %r", path, line) + + def _process_conf_line(self, path: Path, line: str) -> None: + """Parse and resolve the given ``CustomLog`` or ``ErrorLog`` directive found in a Apache ``.conf`` file.""" + if "ErrorLog" in line: + pattern = RE_CONFIG_ERRORLOG_DIRECTIVE + set_to_update = self.error_paths + else: + pattern = RE_CONFIG_CUSTOM_LOG_DIRECTIVE + set_to_update = self.access_paths + + if not (match := pattern.match(line)): + self.target.log.warning("Unexpected Apache 'ErrorLog' or 'CustomLog' configuration in %s: %r", path, line) + return + + location = match.groupdict().get("location") + custom_log = self.target.fs.path(location) + + if env_var_match := RE_ENV_VAR_IN_STRING.match(location): + env_var = env_var_match.groupdict()["env_var"] + if (apache_log_dir := self.env_vars.get(env_var)) is None: + self.target.log.warning("%s does not exist, cannot resolve '%s' in %s", env_var, custom_log, path) + return + + custom_log = self.target.fs.path(location.replace(f"${{{env_var}}}", apache_log_dir.replace("$SUFFIX", ""))) + + set_to_update.update(path for path in custom_log.parent.glob(f"*{custom_log.name}*")) - return sorted(access_log_paths), sorted(error_log_paths) + @cached_property + def env_vars(self) -> dict[str, str]: + variables = {} + for envvar_file in self.DEFAULT_ENVVAR_PATHS: + if (file := self.target.fs.path(envvar_file)).exists(): + for line in file.read_text().splitlines(): + key, _, value = line.strip().partition("=") + if key: + variables[key.replace("export ", "")] = value.strip("\"'") + return variables - @plugin.export(record=WebserverAccessLogRecord) + @export(record=WebserverAccessLogRecord) def access(self) -> Iterator[WebserverAccessLogRecord]: """Return contents of Apache access log files in unified ``WebserverAccessLogRecord`` format.""" - for line, path in self._iterate_log_lines(self.access_log_paths): + for line, path in self._iterate_log_lines(self.access_paths): try: logformat = self.infer_access_log_format(line) if not logformat: @@ -285,10 +399,10 @@ def access(self) -> Iterator[WebserverAccessLogRecord]: self.target.log.warning("An error occured parsing Apache log file %s: %s", path, str(e)) self.target.log.debug("", exc_info=e) - @plugin.export(record=WebserverErrorLogRecord) + @export(record=WebserverErrorLogRecord) def error(self) -> Iterator[WebserverErrorLogRecord]: """Return contents of Apache error log files in unified ``WebserverErrorLogRecord`` format.""" - for line, path in self._iterate_log_lines(self.error_log_paths): + for line, path in self._iterate_log_lines(self.error_paths): try: match = LOG_FORMAT_ERROR_COMMON.pattern.match(line) if not match: @@ -343,7 +457,7 @@ def _iterate_log_lines(self, paths: list[Path]) -> Iterator[tuple[str, Path]]: self.target.log.warning("Apache log file configured but could not be found (dead symlink?): %s", path) @staticmethod - def infer_access_log_format(line: str) -> Optional[LogFormat]: + def infer_access_log_format(line: str) -> LogFormat | None: """Attempt to infer what standard LogFormat is used. Returns None if no known format can be inferred. Three default log type examples from Apache (note that the ipv4 could also be ipv6) diff --git a/dissect/target/plugins/apps/webserver/citrix.py b/dissect/target/plugins/apps/webserver/citrix.py index 328a0f2a1e..55f27b6570 100644 --- a/dissect/target/plugins/apps/webserver/citrix.py +++ b/dissect/target/plugins/apps/webserver/citrix.py @@ -31,21 +31,21 @@ "combined_resptime_with_citrix_hdrs", re.compile( rf""" - (?P.*?) # Client IP address of the request. + (?P.*?) # Client IP address of the request. \s -> \s - (?P.*?) # Local IP of the Netscaler. + (?P.*?) # Local IP of the Netscaler. \s - (?P.*?) # Remote logname (from identd, if supplied). + (?P.*?) # Remote logname (from identd, if supplied). \s - (?P.*?) # Remote user if the request was authenticated. + (?P.*?) # Remote user if the request was authenticated. \s - {RE_ACCESS_COMMON_PATTERN} # Timestamp, pid, method, uri, protocol, status code, bytes_sent + {RE_ACCESS_COMMON_PATTERN} # Timestamp, pid, method, uri, protocol, status code, bytes_sent \s - {RE_REFERER_USER_AGENT_PATTERN} # Referer, user_agent + {RE_REFERER_USER_AGENT_PATTERN} # Referer, user_agent \s - {RE_RESPONSE_TIME_PATTERN} # Response time + {RE_RESPONSE_TIME_PATTERN} # Response time """, re.VERBOSE, ), diff --git a/tests/conftest.py b/tests/conftest.py index ce95a1a9d8..0fab347419 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -491,3 +491,25 @@ def target_linux_docker(tmp_path: pathlib.Path, fs_docker: TarFilesystem) -> Ite mock_target.fs.mount("/", fs_docker) mock_target.apply() yield mock_target + + +class TargetUnixFactory: + def __init__(self, tmp_path: pathlib.Path): + self.tmp_path = tmp_path + + def new(self, hostname: str = "hostname") -> tuple[Target, VirtualFilesystem]: + """Initialize a virtual unix target.""" + fs = VirtualFilesystem() + + fs.makedirs("var") + fs.makedirs("etc") + fs.map_file_fh("/etc/hostname", BytesIO(hostname.encode())) + + return make_os_target(self.tmp_path, UnixPlugin, root_fs=fs), fs + + +@pytest.fixture +def target_unix_factory(tmp_path: pathlib.Path) -> TargetUnixFactory: + """This fixture returns a class that can instantiate a virtual unix targets from a blueprint. This can then be used + to create a fixture for the source target and the desination target, without them 'bleeding' into each other.""" + return TargetUnixFactory(tmp_path) diff --git a/tests/plugins/apps/webserver/test_apache.py b/tests/plugins/apps/webserver/test_apache.py index 825c2c5399..4a4a2ea214 100644 --- a/tests/plugins/apps/webserver/test_apache.py +++ b/tests/plugins/apps/webserver/test_apache.py @@ -158,23 +158,30 @@ def test_logrotate(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file("var/log/apache2/access.log.2", data_file) fs_unix.map_file("var/log/apache2/access.log.3", data_file) - access_log_paths, error_log_paths = ApachePlugin(target_unix).get_log_paths() + target_unix.add_plugin(ApachePlugin) - assert len(access_log_paths) == 4 - assert len(error_log_paths) == 0 + assert len(target_unix.apache.access_paths) == 4 + assert len(target_unix.apache.error_paths) == 0 def test_custom_config(target_unix: Target, fs_unix: VirtualFilesystem) -> None: - fs_unix.map_file_fh("etc/apache2/apache2.conf", BytesIO(b'CustomLog "/custom/log/location/access.log" common')) - fs_unix.map_file_fh("custom/log/location/access.log", BytesIO(b"Foo")) - fs_unix.map_file_fh("custom/log/location/access.log.1", BytesIO(b"Foo1")) - fs_unix.map_file_fh("custom/log/location/access.log.2", BytesIO(b"Foo2")) - fs_unix.map_file_fh("custom/log/location/access.log.3", BytesIO(b"Foo3")) + fs_unix.map_file_fh( + "/etc/apache2/apache2.conf", BytesIO(b'CustomLog "/very/custom/log/location/access.log" common') + ) + fs_unix.map_file_fh("/very/custom/log/location/access.log", BytesIO(b"Foo")) + fs_unix.map_file_fh("/very/custom/log/location/access.log.1", BytesIO(b"Foo1")) + fs_unix.map_file_fh("/very/custom/log/location/access.log.2", BytesIO(b"Foo2")) + fs_unix.map_file_fh("/very/custom/log/location/access.log.3", BytesIO(b"Foo3")) - access_log_paths, error_log_paths = ApachePlugin(target_unix).get_log_paths() + target_unix.add_plugin(ApachePlugin) - assert len(access_log_paths) == 4 - assert len(error_log_paths) == 0 + assert sorted(list(map(str, target_unix.apache.access_paths))) == [ + "/very/custom/log/location/access.log", + "/very/custom/log/location/access.log.1", + "/very/custom/log/location/access.log.2", + "/very/custom/log/location/access.log.3", + ] + assert len(target_unix.apache.error_paths) == 0 def test_config_commented_logs(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -183,22 +190,125 @@ def test_config_commented_logs(target_unix: Target, fs_unix: VirtualFilesystem) CustomLog "/custom/log/location/new.log" common # ErrorLog "/custom/log/location//old_error.log" ErrorLog "/custom/log/location//new_error.log" - """ + fs_unix.map_file_fh("etc/httpd/conf/httpd.conf", BytesIO(textwrap.dedent(config).encode())) fs_unix.map_file_fh("custom/log/location/new.log", BytesIO(b"New")) fs_unix.map_file_fh("custom/log/location/old.log", BytesIO(b"Old")) fs_unix.map_file_fh("custom/log/location/old_error.log", BytesIO(b"Old")) fs_unix.map_file_fh("custom/log/location/new_error.log", BytesIO(b"New")) - access_log_paths, error_log_paths = ApachePlugin(target_unix).get_log_paths() + target_unix.add_plugin(ApachePlugin) + + assert sorted(list(map(str, target_unix.apache.access_paths))) == [ + "/custom/log/location/new.log", + "/custom/log/location/old.log", + ] + + assert sorted(list(map(str, target_unix.apache.error_paths))) == [ + "/custom/log/location/new_error.log", + "/custom/log/location/old_error.log", + ] + + +def test_config_vhosts_httpd(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test if we detect httpd CustomLog and ErrorLog directives using IncludeOptional configuration.""" + config = """ + ServerRoot "/etc/httpd" + IncludeOptional conf/vhosts/*/*.conf + """ + + # '/etc/httpd/conf/vhosts/*/*.conf' + vhost_config_1 = """ + CustomLog "/custom/log/location/vhost_1.log" common + ErrorLog "/custom/log/location/vhost_1.error" + """ + vhost_config_2 = """ + CustomLog "/custom/log/location/vhost_2.log" combined + ErrorLog "/custom/log/location/vhost_2.error" + """ + fs_unix.map_file_fh("etc/httpd/conf/httpd.conf", BytesIO(textwrap.dedent(config).encode())) + fs_unix.map_file_fh("etc/httpd/conf/vhosts/host1/host1.conf", BytesIO(textwrap.dedent(vhost_config_1).encode())) + fs_unix.map_file_fh("etc/httpd/conf/vhosts/host2/host2.conf", BytesIO(textwrap.dedent(vhost_config_2).encode())) + fs_unix.map_file_fh("custom/log/location/vhost_1.log", BytesIO(b"Log 1")) + fs_unix.map_file_fh("custom/log/location/vhost_2.log", BytesIO(b"Log 2")) + fs_unix.map_file_fh("custom/log/location/vhost_1.error", BytesIO(b"Err 1")) + fs_unix.map_file_fh("custom/log/location/vhost_2.error", BytesIO(b"Err 2")) + + target_unix.add_plugin(ApachePlugin) + + assert sorted(list(map(str, target_unix.apache.access_paths))) == [ + "/custom/log/location/vhost_1.log", + "/custom/log/location/vhost_2.log", + ] + assert sorted(list(map(str, target_unix.apache.error_paths))) == [ + "/custom/log/location/vhost_1.error", + "/custom/log/location/vhost_2.error", + ] + + +def test_config_vhosts_apache2(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test if we detect apache2 CustomLog and ErrorLog directives using IncludeOptional configuration.""" + config = r""" + ServerRoot "/etc/apache2" + ErrorLog ${APACHE_LOG_DIR}/error.log + Include example.conf + IncludeOptional conf-enabled/*.conf + IncludeOptional sites-enabled/*.conf + """ + fs_unix.map_file_fh("/etc/apache2/apache2.conf", BytesIO(textwrap.dedent(config).encode())) + fs_unix.map_file_fh("/path/to/apache/logs/error.log", BytesIO(b"")) + + envvars = r""" + export APACHE_LOG_DIR="/path/to/apache/logs" + """ + fs_unix.map_file_fh("/etc/apache2/envvars", BytesIO(textwrap.dedent(envvars).encode())) - # Log paths are returned in alphabetical order - assert str(access_log_paths[0]) == "/custom/log/location/new.log" - assert str(access_log_paths[1]) == "/custom/log/location/old.log" + enabled_conf = r""" + CustomLog ${APACHE_LOG_DIR}/example_access.log custom + """ + fs_unix.map_file_fh("/etc/apache2/conf-enabled/example.conf", BytesIO(textwrap.dedent(enabled_conf).encode())) + fs_unix.map_file_fh("/path/to/apache/logs/example_access.log.1.gz", BytesIO(b"")) + + site_conf = """ + + ServerName example.com + DocumentRoot /var/www/html + ErrorLog /path/to/virtualhost/log/error.log + CustomLog /path/to/virtualhost/log/access.log custom + + """ + fs_unix.map_file_fh("/etc/apache2/sites-enabled/example.conf", BytesIO(textwrap.dedent(site_conf).encode())) + fs_unix.map_file_fh("/path/to/virtualhost/log/error.log.1", BytesIO(b"")) + fs_unix.map_file_fh("/path/to/virtualhost/log/access.log.1", BytesIO(b"")) + + disabled_conf = """ + CustomLog /path/to/disabled/access.log custom + """ + fs_unix.map_file_fh( + "/etc/apache2/sites-available/disabled-site.conf", BytesIO(textwrap.dedent(disabled_conf).encode()) + ) + fs_unix.map_file_fh("/path/to/disabled/access.log.2", BytesIO(b"")) + + fs_unix.map_file_fh("/var/log/apache2/some-other-vhost-old-log.access.log", BytesIO(b"")) + fs_unix.map_file_fh("/var/log/apache2/access.log", BytesIO(b"")) + fs_unix.map_file_fh("/var/log/apache2/error.log", BytesIO(b"")) + + target_unix.add_plugin(ApachePlugin) - assert str(error_log_paths[0]) == "/custom/log/location/new_error.log" - assert str(error_log_paths[1]) == "/custom/log/location/old_error.log" + assert sorted(list(map(str, target_unix.apache.access_paths))) == [ + "/path/to/apache/logs/example_access.log.1.gz", + "/path/to/disabled/access.log.2", + "/path/to/virtualhost/log/access.log.1", + "/var/log/apache2/access.log", + "/var/log/apache2/some-other-vhost-old-log.access.log", + ] + + assert sorted(list(map(str, target_unix.apache.error_paths))) == [ + "/path/to/apache/logs/error.log", + "/path/to/virtualhost/log/error.log.1", + "/var/log/apache2/error.log", + ] def test_error_txt(target_unix: Target, fs_unix: VirtualFilesystem) -> None: diff --git a/tests/plugins/apps/webserver/test_citrix.py b/tests/plugins/apps/webserver/test_citrix.py index 32f8567421..12bf792203 100644 --- a/tests/plugins/apps/webserver/test_citrix.py +++ b/tests/plugins/apps/webserver/test_citrix.py @@ -1,6 +1,9 @@ from datetime import datetime, timedelta, timezone from io import BytesIO +import pytest + +from dissect.target.exceptions import UnsupportedPluginError from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.apps.webserver.apache import ApachePlugin from dissect.target.plugins.apps.webserver.citrix import ( @@ -75,16 +78,18 @@ def test_error_logs(target_citrix: Target, fs_bsd: VirtualFilesystem) -> None: fs_bsd.map_file("var/log/httperror-vpn.log", BytesIO(b"Foo")) fs_bsd.map_file("var/log/httperror.log", BytesIO(b"Bar")) - access_log_paths, error_log_paths = CitrixWebserverPlugin(target_citrix).get_log_paths() + target_citrix.add_plugin(CitrixWebserverPlugin) - assert len(error_log_paths) == 2 + assert len(target_citrix.citrix.error_paths) == 2 def test_access_logs_webserver_namespace(target_citrix: Target, fs_bsd: VirtualFilesystem) -> None: data_file = absolute_path("_data/plugins/apps/webserver/citrix/httpaccess.log") fs_bsd.map_file("var/log/httpaccess.log", data_file) - target_citrix.add_plugin(ApachePlugin, check_compatible=False) + with pytest.raises(UnsupportedPluginError, match="Use the 'apps.webserver.citrix' apache plugin instead"): + target_citrix.add_plugin(ApachePlugin) + target_citrix.add_plugin(CitrixWebserverPlugin) target_citrix.add_plugin(WebserverPlugin) diff --git a/tests/tools/test_diff.py b/tests/tools/test_diff.py index 329014271f..9169c01558 100644 --- a/tests/tools/test_diff.py +++ b/tests/tools/test_diff.py @@ -2,14 +2,11 @@ import textwrap from io import BytesIO, StringIO -from pathlib import Path from typing import Iterator import pytest -from dissect.target.filesystem import VirtualFilesystem from dissect.target.helpers.fsutil import stat_result -from dissect.target.plugins.os.unix._os import UnixPlugin from dissect.target.target import Target from dissect.target.tools import fsutils from dissect.target.tools.diff import ( @@ -21,7 +18,7 @@ ) from dissect.target.tools.diff import main as target_diff from tests._utils import absolute_path -from tests.conftest import make_os_target +from tests.conftest import TargetUnixFactory PASSWD_CONTENTS = """ root:x:0:0:root:/root:/bin/bash @@ -29,28 +26,6 @@ """ -class TargetUnixFactory: - def __init__(self, tmp_path: Path): - self.tmp_path = tmp_path - - def new(self, hostname: str) -> tuple[Target, VirtualFilesystem]: - """Initialize a virtual unix target.""" - fs = VirtualFilesystem() - - fs.makedirs("var") - fs.makedirs("etc") - fs.map_file_fh("/etc/hostname", BytesIO(hostname.encode())) - - return make_os_target(self.tmp_path, UnixPlugin, root_fs=fs), fs - - -@pytest.fixture -def target_unix_factory(tmp_path: Path) -> TargetUnixFactory: - """This fixture returns a class that can instantiate a virtual unix targets from a blueprint. This can then be used - to create a fixture for the source target and the desination target, without them 'bleeding' into each other.""" - return TargetUnixFactory(tmp_path) - - @pytest.fixture def src_target(target_unix_factory: TargetUnixFactory) -> Iterator[Target]: target, fs_unix = target_unix_factory.new("src_target")