Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions angrmanagement/data/analysis_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class AnalysesConfiguration:
Configuration for a sequence of analyses.
"""

def __init__(self, analyses: Sequence[AnalysisConfiguration], instance: Instance) -> None:
self.instance = instance
def __init__(self, analyses: Sequence[AnalysisConfiguration]) -> None:
self.analyses: Sequence[AnalysisConfiguration] = analyses

def __len__(self) -> int:
Expand Down
6 changes: 1 addition & 5 deletions angrmanagement/data/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

from angr.knowledge_plugins.cfg import CFGModel

from .jobs import CallingConventionRecoveryJob, VariableRecoveryJob

_l = logging.getLogger(__name__)


Expand All @@ -40,9 +38,7 @@ class Instance:
log: list[LogRecord] | ObjectContainer

def __init__(self) -> None:
self.variable_recovery_job: VariableRecoveryJob | None = None
self.calling_convention_recovery_job: CallingConventionRecoveryJob | None = None
self._analysis_configuration = None
self.analysis_configuration = None

self.extra_containers = {}
self._container_defaults = {}
Expand Down
65 changes: 59 additions & 6 deletions angrmanagement/data/jobs/cfg_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,65 @@ def __init__(self, instance: Instance) -> None:
]
}

def to_dict(self) -> dict:
cfg_options = super().to_dict()

# update function start locations
if "function_starts" in cfg_options:
function_starts = []
for func_start_str in cfg_options["function_starts"].split(","):
func_start_str = func_start_str.strip(" ")
if not func_start_str:
continue

try:
func_addr = int(func_start_str, 16)
except ValueError as e:
_l.error("Invalid analysis start: %s", func_start_str)
raise ValueError("Invalid function start string") from e

function_starts.append(func_addr)

if function_starts:
if "explicit_analysis_starts" in cfg_options:
cfg_options["elf_eh_frame"] = False
cfg_options["symbols"] = False
cfg_options["start_at_entry"] = False

cfg_options["function_starts"] = function_starts

# discard "explicit_analysis_starts" even if function_starts is not set
if "explicit_analysis_starts" in cfg_options:
del cfg_options["explicit_analysis_starts"]

# update options for region specification
if "regions" in cfg_options:
regions = []
for region_str in cfg_options["regions"].split(","):
region_str = region_str.strip(" ")
if not region_str:
continue
if "-" not in region_str or region_str.count("-") != 1:
_l.error("Invalid analysis region: %s", region_str)
raise ValueError("Invalid analysis region")
min_addr, max_addr = region_str.split("-")
try:
min_addr = int(min_addr, 16)
max_addr = int(max_addr, 16)
except ValueError as e:
_l.error("Invalid analysis region: %s", region_str)
raise ValueError("Invalid analysis region bound") from e
regions.append((min_addr, max_addr))
if regions:
cfg_options["regions"] = regions

scanning_mode = cfg_options.pop("scanning_mode", None)
if scanning_mode is not None:
cfg_options["force_smart_scan"] = scanning_mode == CFGForceScanMode.SmartScan
cfg_options["force_complete_scan"] = scanning_mode == CFGForceScanMode.CompleteScan

return cfg_options


class CFGGenerationJob(InstanceJob):
"""
Expand All @@ -115,12 +174,6 @@ def __init__(self, instance: Instance, on_finish=None, **kwargs) -> None:
cfg_args[key] = val

self.cfg_args = cfg_args

scanning_mode = self.cfg_args.pop("scanning_mode", None)
if scanning_mode is not None:
self.cfg_args["force_smart_scan"] = scanning_mode == CFGForceScanMode.SmartScan
self.cfg_args["force_complete_scan"] = scanning_mode == CFGForceScanMode.CompleteScan

self._cfb = None

def run(self, ctx: JobContext):
Expand Down
121 changes: 121 additions & 0 deletions angrmanagement/logic/analysis_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations

from PySide6.QtCore import QObject, Signal

from angrmanagement.data.analysis_options import AnalysesConfiguration
from angrmanagement.data.jobs import (
APIDeobfuscationConfiguration,
APIDeobfuscationJob,
CallingConventionRecoveryConfiguration,
CallingConventionRecoveryJob,
CFGAnalysisConfiguration,
CFGGenerationJob,
CodeTaggingConfiguration,
CodeTaggingJob,
FlirtAnalysisConfiguration,
FlirtSignatureRecognitionJob,
Job,
PrototypeFindingJob,
StringDeobfuscationConfiguration,
StringDeobfuscationJob,
VariableRecoveryConfiguration,
VariableRecoveryJob,
)


class AnalysisManager(QObject):
"""
Manager of analyses.
"""

cfg_generated = Signal()
cc_recovered = Signal(int)
functions_tagged = Signal(int)
variable_recovered = Signal(int)

def __init__(self, workspace):
super().__init__()
self.workspace = workspace

