Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions capa/features/extractors/cape/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.exceptions import EmptyReportError, UnsupportedFormatError
from capa.exceptions import EmptyReportError
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.cape.models import Call, Static, Process, CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
Expand Down Expand Up @@ -53,9 +53,14 @@ def __init__(self, report: CapeReport):
self.global_features = list(capa.features.extractors.cape.global_.extract_features(self.report))

def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
if self.report.static is None:
return NO_ADDRESS

Comment on lines +57 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider logging a warning message here to indicate that the report is missing static information, which might affect analysis results.

Suggested change
return NO_ADDRESS
logger.warning("CAPE report missing static analysis")
return NO_ADDRESS

if self.report.static.pe is None:
# TODO: handle ELF
return NO_ADDRESS
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a TODO here to handle ELF files. It would be beneficial to provide more details about the expected implementation or link to a specific issue tracking this work.


# value according to the PE header, the actual trace may use a different imagebase
assert self.report.static is not None
assert self.report.static.pe is not None
return AbsoluteVirtualAddress(self.report.static.pe.imagebase)

def extract_global_features(self) -> Iterator[tuple[Feature, Address]]:
Expand Down Expand Up @@ -120,8 +125,10 @@ def get_call_name(self, ph, th, ch) -> str:
parts.append(" -> ")
if call.pretty_return:
parts.append(call.pretty_return)
else:
elif call.return_:
parts.append(hex(call.return_))
else:
parts.append("?")

return "".join(parts)

Expand All @@ -132,25 +139,11 @@ def from_report(cls, report: dict) -> "CapeExtractor":
if cr.info.version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version)

# TODO(mr-tz): support more file types
# https://github.com/mandiant/capa/issues/1933
if "PE" not in cr.target.file.type:
logger.error(
"capa currently only supports PE target files, this target file's type is: '%s'.\nPlease report this at: https://github.com/mandiant/capa/issues/1933",
cr.target.file.type,
)

# observed in 2.4-CAPE reports from capesandbox.com
if cr.static is None and cr.target.file.pe is not None:
cr.static = Static()
cr.static.pe = cr.target.file.pe

if cr.static is None:
raise UnsupportedFormatError("CAPE report missing static analysis")

if cr.static.pe is None:
raise UnsupportedFormatError("CAPE report missing PE analysis")

if len(cr.behavior.processes) == 0:
raise EmptyReportError("CAPE did not capture any processes")

