Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 74 additions & 27 deletions pyplugins/apis/kffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"""

from penguin import plugins, getColoredLogger, Plugin
from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup
from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup, BoundTypeInstance, BoundArrayView
from os.path import join, realpath, isfile
from wrappers.generic import Wrapper
import functools
Expand All @@ -54,8 +54,6 @@ class KFFI(Plugin):
~~~~~~~

- ``call_function``: Call a kernel function with arguments.
- ``read_kernel_memory``: Read bytes from kernel memory.
- ``write_kernel_memory``: Write bytes to kernel memory.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -87,8 +85,9 @@ def __init_tramp_functionality(self):
# Register trampoline hit hypercall handler
from hyper.consts import igloo_hypercall_constants as iconsts
self.portal = plugins.portal
self._on_tramp_hit_hypercall = self.portal.wrap(self._on_tramp_hit_hypercall)
self.panda.hypercall(iconsts.IGLOO_HYP_TRAMP_HIT)(self._on_tramp_hit_hypercall)
self._on_tramp_hit_hypercall = \
self.panda.hypercall(iconsts.IGLOO_HYP_TRAMP_HIT)(
self.portal.wrap(self._on_tramp_hit_hypercall))

# Register with portal's interrupt handler system
self.portal.register_interrupt_handler(
Expand Down Expand Up @@ -591,26 +590,74 @@ def _on_tramp_hit_hypercall(self, cpu):
callback, num_args = entry

self.logger.debug(f"Invoking trampoline callback for id={tramp_id}: {getattr(callback, '__name__', repr(callback))}")
try:
pt_regs_raw = yield from self.read_type(pt_regs_addr, "pt_regs")
pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw)
original_bytes = pt_regs.to_bytes()[:]
pt_regs_raw = yield from self.read_type(pt_regs_addr, "pt_regs")
pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw)
original_bytes = pt_regs.to_bytes()[:]
# Get args from pt_regs
if num_args > 1:
# Get args from pt_regs
if num_args > 1:
# Get args from pt_regs
args = yield from pt_regs.get_args_portal(num_args - 1, convention="userland")
else:
args = []
# Call callback with pt_regs and args
result = callback(pt_regs, *args)
if isinstance(result, Iterator):
result = yield from result
# If callback returns int, set as return value
if isinstance(result, int):
pt_regs.set_retval(result)
new = pt_regs.to_bytes()
if original_bytes != new:
yield from plugins.mem.write_bytes(pt_regs_addr, new)

except Exception as e:
self.logger.error(f"Error in trampoline callback {callback.__name__}: {e}")
args = yield from pt_regs.get_args_portal(num_args - 1, convention="userland")
else:
args = []
# Call callback with pt_regs and args
result = callback(pt_regs, *args)
if isinstance(result, Iterator):
result = yield from result
# If callback returns int, set as return value
if isinstance(result, int):
pt_regs.set_retval(result)
new = pt_regs.to_bytes()
if original_bytes != new:
yield from plugins.mem.write_bytes(pt_regs_addr, new)

def write_struct(self, addr: Union[int, Ptr], instance: BoundTypeInstance):
if isinstance(addr, Ptr):
addr = addr.address

data = instance.to_bytes()
yield from plugins.mem.write_bytes(addr, data)

def string(self, instance: Union[BoundArrayView, BoundTypeInstance, bytes, bytearray]) -> str:
"""
Extract a string from the buffer at the instance's offset.
Always returns a best-effort string, never raises.
"""
try:
# Raw bytes or bytearray
if isinstance(instance, (bytes, bytearray)):
return instance.decode('latin-1', errors='replace').split('\x00', 1)[0]
# BoundArrayView: get bytes from array elements
if type(instance).__name__ == "BoundArrayView":
arr_bytes = bytearray()
try:
for i in range(len(instance)):
val = instance[i]
if hasattr(val, '_value'):
arr_bytes.append(int(val._value))
elif isinstance(val, int):
arr_bytes.append(val)
else:
break
if arr_bytes[-1] == 0:
break
except Exception:
pass
return arr_bytes.decode('latin-1', errors='replace').split('\x00', 1)[0]
# BoundTypeInstance: get bytes from buffer at offset, up to size
if type(instance).__name__ == "BoundTypeInstance":
buf = getattr(instance, "_instance_buffer", None)
offset = getattr(instance, "_instance_offset", 0)
size = getattr(instance._instance_type_def, "size", None)
if buf is not None and size is not None:
raw = buf[offset:offset+size]
return raw.decode('latin-1', errors='replace').split('\x00', 1)[0]
# fallback: try to_bytes
try:
raw = instance.to_bytes()
return raw.decode('latin-1', errors='replace').split('\x00', 1)[0]
except Exception:
pass
# fallback: str()
return str(instance)
except Exception:
return ""
Loading