diff --git a/pyplugins/apis/kffi.py b/pyplugins/apis/kffi.py index 6d03cdd9f..57b778d6d 100644 --- a/pyplugins/apis/kffi.py +++ b/pyplugins/apis/kffi.py @@ -34,7 +34,7 @@ """ from penguin import plugins, getColoredLogger, Plugin -from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup +from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup, BoundTypeInstance, BoundArrayView from os.path import join, realpath, isfile from wrappers.generic import Wrapper import functools @@ -54,8 +54,6 @@ class KFFI(Plugin): ~~~~~~~ - ``call_function``: Call a kernel function with arguments. - - ``read_kernel_memory``: Read bytes from kernel memory. - - ``write_kernel_memory``: Write bytes to kernel memory. """ def __init__(self) -> None: @@ -87,8 +85,9 @@ def __init_tramp_functionality(self): # Register trampoline hit hypercall handler from hyper.consts import igloo_hypercall_constants as iconsts self.portal = plugins.portal - self._on_tramp_hit_hypercall = self.portal.wrap(self._on_tramp_hit_hypercall) - self.panda.hypercall(iconsts.IGLOO_HYP_TRAMP_HIT)(self._on_tramp_hit_hypercall) + self._on_tramp_hit_hypercall = \ + self.panda.hypercall(iconsts.IGLOO_HYP_TRAMP_HIT)( + self.portal.wrap(self._on_tramp_hit_hypercall)) # Register with portal's interrupt handler system self.portal.register_interrupt_handler( @@ -591,26 +590,74 @@ def _on_tramp_hit_hypercall(self, cpu): callback, num_args = entry self.logger.debug(f"Invoking trampoline callback for id={tramp_id}: {getattr(callback, '__name__', repr(callback))}") - try: - pt_regs_raw = yield from self.read_type(pt_regs_addr, "pt_regs") - pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) - original_bytes = pt_regs.to_bytes()[:] + pt_regs_raw = yield from self.read_type(pt_regs_addr, "pt_regs") + pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) + original_bytes = pt_regs.to_bytes()[:] + # Get args from pt_regs + if num_args > 1: # Get args from pt_regs - if num_args > 1: - # Get args from pt_regs - args = yield from pt_regs.get_args_portal(num_args - 1, convention="userland") - else: - args = [] - # Call callback with pt_regs and args - result = callback(pt_regs, *args) - if isinstance(result, Iterator): - result = yield from result - # If callback returns int, set as return value - if isinstance(result, int): - pt_regs.set_retval(result) - new = pt_regs.to_bytes() - if original_bytes != new: - yield from plugins.mem.write_bytes(pt_regs_addr, new) - - except Exception as e: - self.logger.error(f"Error in trampoline callback {callback.__name__}: {e}") + args = yield from pt_regs.get_args_portal(num_args - 1, convention="userland") + else: + args = [] + # Call callback with pt_regs and args + result = callback(pt_regs, *args) + if isinstance(result, Iterator): + result = yield from result + # If callback returns int, set as return value + if isinstance(result, int): + pt_regs.set_retval(result) + new = pt_regs.to_bytes() + if original_bytes != new: + yield from plugins.mem.write_bytes(pt_regs_addr, new) + + def write_struct(self, addr: Union[int, Ptr], instance: BoundTypeInstance): + if isinstance(addr, Ptr): + addr = addr.address + + data = instance.to_bytes() + yield from plugins.mem.write_bytes(addr, data) + + def string(self, instance: Union[BoundArrayView, BoundTypeInstance, bytes, bytearray]) -> str: + """ + Extract a string from the buffer at the instance's offset. + Always returns a best-effort string, never raises. + """ + try: + # Raw bytes or bytearray + if isinstance(instance, (bytes, bytearray)): + return instance.decode('latin-1', errors='replace').split('\x00', 1)[0] + # BoundArrayView: get bytes from array elements + if type(instance).__name__ == "BoundArrayView": + arr_bytes = bytearray() + try: + for i in range(len(instance)): + val = instance[i] + if hasattr(val, '_value'): + arr_bytes.append(int(val._value)) + elif isinstance(val, int): + arr_bytes.append(val) + else: + break + if arr_bytes[-1] == 0: + break + except Exception: + pass + return arr_bytes.decode('latin-1', errors='replace').split('\x00', 1)[0] + # BoundTypeInstance: get bytes from buffer at offset, up to size + if type(instance).__name__ == "BoundTypeInstance": + buf = getattr(instance, "_instance_buffer", None) + offset = getattr(instance, "_instance_offset", 0) + size = getattr(instance._instance_type_def, "size", None) + if buf is not None and size is not None: + raw = buf[offset:offset+size] + return raw.decode('latin-1', errors='replace').split('\x00', 1)[0] + # fallback: try to_bytes + try: + raw = instance.to_bytes() + return raw.decode('latin-1', errors='replace').split('\x00', 1)[0] + except Exception: + pass + # fallback: str() + return str(instance) + except Exception: + return "" diff --git a/pyplugins/apis/mem.py b/pyplugins/apis/mem.py index abbd554a6..4e089c511 100644 --- a/pyplugins/apis/mem.py +++ b/pyplugins/apis/mem.py @@ -40,6 +40,7 @@ from hyper.portal import PortalCmd from struct import pack, unpack from typing import Optional, List, Union, Any, Generator +from wrappers.ctypes_wrap import Ptr class Mem(Plugin): @@ -65,7 +66,7 @@ def __init__(self) -> None: self.endian_format = '<' if self.panda.endianness == 'little' else '>' self.try_panda = True if self.panda.arch != "riscv64" else False - def write_bytes(self, addr: int, data: bytes, + def write_bytes(self, addr: Union[int, Ptr], data: bytes, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write bytes to guest memory. @@ -74,7 +75,7 @@ def write_bytes(self, addr: int, data: bytes, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. data : bytes Data to write. @@ -86,6 +87,8 @@ def write_bytes(self, addr: int, data: bytes, int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug( f"write_bytes called: addr={addr}, data_len={len(data)}") cpu = None @@ -113,7 +116,7 @@ def write_bytes(self, addr: int, data: bytes, self.logger.debug(f"Total bytes written: {len(data)}") return len(data) - def read_bytes(self, addr: int, size: int, + def read_bytes(self, addr: Union[int, Ptr], size: int, pid: Optional[int] = None) -> Generator[Any, Any, bytes]: """ Read bytes from guest memory. @@ -122,7 +125,7 @@ def read_bytes(self, addr: int, size: int, Parameters ---------- - addr : int + addr : int or Ptr Address to read from. size : int Number of bytes to read. @@ -134,6 +137,8 @@ def read_bytes(self, addr: int, size: int, bytes Data read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug(f"read_bytes called: addr={addr}, size={size}") data = b"" cpu = None @@ -173,7 +178,7 @@ def read_bytes(self, addr: int, size: int, self.logger.debug(f"Total bytes read: {len(data)}") return data - def read_str(self, addr: int, + def read_str(self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, str]: """ Read a null-terminated string from guest memory. @@ -182,7 +187,7 @@ def read_str(self, addr: int, Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -192,6 +197,8 @@ def read_str(self, addr: int, str String read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address if addr != 0: self.logger.debug(f"read_str called: addr={addr:#x}") if self.try_panda: @@ -206,7 +213,7 @@ def read_str(self, addr: int, return chunk.decode('latin-1', errors='replace') return "" - def read_int(self, addr: int, + def read_int(self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: """ Read a 4-byte integer from guest memory. @@ -215,7 +222,7 @@ def read_int(self, addr: int, Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -225,6 +232,8 @@ def read_int(self, addr: int, int or None Integer value read, or None on failure. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug(f"read_int called: addr={addr}") data = yield from self.read_bytes(addr, 4, pid) if len(data) != 4: @@ -236,7 +245,7 @@ def read_int(self, addr: int, return value def read_long( - self, addr: int, pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: + self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: """ Read an 8-byte long integer from guest memory. @@ -244,7 +253,7 @@ def read_long( Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -254,6 +263,8 @@ def read_long( int or None Long value read, or None on failure. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug(f"read_long called: addr={addr}") data = yield from self.read_bytes(addr, 8, pid) if len(data) != 8: @@ -264,7 +275,7 @@ def read_long( self.logger.debug(f"Long read successfully: value={value}") return value - def read_ptr(self, addr: int, + def read_ptr(self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: """ Read a pointer-sized value from guest memory. @@ -273,7 +284,7 @@ def read_ptr(self, addr: int, Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -283,6 +294,8 @@ def read_ptr(self, addr: int, int or None Pointer value read, or None on failure. """ + if isinstance(addr, Ptr): + addr = addr.address if self.panda.bits == 32: ptr = yield from self.read_int(addr, pid) elif self.panda.bits == 64: @@ -291,7 +304,7 @@ def read_ptr(self, addr: int, raise Exception("read_ptr: Could not determine bits") return ptr - def write_int(self, addr: int, value: int, + def write_int(self, addr: Union[int, Ptr], value: int, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write a 4-byte integer to guest memory. @@ -300,7 +313,7 @@ def write_int(self, addr: int, value: int, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. value : int Integer value to write. @@ -312,6 +325,8 @@ def write_int(self, addr: int, value: int, int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug(f"write_int called: addr={addr}, value={value}") # Pack the integer according to system endianness data = pack(f"{self.endian_format}I", value) @@ -319,7 +334,7 @@ def write_int(self, addr: int, value: int, self.logger.debug(f"Integer written successfully: {value}") return bytes_written - def write_long(self, addr: int, value: int, + def write_long(self, addr: Union[int, Ptr], value: int, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write an 8-byte long integer to guest memory. @@ -328,7 +343,7 @@ def write_long(self, addr: int, value: int, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. value : int Long value to write. @@ -340,6 +355,8 @@ def write_long(self, addr: int, value: int, int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug(f"write_long called: addr={addr}, value={value}") # Pack the long according to system endianness data = pack(f"{self.endian_format}Q", value) @@ -347,7 +364,7 @@ def write_long(self, addr: int, value: int, self.logger.debug(f"Long written successfully: {value}") return bytes_written - def write_ptr(self, addr: int, value: int, + def write_ptr(self, addr: Union[int, Ptr], value: int, pid: Optional[int] = None) -> Generator[Any, Any, None]: """ Write a pointer-sized value to guest memory. @@ -356,7 +373,7 @@ def write_ptr(self, addr: int, value: int, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. value : int Pointer value to write. @@ -367,6 +384,8 @@ def write_ptr(self, addr: int, value: int, ------- None """ + if isinstance(addr, Ptr): + addr = addr.address if self.panda.bits == 32: yield from self.write_int(addr, value, pid) elif self.panda.bits == 64: @@ -374,7 +393,7 @@ def write_ptr(self, addr: int, value: int, else: raise Exception("read_ptr: Could not determine bits") - def write_str(self, addr: int, string: Union[str, bytes], null_terminate: bool = True, + def write_str(self, addr: Union[int, Ptr], string: Union[str, bytes], null_terminate: bool = True, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write a string to guest memory. @@ -383,7 +402,7 @@ def write_str(self, addr: int, string: Union[str, bytes], null_terminate: bool = Parameters ---------- - addr : int + addr : int or Ptr Address to write to. string : str or bytes String to write. @@ -397,6 +416,8 @@ def write_str(self, addr: int, string: Union[str, bytes], null_terminate: bool = int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address self.logger.debug( f"write_str called: addr={addr}, string_len={len(string)}") # Convert string to bytes @@ -413,7 +434,7 @@ def write_str(self, addr: int, string: Union[str, bytes], null_terminate: bool = self.logger.debug(f"String written successfully: {len(data)} bytes") return bytes_written - def read_ptrlist(self, addr: int, length: int, + def read_ptrlist(self, addr: Union[int, Ptr], length: int, pid: Optional[int] = None) -> Generator[Any, Any, List[int]]: """ Read a list of pointer values from guest memory. @@ -422,7 +443,7 @@ def read_ptrlist(self, addr: int, length: int, Parameters ---------- - addr : int + addr : int or Ptr Address to start reading from. length : int Maximum number of pointers to read. @@ -434,6 +455,8 @@ def read_ptrlist(self, addr: int, length: int, list of int List of pointer values. """ + if isinstance(addr, Ptr): + addr = addr.address ptrs = [] ptrsize = int(self.panda.bits / 8) for start in range(length): @@ -443,7 +466,7 @@ def read_ptrlist(self, addr: int, length: int, ptrs.append(ptr) return ptrs - def read_char_ptrlist(self, addr: int, length: int, + def read_char_ptrlist(self, addr: Union[int, Ptr], length: int, pid: Optional[int] = None) -> Generator[Any, Any, List[str]]: """ Read a list of null-terminated strings from a list of pointers. @@ -452,7 +475,7 @@ def read_char_ptrlist(self, addr: int, length: int, Parameters ---------- - addr : int + addr : int or Ptr Address to start reading pointer list from. length : int Maximum number of pointers to read. @@ -464,6 +487,8 @@ def read_char_ptrlist(self, addr: int, length: int, list of str List of strings read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address ptrlist = yield from self.read_ptrlist(addr, length, pid) vals = [] for start in range(len(ptrlist)): @@ -471,7 +496,7 @@ def read_char_ptrlist(self, addr: int, length: int, vals.append(strs) return vals - def read_int_array(self, addr: int, count: int, + def read_int_array(self, addr: Union[int, Ptr], count: int, pid: Optional[int] = None) -> Generator[Any, Any, List[int]]: """ Read an array of 4-byte integers from guest memory. @@ -480,7 +505,7 @@ def read_int_array(self, addr: int, count: int, Parameters ---------- - addr : int + addr : int or Ptr Address to start reading from. count : int Number of integers to read. @@ -492,6 +517,8 @@ def read_int_array(self, addr: int, count: int, list of int List of integers read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address data = yield from self.read_bytes(addr, 4 * count, pid) if len(data) != 4 * count: self.logger.error( @@ -500,7 +527,7 @@ def read_int_array(self, addr: int, count: int, return list(unpack(f"{self.endian_format}{count}I", data)) def write_int_array( - self, addr: int, values: List[int], pid: Optional[int] = None) -> Generator[Any, Any, int]: + self, addr: Union[int, Ptr], values: List[int], pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write an array of 4-byte integers to guest memory. @@ -508,7 +535,7 @@ def write_int_array( Parameters ---------- - addr : int + addr : int or Ptr Address to start writing to. values : list of int List of integers to write. @@ -520,10 +547,12 @@ def write_int_array( int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address data = pack(f"{self.endian_format}{len(values)}I", *values) return (yield from self.write_bytes(addr, data, pid)) - def read_long_array(self, addr: int, count: int, + def read_long_array(self, addr: Union[int, Ptr], count: int, pid: Optional[int] = None) -> Generator[Any, Any, List[int]]: """ Read an array of 8-byte long integers from guest memory. @@ -532,7 +561,7 @@ def read_long_array(self, addr: int, count: int, Parameters ---------- - addr : int + addr : int or Ptr Address to start reading from. count : int Number of longs to read. @@ -544,6 +573,8 @@ def read_long_array(self, addr: int, count: int, list of int List of long integers read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address data = yield from self.read_bytes(addr, 8 * count, pid) if len(data) != 8 * count: self.logger.error( @@ -551,7 +582,7 @@ def read_long_array(self, addr: int, count: int, return [] return list(unpack(f"{self.endian_format}{count}Q", data)) - def read_uint64_array(self, addr: int, count: int, pid: Optional[int] = None) -> Generator[Any, Any, List[int]]: + def read_uint64_array(self, addr: Union[int, Ptr], count: int, pid: Optional[int] = None) -> Generator[Any, Any, List[int]]: """ Read an array of 8-byte unsigned integers from guest memory. @@ -559,7 +590,7 @@ def read_uint64_array(self, addr: int, count: int, pid: Optional[int] = None) -> Parameters ---------- - addr : int + addr : int or Ptr Address to start reading from. count : int Number of uint64s to read. @@ -571,6 +602,8 @@ def read_uint64_array(self, addr: int, count: int, pid: Optional[int] = None) -> list of int List of uint64s read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address data = yield from self.read_bytes(addr, 8 * count, pid) if len(data) != 8 * count: self.logger.error( @@ -579,7 +612,7 @@ def read_uint64_array(self, addr: int, count: int, pid: Optional[int] = None) -> return list(unpack(f"{self.endian_format}{count}Q", data)) def read_utf8_str( - self, addr: int, pid: Optional[int] = None) -> Generator[Any, Any, str]: + self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, str]: """ Read a null-terminated UTF-8 string from guest memory. @@ -587,7 +620,7 @@ def read_utf8_str( Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -597,6 +630,8 @@ def read_utf8_str( str UTF-8 string read from memory. """ + if isinstance(addr, Ptr): + addr = addr.address if addr != 0: self.logger.debug(f"read_utf8_str called: addr={addr:#x}") if self.try_panda: @@ -612,7 +647,7 @@ def read_utf8_str( return "" def read_byte( - self, addr: int, pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: + self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: """ Read a single byte from guest memory. @@ -620,7 +655,7 @@ def read_byte( Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -630,13 +665,15 @@ def read_byte( int or None Byte value read (0-255), or None on failure. """ + if isinstance(addr, Ptr): + addr = addr.address data = yield from self.read_bytes(addr, 1, pid) if len(data) != 1: self.logger.error(f"Failed to read byte at addr={addr}") return None return data[0] - def write_byte(self, addr: int, value: int, + def write_byte(self, addr: Union[int, Ptr], value: int, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write a single byte to guest memory. @@ -645,7 +682,7 @@ def write_byte(self, addr: int, value: int, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. value : int Byte value to write (0-255). @@ -657,11 +694,13 @@ def write_byte(self, addr: int, value: int, int Number of bytes written (should be 1). """ + if isinstance(addr, Ptr): + addr = addr.address data = bytes([value & 0xFF]) return (yield from self.write_bytes(addr, data, pid)) def read_word( - self, addr: int, pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: + self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, Optional[int]]: """ Read a 2-byte word from guest memory. @@ -669,7 +708,7 @@ def read_word( Parameters ---------- - addr : int + addr : int or Ptr Address to read from. pid : int, optional Process ID for context. @@ -679,13 +718,15 @@ def read_word( int or None Word value read, or None on failure. """ + if isinstance(addr, Ptr): + addr = addr.address data = yield from self.read_bytes(addr, 2, pid) if len(data) != 2: self.logger.error(f"Failed to read word at addr={addr}") return None return unpack(f"{self.endian_format}H", data)[0] - def write_word(self, addr: int, value: int, + def write_word(self, addr: Union[int, Ptr], value: int, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Write a 2-byte word to guest memory. @@ -694,7 +735,7 @@ def write_word(self, addr: int, value: int, Parameters ---------- - addr : int + addr : int or Ptr Address to write to. value : int Word value to write. @@ -706,10 +747,12 @@ def write_word(self, addr: int, value: int, int Number of bytes written (should be 2). """ + if isinstance(addr, Ptr): + addr = addr.address data = pack(f"{self.endian_format}H", value) return (yield from self.write_bytes(addr, data, pid)) - def memset(self, addr: int, value: int, size: int, + def memset(self, addr: Union[int, Ptr], value: int, size: int, pid: Optional[int] = None) -> Generator[Any, Any, int]: """ Set a region of guest memory to a specific byte value. @@ -718,7 +761,7 @@ def memset(self, addr: int, value: int, size: int, Parameters ---------- - addr : int + addr : int or Ptr Address to start setting. value : int Byte value to set (0-255). @@ -732,10 +775,12 @@ def memset(self, addr: int, value: int, size: int, int Number of bytes written. """ + if isinstance(addr, Ptr): + addr = addr.address data = bytes([value & 0xFF]) * size return (yield from self.write_bytes(addr, data, pid)) - def memcmp(self, addr1: int, addr2: int, size: int, + def memcmp(self, addr1: Union[int, Ptr], addr2: Union[int, Ptr], size: int, pid: Optional[int] = None) -> Generator[Any, Any, bool]: """ Compare two regions of guest memory for equality. @@ -744,9 +789,9 @@ def memcmp(self, addr1: int, addr2: int, size: int, Parameters ---------- - addr1 : int + addr1 : int or Ptr First address. - addr2 : int + addr2 : int or Ptr Second address. size : int Number of bytes to compare. @@ -758,11 +803,15 @@ def memcmp(self, addr1: int, addr2: int, size: int, bool True if memory regions are equal, False otherwise. """ + if isinstance(addr1, Ptr): + addr1 = addr1.address + if isinstance(addr2, Ptr): + addr2 = addr2.address data1 = yield from self.read_bytes(addr1, size, pid) data2 = yield from self.read_bytes(addr2, size, pid) return data1 == data2 - def read_ptr_array(self, addr: int, pid: Optional[int] = None) -> Generator[Any, Any, List[str]]: + def read_ptr_array(self, addr: Union[int, Ptr], pid: Optional[int] = None) -> Generator[Any, Any, List[str]]: """ Read a NULL-terminated array of pointers to strings from guest memory using the portal's optimized handler. @@ -770,7 +819,7 @@ def read_ptr_array(self, addr: int, pid: Optional[int] = None) -> Generator[Any, Parameters ---------- - addr : int + addr : int or Ptr Address of the pointer array in guest memory. pid : int, optional Process ID for context. @@ -780,6 +829,8 @@ def read_ptr_array(self, addr: int, pid: Optional[int] = None) -> Generator[Any, list of str List of strings read from the array. """ + if isinstance(addr, Ptr): + addr = addr.address from hyper.consts import HYPER_OP as hop from hyper.portal import PortalCmd buf = yield PortalCmd(hop.HYPER_OP_READ_PTR_ARRAY, addr, 0, pid) diff --git a/pyplugins/apis/net.py b/pyplugins/apis/net.py new file mode 100644 index 000000000..36d10d0de --- /dev/null +++ b/pyplugins/apis/net.py @@ -0,0 +1,235 @@ +from penguin import Plugin, plugins +from hyper.consts import igloo_hypercall_constants as iconsts +from typing import Optional, List, Iterator, Generator, Set, Dict +from hyper.portal import PortalCmd +from hyper.consts import HYPER_OP as hop +import struct + +class Netdevs(Plugin): + def __init__(self): + self._pending_netdevs = [] + self._netdevs = {} + self._netdev_structs = {} # name -> net_device pointer + self._exist_ok = {} # name -> bool + + self._netdev_ops = self._build_netdev_ops_lookup() + plugins.portal.register_interrupt_handler( + "netdevs", self._netdevs_interrupt_handler) + + netdevs = self.get_arg("conf").get("netdevs", []) + for nd in netdevs: + self.register_netdev(nd) + + # self.panda.hypercall(iconsts.IGLOO_NET_SETUP)(plugins.portal.wrap(self._net_setup)) + self._packet_queue = [] # List of (name, buf) + + def _is_function_pointer(self, attr) -> bool: + """Check if an attribute is a function pointer.""" + return (hasattr(attr, "_subtype_info") and + attr._subtype_info.get("kind") == "function") + + def _get_ops_functions(self, struct_name: str) -> Dict[str, Optional[str]]: + """ + Inspect a top-level struct (eg. "net_device") and return a mapping: + - function_name -> None (direct function pointer on top-level struct) + - function_name -> 'ops_struct' (function pointer belonging to an ops struct) + """ + lookup: Dict[str, Optional[str]] = {} + try: + sample = plugins.kffi.new(struct_name) + except Exception as e: + self.logger.debug(f"Failed to instantiate {struct_name}: {e}") + return lookup + + # Collect top-level function pointers + top_funcs: Set[str] = set() + seen_ops: Set[str] = set() + + for mem in dir(sample): + if mem.startswith("_") or not hasattr(sample, mem): + continue + try: + attr = getattr(sample, mem) + except Exception: + continue + + # Direct function pointer on the top-level struct + if self._is_function_pointer(attr): + top_funcs.add(mem) + continue + + # Try to determine if this member points to an *_ops struct + attr_type_str = str(type(attr)) + ops_struct_name = None + + # Prefer attribute name if it ends with _ops (common pattern) + if mem.endswith("_ops"): + ops_struct_name = mem + else: + # Fallback: try to extract from the type string + ops_struct_name = self._extract_ops_struct_name(attr_type_str) + + if not ops_struct_name or ops_struct_name in seen_ops: + continue + + # Instantiate the ops struct and enumerate its function pointers + try: + ops_sample = plugins.kffi.new(ops_struct_name) + except Exception: + # Could not instantiate this ops struct; skip it + continue + + funcs: Set[str] = set() + for of in dir(ops_sample): + if of.startswith("_") or not hasattr(ops_sample, of): + continue + try: + ofattr = getattr(ops_sample, of) + except Exception: + continue + if self._is_function_pointer(ofattr): + funcs.add(of) + + if funcs: + for f in funcs: + lookup[f] = ops_struct_name + seen_ops.add(ops_struct_name) + + # Finally map top-level functions to None + for f in top_funcs: + lookup[f] = None + + return lookup + + def _build_netdev_ops_lookup(self) -> Dict[str, Optional[str]]: + """ + Build a lookup mapping function_name -> ops_struct_name (or None) by inspecting + the top-level 'net_device' structure and its *_ops sub-structures. + """ + try: + return self._get_ops_functions("net_device") + except Exception as e: + self.logger.debug(f"Failed to build netdev ops lookup: {e}") + return {} + + def _extract_ops_struct_name(self, attr_str: str) -> Optional[str]: + """Extract ops struct name from type string.""" + import re + match = re.search(r'(\w*_ops)', attr_str) + return match.group(1) if match else None + + def _net_setup(self, name, dev_ptr): + netdev_class = self._netdevs.get(name, self._netdevs.get("*", None)) + if netdev_class is None: + return + netdev = yield from plugins.kffi.read_type(dev_ptr, "net_device") + + if hasattr(netdev_class, "setup"): + fn_ret = netdev_class.setup(name, netdev) + if isinstance(fn_ret, Iterator): + fn_ret = yield from fn_ret + + def lookup_netdev(self, name: str) -> Generator[PortalCmd, Optional[int], Optional[int]]: + """ + Look up a network device by name using the portal. + Returns the pointer to net_device struct or None if not found. + """ + buf = name.encode("latin-1", errors="ignore") + b"\0" + result = yield PortalCmd(hop.HYPER_OP_LOOKUP_NETDEV, 0, len(buf), None, buf) + if result == 0 or result is None: + self.logger.debug(f"Netdev '{name}' not found (kernel returned 0)") + return None + self.logger.debug(f"Netdev '{name}' found at {result:#x}") + return result + + def register_netdev(self, name: str, backing_class: Optional[Plugin]=None, exist_ok: bool=False): + ''' + Register a network device with the given name. + ''' + if name not in self._netdevs and name not in self._pending_netdevs: + plugins.portal.queue_interrupt("netdevs") + if name != "*": + self._pending_netdevs.append(name) + self._exist_ok[name] = exist_ok + if backing_class: + self._netdevs[name] = backing_class + + def _register_netdevs(self, names: List[str]) -> Iterator[int]: + """ + Build a NUL-terminated buffer of interface names and send to kernel. + New portal implementation registers a single device per hypercall and + returns a non-zero pointer on success (or zero/False on failure). + Call the hypercall once per name and return the number of successful + registrations. + """ + # New implementation: kernel returns pointer to net_device struct on success, 0/null on failure + if not names: + return 0 + + for name in names: + buf = name.encode("latin-1", errors="ignore") + b"\0" + result = yield PortalCmd(hop.HYPER_OP_REGISTER_NETDEV, 0, len(buf), None, buf) + is_up = yield from self.set_netdev_state(name, True) + if not is_up: + self.logger.error(f"Failed to set netdev '{name}' UP state") + + if result == 0 or result is None: + if self._exist_ok.get(name, False) or self._exist_ok.get("*", False): + result = yield from self.lookup_netdev(name) + if result == 0 or result is None: + self.logger.error(f"Failed to register or look up '{name}'") + return + else: + self.logger.error(f"Failed to register netdev '{name}' (kernel returned 0)") + return + self._netdev_structs[name] = result + yield from self._net_setup(name, result) + + + def _netdevs_interrupt_handler(self) -> Iterator[bool]: + """ + Process pending network device registrations and queued packet sends. + """ + # Process pending network device registrations. Generator-style like _uprobe_interrupt_handler. + # Processes each pending (name, backing_class) and attempts kernel registration. + if not self._pending_netdevs: + return False + + pending = self._pending_netdevs[:] + + while pending: + name = pending.pop(0) + yield from self._register_netdevs([name]) + self._pending_netdevs.remove(name) + + # No more pending registrations or packets + return False + + def set_netdev_state(self, name: str, up: bool) -> Generator[PortalCmd, Optional[int], Optional[bool]]: + """ + Set the state (up/down) of a network device. + Returns True if successful, False otherwise. + """ + buf = name.encode("latin-1", errors="ignore") + b"\0" + requested_state = 1 if up else 0 + result = yield PortalCmd(hop.HYPER_OP_SET_NETDEV_STATE, 0, requested_state, None, buf) + if result == requested_state: + self.logger.debug(f"Netdev '{name}' state set to {requested_state}") + return True + else: + self.logger.error(f"Failed to set netdev '{name}' state to {requested_state}") + return False + + def get_netdev_state(self, name: str) -> Generator[PortalCmd, Optional[int], Optional[bool]]: + """ + Get the state (up/down) of a network device. + Returns True if up, False if down, or None if not found. + """ + buf = name.encode("latin-1", errors="ignore") + b"\0" + result = yield PortalCmd(hop.HYPER_OP_GET_NETDEV_STATE, 0, len(buf), None, buf) + if result is None: + self.logger.error(f"Failed to get state for netdev '{name}'") + return None + state = bool(result) + self.logger.debug(f"Netdev '{name}' state is {'up' if state else 'down'}") + return state diff --git a/pyplugins/apis/osi.py b/pyplugins/apis/osi.py index 7c2c9a056..987b3bde4 100644 --- a/pyplugins/apis/osi.py +++ b/pyplugins/apis/osi.py @@ -47,6 +47,7 @@ from wrappers.generic import Wrapper from wrappers.osi_wrap import MappingWrapper, MappingsWrapper from typing import List, Dict, Any, Optional, Generator +from wrappers.ptregs_wrap import get_pt_regs_wrapper kffi = plugins.kffi CONST_UNKNOWN_STR = "[???]" @@ -567,3 +568,18 @@ def get_mapping_by_addr(self, addr: int) -> Generator[Any, None, Optional[Mappin return mapping else: self.logger.debug(f"No mapping found for addr={addr:#x}") + + + def get_ptregs(self) -> Generator[Any, None, Optional[int]]: + """ + ### Get the pt_regs pointer for the current process + + **Returns:** + - `int` or `None`: The pt_regs pointer as an integer, or None if not available. + """ + ptregs_ptr = yield PortalCmd(hop.HYPER_OP_OSI_PROC_PTREGS) + if ptregs_ptr: + pt_regs_raw = yield from plugins.kffi.read_type( + ptregs_ptr, "pt_regs") + pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) + return pt_regs \ No newline at end of file diff --git a/pyplugins/apis/syscalls.py b/pyplugins/apis/syscalls.py index 428daaef7..97ab714fb 100644 --- a/pyplugins/apis/syscalls.py +++ b/pyplugins/apis/syscalls.py @@ -552,7 +552,7 @@ def _get_proto(self, cpu: int, sce: Any) -> SyscallPrototype: # Add to our table for future lookups self._syscall_info_table[cleaned_name] = proto - self.logger.error( + self.logger.debug( f"Syscall {name} not registered {cleaned_name=}, created generic prototype with {len(generic_args)} args") return proto diff --git a/pyplugins/core/core.py b/pyplugins/core/core.py index d94a0888b..dd08e44c2 100644 --- a/pyplugins/core/core.py +++ b/pyplugins/core/core.py @@ -74,6 +74,7 @@ def __init__(self) -> None: "live_image", "igloodriver", "kmods", + "net", ] for essential_plugin in essential_plugins: diff --git a/pyplugins/core/igloodriver.py b/pyplugins/core/igloodriver.py index 68344f87c..ec053e6a0 100644 --- a/pyplugins/core/igloodriver.py +++ b/pyplugins/core/igloodriver.py @@ -16,7 +16,12 @@ def __init__(self) -> None: self.ensure_init = lambda *args: None from hyper.consts import igloo_hypercall_constants as iconsts self.panda.hypercall(iconsts.IGLOO_MODULE_BASE)(self.hyp_report_igloo_module_baseaddr) + self.panda.hypercall(iconsts.IGLOO_INIT_MODULE)(plugins.portal.wrap(self.module_init)) + plugins.register(self, "module_init") + def module_init(self, cpu): + yield from plugins.portal_publish(self, "module_init") + def hyp_report_igloo_module_baseaddr(self, cpu): igloo_test_function = self.panda.arch.get_arg(cpu, 1, convention="syscall") addr = plugins.kffi.get_function_address("igloo_test_function") diff --git a/pyplugins/interventions/pseudofiles.py b/pyplugins/interventions/pseudofiles.py index ea2cb7170..d0e830633 100644 --- a/pyplugins/interventions/pseudofiles.py +++ b/pyplugins/interventions/pseudofiles.py @@ -283,15 +283,6 @@ def populate_hf_config(self): # introspection self.need_ioctl_hooks = True - if len(self.get_arg("conf").get("netdevs", [])): - # If we have netdevs in our config, we'll make the /proc/penguin_net pseudofile with the contents of it - # Here we'll use our make_rwif closure - netdev_val = " ".join(self.get_arg("conf")["netdevs"]) - hf_config["/proc/penguin_net"] = { - HYP_READ: make_rwif({"val": netdev_val}, self.read_const_buf), - "size": len(netdev_val), - } - hf_config["/proc/mtd"] = { # Note we don't use our make_rwif closure helper here because these # are static diff --git a/pyplugins/testing/netdevs_test.py b/pyplugins/testing/netdevs_test.py new file mode 100644 index 000000000..ccd57fc4a --- /dev/null +++ b/pyplugins/testing/netdevs_test.py @@ -0,0 +1,44 @@ +from penguin import Plugin, plugins + +kffi = plugins.kffi +mem = plugins.mem + +class AlteredNetworkDevice(Plugin): + def __init__(self): + self.outdir = self.get_arg("outdir") + plugins.net.register_netdev("blah0", self) + + def setup(self, device_name: str, netdev): + self.logger.info(f"Setting up altered network device {device_name}") + netdev_ops = yield from kffi.deref(netdev.netdev_ops) + netdev_ops.ndo_do_ioctl = yield from kffi.callback(self.ioctl_handler) + netdev_ops.ndo_get_stats64 = yield from kffi.callback(self.stats64_handler) + netdev_ops.ndo_start_xmit = yield from kffi.callback(self.start_xmit) + yield from kffi.write_struct(netdev.netdev_ops, netdev_ops) + + def start_xmit(self, pt_regs, sk_buff_ptr, netdev_ptr): + sk_buff = yield from kffi.read_type(sk_buff_ptr, "sk_buff") + netdevs = yield from kffi.read_type(netdev_ptr, "net_device") + + skb = yield from mem.read_bytes(sk_buff.data, sk_buff.len) + self.logger.info(f"netdev {kffi.string(netdevs.name)} received {skb} {len(skb)} bytes") + return 0 + + def stats64_handler(self, pt_regs, netdev_ptr, stats64_ptr): + netdevs = yield from kffi.read_type(netdev_ptr, "net_device") + stats64 = yield from kffi.read_type(stats64_ptr, "rtnl_link_stats64") + self.logger.info(f"Getting stats64 for device {kffi.string(netdevs.name)}") + stats64.rx_packets = 1337 + stats64.tx_packets = 1338 + stats64.rx_bytes = 1339 + stats64.tx_bytes = 1340 + # Just return zeroed stats for now + yield from kffi.write_struct(stats64_ptr, stats64) + + def ioctl_handler(self, pt_regs, netdev_ptr, ifreq_ptr, cmd): + args = yield from plugins.osi.get_args() + netdevs = yield from kffi.read_type(netdev_ptr, "net_device") + ifreq = yield from kffi.read_type(ifreq_ptr, "ifreq") + name = str(netdevs.name) + print(name, args, cmd) + return 0 \ No newline at end of file diff --git a/pyplugins/testing/verifier.py b/pyplugins/testing/verifier.py index ae56cb210..5f466008c 100644 --- a/pyplugins/testing/verifier.py +++ b/pyplugins/testing/verifier.py @@ -13,6 +13,11 @@ from sqlalchemy import create_engine from events.utils.cli_syscalls import syscall_filter +GREEN = "\x1b[32m" +RED = "\x1b[31m" +END = "\x1b[0m" +FAILED = f"{RED}failed{END}" + class Verifier(Plugin): def __init__(self): @@ -218,16 +223,12 @@ def uninit(self): test_cases, results = self.check_test_cases() for tc in test_cases: - GREEN = "\x1b[32m" - RED = "\x1b[31m" - END = "\x1b[0m" - PASSED = f"{GREEN}passed{END}" - FAILED = f"{RED}failed{END}" test_passed = results[tc.name] - self.logger.info( - f"Test {tc.name} {PASSED if test_passed else FAILED}") - self.logger.info(f"STDOUT: {tc.stdout}") - self.logger.info(f"STDERR: {tc.stderr}") + if not test_passed: + self.logger.info( + f"Test {tc.name} {FAILED}") + self.logger.info(f"STDOUT: {tc.stdout}") + self.logger.info(f"STDERR: {tc.stderr}") ts = TestSuite("verifier", test_cases) with open(join(self.outdir, "verifier.xml"), "w") as f: @@ -236,4 +237,6 @@ def uninit(self): f"Verified output written to {join(self.outdir, 'verifier.xml')}") if all(results.values()): - self.logger.info("Verifier: ALL tests passed") + self.logger.info(f"{GREEN}Verifier: ALL tests passed{END}") + else: + self.logger.info(f"{RED}Verifier: Some tests failed{END}") diff --git a/pyplugins/wrappers/osi_wrap.py b/pyplugins/wrappers/osi_wrap.py index c03070a57..c61741918 100644 --- a/pyplugins/wrappers/osi_wrap.py +++ b/pyplugins/wrappers/osi_wrap.py @@ -82,6 +82,11 @@ def exec(self) -> bool: """Check if the mapping is executable.""" return self.flags & VM_EXEC != 0 + @property + def executable(self) -> bool: + """Check if the mapping is executable.""" + return self.exec + @property def read(self) -> bool: """Check if the mapping is readable.""" @@ -163,6 +168,11 @@ def get_mappings_by_name(self, name: str) -> List[MappingWrapper]: if name in mapping.name: mappings.append(mapping) return mappings + + def get_first_mapping_by_name(self, name: str) -> Optional[MappingWrapper]: + """Find the first (lowest in memory) mapping whose name contains the given string.""" + if m := self.get_mappings_by_name(name): + return sorted(m, key=lambda x: x.start)[0] def __str__(self) -> str: """Return a string representation of all mappings.""" diff --git a/pyplugins/wrappers/ptregs_wrap.py b/pyplugins/wrappers/ptregs_wrap.py index 28011a76b..7af9970f5 100644 --- a/pyplugins/wrappers/ptregs_wrap.py +++ b/pyplugins/wrappers/ptregs_wrap.py @@ -350,6 +350,51 @@ def get_syscall_number(self) -> Optional[int]: # Default implementation - should be overridden by subclasses return None + def get_retaddr(self) -> Optional[int]: + """ + Get the return address (best guess for this architecture). + Supports both generator and non-generator implementations in subclasses. + """ + try: + ret = self._get_retaddr() + return ret + except PandaMemReadFail as e: + pass + + def get_retaddr_portal(self) -> Generator[Optional[int], Any, Optional[int]]: + """ + Coroutine/generator version of get_retaddr for portal/coroutine use. + """ + try: + ret = self._get_retaddr() + return ret + except PandaMemReadFail as e: + if e.size == 4: + val = yield from plugins.mem.read_int(e.addr) + else: + val = yield from plugins.mem.read_long(e.addr) + return val + + def get_return_address(self) -> Optional[int]: + """ + Alias for get_retaddr. + """ + return self.get_retaddr() + + def _get_retaddr(self): + """ + Subclasses should override this to implement return address logic. + Can be a generator or a regular function. + """ + return None + + def in_kernel(self) -> bool: + """ + Returns True if the pt_regs represents kernel mode, False otherwise. + Subclasses should override for architecture-specific logic. + """ + return False + class X86PtRegsWrapper(PtRegsWrapper): """Wrapper for x86 (32-bit) pt_regs""" @@ -401,6 +446,19 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from EAX register""" return self.get_register("orig_eax") + def _get_retaddr(self): + # On x86, return address is at [esp] (top of stack) + sp = self.get_sp() + if sp is not None: + return self._read_memory(sp, 4, 'ptr') + return None + + def in_kernel(self) -> bool: + # On x86, check CPL from eflags. CPL is in bits 12-13. + # CPL=0 is kernel mode. + eflags = self.get_register("eflags") + return eflags is not None and ((eflags >> 12) & 3) == 0 + class X86_64PtRegsWrapper(PtRegsWrapper): """Wrapper for x86_64 pt_regs""" @@ -568,6 +626,19 @@ def get_syscall_number(self) -> Optional[int]: return self._get_x86_delegate().get_syscall_number() return self.get_register("orig_rax") + def _get_retaddr(self): + # On x86_64, return address is at [rsp] (top of stack) + sp = self.get_sp() + if sp is not None: + return self._read_memory(sp, 8, 'ptr') + return None + + def in_kernel(self) -> bool: + # On x86_64, check CPL from eflags. CPL is in bits 12-13. + # CPL=0 is kernel mode. + eflags = self.get_register("eflags") + return eflags is not None and ((eflags >> 12) & 3) == 0 + class ArmPtRegsWrapper(PtRegsWrapper): """Wrapper for ARM pt_regs""" @@ -622,6 +693,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from r7 register""" return self.get_register("r7") + def _get_retaddr(self): + # On ARM, link register (lr/r14) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On ARM, check CPSR mode bits (lowest 5 bits) + cpsr = self.get_register("cpsr") + # User mode is 0b10000 (0x10). Any other mode is privileged. + return cpsr is not None and (cpsr & 0x1F) != 0x10 + class AArch64PtRegsWrapper(PtRegsWrapper): """Wrapper for AArch64 pt_regs""" @@ -773,6 +854,16 @@ def get_syscall_number(self) -> Optional[int]: return self._get_arm_delegate().get_syscall_number() return self.get_register("syscallno") + def _get_retaddr(self): + # On AArch64, link register (x30/lr) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On AArch64, check PSTATE Exception Level bits (bits 2-3) + pstate = self.get_register("pstate") + # EL0 (user) is 0. EL1, EL2, EL3 are kernel/hypervisor. + return pstate is not None and ((pstate >> 2) & 3) != 0 + class MipsPtRegsWrapper(PtRegsWrapper): """Wrapper for MIPS pt_regs""" @@ -854,6 +945,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from v0 register""" return self.get_register("v0") + def _get_retaddr(self): + # On MIPS, ra (r31) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On MIPS, check status register KUc bit (bit 1) + status = self.get_register("cp0_status") + # KUc == 0 means kernel mode. + return status is not None and ((status >> 1) & 1) == 0 + class Mips64PtRegsWrapper(MipsPtRegsWrapper): """Wrapper for MIPS64 pt_regs - same structure but different register meanings""" @@ -892,6 +993,10 @@ def get_userland_arg(self, num: int) -> Optional[int]: return self._read_memory(addr, 8, 'ptr') + def _get_retaddr(self): + # On MIPS64, ra (r31) holds return address + return self.get_register("ra") + class PowerPCPtRegsWrapper(PtRegsWrapper): """Wrapper for PowerPC pt_regs""" @@ -1001,6 +1106,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from r0 register""" return self.get_register("r0") + def _get_retaddr(self): + # On PowerPC, link register (lr) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On PowerPC, check MSR PR bit (bit 14). + msr = self.get_register("msr") + # PR (Problem state) == 0 means supervisor (kernel) mode. + return msr is not None and ((msr >> 14) & 1) == 0 + class PowerPC64PtRegsWrapper(PowerPCPtRegsWrapper): """Wrapper for PowerPC64 pt_regs - same structure as PowerPC""" @@ -1091,6 +1206,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from a7 register""" return self.get_register("a7") + def _get_retaddr(self): + # On LoongArch64, ra (r1) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On LoongArch64, check CSR_PRMD's PPLV field (bits 0-1). + prmd = self.get_register("csr_prmd") + # PPLV == 0 means kernel mode (PLV0). + return prmd is not None and (prmd & 0x3) == 0 + class Riscv32PtRegsWrapper(PtRegsWrapper): """Wrapper for RISC-V 32-bit pt_regs""" @@ -1210,6 +1335,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from a7 register""" return self.get_register("a7") + def _get_retaddr(self): + # On RISC-V, ra (x1) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On RISC-V, check status register SPP bit (bit 8). + status = self.get_register("status") + # If SPP is 1, previous mode was Supervisor (kernel). + return status is not None and ((status >> 8) & 1) == 1 + class Riscv64PtRegsWrapper(Riscv32PtRegsWrapper): """Wrapper for RISC-V 64-bit pt_regs - same structure as 32-bit but with 64-bit registers""" @@ -1232,6 +1367,10 @@ def get_userland_arg(self, num: int) -> Optional[int]: return self._read_memory(addr, 8, 'ptr') + def _get_retaddr(self): + # On RISC-V 64, ra (x1) holds return address + return self.get_register("ra") + def get_pt_regs_wrapper( panda: Optional[Any], diff --git a/src/penguin/manager.py b/src/penguin/manager.py index 80356b9c0..9813e89a3 100644 --- a/src/penguin/manager.py +++ b/src/penguin/manager.py @@ -56,6 +56,9 @@ def calculate_score(result_dir: str, have_console: bool = True) -> dict[str, int :raises RuntimeError: If required files are missing. :raises ValueError: If an unknown score type is encountered. """ + health_data = {} + config = {} + blocked_signals = 0 if not os.path.isfile(os.path.join(result_dir, ".ran")): raise RuntimeError( f"calculate_score: {result_dir} does not have a .ran file - check logs for error" @@ -96,8 +99,11 @@ def calculate_score(result_dir: str, have_console: bool = True) -> dict[str, int break # Shell cov: number of lines (minus one) in shell_cov.csv - with open(f"{result_dir}/shell_cov.csv") as f: - shell_cov = len(f.readlines()) - 1 + if os.path.isfile(f"{result_dir}/shell_cov.csv"): + with open(f"{result_dir}/shell_cov.csv") as f: + shell_cov = len(f.readlines()) - 1 + else: + shell_cov = 0 # Coverage: processes, modules, blocks if os.path.isfile(f"{result_dir}/coverage.csv"): @@ -125,18 +131,19 @@ def calculate_score(result_dir: str, have_console: bool = True) -> dict[str, int modules_loaded = 0 blocks_covered = 0 + if config: + blocked_signals = -len(config.get("blocked_signals", [])) + score = { - "execs": health_data["nexecs"], - "bound_sockets": health_data["nbound_sockets"], - "devices_accessed": health_data["nuniquedevs"], + "execs": health_data.get("nexecs", 0), + "bound_sockets": health_data.get("nbound_sockets", 0), + "devices_accessed": health_data.get("nuniquedevs", 0), "processes_run": processes_run, "modules_loaded": modules_loaded, "blocks_covered": blocks_covered, "script_lines_covered": shell_cov, "nopanic": 1 if not panic else 0, - "blocked_signals": -len( - config["blocked_signals"] if "blocked_signals" in config else [] - ), # Negative because we want to minimize! + "blocked_signals": blocked_signals, # Negative because we want to minimize! } for k in score.keys(): diff --git a/tests/unit_tests/test_target/patches/tests/netdevs.yaml b/tests/unit_tests/test_target/patches/tests/netdevs.yaml index 5925ef7fb..9abe93bd1 100644 --- a/tests/unit_tests/test_target/patches/tests/netdevs.yaml +++ b/tests/unit_tests/test_target/patches/tests/netdevs.yaml @@ -5,12 +5,14 @@ plugins: type: file_contains file: console.log string: "/tests/netdevs.sh PASS" + netdevs_test: {} netdevs: - nd0 - nd1 - end4 - wlan0 + - blah0 static_files: /tests/netdevs.sh: @@ -22,12 +24,45 @@ static_files: /igloo/utils/busybox ip a # Check for each interfaces - for dev in nd0 nd1 end4 wlan0; do - if ! /igloo/utils/busybox ip link show dev $dev; then - echo "Expected interface $dev not found" - exit 1 - fi + for dev in nd0 nd1 end4 wlan0 blah0; do + if ! /igloo/utils/busybox ip link show dev $dev; then + echo "Expected interface $dev not found" + exit 1 + fi + if ! /igloo/utils/busybox ifconfig $dev | /igloo/utils/busybox grep -q "^$dev"; then + echo "Expected interface $dev not up" + exit 1 + fi done + # Specific checks for blah0 + if [ "$(/igloo/utils/busybox cat /sys/class/net/blah0/statistics/rx_bytes)" -ne 1339 ]; then + echo "rx_bytes for blah0 is not 1339" + exit 1 + fi + + ifconfig_out=$(/igloo/utils/busybox ifconfig blah0) + echo "$ifconfig_out" + + if ! echo "$ifconfig_out" | /igloo/utils/busybox grep -q "RX packets:1337"; then + echo "ifconfig blah0: RX packets not 1337" + exit 1 + fi + if ! echo "$ifconfig_out" | /igloo/utils/busybox grep -q "TX packets:1338"; then + echo "ifconfig blah0: TX packets not 1338" + exit 1 + fi + if ! echo "$ifconfig_out" | /igloo/utils/busybox grep -q "RX bytes:1339"; then + echo "ifconfig blah0: RX bytes not 1339" + exit 1 + fi + if ! echo "$ifconfig_out" | /igloo/utils/busybox grep -q "TX bytes:1340"; then + echo "ifconfig blah0: TX bytes not 1340" + exit 1 + fi + + /igloo/utils/busybox ping -I blah0 -c 1 192.168.1.1 + + echo "/tests/netdevs.sh PASS" exit 0 mode: 73