Skip to content
121 changes: 71 additions & 50 deletions dissect/target/helpers/network_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import defaultdict
from configparser import ConfigParser, MissingSectionHeaderError
from io import StringIO
from itertools import chain
from re import compile, sub
from typing import Any, Callable, Iterable, Match, Optional, Union

Expand Down Expand Up @@ -279,7 +280,7 @@ def translate(self, value: Any, option: str) -> str:
if option in translation_values and value:
return translation_key

def _get_option(self, config: dict, option: str, section: Optional[str] = None) -> Union[str, Callable]:
def _get_option(self, config: dict, option: str, section: Optional[str] = None) -> Union[str, Callable, None]:
"""Internal function to get arbitrary options values from a parsed (non-translated) dictionary.

Args:
Expand All @@ -290,8 +291,13 @@ def _get_option(self, config: dict, option: str, section: Optional[str] = None)
Returns:
Value(s) corrensponding to that network configuration option.
"""

if not config:
return None

if section:
config = config[section]
config = config.get(section, {}) or {}

for key, value in config.items():
if key == option:
return value
Expand Down Expand Up @@ -499,61 +505,76 @@ def get_config_value(self, attr: str) -> list[set]:


def parse_unix_dhcp_log_messages(target) -> list[str]:
"""Parse local syslog and cloud init log files for DHCP lease IPs.
"""Parse local syslog, journal and cloud init log files for DHCP lease IPs.

Args:
target: Target to discover and obtain network information from.

