Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,10 @@ def makedirs(self, path: str) -> VirtualDirectory:

return directory

def map_fs(self, vfspath: str, fs: Filesystem) -> None:
def map_fs(self, vfspath: str, fs: Filesystem, base: str = "/") -> None:
"""Mount a dissect filesystem to a directory in the VFS"""
directory = self.makedirs(vfspath)
directory.top = fs.get("/")
directory.top = fs.get(base)

mount = map_fs

Expand Down
59 changes: 29 additions & 30 deletions dissect/target/helpers/configutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from configparser import ConfigParser, MissingSectionHeaderError
from dataclasses import dataclass
from fnmatch import fnmatch
from pathlib import Path
from types import TracebackType
from typing import (
Any,
Expand All @@ -18,9 +19,7 @@
Iterator,
KeysView,
Literal,
Optional,
TextIO,
Union,
)

from defusedxml import ElementTree
Expand Down Expand Up @@ -107,7 +106,7 @@ class ConfigurationParser:

def __init__(
self,
collapse: Union[bool, Iterable[str]] = False,
collapse: bool | Iterable[str] = False,
collapse_inverse: bool = False,
separator: tuple[str] = ("=",),
comment_prefixes: tuple[str] = (";", "#"),
Expand All @@ -120,7 +119,7 @@ def __init__(
self.comment_prefixes = comment_prefixes
self.parsed_data = {}

def __getitem__(self, item: Any) -> Union[dict, str]:
def __getitem__(self, item: Any) -> dict | str:
return self.parsed_data[item]

def __contains__(self, item: str) -> bool:
Expand Down Expand Up @@ -157,7 +156,7 @@ def parse_file(self, fh: TextIO) -> None:
"""
raise NotImplementedError()

def get(self, item: str, default: Optional[Any] = None) -> Any:
def get(self, item: str, default: Any | None = None) -> Any:
return self.parsed_data.get(item, default)

def read_file(self, fh: TextIO | io.BytesIO) -> None:
Expand Down Expand Up @@ -388,7 +387,7 @@ class ListUnwrapper:
"""Provides utility functions to unwrap dictionary objects out of lists."""

@staticmethod
def unwrap(data: Union[dict, list]) -> Union[dict, list]:
def unwrap(data: dict | list) -> dict | list:
"""Transforms a list with dictionaries to a dictionary.

The order of the list is preserved. If no dictionary is found, the list remains untouched:
Expand All @@ -409,7 +408,7 @@ def unwrap(data: Union[dict, list]) -> Union[dict, list]:
return ListUnwrapper._unwrap_dict(orig)

@staticmethod
def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]:
def _unwrap_dict(data: dict | list) -> dict | list:
"""Looks for dictionaries and unwraps its values."""

if not isinstance(data, dict):
Expand All @@ -425,7 +424,7 @@ def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]:
return root

@staticmethod
def _unwrap_dict_list(data: Union[dict, list]) -> Union[dict, list]:
def _unwrap_dict_list(data: dict | list) -> dict | list:
"""Unwraps a list containing dictionaries."""
if not isinstance(data, list) or not any(isinstance(obj, dict) for obj in data):
return data
Expand Down Expand Up @@ -559,9 +558,9 @@ def __enter__(self) -> ScopeManager:

def __exit__(
self,
type: Optional[type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
type: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.clean()

Expand Down Expand Up @@ -636,7 +635,7 @@ def _change_scope(
manager: ScopeManager,
line: str,
key: str,
next_line: Optional[str] = None,
next_line: str | None = None,
) -> bool:
"""A function to check whether to create a new scope, or go back to a previous one.

Expand Down Expand Up @@ -722,7 +721,7 @@ def _change_scope(
manager: ScopeManager,
line: str,
key: str,
next_line: Optional[str] = None,
next_line: str | None = None,
) -> bool:
scope_char = ("[", "]")
changed = False
Expand Down Expand Up @@ -785,22 +784,22 @@ def _update_continued_values(self, func: Callable, key, values: list[str]) -> tu

@dataclass(frozen=True)
class ParserOptions:
collapse: Optional[Union[bool, set]] = None
collapse_inverse: Optional[bool] = None
separator: Optional[tuple[str]] = None
comment_prefixes: Optional[tuple[str]] = None
collapse: bool | set | None = None
collapse_inverse: bool | None = None
separator: tuple[str] | None = None
comment_prefixes: tuple[str] | None = None


@dataclass(frozen=True)
class ParserConfig:
parser: type[ConfigurationParser] = Default
collapse: Optional[Union[bool, set]] = None
collapse_inverse: Optional[bool] = None
separator: Optional[tuple[str]] = None
comment_prefixes: Optional[tuple[str]] = None
fields: Optional[tuple[str]] = None
collapse: bool | set | None = None
collapse_inverse: bool | None = None
separator: tuple[str] | None = None
comment_prefixes: tuple[str] | None = None
fields: tuple[str] | None = None

def create_parser(self, options: Optional[ParserOptions] = None) -> ConfigurationParser:
def create_parser(self, options: ParserOptions | None = None) -> ConfigurationParser:
kwargs = {}

for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "fields"]:
Expand Down Expand Up @@ -890,7 +889,7 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio
}


def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigurationParser:
def parse(path: FilesystemEntry | TargetPath | Path, hint: str | None = None, *args, **kwargs) -> ConfigurationParser:
"""Parses the content of an ``path`` or ``entry`` to a dictionary.

Args:
Expand All @@ -909,7 +908,7 @@ def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None,
if isinstance(path, TargetPath):
entry = path.get()

if not entry.is_file(follow_symlinks=True):
if not isinstance(entry, Path) and not entry.is_file(follow_symlinks=True):
raise FileNotFoundError(f"Could not parse {path} as a dictionary.")

options = ParserOptions(*args, **kwargs)
Expand All @@ -918,14 +917,14 @@ def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None,


def parse_config(
entry: FilesystemEntry,
hint: Optional[str] = None,
options: Optional[ParserOptions] = None,
entry: FilesystemEntry | Path,
hint: str | None = None,
options: ParserOptions | None = None,
) -> ConfigurationParser:
parser_type = _select_parser(entry, hint)

parser = parser_type.create_parser(options)
with entry.open() as fh:
with entry.open("rb") if isinstance(entry, Path) else entry.open() as fh:
if not isinstance(parser, Bin):
open_file = io.TextIOWrapper(fh, encoding="utf-8")
else:
Expand All @@ -935,7 +934,7 @@ def parse_config(
return parser


def _select_parser(entry: FilesystemEntry, hint: Optional[str] = None) -> ParserConfig:
def _select_parser(entry: FilesystemEntry, hint: str | None = None) -> ParserConfig:
if hint and (parser_type := CONFIG_MAP.get(hint)):
return parser_type

Expand Down
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def open(item: str | Path, *args, **kwargs) -> Loader:
register("tanium", "TaniumLoader")
register("itunes", "ITunesLoader")
register("ab", "AndroidBackupLoader")
register("cellebrite", "CellebriteLoader")
register("target", "TargetLoader")
register("log", "LogLoader")
# Disabling ResLoader because of DIS-536
Expand Down
180 changes: 180 additions & 0 deletions dissect/target/loaders/cellebrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path
from uuid import UUID

from defusedxml import ElementTree as ET

from dissect.target.filesystem import LayerFilesystem
from dissect.target.filesystems.zip import ZipFilesystem
from dissect.target.helpers import configutil
from dissect.target.loader import Loader
from dissect.target.target import Target

log = logging.getLogger(__name__)


@dataclass
class Extraction:
type: str | None
path: Path


@dataclass
class DeviceInfo:
vendor: str
model: str
fguid: UUID | None = None
guid: UUID | None = None
os: str | None = None


@dataclass
class Ufdx:
path: Path | None = None
evidence: UUID | None = None
device: DeviceInfo | None = None
extractions: list[Extraction] | None = None


@dataclass
class Dump:
type: str
path: Path


@dataclass
class Keychain:
type: str
path: Path


@dataclass
class Ufd:
path: Path
device: DeviceInfo
dumps: list[Dump] | None = None


class CellebriteLoader(Loader):
"""Load Cellebrite UFED exports (``.ufdx`` and ``.ufd``).

References:
- https://corp.digitalcorpora.org/corpora/mobile
"""

def __init__(self, path: Path, **kwargs):
super().__init__(path)

self.ufdx = None
self.ufd = []
self.ffs = None

# Parse xml to find .ufd path
if path.suffix == ".ufdx":
try:
tree = ET.fromstring(path.read_text())

self.ufdx = Ufdx(
path=path,
evidence=tree.get("EvidenceID"),
device=DeviceInfo(**{k.lower(): v for k, v in tree.find("DeviceInfo").attrib.items()}),
extractions=[],
)

base_path = path.resolve().parent
for extraction in tree.findall("Extractions/Extraction"):
self.ufdx.extractions.append(
Extraction(
type=extraction.get("TransferType"),
path=base_path.joinpath(extraction.get("Path").replace("\\", "/")),
)
)

except ET.ParseError as e:
raise ValueError(f"Invalid XML in {path}: {str(e)}")

Check warning on line 97 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L96-L97

Added lines #L96 - L97 were not covered by tests

elif path.suffix == ".ufd":
self.ufdx = Ufdx(extractions=[Extraction(type=None, path=path.resolve())])

Check warning on line 100 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L99-L100

Added lines #L99 - L100 were not covered by tests

else:
raise ValueError(f"Unknown suffix {path.suffix} for {path}")

Check warning on line 103 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L103

Added line #L103 was not covered by tests

# Not implemented: parse ufd ini to find ffs, could replace ``Extraction()`` with ``Ufd()``.

for extraction in self.ufdx.extractions:
if not extraction.path.is_file():
log.warning("Extraction %s does not exist", extraction.path)
continue

Check warning on line 110 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L109-L110

Added lines #L109 - L110 were not covered by tests

config = configutil.parse(extraction.path, hint="ini")
device = {k.lower(): v for k, v in config.get("DeviceInfo").items()}
device["model"] += f" ({device['devicemodel']})"
del device["devicemodel"]

ufd = Ufd(
path=extraction.path,
device=DeviceInfo(**device),
dumps=[],
)

for type, dump in config.get("Dumps").items():
dump_path = ufd.path.resolve().parent.joinpath(dump)
ufd.dumps.append(Dump(type=type, path=dump_path))

self.ufd.append(ufd)

@staticmethod
def detect(path: Path) -> bool:
return path.is_file() and path.suffix in [".ufdx", ".ufd"]

def map(self, target: Target) -> None:
for ufd in self.ufd:
for dump in ufd.dumps:
# Keychain dumps are inside the same FFS zip file, so we let the CellebriteFilesystem handle mounting
# that so we prevent an extra file handle being opened.
if dump.type != "FileDump":
log.warning("Ignoring Cellebrite dump %s of type %s", dump.path.name, dump.type)
continue

if (size := dump.path.lstat().st_size) > 1_000_000_000:
log.warning(

Check warning on line 143 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L143

Added line #L143 was not covered by tests
"Cellebrite filesystem dump %s is %s GB, this might take a while..",
dump.path.name,
size // 1024 // 1024 // 1024,
)

target.filesystems.add(CellebriteFilesystem(dump.path))


class CellebriteFilesystem(LayerFilesystem):
"""Cellebrite ``FileDump`` filesystem implementation."""

__type__ = "cellebrite"

def __init__(self, path: Path, base: str | None = None, **kwargs):
super().__init__(**kwargs)
self.source = path

if path.suffix == ".zip":
fs = ZipFilesystem(path.open("rb"), base=base)
else:
raise ValueError(f"Unsupported Cellebrite dump type {path.name}")

Check warning on line 164 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L164

Added line #L164 was not covered by tests

# We add the full file system from ``/filesystem1`` as a layer to the root ``/`` and
# mount found extras and extraction metadata, such as keychain dumps to folders in ``/$fs$/fs0``,
# to keep this information accessible for device specific plugins.
for fs_dir, dest in [
("/filesystem1", "/"),
("/extra", "/$fs$/fs0/extra"),
("/metadata1", "/$fs$/fs0/metadata"),
]:
if fs.path(fs_dir).exists():
# Mounts the ZipFilesystem at the provided fs_dir base. This way we can
# (ab)use a single zip file handle for multiple filesystem layers.
self.append_layer().mount(dest, fs, base=fs_dir)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.source}>"

Check warning on line 180 in dissect/target/loaders/cellebrite.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/cellebrite.py#L180

Added line #L180 was not covered by tests
3 changes: 3 additions & 0 deletions tests/_data/loaders/cellebrite/DeviceInfo.txt
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/loaders/cellebrite/EvidenceCollection.ufdx
Git LFS file not shown
Loading
Loading