diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index 9e1d8c166d..17ffc3d95d 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -1168,10 +1168,10 @@ def makedirs(self, path: str) -> VirtualDirectory: return directory - def map_fs(self, vfspath: str, fs: Filesystem) -> None: + def map_fs(self, vfspath: str, fs: Filesystem, base: str = "/") -> None: """Mount a dissect filesystem to a directory in the VFS""" directory = self.makedirs(vfspath) - directory.top = fs.get("/") + directory.top = fs.get(base) mount = map_fs diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 0ae5788c1d..977be853f1 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -9,6 +9,7 @@ from configparser import ConfigParser, MissingSectionHeaderError from dataclasses import dataclass from fnmatch import fnmatch +from pathlib import Path from types import TracebackType from typing import ( Any, @@ -18,9 +19,7 @@ Iterator, KeysView, Literal, - Optional, TextIO, - Union, ) from defusedxml import ElementTree @@ -107,7 +106,7 @@ class ConfigurationParser: def __init__( self, - collapse: Union[bool, Iterable[str]] = False, + collapse: bool | Iterable[str] = False, collapse_inverse: bool = False, separator: tuple[str] = ("=",), comment_prefixes: tuple[str] = (";", "#"), @@ -120,7 +119,7 @@ def __init__( self.comment_prefixes = comment_prefixes self.parsed_data = {} - def __getitem__(self, item: Any) -> Union[dict, str]: + def __getitem__(self, item: Any) -> dict | str: return self.parsed_data[item] def __contains__(self, item: str) -> bool: @@ -157,7 +156,7 @@ def parse_file(self, fh: TextIO) -> None: """ raise NotImplementedError() - def get(self, item: str, default: Optional[Any] = None) -> Any: + def get(self, item: str, default: Any | None = None) -> Any: return self.parsed_data.get(item, default) def read_file(self, fh: TextIO | io.BytesIO) -> None: @@ -388,7 +387,7 @@ class ListUnwrapper: """Provides utility functions to unwrap dictionary objects out of lists.""" @staticmethod - def unwrap(data: Union[dict, list]) -> Union[dict, list]: + def unwrap(data: dict | list) -> dict | list: """Transforms a list with dictionaries to a dictionary. The order of the list is preserved. If no dictionary is found, the list remains untouched: @@ -409,7 +408,7 @@ def unwrap(data: Union[dict, list]) -> Union[dict, list]: return ListUnwrapper._unwrap_dict(orig) @staticmethod - def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]: + def _unwrap_dict(data: dict | list) -> dict | list: """Looks for dictionaries and unwraps its values.""" if not isinstance(data, dict): @@ -425,7 +424,7 @@ def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]: return root @staticmethod - def _unwrap_dict_list(data: Union[dict, list]) -> Union[dict, list]: + def _unwrap_dict_list(data: dict | list) -> dict | list: """Unwraps a list containing dictionaries.""" if not isinstance(data, list) or not any(isinstance(obj, dict) for obj in data): return data @@ -559,9 +558,9 @@ def __enter__(self) -> ScopeManager: def __exit__( self, - type: Optional[type[BaseException]], - value: Optional[BaseException], - traceback: Optional[TracebackType], + type: type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, ) -> None: self.clean() @@ -636,7 +635,7 @@ def _change_scope( manager: ScopeManager, line: str, key: str, - next_line: Optional[str] = None, + next_line: str | None = None, ) -> bool: """A function to check whether to create a new scope, or go back to a previous one. @@ -722,7 +721,7 @@ def _change_scope( manager: ScopeManager, line: str, key: str, - next_line: Optional[str] = None, + next_line: str | None = None, ) -> bool: scope_char = ("[", "]") changed = False @@ -785,22 +784,22 @@ def _update_continued_values(self, func: Callable, key, values: list[str]) -> tu @dataclass(frozen=True) class ParserOptions: - collapse: Optional[Union[bool, set]] = None - collapse_inverse: Optional[bool] = None - separator: Optional[tuple[str]] = None - comment_prefixes: Optional[tuple[str]] = None + collapse: bool | set | None = None + collapse_inverse: bool | None = None + separator: tuple[str] | None = None + comment_prefixes: tuple[str] | None = None @dataclass(frozen=True) class ParserConfig: parser: type[ConfigurationParser] = Default - collapse: Optional[Union[bool, set]] = None - collapse_inverse: Optional[bool] = None - separator: Optional[tuple[str]] = None - comment_prefixes: Optional[tuple[str]] = None - fields: Optional[tuple[str]] = None + collapse: bool | set | None = None + collapse_inverse: bool | None = None + separator: tuple[str] | None = None + comment_prefixes: tuple[str] | None = None + fields: tuple[str] | None = None - def create_parser(self, options: Optional[ParserOptions] = None) -> ConfigurationParser: + def create_parser(self, options: ParserOptions | None = None) -> ConfigurationParser: kwargs = {} for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "fields"]: @@ -890,7 +889,7 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio } -def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigurationParser: +def parse(path: FilesystemEntry | TargetPath | Path, hint: str | None = None, *args, **kwargs) -> ConfigurationParser: """Parses the content of an ``path`` or ``entry`` to a dictionary. Args: @@ -909,7 +908,7 @@ def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, if isinstance(path, TargetPath): entry = path.get() - if not entry.is_file(follow_symlinks=True): + if not isinstance(entry, Path) and not entry.is_file(follow_symlinks=True): raise FileNotFoundError(f"Could not parse {path} as a dictionary.") options = ParserOptions(*args, **kwargs) @@ -918,14 +917,14 @@ def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, def parse_config( - entry: FilesystemEntry, - hint: Optional[str] = None, - options: Optional[ParserOptions] = None, + entry: FilesystemEntry | Path, + hint: str | None = None, + options: ParserOptions | None = None, ) -> ConfigurationParser: parser_type = _select_parser(entry, hint) parser = parser_type.create_parser(options) - with entry.open() as fh: + with entry.open("rb") if isinstance(entry, Path) else entry.open() as fh: if not isinstance(parser, Bin): open_file = io.TextIOWrapper(fh, encoding="utf-8") else: @@ -935,7 +934,7 @@ def parse_config( return parser -def _select_parser(entry: FilesystemEntry, hint: Optional[str] = None) -> ParserConfig: +def _select_parser(entry: FilesystemEntry, hint: str | None = None) -> ParserConfig: if hint and (parser_type := CONFIG_MAP.get(hint)): return parser_type diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 2a23684e9e..5d7d1b9801 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -219,6 +219,7 @@ def open(item: str | Path, *args, **kwargs) -> Loader: register("tanium", "TaniumLoader") register("itunes", "ITunesLoader") register("ab", "AndroidBackupLoader") +register("cellebrite", "CellebriteLoader") register("target", "TargetLoader") register("log", "LogLoader") # Disabling ResLoader because of DIS-536 diff --git a/dissect/target/loaders/cellebrite.py b/dissect/target/loaders/cellebrite.py new file mode 100644 index 0000000000..254db51199 --- /dev/null +++ b/dissect/target/loaders/cellebrite.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from pathlib import Path +from uuid import UUID + +from defusedxml import ElementTree as ET + +from dissect.target.filesystem import LayerFilesystem +from dissect.target.filesystems.zip import ZipFilesystem +from dissect.target.helpers import configutil +from dissect.target.loader import Loader +from dissect.target.target import Target + +log = logging.getLogger(__name__) + + +@dataclass +class Extraction: + type: str | None + path: Path + + +@dataclass +class DeviceInfo: + vendor: str + model: str + fguid: UUID | None = None + guid: UUID | None = None + os: str | None = None + + +@dataclass +class Ufdx: + path: Path | None = None + evidence: UUID | None = None + device: DeviceInfo | None = None + extractions: list[Extraction] | None = None + + +@dataclass +class Dump: + type: str + path: Path + + +@dataclass +class Keychain: + type: str + path: Path + + +@dataclass +class Ufd: + path: Path + device: DeviceInfo + dumps: list[Dump] | None = None + + +class CellebriteLoader(Loader): + """Load Cellebrite UFED exports (``.ufdx`` and ``.ufd``). + + References: + - https://corp.digitalcorpora.org/corpora/mobile + """ + + def __init__(self, path: Path, **kwargs): + super().__init__(path) + + self.ufdx = None + self.ufd = [] + self.ffs = None + + # Parse xml to find .ufd path + if path.suffix == ".ufdx": + try: + tree = ET.fromstring(path.read_text()) + + self.ufdx = Ufdx( + path=path, + evidence=tree.get("EvidenceID"), + device=DeviceInfo(**{k.lower(): v for k, v in tree.find("DeviceInfo").attrib.items()}), + extractions=[], + ) + + base_path = path.resolve().parent + for extraction in tree.findall("Extractions/Extraction"): + self.ufdx.extractions.append( + Extraction( + type=extraction.get("TransferType"), + path=base_path.joinpath(extraction.get("Path").replace("\\", "/")), + ) + ) + + except ET.ParseError as e: + raise ValueError(f"Invalid XML in {path}: {str(e)}") + + elif path.suffix == ".ufd": + self.ufdx = Ufdx(extractions=[Extraction(type=None, path=path.resolve())]) + + else: + raise ValueError(f"Unknown suffix {path.suffix} for {path}") + + # Not implemented: parse ufd ini to find ffs, could replace ``Extraction()`` with ``Ufd()``. + + for extraction in self.ufdx.extractions: + if not extraction.path.is_file(): + log.warning("Extraction %s does not exist", extraction.path) + continue + + config = configutil.parse(extraction.path, hint="ini") + device = {k.lower(): v for k, v in config.get("DeviceInfo").items()} + device["model"] += f" ({device['devicemodel']})" + del device["devicemodel"] + + ufd = Ufd( + path=extraction.path, + device=DeviceInfo(**device), + dumps=[], + ) + + for type, dump in config.get("Dumps").items(): + dump_path = ufd.path.resolve().parent.joinpath(dump) + ufd.dumps.append(Dump(type=type, path=dump_path)) + + self.ufd.append(ufd) + + @staticmethod + def detect(path: Path) -> bool: + return path.is_file() and path.suffix in [".ufdx", ".ufd"] + + def map(self, target: Target) -> None: + for ufd in self.ufd: + for dump in ufd.dumps: + # Keychain dumps are inside the same FFS zip file, so we let the CellebriteFilesystem handle mounting + # that so we prevent an extra file handle being opened. + if dump.type != "FileDump": + log.warning("Ignoring Cellebrite dump %s of type %s", dump.path.name, dump.type) + continue + + if (size := dump.path.lstat().st_size) > 1_000_000_000: + log.warning( + "Cellebrite filesystem dump %s is %s GB, this might take a while..", + dump.path.name, + size // 1024 // 1024 // 1024, + ) + + target.filesystems.add(CellebriteFilesystem(dump.path)) + + +class CellebriteFilesystem(LayerFilesystem): + """Cellebrite ``FileDump`` filesystem implementation.""" + + __type__ = "cellebrite" + + def __init__(self, path: Path, base: str | None = None, **kwargs): + super().__init__(**kwargs) + self.source = path + + if path.suffix == ".zip": + fs = ZipFilesystem(path.open("rb"), base=base) + else: + raise ValueError(f"Unsupported Cellebrite dump type {path.name}") + + # We add the full file system from ``/filesystem1`` as a layer to the root ``/`` and + # mount found extras and extraction metadata, such as keychain dumps to folders in ``/$fs$/fs0``, + # to keep this information accessible for device specific plugins. + for fs_dir, dest in [ + ("/filesystem1", "/"), + ("/extra", "/$fs$/fs0/extra"), + ("/metadata1", "/$fs$/fs0/metadata"), + ]: + if fs.path(fs_dir).exists(): + # Mounts the ZipFilesystem at the provided fs_dir base. This way we can + # (ab)use a single zip file handle for multiple filesystem layers. + self.append_layer().mount(dest, fs, base=fs_dir) + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} {self.source}>" diff --git a/tests/_data/loaders/cellebrite/DeviceInfo.txt b/tests/_data/loaders/cellebrite/DeviceInfo.txt new file mode 100644 index 0000000000..ddc1a5c330 --- /dev/null +++ b/tests/_data/loaders/cellebrite/DeviceInfo.txt @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:af3519a42a4c2977f885d90d4b29c0c57fdb21158a28293b2f162e2a1736bd11 +size 332 diff --git a/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.ufd b/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.ufd new file mode 100755 index 0000000000..4595592353 --- /dev/null +++ b/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.ufd @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:84ec28d111ef38849bb1b505eede28905309dcb98b30249ffcfd17a23310f87a +size 1071 diff --git a/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.zip b/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.zip new file mode 100644 index 0000000000..6b6406eead --- /dev/null +++ b/tests/_data/loaders/cellebrite/EXTRACTION_FFS 01/EXTRACTION_FFS.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:af3842be9a1f00cf3a99962ea5ca34ffa4614a28ff3586f1e53632062bbb4171 +size 1576 diff --git a/tests/_data/loaders/cellebrite/EvidenceCollection.ufdx b/tests/_data/loaders/cellebrite/EvidenceCollection.ufdx new file mode 100644 index 0000000000..a3610fbf03 --- /dev/null +++ b/tests/_data/loaders/cellebrite/EvidenceCollection.ufdx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3809100dcf8d6bd61f94896dab08422e42995d6a4d1bd7ee19da3f51d155a06b +size 488 diff --git a/tests/loaders/test_cellebrite.py b/tests/loaders/test_cellebrite.py new file mode 100644 index 0000000000..ce141f0277 --- /dev/null +++ b/tests/loaders/test_cellebrite.py @@ -0,0 +1,59 @@ +from pathlib import Path + +from dissect.target.loaders.cellebrite import CellebriteFilesystem, CellebriteLoader +from dissect.target.target import Target +from tests._utils import absolute_path + + +def test_cellebrite_loader(target_bare: Target) -> None: + """Test if we correctly detect and load a Cellebrite UFDX FFS zip extraction from DigitalCorpora (``iOS_17``). + + The content of the ``EXTRACTION_FFS.zip`` file has been replaced with a minimal linux folder structure + for performance reasons. + + Resources: + - https://corp.digitalcorpora.org/corpora/iOS17/ + """ + path = Path(absolute_path("_data/loaders/cellebrite/EvidenceCollection.ufdx")) + loader = CellebriteLoader(path) + + assert loader.ufdx.path.name == "EvidenceCollection.ufdx" + assert loader.ufdx.evidence == "00f905b7-5131-42d9-9ccf-2227115d9536" + assert loader.ufdx.device.vendor == "Apple" + assert loader.ufdx.device.model == "iPhone 11 (N104AP)" + assert loader.ufdx.device.fguid == "98ec76e1-a885-4733-91dd-5dbb61156335" + assert loader.ufdx.device.guid == "98ec76e1-a885-4733-91dd-5dbb61156335" + assert not loader.ufdx.device.os + assert len(loader.ufdx.extractions) == 1 + assert loader.ufdx.extractions[0].type == "FileSystemDump" + assert loader.ufdx.extractions[0].path.name == "EXTRACTION_FFS.ufd" + + assert len(loader.ufd) == 1 + assert loader.ufd[0].path.name == "EXTRACTION_FFS.ufd" + assert loader.ufd[0].device.vendor == "Apple" + assert loader.ufd[0].device.model == "iPhone 11 (N104AP)" + assert not loader.ufd[0].device.fguid + assert not loader.ufd[0].device.guid + assert loader.ufd[0].device.os == "17.3 (21D50)" + assert len(loader.ufd[0].dumps) == 2 + assert loader.ufd[0].dumps[0].type == "FileDump" + assert loader.ufd[0].dumps[0].path.name == "EXTRACTION_FFS.zip" + assert loader.ufd[0].dumps[1].type == "Keychain" + + loader.map(target_bare) + target_bare.apply() + + assert len(target_bare.filesystems) == 1 + assert isinstance(target_bare.filesystems[0], CellebriteFilesystem) + + assert sorted(list(map(str, target_bare.fs.path("/").iterdir()))) == [ + "/$fs$", + "/etc", + "/home", + "/opt", + "/root", + "/var", + ] + + assert target_bare.os == "linux" + assert target_bare.hostname == "example"