-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathscan.py
More file actions
212 lines (180 loc) · 7.29 KB
/
scan.py
File metadata and controls
212 lines (180 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""Albator Scan Module — Audit a system against YAML security rules.
Loads rules from rules/*.yaml, optionally filters by compliance profile,
runs each rule's check command, and reports compliance status.
"""
import glob
import json
import os
import subprocess
import yaml
from odv import load_odv_defaults, get_effective_check_command
from exemptions import load_exemptions, get_exempt_ids, filter_rules_with_exemptions
def load_rules(rules_dir):
"""Load all YAML rule files from the rules directory."""
rules = []
for path in sorted(glob.glob(os.path.join(rules_dir, "os_*.yaml"))):
with open(path) as f:
data = yaml.safe_load(f)
if data and "id" in data and "check" in data:
data["_source"] = path
rules.append(data)
return rules
def load_profile(profiles_dir, profile_name):
"""Load a compliance profile and return the list of rule IDs."""
path = os.path.join(profiles_dir, f"{profile_name}.yaml")
if not os.path.exists(path):
raise FileNotFoundError(f"Profile not found: {path}")
with open(path) as f:
data = yaml.safe_load(f)
return data["profile"]["rules"]
def filter_rules_by_profile(rules, profile_rule_ids):
"""Filter rules to only those listed in a profile."""
id_set = set(profile_rule_ids)
return [r for r in rules if r["id"] in id_set]
def filter_rules_by_severity(rules, min_severity):
"""Filter rules to those at or above a minimum severity level."""
severity_order = {"low": 0, "medium": 1, "high": 2, "critical": 3}
threshold = severity_order.get(min_severity, 0)
return [r for r in rules if severity_order.get(r.get("severity", "low"), 0) >= threshold]
def run_check(rule, timeout=30, odv_values=None):
"""Run a rule's check command and return (passed, detail).
Returns (True, stdout) if exit code 0, (False, stderr/stdout) otherwise.
In case of execution errors (e.g., command not found), returns (False, error_message).
When odv_values is provided, uses check_odv template if available.
"""
check_cmd = get_effective_check_command(rule, odv_values)
if not check_cmd.strip():
return False, "empty check command"
try:
result = subprocess.run(
["bash", "-c", check_cmd],
capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0:
return True, (result.stdout or "").strip()
return False, (result.stderr or result.stdout or "").strip()
except subprocess.TimeoutExpired:
return False, f"check timed out after {timeout}s"
except OSError as e:
return False, str(e)
def scan(rules_dir, profiles_dir=None, profile_name=None,
min_severity=None, dry_run=False, timeout=30, odv_file=None,
exempt_file=None):
"""Run a compliance scan and return structured results.
Args:
rules_dir: Path to the rules/ directory.
profiles_dir: Path to config/profiles/ directory (needed if profile_name set).
profile_name: Optional profile to filter rules (e.g., 'cis_level1').
min_severity: Optional minimum severity filter.
dry_run: If True, list rules without executing checks.
timeout: Per-check timeout in seconds.
odv_file: Optional path to ODV overrides YAML file.
Returns:
dict with keys: rules_scanned, passed, failed, errors, results, summary.
"""
rules = load_rules(rules_dir)
# Load ODV overrides if provided
odv_values = None
if odv_file:
odv_values = load_odv_defaults(odv_file)
if profile_name:
if not profiles_dir:
raise ValueError("profiles_dir required when profile_name is set")
profile_ids = load_profile(profiles_dir, profile_name)
rules = filter_rules_by_profile(rules, profile_ids)
if min_severity:
rules = filter_rules_by_severity(rules, min_severity)
# Handle exemptions
exemptions = None
exempted_rules = []
if exempt_file:
exemptions = load_exemptions(exempt_file)
exempt_ids = get_exempt_ids(exemptions)
rules, exempted_rules = filter_rules_with_exemptions(rules, exempt_ids)
results = []
passed_count = 0
failed_count = 0
error_count = 0
for rule in rules:
entry = {
"id": rule["id"],
"title": rule.get("title", ""),
"severity": rule.get("severity", "unknown"),
}
if dry_run:
entry["status"] = "dry-run"
entry["check"] = get_effective_check_command(rule, odv_values)
else:
ok, detail = run_check(rule, timeout=timeout, odv_values=odv_values)
if ok:
entry["status"] = "pass"
passed_count += 1
else:
entry["status"] = "fail"
entry["detail"] = detail
failed_count += 1
results.append(entry)
# Add exempted rules to results with "exempt" status
for rule in exempted_rules:
ex_info = None
if exemptions:
ex_info = next((e for e in exemptions if e["rule_id"] == rule["id"]), None)
entry = {
"id": rule["id"],
"title": rule.get("title", ""),
"severity": rule.get("severity", "unknown"),
"status": "exempt",
}
if ex_info:
entry["exempt_reason"] = ex_info["reason"]
entry["exempt_approved_by"] = ex_info["approved_by"]
if ex_info["expires"]:
entry["exempt_expires"] = ex_info["expires"]
results.append(entry)
total = len(rules)
exempt_count = len(exempted_rules)
if dry_run:
passed_count = 0
failed_count = 0
return {
"rules_scanned": total,
"passed": passed_count,
"failed": failed_count,
"errors": error_count,
"exempt": exempt_count,
"dry_run": dry_run,
"profile": profile_name,
"results": results,
"summary": {
"total": total,
"passed": passed_count,
"failed": failed_count,
"exempt": exempt_count,
"compliance_pct": round(100.0 * passed_count / total, 1) if total > 0 else 0.0,
},
}
def format_scan_report(scan_result):
"""Format scan results as a human-readable report."""
lines = []
lines.append("Albator Compliance Scan Report")
lines.append("=" * 40)
if scan_result.get("profile"):
lines.append(f"Profile: {scan_result['profile']}")
if scan_result.get("dry_run"):
lines.append("Mode: DRY-RUN (checks not executed)")
lines.append(f"Rules scanned: {scan_result['rules_scanned']}")
lines.append("")
for r in scan_result["results"]:
status = r["status"].upper()
severity = r["severity"].upper()
lines.append(f"[{status}] [{severity}] {r['id']}: {r['title']}")
if r.get("detail"):
lines.append(f" {r['detail'][:120]}")
if r.get("check"):
lines.append(f" check: {r['check'][:120]}")
lines.append("")
lines.append("-" * 40)
s = scan_result["summary"]
exempt_str = f" Exempt: {s.get('exempt', 0)}" if s.get("exempt", 0) > 0 else ""
lines.append(f"Total: {s['total']} Passed: {s['passed']} Failed: {s['failed']}{exempt_str} Compliance: {s['compliance_pct']}%")
return "\n".join(lines)