Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [Context](#context)
- [Fun \& Experimental](#fun--experimental)
- [Start from Scratch](#start-from-scratch)
- [Threat Modeling Integration](#threat-modeling-integration)
- [Contributing](#contributing)

---
Expand Down Expand Up @@ -209,7 +210,81 @@
- `// I want to build a web scraper—start me off`
<sub>Data scraping or automation tools using Python/Node.</sub>


## Threat Modeling Integration

This project now includes a basic framework for performing threat modeling on application architectures.

**Core Components:**

* **`architecture.py`**: Defines classes to represent your application's architecture (`Service`, `Database`, `NetworkZone`, `Application`).
* You can load your application architecture from a YAML file using the `load_architecture_from_yaml(file_path)` function.
* **`threat_model.py`**: Defines classes for threat modeling concepts (`ThreatActor`, `AttackVector`, `Vulnerability`, `SecurityControl`, `IdentifiedAttackSurface`, `SuggestedControl`).
* `identify_attack_surfaces(application, known_vulnerabilities)`: Analyzes an `Application` object and a list of `Vulnerability` objects to find potential attack surfaces.
* `suggest_security_controls(attack_surfaces, available_controls)`: Suggests `SecurityControl`s for identified attack surfaces based on their vulnerabilities.

**Example YAML Files:**

* **`example_architecture.yaml`**: Shows an example of how to define your application's services, databases, and network zones in YAML. This file can be loaded by `load_architecture_from_yaml`.
* **`example_security_knowledge_base.yaml`**: Provides an example of how to define your threat actors, attack vectors, vulnerabilities, and security controls in YAML. This file can be loaded by `load_threat_intelligence_from_yaml` and now supports richer attributes for each entity.

**Example Usage (Conceptual):**

```python
# main_analyzer.py (Illustrative)
from typing import List # For type hinting
from architecture import load_architecture_from_yaml, Application
from threat_model import (
load_threat_intelligence_from_yaml, # New loader
identify_attack_surfaces, suggest_security_controls,
IdentifiedAttackSurface, ThreatActor, AttackVector, Vulnerability, SecurityControl # For type hinting
)

# 1. Load application architecture
app_arch: Application = load_architecture_from_yaml("example_architecture.yaml")

# 2. Load threat intelligence data (actors, vectors, vulnerabilities, controls)
threat_actors: List[ThreatActor]
attack_vectors: List[AttackVector]
known_vulnerabilities: List[Vulnerability]
available_controls: List[SecurityControl]

threat_actors, attack_vectors, known_vulnerabilities, available_controls = \
load_threat_intelligence_from_yaml("example_security_knowledge_base.yaml")

print(f"Loaded {len(threat_actors)} threat actors, {len(attack_vectors)} attack vectors, "
f"{len(known_vulnerabilities)} vulnerabilities, and {len(available_controls)} security controls.")

# 3. Identify attack surfaces
surfaces: List[IdentifiedAttackSurface] = identify_attack_surfaces(app_arch, known_vulnerabilities)
print("\\n--- Identified Attack Surfaces ---")
for surface in surfaces:
print(f"Surface: {surface.component_name} ({surface.component_type}) in {surface.network_zone}")
print(f" Reason: {surface.reason}")
if surface.potential_vulnerabilities:
print(" Potential Vulnerabilities:")
for vuln in surface.potential_vulnerabilities:
cvss_info = f", CVSS: {vuln.cvss_score}" if vuln.cvss_score is not None else ""
cve_info = f", CVE: {vuln.cve_id}" if vuln.cve_id else ""
print(f" - {vuln.name} (Severity: {vuln.severity}{cvss_info}{cve_info})")
print(f" Attack Vector: {vuln.attack_vector.name} (CWE: {vuln.attack_vector.cwe_id if vuln.attack_vector.cwe_id else 'N/A'})")
print(f" Impact: {vuln.impact_description}")
print(f" Exploitability: {vuln.exploitability}")


# 4. Suggest security controls
print("\\n--- Suggested Security Controls ---")
suggestions = suggest_security_controls(surfaces, available_controls)
for suggestion in suggestions:
print(f"Control: {suggestion.control.name} for {suggestion.applies_to_surface.component_name}")
print(f" Description: {suggestion.control.description}")
print(f" Reason: {suggestion.reason_for_suggestion}") # This is now very detailed
print(f" Status: {suggestion.control.implementation_status}, Owner: {suggestion.control.owner}")
print(f" Effectiveness: {suggestion.control.effectiveness}, Cost: {suggestion.control.cost_to_implement}")
print(f" Residual Risk: {suggestion.control.residual_risk}")
if suggestion.control.related_vulnerabilities:
print(f" Specifically addresses: {', '.join(suggestion.control.related_vulnerabilities)}")

```

## Contributing

Expand Down
140 changes: 140 additions & 0 deletions architecture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from typing import Union, List
import yaml

class NetworkZone:
def __init__(self, name: str):
self.name: str = name

class Component:
def __init__(self, network_zone: NetworkZone):
self.network_zone: NetworkZone = network_zone

class Service(Component):
def __init__(self, name: str, port: int, protocol: str, processes_sensitive_data: bool, network_zone: NetworkZone):
super().__init__(network_zone)
self.name: str = name
self.port: int = port
self.protocol: str = protocol
self.processes_sensitive_data: bool = processes_sensitive_data

class Database(Component):
def __init__(self, name: str, type: str, stores_sensitive_data: bool, network_zone: NetworkZone):
super().__init__(network_zone)
self.name: str = name
self.type: str = type
self.stores_sensitive_data: bool = stores_sensitive_data

class Application:
def __init__(self, services: List[Service], databases: List[Database], network_zones: List[NetworkZone]):
self.services: List[Service] = services
self.databases: List[Database] = databases
self.network_zones: List[NetworkZone] = network_zones

def load_architecture_from_yaml(file_path: str) -> Application:
try:
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Error: Architecture file not found at {file_path}")
except yaml.YAMLError as e:
raise yaml.YAMLError(f"Error: Invalid YAML format in {file_path}: {e}")

if not isinstance(data, dict):
raise ValueError(f"Error: Root of YAML must be a dictionary in {file_path}")

# Create NetworkZones
network_zones_data = data.get('network_zones', [])
if not isinstance(network_zones_data, list):
raise ValueError(f"Error: 'network_zones' must be a list in {file_path}")

network_zones_map = {}
created_network_zones = []
for nz_data in network_zones_data:
if not isinstance(nz_data, dict) or 'name' not in nz_data:
raise ValueError(f"Error: Invalid format for network zone entry in {file_path}: {nz_data}")
name = nz_data['name']
if not isinstance(name, str):
raise ValueError(f"Error: Network zone name must be a string in {file_path}: {name}")
network_zone = NetworkZone(name=name)
network_zones_map[name] = network_zone
created_network_zones.append(network_zone)

# Create Services
services_data = data.get('services', [])
if not isinstance(services_data, list):
raise ValueError(f"Error: 'services' must be a list in {file_path}")

created_services = []
for s_data in services_data:
if not isinstance(s_data, dict):
raise ValueError(f"Error: Invalid format for service entry in {file_path}: {s_data}")
try:
name = s_data['name']
port = s_data['port']
protocol = s_data['protocol']
nz_name = s_data['network_zone']
processes_sensitive_data = s_data.get('processes_sensitive_data', False) # Default to False

if not all(isinstance(val, str) for val in [name, protocol, nz_name]):
raise ValueError(f"Error: Service name, protocol, and network_zone name must be strings for service '{name}'.")
if not isinstance(port, int):
raise ValueError(f"Error: Service port must be an integer for service '{name}'.")
if not isinstance(processes_sensitive_data, bool):
raise ValueError(f"Error: Service processes_sensitive_data must be a boolean for service '{name}'.")


if nz_name not in network_zones_map:
raise ValueError(f"Error: Network zone '{nz_name}' for service '{name}' not defined in 'network_zones'.")

network_zone_obj = network_zones_map[nz_name]
service = Service(
name=name,
port=port,
protocol=protocol,
network_zone=network_zone_obj,
processes_sensitive_data=processes_sensitive_data
)
created_services.append(service)
except KeyError as e:
raise ValueError(f"Error: Missing required key {e} for a service entry in {file_path}: {s_data}")
except ValueError as e: # Catch ValueErrors from type checks
raise e


# Create Databases
databases_data = data.get('databases', [])
if not isinstance(databases_data, list):
raise ValueError(f"Error: 'databases' must be a list in {file_path}")

created_databases = []
for db_data in databases_data:
if not isinstance(db_data, dict):
raise ValueError(f"Error: Invalid format for database entry in {file_path}: {db_data}")
try:
name = db_data['name']
db_type = db_data['type']
nz_name = db_data['network_zone']
stores_sensitive_data = db_data.get('stores_sensitive_data', False) # Default to False

if not all(isinstance(val, str) for val in [name, db_type, nz_name]):
raise ValueError(f"Error: Database name, type, and network_zone name must be strings for database '{name}'.")
if not isinstance(stores_sensitive_data, bool):
raise ValueError(f"Error: Database stores_sensitive_data must be a boolean for database '{name}'.")

if nz_name not in network_zones_map:
raise ValueError(f"Error: Network zone '{nz_name}' for database '{name}' not defined in 'network_zones'.")

network_zone_obj = network_zones_map[nz_name]
database = Database(
name=name,
type=db_type,
network_zone=network_zone_obj,
stores_sensitive_data=stores_sensitive_data
)
created_databases.append(database)
except KeyError as e:
raise ValueError(f"Error: Missing required key {e} for a database entry in {file_path}: {db_data}")
except ValueError as e: # Catch ValueErrors from type checks
raise e

return Application(services=created_services, databases=created_databases, network_zones=created_network_zones)
37 changes: 37 additions & 0 deletions example_architecture.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
network_zones:
- name: public
- name: private
- name: dmz

services:
- name: frontend_web_server
port: 443
protocol: TCP
network_zone: public
processes_sensitive_data: false
- name: user_management_api
port: 8080
protocol: TCP
network_zone: private
processes_sensitive_data: true
- name: payment_gateway_proxy
port: 8443
protocol: TCP
network_zone: dmz
processes_sensitive_data: true
- name: internal_reporting_service
port: 9000
protocol: TCP
network_zone: private
processes_sensitive_data: false


databases:
- name: customer_data_db
type: SQL
network_zone: private
stores_sensitive_data: true
- name: session_cache_db
type: NoSQL
network_zone: private
stores_sensitive_data: false
Loading