Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions dissect/util/compression/lz4.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO

from dissect.util.exceptions import CorruptDataError

Expand All @@ -23,12 +25,12 @@ def _get_length(src: BinaryIO, length: int) -> int:


def decompress(
src: Union[bytes, BinaryIO],
src: bytes | BinaryIO,
uncompressed_size: int = -1,
max_length: int = -1,
return_bytearray: bool = False,
return_bytes_read: bool = False,
) -> Union[bytes, tuple[bytes, int]]:
) -> bytes | tuple[bytes, int]:
"""LZ4 decompress from a file-like object up to a certain length. Assumes no header.

Args:
Expand Down Expand Up @@ -92,5 +94,5 @@ def decompress(

if return_bytes_read:
return dst, src.tell() - start
else:
return dst

return dst
11 changes: 6 additions & 5 deletions dissect/util/compression/lznt1.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Reference: https://github.com/google/rekall/blob/master/rekall-core/rekall/plugins/filesystems/lznt1.py
from __future__ import annotations

import array
import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO


def _get_displacement(offset: int) -> int:
Expand All @@ -20,10 +22,10 @@ def _get_displacement(offset: int) -> int:
COMPRESSED_MASK = 1 << 15
SIGNATURE_MASK = 3 << 12
SIZE_MASK = (1 << 12) - 1
TAG_MASKS = [(1 << i) for i in range(0, 8)]
TAG_MASKS = [(1 << i) for i in range(8)]


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZNT1 decompress from a file-like object or bytes.

Args:
Expand Down Expand Up @@ -87,5 +89,4 @@ def decompress(src: Union[bytes, BinaryIO]) -> bytes:
data = src.read(hsize + 1)
dst.write(data)

result = dst.getvalue()
return result
return dst.getvalue()
5 changes: 3 additions & 2 deletions dissect/util/compression/lzo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# - https://github.com/FFmpeg/FFmpeg/blob/master/libavutil/lzo.c
# - https://docs.kernel.org/staging/lzo.html
# - https://github.com/torvalds/linux/blob/master/lib/lzo/lzo1x_decompress_safe.c
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO

MAX_READ_LENGTH = (1 << 32) - 1000

Expand All @@ -22,7 +23,7 @@ def _read_length(src: BinaryIO, val: int, mask: int) -> int:
return length + mask + val


def decompress(src: Union[bytes, BinaryIO], header: bool = True, buflen: int = -1) -> bytes:
def decompress(src: bytes | BinaryIO, header: bool = True, buflen: int = -1) -> bytes:
"""LZO decompress from a file-like object or bytes. Assumes no header.

Arguments are largely compatible with python-lzo API.
Expand Down
8 changes: 5 additions & 3 deletions dissect/util/compression/lzxpress.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Reference: [MS-XCA]
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.

Args:
Expand Down Expand Up @@ -62,7 +64,7 @@
match_length = struct.unpack("<I", src.read(4))[0]

if match_length < 15 + 7:
raise Exception("wrong match length")
raise ValueError("wrong match length")

Check warning on line 67 in dissect/util/compression/lzxpress.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/lzxpress.py#L67

Added line #L67 was not covered by tests

match_length -= 15 + 7
match_length += 15
Expand Down
13 changes: 8 additions & 5 deletions dissect/util/compression/lzxpress_huffman.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-frs2/8cb5bae9-edf3-4833-9f0a-9d7e24218d3d
# https://winprotocoldoc.blob.core.windows.net/productionwindowsarchives/MS-XCA/[MS-XCA].pdf
from __future__ import annotations

import io
import struct
from collections import namedtuple
from typing import BinaryIO, Optional, Union
from typing import BinaryIO, NamedTuple

Symbol = namedtuple("Symbol", ["length", "symbol"])

class Symbol(NamedTuple):
length: int
symbol: int


def _read_16_bit(fh: BinaryIO) -> int:
Expand All @@ -16,7 +19,7 @@ def _read_16_bit(fh: BinaryIO) -> int:
class Node:
__slots__ = ("symbol", "is_leaf", "children")

def __init__(self, symbol: Optional[Symbol] = None, is_leaf: bool = False):
def __init__(self, symbol: Symbol | None = None, is_leaf: bool = False):
self.symbol = symbol
self.is_leaf = is_leaf
self.children = [None, None]
Expand Down Expand Up @@ -120,7 +123,7 @@ def decode(self, root: Node) -> Symbol:
return node.symbol


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.

Decompresses until EOF of the input data.
Expand Down
8 changes: 5 additions & 3 deletions dissect/util/compression/sevenbit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

from io import BytesIO
from typing import BinaryIO, Union
from typing import BinaryIO


def compress(src: Union[bytes, BinaryIO]) -> bytes:
def compress(src: bytes | BinaryIO) -> bytes:
"""Sevenbit compress from a file-like object or bytes.

Args:
Expand Down Expand Up @@ -37,7 +39,7 @@ def compress(src: Union[bytes, BinaryIO]) -> bytes:
return bytes(dst)


def decompress(src: Union[bytes, BinaryIO], wide: bool = False) -> bytes:
def decompress(src: bytes | BinaryIO, wide: bool = False) -> bytes:
"""Sevenbit decompress from a file-like object or bytes.