def get_default_analyses_configuration(self) -> AnalysesConfiguration:
return AnalysesConfiguration(
[
a(self.workspace.main_instance)
for a in [
CFGAnalysisConfiguration,
APIDeobfuscationConfiguration,
StringDeobfuscationConfiguration,
FlirtAnalysisConfiguration,
CodeTaggingConfiguration,
CallingConventionRecoveryConfiguration,
VariableRecoveryConfiguration,
]
]
)

def _schedule_job(self, job: Job):
self.workspace.job_manager.add_job(job)

def run_analysis(self) -> None:
instance = self.workspace.main_instance
conf = instance.analysis_configuration

if conf["cfg"].enabled:
job = CFGGenerationJob(instance, on_finish=self._on_cfg_generated, **conf["cfg"].to_dict())
self._schedule_job(job)

if conf["flirt"].enabled:
self._schedule_job(FlirtSignatureRecognitionJob(instance))
self._schedule_job(PrototypeFindingJob(instance))

if conf["api_deobfuscation"].enabled:
self._schedule_job(APIDeobfuscationJob(instance))

if conf["string_deobfuscation"].enabled:
self._schedule_job(StringDeobfuscationJob(instance))

if conf["code_tagging"].enabled:
self._schedule_job(CodeTaggingJob(instance, on_finish=self._on_functions_tagged))

if conf["cca"].enabled:
job = CallingConventionRecoveryJob(instance, **conf["cca"].to_dict(), on_cc_recovered=self._on_cc_recovered)

# prioritize the current function in display
disassembly_view = self.workspace.view_manager.first_view_in_category("disassembly")
if disassembly_view is not None and not disassembly_view.function.am_none:
job.prioritize_function(disassembly_view.function.addr)

self._schedule_job(job)

if conf["varec"].enabled:
job = VariableRecoveryJob(
instance, **conf["varec"].to_dict(), on_variable_recovered=self._on_variable_recovered
)

# prioritize the current function in display
disassembly_view = self.workspace.view_manager.first_view_in_category("disassembly")
if disassembly_view is not None and not disassembly_view.function.am_none:
job.prioritize_function(disassembly_view.function.addr)

self._schedule_job(job)

def generate_cfg(self, cfg_args=None) -> None:
job = CFGGenerationJob(self.workspace.main_instance, on_finish=self._on_cfg_generated, **(cfg_args or {}))
self._schedule_job(job)

def _on_cfg_generated(self, cfg_result) -> None:
cfg, cfb = cfg_result
self.workspace.main_instance.cfb = cfb
self.workspace.main_instance.cfg = cfg
self.workspace.main_instance.cfb.am_event()
self.workspace.main_instance.cfg.am_event()
self.cfg_generated.emit()

def _on_cc_recovered(self, func_addr: int) -> None:
self.cc_recovered.emit(func_addr)

def _on_variable_recovered(self, func_addr: int) -> None:
self.variable_recovered.emit(func_addr)

def _on_functions_tagged(self) -> None:
self.functions_tagged.emit()
4 changes: 2 additions & 2 deletions angrmanagement/plugins/precise_diffing/diff_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def decomp(*_) -> None:
self._function.prototype is None or self._function.is_prototype_guessed
):
# run calling convention analysis for this function
if self.instance._analysis_configuration:
options = self.instance._analysis_configuration["varec"].to_dict()
if self.instance.analysis_configuration:
options = self.instance.analysis_configuration["varec"].to_dict()
else:
options = {}
options["workers"] = 0
Expand Down
4 changes: 3 additions & 1 deletion angrmanagement/ui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,9 @@ def _on_load_database_finished(self, job: LoadAngrDBJob, *args, **kwargs) -> Non

# trigger callbacks
self.workspace.reload()
self.workspace.on_cfg_generated((cfg, cfb))
self.workspace.main_instance.cfb.am_event()
self.workspace.main_instance.cfg.am_event()
self.workspace.on_cfg_generated()
self.workspace.plugins.angrdb_load_entries(job.extra_info)

def _save_database(self, file_path) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions angrmanagement/ui/views/code_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ def decomp(*_) -> None:
self._function.prototype is None or self._function.is_prototype_guessed is True
):
# run calling convention analysis for this function
if self.instance._analysis_configuration:
options = self.instance._analysis_configuration["varec"].to_dict()
if self.instance.analysis_configuration:
options = self.instance.analysis_configuration["varec"].to_dict()
else:
options = {}
options["workers"] = 0
Expand Down
7 changes: 0 additions & 7 deletions angrmanagement/ui/views/disassembly_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,6 @@ def display_linear_viewer(self, prefer: bool = True) -> None:
def display_function(self, function) -> None:
if function is None:
return
if (
function.addr not in self.instance.kb.variables.function_managers
and self.instance.variable_recovery_job is not None
):
# variable information is not available
# prioritize the analysis of this function
self.instance.variable_recovery_job.prioritize_function(function.addr)
self.jump_history.jump_to(function.addr)
self._display_function(function)

Expand Down
Loading
Loading