diff --git a/volatility3/framework/plugins/linux/malware/process_spoofing.py b/volatility3/framework/plugins/linux/malware/process_spoofing.py new file mode 100644 index 0000000000..941f1495ea --- /dev/null +++ b/volatility3/framework/plugins/linux/malware/process_spoofing.py @@ -0,0 +1,287 @@ +# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# + +import logging +from pathlib import PurePosixPath +from typing import Optional, Tuple, Iterator + +from volatility3.framework import exceptions, interfaces, renderers +from volatility3.framework.configuration import requirements +from volatility3.framework.interfaces import plugins +from volatility3.framework.objects import utility +from volatility3.framework.symbols import linux +from volatility3.plugins.linux import pslist + +vollog = logging.getLogger(__name__) + + +# https://github.com/SolitudePy/linux-mal +class ProcessSpoofing(plugins.PluginInterface): + """Detects process spoofing by comparing executable path to cmdline & comm fields""" + + _required_framework_version = (2, 27, 0) + _version = (1, 1, 0) + + @classmethod + def get_requirements(cls): + return [ + requirements.ModuleRequirement( + name="kernel", + description="Linux kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(4, 0, 0) + ), + requirements.VersionRequirement( + name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0) + ), + requirements.ListRequirement( + name="pid", + description="Filter on specific process IDs", + element_type=int, + optional=True, + ), + ] + + @classmethod + def get_executable_path( + cls, + context: interfaces.context.ContextInterface, + task: interfaces.objects.ObjectInterface, + ) -> Optional[str]: + """ + Extract the executable path from task_struct.mm.exe_file + + Args: + context: The context to operate on + task: task_struct object of the process + + Returns: + Returns executable path or None if not available + """ + + try: + mm = task.mm + except (exceptions.InvalidAddressException, AttributeError) as e: + vollog.debug(f"Unable to access mm for task at {task.vol.offset:#x}: {e}") + return None + + if not mm or not mm.is_readable(): + # Kernel threads don't have mm struct + return None + + try: + exe_file = mm.exe_file + except (exceptions.InvalidAddressException, AttributeError) as e: + vollog.debug( + f"Unable to access exe_file for task at {task.vol.offset:#x}: {e}" + ) + return None + + if not exe_file or not exe_file.is_readable(): + return None + + try: + exe_path = linux.LinuxUtilities.path_for_file(context, task, exe_file) + except (exceptions.InvalidAddressException, AttributeError) as e: + vollog.debug( + f"Unable to read exe_file path for task at {task.vol.offset:#x}: {e}" + ) + return None + + return exe_path + + @classmethod + def get_cmdline_basename( + cls, + context: interfaces.context.ContextInterface, + task: interfaces.objects.ObjectInterface, + ) -> Optional[str]: + """ + Extract the command line arguments and return the basename of the first argument + + Args: + context: The context to operate on + task: task_struct object of the process + + Returns: + Basename of the first command line argument or None if not available + """ + mm = task.mm + if not mm or not mm.is_readable(): + return None + + proc_layer_name = task.add_process_layer() + if proc_layer_name is None: + return None + + proc_layer = context.layers[proc_layer_name] + start = task.mm.arg_start + size_to_read = task.mm.arg_end - task.mm.arg_start + + if not (0 < size_to_read <= 4096): + return None + + try: + argv = proc_layer.read(start, size_to_read) + except exceptions.InvalidAddressException as e: + vollog.debug( + f"Unable to read cmdline for task at {task.vol.offset:#x}: {e}" + ) + return None + + # Parse the arguments - they are null byte terminated + args_str = argv.decode(encoding="utf8", errors="replace") + args_list = args_str.split("\x00") + if args_list and args_list[0]: + basename = PurePosixPath(args_list[0]).name + return basename + else: + return None + + @classmethod + def get_comm(cls, task: interfaces.objects.ObjectInterface) -> Optional[str]: + """ + Extract the comm field from task_struct + + Args: + task: task_struct object of the process + + Returns: + Process name from comm field or None if not available + """ + try: + return utility.array_to_string(task.comm) + except (exceptions.InvalidAddressException, AttributeError) as e: + vollog.debug(f"Unable to read comm for task at {task.vol.offset:#x}: {e}") + return None + + @classmethod + def extract_process_names( + cls, + context: interfaces.context.ContextInterface, + task: interfaces.objects.ObjectInterface, + ) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], bool]: + """ + Extract all process name sources for comparison + + Returns: + Tuple of (exe_path, exe_basename, cmdline_basename, comm) + """ + exe_path = cls.get_executable_path(context, task) + exe_basename = PurePosixPath(exe_path).name if exe_path else None + if exe_basename and exe_basename.endswith(" (deleted)"): + exe_basename = exe_basename[: -len(" (deleted)")] + cmdline_basename = cls.get_cmdline_basename(context, task) + comm = cls.get_comm(task) + + return exe_path, exe_basename, cmdline_basename, comm + + def _detect_spoofing( + self, + exe_basename: Optional[str], + cmdline_basename: Optional[str], + comm: Optional[str], + ) -> Tuple[bool, bool]: + """ + Analyze the three name sources to detect potential spoofing + + Args: + exe_basename: Basename from exe_file path + cmdline_basename: Basename from command line + comm: Name from comm field + + Returns: + Tuple of (cmdline_spoofed, comm_spoofed) boolean flags + """ + # Skip kernel threads - need at least 2 sources for comparison + available_sources = sum( + 1 for name in [exe_basename, cmdline_basename, comm] if name + ) + if available_sources < 2: + return False, False + + # Check for cmdline spoofing + cmdline_spoofed = False + if exe_basename and cmdline_basename: + cmdline_spoofed = exe_basename != cmdline_basename + + # Check for comm spoofing (comm is truncated to 15 characters) + comm_spoofed = False + if exe_basename and comm: + comm_spoofed = exe_basename[:15] != comm + + return cmdline_spoofed, comm_spoofed + + def _generator(self, tasks) -> Iterator[Tuple[int, Tuple]]: + """ + Generate process spoofing detection results + + Args: + tasks: Iterator of task_struct objects + + Yields: + Tuple containing process information and spoofing analysis + """ + for task in tasks: + try: + pid = task.pid + ppid = task.get_parent_pid() + + exe_path, exe_basename, cmdline_basename, comm = ( + self.extract_process_names(self.context, task) + ) + + cmdline_spoofed, comm_spoofed = self._detect_spoofing( + exe_basename, cmdline_basename, comm + ) + + is_deleted = exe_path.endswith(" (deleted)") if exe_path else False + + # Convert None values to strings for TreeGrid compatibility + exe_path_render = exe_path if exe_path else "N/A" + cmdline_render = cmdline_basename if cmdline_basename else "N/A" + comm_render = comm if comm else "N/A" + + yield ( + 0, + ( + pid, + ppid, + exe_path_render, + cmdline_render, + comm_render, + cmdline_spoofed, + comm_spoofed, + is_deleted, + ), + ) + + except (exceptions.InvalidAddressException, AttributeError) as e: + vollog.warning( + f"Unable to process task PID {getattr(task, 'pid', 'unknown')} at {task.vol.offset:#x}: {e}" + ) + continue + + def run(self): + filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) + + return renderers.TreeGrid( + [ + ("PID", int), + ("PPID", int), + ("Exe_Path", str), + ("Cmdline_Basename", str), + ("Comm", str), + ("Cmdline_Spoofed", bool), + ("Comm_Spoofed", bool), + ("Exe_Deleted", bool), + ], + self._generator( + pslist.PsList.list_tasks( + self.context, self.config["kernel"], filter_func=filter_func + ) + ), + )