diff --git a/dissect/target/plugins/os/windows/catroot.py b/dissect/target/plugins/os/windows/catroot.py index 47dd35c0ec..e775e59525 100644 --- a/dissect/target/plugins/os/windows/catroot.py +++ b/dissect/target/plugins/os/windows/catroot.py @@ -1,4 +1,8 @@ -from asn1crypto import algos, core +from typing import Iterator, Optional + +from asn1crypto.cms import ContentInfo +from asn1crypto.core import Sequence +from dissect.esedb import EseDB from flow.record.fieldtypes import digest from dissect.target.exceptions import UnsupportedPluginError @@ -6,21 +10,27 @@ from dissect.target.plugin import Plugin, export HINT_NEEDLE = b"\x1e\x08\x00H\x00i\x00n\x00t" -MD5_NEEDLE = b"\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05" -SHA1_NEEDLE = b"\x06\x05\x2b\x0e\x03\x02\x1a" -SHA_GENERIC_NEEDLE = b"\x06\x09\x08\x86\x48\x01\x65\x03\x04\x02" +PACKAGE_NAME_NEEDLE = b"\x06\n+\x06\x01\x04\x01\x827\x0c\x02\x01" +DIGEST_NEEDLES = { + "md5": b"\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05", + "sha1": b"\x06\x05\x2b\x0e\x03\x02\x1a", + "sha_generic": b"\x06\x09\x08\x86\x48\x01\x65\x03\x04\x02", + "sha256": b"\x06\t`\x86H\x01e\x03\x04\x02\x01\x05\x00", +} + CatrootRecord = TargetRecordDescriptor( "windows/catroot", [ ("digest", "digest"), - ("path", "hint"), + ("string[]", "hints"), + ("string", "catroot_name"), ("path", "source"), ], ) -def findall(buf, needle): +def findall(buf: bytes, needle: bytes) -> Iterator[int]: offset = 0 while True: offset = buf.find(needle, offset) @@ -31,27 +41,56 @@ def findall(buf, needle): offset += 1 +def _get_package_name(sequence: Sequence) -> str: + """Parse sequences within a sequence and return the 'PackageName' value if it exists.""" + for value in sequence.native.values(): + # Value is an ordered dict that contains a sequence on index 1 + inner_sequence = Sequence.load(value.get("1")) + # Key value is stored at index 0, value at index 2 + if "PackageName" in inner_sequence[0].native: + return inner_sequence[2].native.decode("utf-16-le").strip("\x00") + + +def find_package_name(hint_buf: bytes) -> Optional[str]: + """Find a sequence that contains the 'PackageName' key and return the value if present.""" + for hint_offset in findall(hint_buf, PACKAGE_NAME_NEEDLE): + # 7, 6 or 5 bytes before the package_name needle, a sequence starts (starts with b"0\x82" or b"0\x81"). + for sequence_needle in [b"0\x82", b"0\x81"]: + if (sequence_offset := hint_buf.find(sequence_needle, hint_offset - 8, hint_offset)) == -1: + continue + + hint_sequence = Sequence.load(hint_buf[sequence_offset:]) + return _get_package_name(hint_sequence) + + class CatrootPlugin(Plugin): """Catroot plugin. Parses catroot files for hashes and file hints. """ + __namespace__ = "catroot" + def __init__(self, target): super().__init__(target) - self.catrootdir = self.target.fs.path("sysvol/windows/system32/catroot") + self.catroot_dir = self.target.fs.path("sysvol/windows/system32/catroot") + self.catroot2_dir = self.target.fs.path("sysvol/windows/system32/catroot2") def check_compatible(self) -> None: - if len(list(self.catrootdir.iterdir())) == 0: - raise UnsupportedPluginError("No catroot dirs found") + if next(self.catroot2_dir.rglob("catdb"), None) is None and next(self.catroot_dir.rglob("*.cat"), None) is None: + raise UnsupportedPluginError("No catroot files or catroot ESE databases found") @export(record=CatrootRecord) - def catroot(self): + def files(self) -> Iterator[CatrootRecord]: """Return the content of the catalog files in the CatRoot folder. A catalog file contains a collection of cryptographic hashes, or thumbprints. These files are generally used to verify the integrity of Windows operating system files, instead of per-file authenticode signatures. + At the moment, parsing catalog files is done on best effort. ``asn1crypto`` is not able to fully parse the + ``encap_content_info``, highly likely because Microsoft uses its own format. Future research should result in + a more resilient and complete implementation of the ``catroot.files`` plugin. + References: - https://www.thewindowsclub.com/catroot-catroot2-folder-reset-windows - https://docs.microsoft.com/en-us/windows-hardware/drivers/install/catalog-files @@ -60,67 +99,134 @@ def catroot(self): hostname (string): The target hostname. domain (string): The target domain. digest (digest): The parsed digest. - hint (path): File hint, if present. - source (path): Source catroot file. + hints (string[]): File hints, if present. + catroot_name (string): Catroot name. + source (path): Source of the catroot record. """ - # So asn1crypt dies when parsing these files, so we kinda bruteforce it - # Look for the object identifiers of various hash types, and parse from there - # We don't do any further checking, just traverse according to a known structure - # If an exception occurs, we're not looking at the right structure. - for d in self.catrootdir.iterdir(): - if not d.is_dir(): + # As far as known, Microsoft uses its own implementation to store the digest in the + # encap_content_info along with an optional file hint. Here we parse the digest values + # ourselves by looking for the corresponding digest needles in the raw encap_content_info + # data. Furthermore, we try to find the file hint if it is present in that same raw data. + for file in self.catroot_dir.rglob("*.cat"): + if not file.is_file(): continue - for f in d.iterdir(): - buf = f.open().read() + try: + buf = file.read_bytes() + + # TODO: Parse other data in the content info + content_info = ContentInfo.load(buf)["content"] + + digest_type = content_info["digest_algorithms"].native[0].get("algorithm") + encap_contents = content_info["encap_content_info"].contents + needle = DIGEST_NEEDLES[digest_type] + + digests = [] + offset = None + for offset in findall(encap_contents, needle): + # 4 bytes before the digest type, a sequence starts + objseq = Sequence.load(encap_contents[offset - 4 :]) + # The second entry in the sequence is the digest string + raw_digest = objseq[1].native + hexdigest = raw_digest.hex() + + file_digest = digest() + if len(hexdigest) == 32: + file_digest.md5 = hexdigest + elif len(hexdigest) == 40: + file_digest.sha1 = hexdigest + elif len(hexdigest) == 64: + file_digest.sha256 = hexdigest + + digests.append(file_digest) + + # Finding the hint in encap_content_info is on best effort. In most of the cases, + # there is a key "PackageName" available. We first try to parse the corresponding + # value if it is present. If this does not succeed, we might be dealing with catroot + # files containing the "hint" needle. + # If both methods do not result in a file hint, there is either no hint available or + # the format is not yet known and therefore not supported. + hints = [] + try: + if offset: + # As far as known, the PackageName data is only present in the encap_content_info + # after the last digest. + hint_buf = encap_contents[offset + len(needle) + len(raw_digest) + 2 :] + + # First try to find to find the "PackageName" value, if it's present. + hint = find_package_name(hint_buf) + if hint: + hints.append(hint) + + # If the package_name needle is not found or it's not present in the first 7 bytes of the hint_buf + # We are probably dealing with a catroot file that contains "hint" needles. + if not hints: + for hint_offset in findall(encap_contents, HINT_NEEDLE): + # Either 3 or 4 bytes before the needle, a sequence starts + bytes_before_needle = 3 if encap_contents[hint_offset - 3] == 48 else 4 + name_sequence = Sequence.load(encap_contents[hint_offset - bytes_before_needle :]) + + hint = name_sequence[2].native.decode("utf-16-le").strip("\x00") + hints.append(hint) + + except Exception as e: + self.target.log.debug("", exc_info=e) + + # Currently, it is not known how the file hints are related to the digests. Therefore, each digest + # is yielded as a record with all of the file hints found. + # TODO: find the correlation between the file hints and the digests in catroot files. + for file_digest in digests: + yield CatrootRecord( + digest=file_digest, + hints=hints, + catroot_name=file.name, + source=file, + _target=self.target, + ) + + except Exception as error: + self.target.log.error("An error occurred while parsing the catroot file %s: %s", file, error) - for needle in [MD5_NEEDLE, SHA1_NEEDLE, SHA_GENERIC_NEEDLE]: - # There's an identifier early on in the file that specifies the hash type for this file - offset = buf.find(needle, 0, 100) - if offset == -1: - continue + @export(record=CatrootRecord) + def catdb(self) -> Iterator[CatrootRecord]: + """Return the hash values present in the catdb files in the catroot2 folder. + + The catdb file is an ESE database file that contains the digests of the catalog files present on the system. + This database is used to speed up the process of validating a Portable Executable (PE) file. + + Note: catalog files can include file hints, however these seem not to be present in the catdb files. + + References: + - https://www.thewindowsclub.com/catroot-catroot2-folder-reset-windows + - https://docs.microsoft.com/en-us/windows-hardware/drivers/install/catalog-files + + Yields CatrootRecords with the following fields: + hostname (string): The target hostname. + domain (string): The target domain. + digest (digest): The parsed digest. + hints (string[]): File hints, if present. + catroot_name (string): Catroot name. + source (path): Source of the catroot record. + """ + for ese_file in self.catroot2_dir.rglob("catdb"): + with ese_file.open("rb") as fh: + ese_db = EseDB(fh) - try: - # Sanity check - algos.DigestAlgorithmId.load(buf[offset:]) - except TypeError: + tables = [table.name for table in ese_db.tables()] + for hash_type, table_name in [("sha256", "HashCatNameTableSHA256"), ("sha1", "HashCatNameTableSHA1")]: + if table_name not in tables: continue - for offset in findall(buf, needle): - try: - digestid = algos.DigestAlgorithmId.load(buf[offset:]) - # 4 bytes before the digest type, a sequence starts - objseq = core.Sequence.load(buf[offset - 4 :]) - # The second entry in the sequence is the digest string - hexdigest = objseq[1].native.hex() - - # Later versions of windows also have a file hint - # Try to find it - digestlen = len(digestid.contents) - hintoffset = buf.find(HINT_NEEDLE, offset + digestlen, offset + digestlen + 64) - - filehint = None - if hintoffset != -1: - try: - file_buf = buf[hintoffset + len(HINT_NEEDLE) + 6 :] - # There's an INTEGER after the Hint BMPString of size 6 - filehint = core.OctetString.load(file_buf).native.decode("utf-16-le") - except Exception: - pass - - fdigest = digest() - if len(hexdigest) == 32: - fdigest.md5 = hexdigest - elif len(hexdigest) == 40: - fdigest.sha1 = hexdigest - elif len(hexdigest) == 64: - fdigest.sha256 = hexdigest + for record in ese_db.table(table_name).records(): + file_digest = digest() + setattr(file_digest, hash_type, record.get("HashCatNameTable_HashCol").hex()) + catroot_names = record.get("HashCatNameTable_CatNameCol").decode().rstrip("|").split("|") + for catroot_name in catroot_names: yield CatrootRecord( - digest=fdigest, - hint=self.target.fs.path(filehint) if filehint else None, - source=f, + digest=file_digest, + hints=None, + catroot_name=catroot_name, + source=ese_file, _target=self.target, ) - except Exception: - continue diff --git a/tests/_data/plugins/os/windows/catroot/catdb b/tests/_data/plugins/os/windows/catroot/catdb new file mode 100644 index 0000000000..cc3a9b4e00 --- /dev/null +++ b/tests/_data/plugins/os/windows/catroot/catdb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:071ac158c06f1cba8bc235997935038ceec2c1ed21691831b2ba1d5207710fc8 +size 196608 diff --git a/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat b/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat new file mode 100644 index 0000000000..205dcaee61 --- /dev/null +++ b/tests/_data/plugins/os/windows/catroot/catroot_file_hint.cat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:64a5855d7bab31ab5d7dcbb438b14b9ecbb68c6b20278d6aa714ba43613ec6a1 +size 19506 diff --git a/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat b/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat new file mode 100644 index 0000000000..2f3aea615b --- /dev/null +++ b/tests/_data/plugins/os/windows/catroot/catroot_package_name.cat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:54579846197e2d847d8896b6a487897e0cc9d4f700b4b937ea2c4bba59f8e23a +size 9276 diff --git a/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat b/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat new file mode 100644 index 0000000000..167dbff791 --- /dev/null +++ b/tests/_data/plugins/os/windows/catroot/catroot_package_name_2.cat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dc7eb91e2a34d60191788adfb34e8daecaa76fd5a16ba1c1af5b71eb146c200a +size 8906 diff --git a/tests/plugins/os/windows/test_catroot.py b/tests/plugins/os/windows/test_catroot.py new file mode 100644 index 0000000000..76dd940ef7 --- /dev/null +++ b/tests/plugins/os/windows/test_catroot.py @@ -0,0 +1,118 @@ +import pytest +from flow.record.fieldtypes import digest + +from dissect.target import Target +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.windows.catroot import CatrootPlugin +from tests._utils import absolute_path + + +@pytest.mark.parametrize( + "filename, hashes, file_hints, len_results", + [ + pytest.param( + "catroot_package_name.cat", + [ + "18d4711ffaf619d81c76b9a2375888316cbce6cfde6298b7e2d7165028281cde", + "9333e6100ea011d73bd6d927237d93c284424042d8e339e7bec58e958cf1b18a", + "d6bffeb5833fc17ddc441459f4a90f866a270d10b2261f1775f303ea1e600ede", + ], + ["Microsoft-Windows-PhotoBasic-WOW64-merged-Package"], + 3, + id="PackageName", + ), + pytest.param( + "catroot_package_name_2.cat", + [ + "9504d1e72c0276088ba53d493b869d9dd1da253852823b4f44ea05d2c59e488e", + ], + ["Microsoft-Windows-Printing-PrintToPDFServices-Package"], + 1, + id="PackageName2", + ), + pytest.param( + "catroot_file_hint.cat", + [ + "0469496b538e68ae97b6dc856a16e272830d8b0f8c978254e19226f5a2cdb71e", + "0a3b6d06699313dc2b4fb2ae1e677c416408ade6a1ca022395b3cd92764ddd5d", + "1a4ced5ac0b485f859584743042ff6eef68329b95ae8979b192e9ed46dc4a5cc", + "32e8250fba4d4d24692cca4fa91dfd6659ab811ff37fd071f7875c6178888ba7", + "35b07ee2cf53351917b966283fedebb58ccae21df358223301ba77577c186136", + ], + [ + "msil_multipoint-wms.coll..lecontrol.resources_31bf3856ad364e35_10.0.19041.1_en-us_be33ff08e678d0dc\\Wms.CollapsibleControl.Resources.dll", # noqa + "msil_multipoint-wmsmanager.resources_31bf3856ad364e35_10.0.19041.1_en-us_b4f3be8d3d296eb4\\WmsManager.Resources.dll", # noqa + "msil_multipoint-wms.alertsview.resources_31bf3856ad364e35_10.0.19041.1_en-us_7a57dedfa22c1a6d\\Wms.AlertsView.Resources.dll", # noqa + "msil_multipoint-wmsadminuilibrary.resources_31bf3856ad364e35_10.0.19041.1_en-us_47cfbfac3d8bbe69\\WmsAdminUILibrary.Resources.dll", # noqa + "msil_multipoint-wms.dash..addintabs.resources_31bf3856ad364e35_10.0.19041.1_en-us_f6c7cebefb0b4d85\\Wms.Dashboard.AddinTabs.Resources.dll", # noqa + "msil_multipoint-wmswssgcommon.resources_31bf3856ad364e35_10.0.19041.1_en-us_b7261118c0f1fbfe\\WmsWssgCommon.Resources.dll", # noqa + "msil_multipoint-wms.skuresources.resources_31bf3856ad364e35_10.0.19041.1_en-us_fc4901fade485b61\\Wms.SkuResources.Resources.dll", # noqa + "msil_multipoint-wmsusertab.resources_31bf3856ad364e35_10.0.19041.1_en-us_a64149851e198a87\\WmsUserTab.Resources.dll", # noqa + "msil_multipoint-wmssystemtab.resources_31bf3856ad364e35_10.0.19041.1_en-us_38e614cdb422ddf1\\WmsSystemTab.Resources.dll", # noqa + "msil_multipoint-wms.mmstools.resources_31bf3856ad364e35_10.0.19041.1_en-us_94ce42426fe482ef\\Wms.MMSTools.Resources.dll", # noqa + "msil_multipoint-wmsstatustab.resources_31bf3856ad364e35_10.0.19041.1_en-us_8c4e51c6a0a9be12\\WmsStatusTab.Resources.dll", # noqa + "msil_multipoint-wmsdashboard.resources_31bf3856ad364e35_10.0.19041.1_en-us_f7e7f4de797fc24f\\WmsDashboard.Resources.dll", # noqa + "msil_multipoint-wms.admincommon.resources_31bf3856ad364e35_10.0.19041.1_en-us_7c8a2d4f818abac3\\Wms.AdminCommon.Resources.dll", # noqa + "msil_multipoint-wms.dashboard.forms.resources_31bf3856ad364e35_10.0.19041.1_en-us_3f56c777fba2ec12\\Wms.Dashboard.Forms.Resources.dll", # noqa + "msil_multipoint-wms.dashboardcommon.resources_31bf3856ad364e35_10.0.19041.1_en-us_3c11dbfdda22a912\\Wms.DashboardCommon.Resources.dll", # noqa + ], + 34, + id="FileHint", + ), + ], +) +def test_catroot_files( + target_win: Target, + fs_win: VirtualFilesystem, + filename: str, + hashes: list[str], + file_hints: list[str], + len_results: int, +) -> None: + catroot_file = absolute_path(f"_data/plugins/os/windows/catroot/{filename}") + file_location = f"\\windows\\system32\\catroot\\test\\{filename}" + fs_win.map_file( + file_location, + catroot_file, + ) + + target_win.add_plugin(CatrootPlugin) + + records = list(target_win.catroot.files()) + + assert len(records) == len_results + + sorted_file_hints = sorted(file_hints) + # Make sure the order is constant by sorting on digest + for cat_hash, record in zip(sorted(hashes), sorted(records, key=lambda r: r.digest.sha256)): + assert str(record.source) == "sysvol" + file_location + assert record.catroot_name == filename + assert sorted(record.hints) == sorted_file_hints + assert record.digest.sha256 == cat_hash + + +def test_catroot_catdb(target_win: Target, fs_win: VirtualFilesystem) -> None: + catroot_file = absolute_path("_data/plugins/os/windows/catroot/catdb") + fs_win.map_file("windows/system32/catroot2/{ID}/catdb", catroot_file) + + target_win.add_plugin(CatrootPlugin) + + records = list(target_win.catroot.catdb()) + + hashes = [ + digest({"sha256": "083b9b717253f48ac314e6aa92b88fa775d194420a55f1b3e291ceffbff2377a"}), + digest({"sha1": "fe71c1d4efa330b807ce00dc0b5055f8ab95eb02"}), + ] + + assert len(records) == 2 + + # Make sure the order is constant by sorting on digest + for expected_digest, record in zip( + sorted(hashes, key=lambda d: d.sha1 or d.sha256), + sorted(records, key=lambda r: r.digest.sha1 or r.digest.sha256), + ): + assert record.catroot_name == "Containers-ApplicationGuard-Package~31bf3856ad364e35~amd64~~10.0.19041.1288.cat" + assert record.source == "sysvol\\windows\\system32\\catroot2\\{ID}\\catdb" + assert record.hints == [] + # No direct comparison available, but representation comparison suffices. + assert str(expected_digest) == str(record.digest)