Expand Down
32 changes: 28 additions & 4 deletions capa/features/extractors/cape/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
"""
seen_processes = {}
for process in report.behavior.processes:
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
if process.parent_id is None:
# on CAPE for Linux, the root process may have no parent id, so we set that to 0
ppid = 0
else:
ppid = process.parent_id

addr = ProcessAddress(pid=process.process_id, ppid=ppid)
yield ProcessHandle(address=addr, inner=process)

# check for pid and ppid reuse
Expand All @@ -52,7 +58,13 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]
"""
extract imported function names
"""
assert report.static is not None and report.static.pe is not None
if report.static is None:
return

if report.static.pe is None:
# TODO: elf
return
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a TODO here to handle ELF files. It would be beneficial to provide more details about the expected implementation or link to a specific issue tracking this work.


imports = report.static.pe.imports

if isinstance(imports, dict):
Expand All @@ -70,13 +82,25 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]


def extract_export_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
if report.static is None:
return

if report.static.pe is None:
# TODO: elf
return
Comment on lines +89 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a TODO here to handle ELF files. It would be beneficial to provide more details about the expected implementation or link to a specific issue tracking this work.


for function in report.static.pe.exports:
yield Export(function.name), AbsoluteVirtualAddress(function.address)


def extract_section_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
if report.static is None:
return

if report.static.pe is None:
# TODO: elf
return
Comment on lines +101 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a TODO here to handle ELF files. It would be beneficial to provide more details about the expected implementation or link to a specific issue tracking this work.


for section in report.static.pe.sections:
yield Section(section.name), AbsoluteVirtualAddress(section.virtual_address)

Expand Down
11 changes: 4 additions & 7 deletions capa/features/extractors/cape/global_.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ def extract_arch(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warning("unrecognized Architecture: %s", report.target.file.type)
raise ValueError(
f"unrecognized Architecture from the CAPE report; output of file command: {report.target.file.type}"
)


def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
Comment on lines 45 to 47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ValueError exception is removed here. How will the program handle unrecognized architectures now? Is there alternative error handling in place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer a critical error, because we want to support ELF files that the underlying sandbox supports.

Expand All @@ -54,9 +51,6 @@ def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
logger.warning("unknown file format, file command output: %s", report.target.file.type)
raise ValueError(
f"unrecognized file format from the CAPE report; output of file command: {report.target.file.type}"
)


Comment on lines 53 to 55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ValueError exception is removed here. How will the program handle unknown file formats now? Is there alternative error handling in place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer a critical error, because we want to support ELF files (and even ZIP files!) that the underlying sandbox supports.

def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
Expand All @@ -80,7 +74,10 @@ def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
else:
# if the operating system information is missing from the cape report, it's likely a bug
logger.warning("unrecognized OS: %s", file_output)
raise ValueError(f"unrecognized OS from the CAPE report; output of file command: {file_output}")
elif report.info.machine and report.info.machine.platform == "windows":
Comment on lines 76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ValueError exception is removed here. How will the program handle unrecognized OS now? Is there alternative error handling in place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer a critical error, because we want to support ELF files that the underlying sandbox supports.

yield OS(OS_WINDOWS), NO_ADDRESS
elif report.info.machine and report.info.machine.platform == "linux":
yield OS(OS_LINUX), NO_ADDRESS
else:
# the sample is shellcode
logger.debug("unsupported file format, file command output: %s", file_output)
Expand Down
55 changes: 50 additions & 5 deletions capa/features/extractors/cape/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,26 @@ def validate_hex_bytes(value):
return bytes.fromhex(value) if isinstance(value, str) else value


def validate_status_code(value):
if isinstance(value, str):
if value == "?":
# TODO: check for this in the return handling
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: check for this in the return handling

return None
Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a TODO here to check for this in the return handling. It would be beneficial to provide more details about the expected implementation or link to a specific issue tracking this work.


# like: -1 EINVAL (Invalid argument)
# like: 0 (Timeout)
# like: 0x8002 (flags O_RDWR|O_LARGEFILE)
assert value.endswith(")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assert statement checks if the value ends with ')'. Consider adding a more descriptive error message or handling the case where the value does not end with ')' to prevent unexpected behavior.

num = value.partition(" ")[0]
return int(num, 16) if num.startswith("0x") else int(num, 10)
else:
return value


HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
HexBytes = Annotated[bytes, BeforeValidator(validate_hex_bytes)]
# this is a status code, such as returned by CAPE for Linux, like: "0 (Timeout)" or "0x8002 (flags O_RDWR|O_LARGEFILE)
StatusCode = Annotated[int | None, BeforeValidator(validate_status_code)]


# a model that *cannot* have extra fields
Expand Down Expand Up @@ -71,8 +89,13 @@ class FlexibleModel(BaseModel):
EmptyList: TypeAlias = list[Any]


class Machine(FlexibleModel):
platform: Optional[str] = None


class Info(FlexibleModel):
version: str
machine: Optional[Machine] = None


class ImportedSymbol(FlexibleModel):
Expand Down Expand Up @@ -287,16 +310,38 @@ class Argument(FlexibleModel):
pretty_value: Optional[str] = None


def validate_argument(value):
if isinstance(value, str):
# for a few calls on CAPE for Linux, we see arguments like in this call:
#
# timestamp: "18:12:17.199276"
# category: "misc"
# api: "uname"
# return: "0"
# ▽ arguments:
# [0]: "{sysname=\"Linux\", nodename=\"laptop\", ...}"
#
# which is just a string with a JSON-like thing inside,
# that we want to map a default unnamed argument.
return Argument(name="", value=value)
else:
return value


# mypy isn't happy about assigning to type
Argument = Annotated[Argument, BeforeValidator(validate_argument)] # type: ignore


class Call(FlexibleModel):
# timestamp: str
thread_id: int
thread_id: int | None = None
# category: str

api: str

arguments: list[Argument]
# status: bool
return_: HexInt = Field(alias="return")
return_: HexInt | StatusCode = Field(alias="return")
pretty_return: Optional[str] = None

# repeated: int
Expand All @@ -315,12 +360,12 @@ class Call(FlexibleModel):
class Process(FlexibleModel):
process_id: int
process_name: str
parent_id: int
parent_id: int | None
# module_path: str
# first_seen: str
calls: list[Call]
threads: list[int]
environ: dict[str, str]
threads: list[int] | None = None # this can be None for CAPE for Linux, which doesn't track threads.
environ: dict[str, str] = Field(default_factory=dict) # type: ignore


"""
Expand Down
10 changes: 10 additions & 0 deletions capa/features/extractors/cape/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]:
get the threads associated with a given process
"""
process: Process = ph.inner

if not process.threads:
# CAPE for linux doesn't record threads
# so we return a default 0 value
yield ThreadHandle(address=ThreadAddress(process=ph.address, tid=0), inner={})
return

threads: list[int] = process.threads

for thread in threads:
Expand All @@ -42,6 +49,9 @@ def extract_environ_strings(ph: ProcessHandle) -> Iterator[tuple[Feature, Addres
"""
process: Process = ph.inner

if not process.environ:
return

for value in (value for value in process.environ.values() if value):
yield String(value), ph.address

Expand Down
12 changes: 10 additions & 2 deletions capa/features/extractors/cape/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:

tid = th.address.tid
for call_index, call in enumerate(process.calls):
if call.thread_id != tid:
continue

if call.thread_id is None:
# CAPE for linux doesn't record threads
# so this must be the 0 value
# and we'll enumerate all the calls in this process
assert tid == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assert statement checks if tid == 0. Consider adding a more descriptive error message or handling the case where tid != 0 to prevent unexpected behavior.


else:
if call.thread_id != tid:
continue

for symbol in generate_symbols("", call.api):
call.api = symbol
Expand Down
Loading