diff --git a/dissect/target/helpers/network_managers.py b/dissect/target/helpers/network_managers.py index 34d4120c2d..9f776a6734 100644 --- a/dissect/target/helpers/network_managers.py +++ b/dissect/target/helpers/network_managers.py @@ -5,6 +5,7 @@ from collections import defaultdict from configparser import ConfigParser, MissingSectionHeaderError from io import StringIO +from itertools import chain from re import compile, sub from typing import Any, Callable, Iterable, Match, Optional @@ -299,7 +300,8 @@ def _get_option(self, config: dict, option: str, section: Optional[str] = None) return if section: - config = config.get(section, {}) + # account for values of sections which are None + config = config.get(section, {}) or {} for key, value in config.items(): if key == option: @@ -508,7 +510,7 @@ def get_config_value(self, attr: str) -> list[set]: def parse_unix_dhcp_log_messages(target) -> list[str]: - """Parse local syslog and cloud init log files for DHCP lease IPs. + """Parse local syslog, journal and cloud init-log files for DHCP lease IPs. Args: target: Target to discover and obtain network information from. @@ -516,53 +518,68 @@ def parse_unix_dhcp_log_messages(target) -> list[str]: Returns: List of DHCP ip addresses. """ - ips = [] - - # Search through parsed syslogs for DHCP leases. - try: - messages = target.messages() - for record in messages: - line = record.message - - # Ubuntu DHCP - if ("DHCPv4" in line or "DHCPv6" in line) and " address " in line and " via " in line: - ip = line.split(" address ")[1].split(" via ")[0].strip().split("/")[0] - if ip not in ips: - ips.append(ip) - - # Ubuntu DHCP NetworkManager - elif "option ip_address" in line and ("dhcp4" in line or "dhcp6" in line) and "=> '" in line: - ip = line.split("=> '")[1].replace("'", "").strip() - if ip not in ips: - ips.append(ip) - - # Debian and CentOS dhclient - elif record.daemon == "dhclient" and "bound to" in line: - ip = line.split("bound to")[1].split(" ")[1].strip() - if ip not in ips: - ips.append(ip) - - # CentOS DHCP and general NetworkManager - elif " address " in line and ("dhcp4" in line or "dhcp6" in line): - ip = line.split(" address ")[1].strip() - if ip not in ips: - ips.append(ip) - - except PluginError: - target.log.debug("Can not search for DHCP leases in syslog files as they does not exist.") - - # A unix system might be provisioned using Ubuntu's cloud-init (https://cloud-init.io/). - if (path := target.fs.path("/var/log/cloud-init.log")).exists(): - for line in path.open("rt"): - # We are interested in the following log line: - # YYYY-MM-DD HH:MM:SS,000 - dhcp.py[DEBUG]: Received dhcp lease on IFACE for IP/MASK - if "Received dhcp lease on" in line: - interface, ip, netmask = re.search(r"Received dhcp lease on (\w{0,}) for (\S+)\/(\S+)", line).groups() - if ip not in ips: - ips.append(ip) - - if not path and not messages: - target.log.warning("Can not search for DHCP leases in syslog or cloud-init.log files as they does not exist.") + ips = set() + messages = set() + + for log_func in ["messages", "journal"]: + try: + messages = chain(messages, getattr(target, log_func)()) + except PluginError: + target.log.debug(f"Could not search for DHCP leases in {log_func} files.") + + if not messages: + target.log.warning(f"Could not search for DHCP leases using {log_func}: No log entries found.") + + for record in messages: + line = record.message + + # Ubuntu cloud-init + if "Received dhcp lease on" in line: + interface, ip, netmask = re.search(r"Received dhcp lease on (\w{0,}) for (\S+)\/(\S+)", line).groups() + ips.add(ip) + continue + + # Ubuntu DHCP + if ("DHCPv4" in line or "DHCPv6" in line) and " address " in line and " via " in line: + ip = line.split(" address ")[1].split(" via ")[0].strip().split("/")[0] + ips.add(ip) + continue + + # Ubuntu DHCP NetworkManager + if "option ip_address" in line and ("dhcp4" in line or "dhcp6" in line) and "=> '" in line: + ip = line.split("=> '")[1].replace("'", "").strip() + ips.add(ip) + continue + + # Debian and CentOS dhclient + if hasattr(record, "daemon") and record.daemon == "dhclient" and "bound to" in line: + ip = line.split("bound to")[1].split(" ")[1].strip() + ips.add(ip) + continue + + # CentOS DHCP and general NetworkManager + if " address " in line and ("dhcp4" in line or "dhcp6" in line): + ip = line.split(" address ")[1].strip() + ips.add(ip) + continue + + # Ubuntu/Debian DHCP networkd (Journal) + if ( + hasattr(record, "code_func") + and record.code_func == "dhcp_lease_acquired" + and " address " in line + and " via " in line + ): + interface, ip, netmask, gateway = re.search( + r"^(\S+): DHCPv[4|6] address (\S+)\/(\S+) via (\S+)", line + ).groups() + ips.add(ip) + continue + + # Journals and syslogs can be large and slow to iterate, + # so we stop if we have some results and have reached the journal plugin. + if len(ips) >= 2 and record._desc.name == "linux/log/journal": + break return ips diff --git a/dissect/target/plugins/os/unix/log/messages.py b/dissect/target/plugins/os/unix/log/messages.py index ab1e37b468..d739145ddc 100644 --- a/dissect/target/plugins/os/unix/log/messages.py +++ b/dissect/target/plugins/os/unix/log/messages.py @@ -1,7 +1,8 @@ import re -from itertools import chain +from pathlib import Path from typing import Iterator +from dissect.target import Target from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper @@ -23,17 +24,28 @@ RE_DAEMON = re.compile(r"^[^:]+:\d+:\d+[^\[\]:]+\s([^\[:]+)[\[|:]{1}") RE_PID = re.compile(r"\w\[(\d+)\]") RE_MSG = re.compile(r"[^:]+:\d+:\d+[^:]+:\s(.*)$") +RE_CLOUD_INIT_LINE = re.compile(r"(?P.*) - (?P.*)\[(?P\w+)\]\: (?P.*)$") class MessagesPlugin(Plugin): + def __init__(self, target: Target): + super().__init__(target) + self.log_files = set(self._find_log_files()) + + def _find_log_files(self) -> Iterator[Path]: + log_dirs = ["/var/log/", "/var/log/installer/"] + file_globs = ["syslog*", "messages*", "cloud-init.log*"] + for log_dir in log_dirs: + for glob in file_globs: + yield from self.target.fs.path(log_dir).glob(glob) + def check_compatible(self) -> None: - var_log = self.target.fs.path("/var/log") - if not any(var_log.glob("syslog*")) and not any(var_log.glob("messages*")): - raise UnsupportedPluginError("No message files found") + if not self.log_files: + raise UnsupportedPluginError("No log files found") @export(record=MessagesRecord) def syslog(self) -> Iterator[MessagesRecord]: - """Return contents of /var/log/messages* and /var/log/syslog*. + """Return contents of /var/log/messages*, /var/log/syslog* and cloud-init logs. See ``messages`` for more information. """ @@ -41,7 +53,7 @@ def syslog(self) -> Iterator[MessagesRecord]: @export(record=MessagesRecord) def messages(self) -> Iterator[MessagesRecord]: - """Return contents of /var/log/messages* and /var/log/syslog*. + """Return contents of /var/log/messages*, /var/log/syslog* and cloud-init logs. Note: due to year rollover detection, the contents of the files are returned in reverse. @@ -52,12 +64,16 @@ def messages(self) -> Iterator[MessagesRecord]: References: - https://geek-university.com/linux/var-log-messages-file/ - https://www.geeksforgeeks.org/file-timestamps-mtime-ctime-and-atime-in-linux/ + - https://cloudinit.readthedocs.io/en/latest/development/logging.html#logging-command-output """ tzinfo = self.target.datetime.tzinfo - var_log = self.target.fs.path("/var/log") - for log_file in chain(var_log.glob("syslog*"), var_log.glob("messages*")): + for log_file in self.log_files: + if "cloud-init" in log_file.name: + yield from self._parse_cloud_init_log(log_file) + continue + for ts, line in year_rollover_helper(log_file, RE_TS, DEFAULT_TS_LOG_FORMAT, tzinfo): daemon = dict(enumerate(RE_DAEMON.findall(line))).get(0) pid = dict(enumerate(RE_PID.findall(line))).get(0) @@ -71,3 +87,32 @@ def messages(self) -> Iterator[MessagesRecord]: source=log_file, _target=self.target, ) + + def _parse_cloud_init_log(self, log_file: Path) -> Iterator[MessagesRecord]: + """Parse a cloud-init.log file. + + Lines are structured in the following format: + ``YYYY-MM-DD HH:MM:SS,000 - dhcp.py[DEBUG]: Received dhcp lease on IFACE for IP/MASK`` + + NOTE: ``cloud-init-output.log`` files are not supported as they do not contain structured logs. + + Args: + ``log_file``: path to cloud-init.log file. + + Returns: ``MessagesRecord`` + """ + for line in log_file.open("rt").readlines(): + if line := line.strip(): + if match := RE_CLOUD_INIT_LINE.match(line): + match = match.groupdict() + yield MessagesRecord( + ts=match["ts"].split(",")[0], + daemon=match["daemon"], + pid=None, + message=match["message"], + source=log_file, + _target=self.target, + ) + else: + self.target.log.warning("Could not match cloud-init log line") + self.target.log.debug("No match for line '%s'", line) diff --git a/tests/plugins/os/unix/log/test_messages.py b/tests/plugins/os/unix/log/test_messages.py index cce0dfb612..810d1fd300 100644 --- a/tests/plugins/os/unix/log/test_messages.py +++ b/tests/plugins/os/unix/log/test_messages.py @@ -8,16 +8,17 @@ from flow.record.fieldtypes import datetime as dt from dissect.target import Target +from dissect.target.filesystem import VirtualFilesystem from dissect.target.filesystems.tar import TarFilesystem from dissect.target.plugins.general import default from dissect.target.plugins.os.unix.log.messages import MessagesPlugin, MessagesRecord from tests._utils import absolute_path -def test_unix_log_messages_plugin(target_unix_users, fs_unix): +def test_unix_log_messages_plugin(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO(b"Europe/Amsterdam")) - data_file = absolute_path("_data/plugins/os/unix/log//messages/messages") + data_file = absolute_path("_data/plugins/os/unix/log/messages/messages") fs_unix.map_file("var/log/messages", data_file) entry = fs_unix.get("var/log/messages") @@ -46,7 +47,7 @@ def test_unix_log_messages_plugin(target_unix_users, fs_unix): assert isinstance(syslogs[0], type(MessagesRecord())) -def test_unix_log_messages_compressed_timezone_year_rollover(): +def test_unix_log_messages_compressed_timezone_year_rollover() -> None: target = Target() bio = BytesIO() @@ -86,7 +87,7 @@ def test_unix_log_messages_compressed_timezone_year_rollover(): assert results[1].ts == dt(2021, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("America/Chicago")) -def test_unix_log_messages_malformed_log_year_rollover(target_unix_users, fs_unix): +def test_unix_log_messages_malformed_log_year_rollover(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO(b"Europe/Amsterdam")) messages = BytesIO( @@ -105,3 +106,21 @@ def test_unix_log_messages_malformed_log_year_rollover(target_unix_users, fs_uni results = list(target_unix_users.messages()) assert len(results) == 2 + + +def test_unix_messages_cloud_init(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + messages = """ + 2005-08-09 11:55:21,000 - foo.py[DEBUG]: This is a cloud-init message! + 2005-08-09 11:55:21,001 - util.py[DEBUG]: Cloud-init v. 1.2.3-4ubuntu5 running 'init-local' at Tue, 9 Aug 2005 11:55:21 +0000. Up 13.37 seconds. + """ # noqa: E501 + fs_unix.map_file_fh("/var/log/installer/cloud-init.log", BytesIO(textwrap.dedent(messages).encode())) + + target_unix_users.add_plugin(MessagesPlugin) + + results = list(target_unix_users.messages()) + assert len(results) == 2 + assert results[0].ts == dt(2005, 8, 9, 11, 55, 21) + assert results[0].daemon == "foo.py" + assert results[0].pid is None + assert results[0].message == "This is a cloud-init message!" + assert results[0].source == "/var/log/installer/cloud-init.log" diff --git a/tests/plugins/os/unix/test_ips.py b/tests/plugins/os/unix/test_ips.py index 16bc0c0e4f..4c85ea0329 100644 --- a/tests/plugins/os/unix/test_ips.py +++ b/tests/plugins/os/unix/test_ips.py @@ -1,12 +1,16 @@ import textwrap from io import BytesIO +import pytest + +from dissect.target import Target +from dissect.target.filesystem import VirtualFilesystem from dissect.target.helpers.network_managers import NetworkManager from dissect.target.plugins.os.unix.linux._os import LinuxPlugin from tests._utils import absolute_path -def test_ips_dhcp_plugin(target_unix_users, fs_unix): +def test_ips_dhcp(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: """Test DHCP lease messages from /var/log/syslog.""" messages = """ @@ -26,10 +30,11 @@ def test_ips_dhcp_plugin(target_unix_users, fs_unix): target_unix_users.add_plugin(LinuxPlugin) results = target_unix_users.ips results.reverse() - assert results == ["10.13.37.1", "10.13.37.2", "10.13.37.3", "10.13.37.4", "2001:db8::"] + assert len(results) == 5 + assert sorted(results) == ["10.13.37.1", "10.13.37.2", "10.13.37.3", "10.13.37.4", "2001:db8::"] -def test_ips_cloud_init_plugin(target_unix_users, fs_unix): +def test_ips_cloud_init(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: """Test cloud-init dhcp.py lease messages.""" messages = """ @@ -43,10 +48,12 @@ def test_ips_cloud_init_plugin(target_unix_users, fs_unix): target_unix_users.add_plugin(LinuxPlugin) results = target_unix_users.ips + + assert len(results) == 1 assert results == ["10.13.37.5"] -def test_ips_static_plugin(target_unix_users, fs_unix): +def test_ips_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: """Test statically defined ipv4 and ipv6 addresses in /etc/network/interfaces.""" fs_unix.map_file("/etc/network/interfaces", absolute_path("_data/plugins/os/unix/_os/ips/interfaces")) @@ -56,7 +63,7 @@ def test_ips_static_plugin(target_unix_users, fs_unix): assert sorted(results) == sorted(["10.13.37.6", "2001:db8:ffff:ffff:ffff:ffff:ffff:ffff"]) -def test_ips_wicked_static_plugin(target_unix_users, fs_unix): +def test_ips_wicked_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: """Test statically defined ipv4 addresses in /etc/wicked/ifconfig/.""" fs_unix.map_file("/etc/wicked/ifconfig/eth0.xml", absolute_path("_data/plugins/os/unix/_os/ips/eth0.xml")) @@ -66,7 +73,7 @@ def test_ips_wicked_static_plugin(target_unix_users, fs_unix): assert sorted(results) == sorted(["10.13.37.2", "2001:db8:ffff:ffff:ffff:ffff:ffff:fffe"]) -def test_dns_static_plugin(target_unix_users, fs_unix): +def test_dns_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: """Test statically defined ipv4 and ipv6 dns-nameservers in /etc/network/interfaces.""" fs_unix.map_file("/etc/network/interfaces", absolute_path("_data/plugins/os/unix/_os/ips/interfaces")) @@ -76,25 +83,77 @@ def test_dns_static_plugin(target_unix_users, fs_unix): assert results == [{"10.13.37.1", "10.13.37.2", "2001:db8::", "2001:db9::"}] -def test_clean_ips(): +def test_ips_netplan_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + """Test statically defined ipv4 and ipv6 ip addresses in /etc/netplan/*.yaml""" + + config = """ + # This file describes the network interfaces available on your system + # For more information, see netplan(5). + network: + version: 2 + renderer: networkd + ethernets: + enp0s3: + dhcp4: no + addresses: [192.168.1.123/24] + gateway4: 192.168.1.1 + nameservers: + addresses: [1.2.3.4, 5.6.7.8] + """ + + fs_unix.map_file_fh("/etc/netplan/01-netcfg.yaml", BytesIO(textwrap.dedent(config).encode())) + target_unix_users.add_plugin(LinuxPlugin) + assert target_unix_users.ips == ["192.168.1.123"] + + +@pytest.mark.parametrize( + "config, expected_output", + [ + ("", []), + ("network:", []), + ("network:\n ethernets:\n", []), + ("network:\n ethernets:\n eth0:\n", []), + ("network:\n ethernets:\n eth0:\n addresses: []\n", []), + ("network:\n ethernets:\n eth0:\n addresses: [1.2.3.4/24]\n", ["1.2.3.4"]), + ("network:\n ethernets:\n eth0:\n addresses: ['1.2.3.4']\n", ["1.2.3.4"]), + ], +) +def test_ips_netplan_static_invalid( + target_unix_users: Target, fs_unix: VirtualFilesystem, config: str, expected_output: list +) -> None: + fs_unix.map_file_fh("/etc/netplan/02-netcfg.yaml", BytesIO(textwrap.dedent(config).encode())) + target_unix_users.add_plugin(LinuxPlugin) + assert target_unix_users.ips == expected_output + + +def test_ips_netplan_static_empty_regression(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + fs_unix.map_file_fh("/etc/netplan/01-netcfg.yaml", BytesIO(b"")) + target_unix_users.add_plugin(LinuxPlugin) + assert target_unix_users.ips == [] + + +@pytest.mark.parametrize( + "input, expected_output", + [ + # 'invalid' or input that should be filtered + ("0.0.0.0", set()), + ("127.0.0.1", set()), + ("127.0.0.1/8", set()), + ("0.0.0.0/24", set()), + ("::1", set()), + ("::", set()), + ("0:0:0:0:0:0:0:1", set()), + # valid input + ("::ffff:192.0.2.128", {"::ffff:192.0.2.128"}), + ("2001:db8::2:1", {"2001:db8::2:1"}), + ("10.13.37.1", {"10.13.37.1"}), + ("10.13.37.2/24", {"10.13.37.2"}), + (" 10.13.37.3 ", {"10.13.37.3"}), + ("2001:db8::", {"2001:db8::"}), + ("2001:db8:ffff:ffff:ffff:ffff:ffff:ffff", {"2001:db8:ffff:ffff:ffff:ffff:ffff:ffff"}), + ], +) +def test_clean_ips(input: str, expected_output: set) -> None: """Test the cleaning of dirty ip addresses.""" - ips = { - "0.0.0.0": set(), - "127.0.0.1": set(), - "127.0.0.1/8": set(), - "0.0.0.0/24": set(), - "::1": set(), - "::": set(), - "0:0:0:0:0:0:0:1": set(), - "::ffff:192.0.2.128": {"::ffff:192.0.2.128"}, - "2001:db8::2:1": {"2001:db8::2:1"}, - "10.13.37.1": {"10.13.37.1"}, - "10.13.37.2/24": {"10.13.37.2"}, - " 10.13.37.3 ": {"10.13.37.3"}, - "2001:db8::": {"2001:db8::"}, - "2001:db8:ffff:ffff:ffff:ffff:ffff:ffff": {"2001:db8:ffff:ffff:ffff:ffff:ffff:ffff"}, - } - - for input_ip, expected_ip in ips.items(): - assert NetworkManager.clean_ips({input_ip}) == expected_ip + assert NetworkManager.clean_ips({input}) == expected_output