Args:
Expand Down
23 changes: 13 additions & 10 deletions dissect/util/cpio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import stat
import struct
import tarfile
Expand Down Expand Up @@ -38,7 +40,7 @@ class CpioInfo(tarfile.TarInfo):
"""

@classmethod
def fromtarfile(cls, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
def fromtarfile(cls, tarfile: tarfile.TarFile) -> CpioInfo:
if tarfile.format not in (
FORMAT_CPIO_BIN,
FORMAT_CPIO_ODC,
Expand All @@ -64,7 +66,7 @@ def fromtarfile(cls, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
return obj._proc_member(tarfile)

@classmethod
def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> tarfile.TarInfo:
def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> CpioInfo:
if format in (FORMAT_CPIO_BIN, FORMAT_CPIO_ODC, FORMAT_CPIO_HPBIN, FORMAT_CPIO_HPODC):
obj = cls._old_frombuf(buf, format)
elif format in (FORMAT_CPIO_NEWC, FORMAT_CPIO_CRC):
Expand All @@ -78,7 +80,7 @@ def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> tarfile
return obj

@classmethod
def _old_frombuf(cls, buf: bytes, format: int):
def _old_frombuf(cls, buf: bytes, format: int) -> CpioInfo:
if format in (FORMAT_CPIO_BIN, FORMAT_CPIO_HPBIN):
values = list(struct.unpack("<13H", buf))
if values[0] == _swap16(CPIO_MAGIC_OLD):
Expand Down Expand Up @@ -131,7 +133,7 @@ def _old_frombuf(cls, buf: bytes, format: int):
return obj

@classmethod
def _new_frombuf(cls, buf: bytes, format: int):
def _new_frombuf(cls, buf: bytes, format: int) -> CpioInfo:
values = struct.unpack("<6s8s8s8s8s8s8s8s8s8s8s8s8s8s", buf)
values = [int(values[0], 8)] + [int(v, 16) for v in values[1:]]
if values[0] not in (CPIO_MAGIC_NEW, CPIO_MAGIC_CRC):
Expand All @@ -157,7 +159,7 @@ def _new_frombuf(cls, buf: bytes, format: int):

return obj

def _proc_member(self, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
def _proc_member(self, tarfile: tarfile.TarFile) -> CpioInfo | None:
self.name = tarfile.fileobj.read(self.namesize - 1).decode(tarfile.encoding, tarfile.errors)
if self.name == "TRAILER!!!":
# The last entry in a cpio file has the special name ``TRAILER!!!``, indicating the end of the archive
Expand All @@ -177,10 +179,11 @@ def _proc_member(self, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
def _round_word(self, offset: int) -> int:
if self.format in (FORMAT_CPIO_BIN, FORMAT_CPIO_HPBIN):
return (offset + 1) & ~0x01
elif self.format in (FORMAT_CPIO_NEWC, FORMAT_CPIO_CRC):

if self.format in (FORMAT_CPIO_NEWC, FORMAT_CPIO_CRC):
return (offset + 3) & ~0x03
else:
return offset

return offset

def issocket(self) -> bool:
"""Return True if it is a socket."""
Expand Down Expand Up @@ -211,13 +214,13 @@ def _swap16(value: int) -> int:
return ((value & 0xFF) << 8) | (value >> 8)


def CpioFile(*args, **kwargs):
def CpioFile(*args, **kwargs) -> tarfile.TarFile:
"""Utility wrapper around ``tarfile.TarFile`` to easily open cpio archives."""
kwargs.setdefault("format", FORMAT_CPIO_UNKNOWN)
return tarfile.TarFile(*args, **kwargs, tarinfo=CpioInfo)


def open(*args, **kwargs):
def open(*args, **kwargs) -> tarfile.TarFile:
"""Utility wrapper around ``tarfile.open`` to easily open cpio archives."""
kwargs.setdefault("format", FORMAT_CPIO_UNKNOWN)
return tarfile.open(*args, **kwargs, tarinfo=CpioInfo)
2 changes: 1 addition & 1 deletion dissect/util/encoding/surrogateescape.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import codecs


def error_handler(error):
def error_handler(error: Exception) -> tuple[str, int]:

Check warning on line 4 in dissect/util/encoding/surrogateescape.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/encoding/surrogateescape.py#L4

Added line #L4 was not covered by tests
if not isinstance(error, UnicodeDecodeError):
raise error

Expand Down
10 changes: 6 additions & 4 deletions dissect/util/feature.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import functools
import os
from enum import Enum
from typing import Callable, Optional
from typing import Callable, NoReturn


# Register feature flags in a central place to avoid chaos
Expand Down Expand Up @@ -43,7 +45,7 @@ def parse_blob():
return feature in feature_flags()


def feature(flag: Feature, alternative: Optional[Callable] = None) -> Callable:
def feature(flag: Feature, alternative: Callable | None = None) -> Callable:
"""Feature flag decorator allowing you to guard a function behind a feature flag.

Usage::
Expand All @@ -57,7 +59,7 @@ def my_func( ... ) -> ...

if alternative is None:

def alternative():
def alternative() -> NoReturn:
raise FeatureException(
"\n".join(
[
Expand All @@ -68,7 +70,7 @@ def alternative():
)
)

def decorator(func):
def decorator(func: Callable) -> Callable:
return func if feature_enabled(flag) else alternative

return decorator
Loading