Returns:
List of DHCP ip addresses.
"""
ips = []

# Search through parsed syslogs for DHCP leases.
try:
messages = target.messages()
for record in messages:
line = record.message

# Ubuntu DHCP
if ("DHCPv4" in line or "DHCPv6" in line) and " address " in line and " via " in line:
ip = line.split(" address ")[1].split(" via ")[0].strip().split("/")[0]
if ip not in ips:
ips.append(ip)

# Ubuntu DHCP NetworkManager
elif "option ip_address" in line and ("dhcp4" in line or "dhcp6" in line) and "=> '" in line:
ip = line.split("=> '")[1].replace("'", "").strip()
if ip not in ips:
ips.append(ip)

# Debian and CentOS dhclient
elif record.daemon == "dhclient" and "bound to" in line:
ip = line.split("bound to")[1].split(" ")[1].strip()
if ip not in ips:
ips.append(ip)

# CentOS DHCP and general NetworkManager
elif " address " in line and ("dhcp4" in line or "dhcp6" in line):
ip = line.split(" address ")[1].strip()
if ip not in ips:
ips.append(ip)

except PluginError:
target.log.debug("Can not search for DHCP leases in syslog files as they does not exist.")

# A unix system might be provisioned using Ubuntu's cloud-init (https://cloud-init.io/).
if (path := target.fs.path("/var/log/cloud-init.log")).exists():
for line in path.open("rt"):
# We are interested in the following log line:
# YYYY-MM-DD HH:MM:SS,000 - dhcp.py[DEBUG]: Received dhcp lease on IFACE for IP/MASK
if "Received dhcp lease on" in line:
interface, ip, netmask = re.search(r"Received dhcp lease on (\w{0,}) for (\S+)\/(\S+)", line).groups()
if ip not in ips:
ips.append(ip)

if not path and not messages:
target.log.warning("Can not search for DHCP leases in syslog or cloud-init.log files as they does not exist.")
ips = set()
messages = set(())

for log_func in ["messages", "journal"]:
try:
messages = chain(messages, getattr(target, log_func)())
except PluginError:
target.log.debug(f"Could not search for DHCP leases in {log_func} files.")

for record in messages:
line = record.message

# Ubuntu cloud-init
if "Received dhcp lease on" in line:
interface, ip, netmask = re.search(r"Received dhcp lease on (\w{0,}) for (\S+)\/(\S+)", line).groups()
ips.add(ip)
continue

# Ubuntu DHCP
if ("DHCPv4" in line or "DHCPv6" in line) and " address " in line and " via " in line:
ip = line.split(" address ")[1].split(" via ")[0].strip().split("/")[0]
ips.add(ip)
continue

# Ubuntu DHCP NetworkManager
if "option ip_address" in line and ("dhcp4" in line or "dhcp6" in line) and "=> '" in line:
ip = line.split("=> '")[1].replace("'", "").strip()
ips.add(ip)
continue

# Debian and CentOS dhclient
if hasattr(record, "daemon") and record.daemon == "dhclient" and "bound to" in line:
ip = line.split("bound to")[1].split(" ")[1].strip()
ips.add(ip)
continue

# CentOS DHCP and general NetworkManager
if " address " in line and ("dhcp4" in line or "dhcp6" in line):
ip = line.split(" address ")[1].strip()
ips.add(ip)
continue

# Ubuntu/Debian DHCP networkd (Journal)
if (
hasattr(record, "code_func")
and record.code_func == "dhcp_lease_acquired"
and " address " in line
and " via " in line
):
interface, ip, netmask, gateway = re.search(
r"^(\S+): DHCPv[4|6] address (\S+)\/(\S+) via (\S+)", line
).groups()
ips.add(ip)
continue

# Journals and syslogs can be large and slow to iterate,
# so we stop if we have some results and have reached the journal plugin.
if len(ips) >= 2 and record._desc.name == "linux/log/journal":
break

if not messages:
target.log.warning("Could not search for DHCP leases: No log files found.")

return ips

Expand Down
63 changes: 56 additions & 7 deletions dissect/target/plugins/os/unix/log/messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import re
from itertools import chain
from pathlib import Path
from typing import Iterator

from dissect.target import Target
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.helpers.utils import year_rollover_helper
Expand All @@ -23,13 +24,26 @@
RE_DAEMON = re.compile(r"^[^:]+:\d+:\d+[^\[\]:]+\s([^\[:]+)[\[|:]{1}")
RE_PID = re.compile(r"\w\[(\d+)\]")
RE_MSG = re.compile(r"[^:]+:\d+:\d+[^:]+:\s(.*)$")
RE_CLOUD_INIT_LINE = re.compile(r"(?P<ts>.*) - (?P<daemon>.*)\[(?P<log_level>\w+)\]\: (?P<message>.*)$")


class MessagesPlugin(Plugin):
def __init__(self, target: Target):
super().__init__(target)
self.log_files = set(self._find_log_files())

def _find_log_files(self) -> Iterator[Path]:
log_dirs = ["/var/log/", "/var/log/installer/"]
file_globs = ["syslog*", "messages*", "cloud-init.log*"]
for log_dir in log_dirs:
log_dir = self.target.fs.path(log_dir)
for file_glob in file_globs:
for file in log_dir.glob(file_glob):
yield file

def check_compatible(self) -> None:
var_log = self.target.fs.path("/var/log")
if not any(var_log.glob("syslog*")) and not any(var_log.glob("messages*")):
raise UnsupportedPluginError("No message files found")
if not any(self.log_files):
raise UnsupportedPluginError("No log files found")

@export(record=MessagesRecord)
def syslog(self) -> Iterator[MessagesRecord]:
Expand All @@ -41,7 +55,7 @@ def syslog(self) -> Iterator[MessagesRecord]:

@export(record=MessagesRecord)
def messages(self) -> Iterator[MessagesRecord]:
"""Return contents of /var/log/messages* and /var/log/syslog*.
"""Return contents of /var/log/messages*, /var/log/syslog* and cloud-init logs.

Note: due to year rollover detection, the contents of the files are returned in reverse.

Expand All @@ -56,8 +70,11 @@ def messages(self) -> Iterator[MessagesRecord]:

tzinfo = self.target.datetime.tzinfo

var_log = self.target.fs.path("/var/log")
for log_file in chain(var_log.glob("syslog*"), var_log.glob("messages*")):
for log_file in self.log_files:
if "cloud-init.log" in log_file.name:
yield from self._parse_cloud_init_log(log_file)
continue

for ts, line in year_rollover_helper(log_file, RE_TS, DEFAULT_TS_LOG_FORMAT, tzinfo):
daemon = dict(enumerate(RE_DAEMON.findall(line))).get(0)
pid = dict(enumerate(RE_PID.findall(line))).get(0)
Expand All @@ -71,3 +88,35 @@ def messages(self) -> Iterator[MessagesRecord]:
source=log_file,
_target=self.target,
)

def _parse_cloud_init_log(self, log_file: Path) -> Iterator[MessagesRecord]:
"""Parse a cloud-init.log file.

Lines are structured in the following format:
``YYYY-MM-DD HH:MM:SS,000 - dhcp.py[DEBUG]: Received dhcp lease on IFACE for IP/MASK``

Args:
``log_file``: path to cloud-init.log file.

Returns: ``MessagesRecord``
"""
for line in log_file.open("rt").readlines():
line = line.strip()
if not line:
continue

match = RE_CLOUD_INIT_LINE.match(line)
if not match:
self.target.log.warning("Could not match cloud-init log line")
self.target.log.debug(f"No match for line '{line}'")
continue

match = match.groupdict()
yield MessagesRecord(
ts=match["ts"].split(",")[0],
daemon=match["daemon"],
pid=None,
message=match["message"],
source=log_file,
_target=self.target,
)
Loading