-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpcap_decrypter.py
More file actions
649 lines (526 loc) · 21.6 KB
/
pcap_decrypter.py
File metadata and controls
649 lines (526 loc) · 21.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "cryptography",
# ]
# ///
"""
C2 PCAP 解密工具
从 config.json 读取加密参数,使用 tshark 从 PCAP 文件提取并解密 C2 流量。
使用方法:
uv run scripts/decrypt_pcap.py <pcap_file> -c <config_file>
示例:
uv run scripts/decrypt_pcap.py cleaned.pcap -c decrypted_config.json
uv run scripts/decrypt_pcap.py cleaned.pcap -c decrypted_config.json -o result.json
加密参数 (从逆向分析得出):
- 算法: AES-256-GCM
- 密钥: md5(salt).hexdigest().encode('ascii') (32 字节)
- Nonce: 12 字节 (消息前缀)
- Tag: 16 字节 (消息后缀)
- 消息格式: [4字节LE长度][nonce 12字节][ciphertext][tag 16字节]
依赖:
- tshark (Wireshark CLI) 用于 PCAP 解析
"""
import argparse
import hashlib
import json
import struct
import subprocess
import sys
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List, Tuple
from urllib.parse import urlparse
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
@dataclass
class Config:
"""C2 配置"""
salt: str
vkey: str
server: str
server_ip: str
server_port: int
conn_type: str # ws, tcp, etc.
@classmethod
def from_json(cls, config_path: str) -> 'Config':
"""从 JSON 文件加载配置"""
with open(config_path, 'r') as f:
data = json.load(f)
# 解析服务器地址
server = data.get('server', '')
parsed = urlparse(server)
if parsed.hostname:
server_ip = parsed.hostname
server_port = parsed.port or 80
else:
# 尝试直接解析 ip:port 格式
parts = server.replace('ws://', '').replace('tcp://', '').split(':')
server_ip = parts[0] if parts else ''
server_port = int(parts[1]) if len(parts) > 1 else 80
return cls(
salt=data.get('salt', ''),
vkey=data.get('vkey', ''),
server=server,
server_ip=server_ip,
server_port=server_port,
conn_type=data.get('type', 'ws')
)
def get_encryption_key(self) -> bytes:
"""获取 AES-256 加密密钥"""
# md5(salt) 返回十六进制字符串作为密钥
md5_hex = hashlib.md5(self.salt.encode()).hexdigest()
return md5_hex.encode('ascii') # 32 字节
def get_vkey_hash(self) -> str:
"""获取 vkey 的 MD5 哈希 (用于验证)"""
return hashlib.md5(self.vkey.encode()).hexdigest()
@dataclass
class EncryptedMessage:
"""加密消息"""
src_port: int
dst_port: int
data: bytes
direction: str = 'unknown' # 'C2S' (Client→Server) 或 'S2C' (Server→Client)
@property
def nonce(self) -> bytes:
return self.data[:12]
@property
def ciphertext_with_tag(self) -> bytes:
return self.data[12:]
@property
def direction_arrow(self) -> str:
"""方向箭头符号"""
return '→' if self.direction == 'C2S' else '←' if self.direction == 'S2C' else '?'
@property
def direction_label(self) -> str:
"""方向标签"""
if self.direction == 'C2S':
return 'Client → Server'
elif self.direction == 'S2C':
return 'Server → Client'
return 'Unknown'
class PcapDecryptor:
"""PCAP 解密器"""
NONCE_SIZE = 12
TAG_SIZE = 16
LENGTH_SIZE = 4
def __init__(self, config: Config):
self.config = config
self.key = config.get_encryption_key()
self.aesgcm = AESGCM(self.key)
def extract_websocket_payload(self, data: bytes) -> Optional[bytes]:
"""从 WebSocket 帧中提取 payload"""
if len(data) < 2:
return None
# WebSocket 帧格式
opcode = data[0] & 0x0F
masked = (data[1] & 0x80) != 0
payload_len = data[1] & 0x7F
offset = 2
if payload_len == 126:
if len(data) < 4:
return None
payload_len = struct.unpack('>H', data[2:4])[0]
offset = 4
elif payload_len == 127:
if len(data) < 10:
return None
payload_len = struct.unpack('>Q', data[2:10])[0]
offset = 10
if masked:
if len(data) < offset + 4:
return None
mask_key = data[offset:offset+4]
offset += 4
if len(data) < offset + payload_len:
return None
# 解除掩码
payload = bytearray(data[offset:offset+payload_len])
for i in range(len(payload)):
payload[i] ^= mask_key[i % 4]
return bytes(payload)
else:
if len(data) < offset + payload_len:
return None
return data[offset:offset+payload_len]
def extract_tcp_payloads_tshark(self, pcap_path: str) -> List[dict]:
"""使用 tshark 提取 TCP payloads,保持时间顺序"""
try:
# 添加 frame.number 以保持时间顺序
result = subprocess.run(
['tshark', '-r', pcap_path, '-T', 'fields',
'-e', 'frame.number', '-e', 'tcp.srcport', '-e', 'tcp.dstport', '-e', 'tcp.payload',
'-Y', 'tcp.payload'],
capture_output=True, text=True
)
# 按帧号排序的数据包列表
packets = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
# 处理制表符分隔的格式: frame_num \t src_port \t dst_port \t payload
parts = line.split('\t')
if len(parts) < 4:
# 尝试空格分隔
parts = line.split(None, 3)
if len(parts) < 4:
continue
try:
frame_num = int(parts[0])
src_port = int(parts[1])
dst_port = int(parts[2])
except ValueError:
continue
payload_hex = parts[3].replace(':', '').strip()
if not payload_hex:
continue
try:
payload = bytes.fromhex(payload_hex)
except ValueError:
continue
# 确定方向
if dst_port == self.config.server_port:
direction = 'C2S' # Client → Server
elif src_port == self.config.server_port:
direction = 'S2C' # Server → Client
else:
direction = 'unknown'
packets.append({
'frame': frame_num,
'src_port': src_port,
'dst_port': dst_port,
'direction': direction,
'payload': payload
})
# 按帧号排序
packets.sort(key=lambda x: x['frame'])
return packets
except FileNotFoundError:
print("错误: 需要安装 tshark (Wireshark CLI)", file=sys.stderr)
sys.exit(1)
def extract_messages(self, pcap_path: str) -> List[EncryptedMessage]:
"""从 PCAP 提取加密消息,保持时间顺序"""
# 使用 tshark 提取 TCP payloads (按帧号排序)
packets = self.extract_tcp_payloads_tshark(pcap_path)
if not packets:
return []
# 每个流的 buffer: key = (src_port, dst_port)
stream_buffers = {}
messages = []
# 按时间顺序处理每个包
for pkt in packets:
key = (pkt['src_port'], pkt['dst_port'])
if key not in stream_buffers:
stream_buffers[key] = {
'buffer': bytearray(),
'direction': pkt['direction'],
'http_skipped': False
}
stream = stream_buffers[key]
stream['buffer'].extend(pkt['payload'])
# 跳过 HTTP 握手部分 (WebSocket 升级请求) - 只处理一次
if not stream['http_skipped']:
raw_data = bytes(stream['buffer'])
http_end = raw_data.find(b'\r\n\r\n')
if http_end != -1 and (raw_data.startswith(b'GET ') or raw_data.startswith(b'HTTP/')):
stream['buffer'] = bytearray(raw_data[http_end + 4:])
stream['http_skipped'] = True
# 尝试从 buffer 中提取完整消息
self._extract_complete_messages(stream, pkt['src_port'], pkt['dst_port'], messages)
return messages
def _extract_complete_messages(self, stream: dict, src_port: int, dst_port: int, messages: list):
"""从 buffer 中提取所有完整的消息"""
buffer = stream['buffer']
direction = stream['direction']
while len(buffer) >= 4:
msg_len = struct.unpack('<I', buffer[:4])[0]
# 检查合理性
if msg_len > 100000 or msg_len < self.NONCE_SIZE + self.TAG_SIZE:
# 无效长度,跳过一个字节
del buffer[0]
continue
total_len = 4 + msg_len
if len(buffer) < total_len:
# 消息不完整,等待更多数据
break
msg_data = bytes(buffer[4:total_len])
messages.append(EncryptedMessage(
src_port=src_port,
dst_port=dst_port,
data=msg_data,
direction=direction
))
# 移除已处理的数据
del buffer[:total_len]
def decrypt_message(self, msg: EncryptedMessage) -> Tuple[Optional[bytes], Optional[str]]:
"""解密单条消息"""
if len(msg.data) < self.NONCE_SIZE + self.TAG_SIZE:
return None, "消息太短"
try:
plaintext = self.aesgcm.decrypt(msg.nonce, msg.ciphertext_with_tag, None)
return plaintext, None
except Exception as e:
return None, str(e)
def analyze_plaintext(self, plaintext: bytes) -> dict:
"""分析明文内容"""
result = {
'raw': plaintext,
'type': 'unknown',
'content': None,
}
try:
text = plaintext.decode('utf-8')
result['text'] = text
# 检查是否是版本号
stripped = text.strip('\x00').strip()
if stripped.count('.') == 2 and len(stripped) <= 10:
try:
parts = stripped.split('.')
if all(p.isdigit() for p in parts):
result['type'] = 'version'
result['content'] = stripped
return result
except:
pass
# 检查是否是 MD5 哈希 (32 字符十六进制)
if len(text) == 32 and all(c in '0123456789abcdef' for c in text):
result['type'] = 'hash'
result['content'] = text
# 检查是否是 vkey 哈希
if text == self.config.get_vkey_hash():
result['is_vkey_hash'] = True
return result
# 检查是否是隧道类型
if text in ('conf', 'main', 'chan', 'file', 'health'):
result['type'] = 'tunnel_type'
result['content'] = text
return result
# 检查是否包含 JSON
if text.startswith('{') or (text.startswith('conf#') or text.startswith('main#')):
result['type'] = 'json_data'
if '#' in text:
prefix, json_str = text.split('#', 1)
result['prefix'] = prefix
try:
result['content'] = json.loads(json_str)
except json.JSONDecodeError:
result['content'] = json_str
else:
try:
result['content'] = json.loads(text)
except json.JSONDecodeError:
result['content'] = text
return result
result['type'] = 'text'
result['content'] = text
except UnicodeDecodeError:
result['type'] = 'binary'
result['content'] = plaintext.hex()
result['length'] = len(plaintext)
parsed_data = []
try:
len_plain = len(plaintext)
lener = struct.unpack('<I', plaintext[0:4])[0]
content = plaintext[4:]
# args_len = struct.unpack('<I', plaintext[1:5])[0]
parsed_data = {
'len': lener,
'content': content.decode(errors='ignore')
}
try:
json_content = json.loads(content.decode())
parsed_data['json_content'] = json_content
except:
pass
except Exception as e:
parsed_data = {
'error': str(e)
}
result['parsed'] = parsed_data
return result
def print_report(config: Config, messages: List[EncryptedMessage],
decryptor: PcapDecryptor, output_json: str = None):
"""打印解密报告"""
print("=" * 70)
print("C2 流量解密报告")
print("=" * 70)
print(f"\n📋 配置信息:")
print(f" Salt: {config.salt}")
print(f" VKey: {config.vkey}")
print(f" VKey MD5: {config.get_vkey_hash()}")
print(f" 服务器: {config.server}")
print(f" 服务器 IP: {config.server_ip}")
print(f" 服务器端口: {config.server_port}")
print(f"\n🔐 加密参数:")
print(f" 算法: AES-256-GCM")
print(f" 密钥 (hex): {config.get_encryption_key().hex()}")
print(f" 密钥 (ASCII): {config.get_encryption_key().decode()}")
print(f"\n📊 消息统计:")
print(f" 提取消息数: {len(messages)}")
# 按方向统计
c2s_count = sum(1 for msg in messages if msg.direction == 'C2S')
s2c_count = sum(1 for msg in messages if msg.direction == 'S2C')
print(f" 📤 Client → Server: {c2s_count}")
print(f" 📥 Server → Client: {s2c_count}")
# 按端口分组
ports = set(msg.src_port for msg in messages)
print(f" 来源端口: {sorted(ports)}")
results = []
print("\n" + "=" * 70)
print("解密结果")
print("=" * 70)
for i, msg in enumerate(messages, 1):
plaintext, error = decryptor.decrypt_message(msg)
# 方向指示
dir_icon = '📤' if msg.direction == 'C2S' else '📥' if msg.direction == 'S2C' else '❓'
print(f"\n--- 消息 {i} {dir_icon} {msg.direction_label} ({msg.src_port}→{msg.dst_port}, {len(msg.data)} 字节) ---")
print(f" Nonce: {msg.nonce.hex()}")
result = {
'index': i,
'direction': msg.direction,
'direction_label': msg.direction_label,
'src_port': msg.src_port,
'dst_port': msg.dst_port,
'size': len(msg.data),
'nonce': msg.nonce.hex(),
}
if plaintext:
analysis = decryptor.analyze_plaintext(plaintext)
result['status'] = 'success'
result['plaintext_size'] = len(plaintext)
result['type'] = analysis['type']
print(f" ✅ 解密成功 ({len(plaintext)} 字节)")
print(f" 类型: {analysis['type']}")
if analysis['type'] == 'version':
print(f" 版本: {analysis['content']}")
result['content'] = analysis['content']
elif analysis['type'] == 'hash':
is_vkey = analysis.get('is_vkey_hash', False)
print(f" 哈希: {analysis['content']}")
if is_vkey:
print(f" ⚠️ 匹配 vkey 的 MD5 (验证令牌)")
result['content'] = analysis['content']
result['is_vkey_hash'] = is_vkey
elif analysis['type'] == 'tunnel_type':
print(f" 隧道: {analysis['content']}")
result['content'] = analysis['content']
elif analysis['type'] == 'json_data':
if 'prefix' in analysis:
print(f" 前缀: {analysis['prefix']}")
result['prefix'] = analysis['prefix']
if isinstance(analysis['content'], dict):
print(f" JSON:")
print(f" {json.dumps(analysis['content'], indent=4, ensure_ascii=False)}")
else:
print(f" 内容: {analysis['content']}")
result['content'] = analysis['content']
elif analysis['type'] == 'binary':
print(f" 二进制: {analysis['content']}")
result['content'] = analysis['content']
result['parsed'] = analysis.get('parsed', {})
print(f" 解析: {analysis.get('parsed', {})}")
else:
text = analysis.get('text', analysis.get('content', plaintext.hex()))
print(f" 内容: {repr(text)}")
result['content'] = text
else:
print(f" ❌ 解密失败: {error}")
result['status'] = 'failed'
result['error'] = error
results.append(result)
# 输出 JSON
if output_json:
output_data = {
'config': {
'salt': config.salt,
'vkey': config.vkey,
'vkey_md5': config.get_vkey_hash(),
'server': config.server,
},
'encryption': {
'algorithm': 'AES-256-GCM',
'key_hex': config.get_encryption_key().hex(),
'key_ascii': config.get_encryption_key().decode(),
},
'messages': results
}
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n📁 结果已保存到: {output_json}")
# 打印协议摘要
print("\n" + "=" * 70)
print("协议摘要")
print("=" * 70)
tunnel_types = {}
versions = set()
hashes = []
host_info = None
for r in results:
if r.get('status') != 'success':
continue
msg_type = r.get('type')
content = r.get('content')
if msg_type == 'version':
versions.add(content)
elif msg_type == 'hash':
hashes.append({
'hash': content,
'is_vkey': r.get('is_vkey_hash', False)
})
elif msg_type == 'tunnel_type':
tunnel_types[content] = tunnel_types.get(content, 0) + 1
elif msg_type == 'json_data' and isinstance(content, dict):
if 'LocalIP' in content or 'HostName' in content:
host_info = content
if versions:
print(f"\n📌 客户端版本: {', '.join(versions)}")
if tunnel_types:
print(f"\n📡 隧道类型统计:")
for t, count in tunnel_types.items():
print(f" - {t}: {count} 次")
if hashes:
print(f"\n🔑 验证令牌:")
for h in hashes:
mark = " (vkey)" if h['is_vkey'] else ""
print(f" - {h['hash']}{mark}")
if host_info:
print(f"\n🖥️ 受害主机信息:")
for key in ['LocalIP', 'UserName', 'HostName', 'OsName', 'ProcessName']:
if key in host_info:
print(f" {key}: {host_info[key]}")
def main():
parser = argparse.ArgumentParser(
description='从 PCAP 文件解密 C2 流量',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s cleaned.pcap -c decrypted_config.json
%(prog)s cleaned.pcap -c decrypted_config.json -o decrypted.json
"""
)
parser.add_argument('pcap', help='PCAP 文件路径')
parser.add_argument('-c', '--config', required=True, help='配置文件路径 (JSON)')
parser.add_argument('-o', '--output', help='输出 JSON 文件路径')
args = parser.parse_args()
# 检查文件
if not Path(args.pcap).exists():
print(f"错误: PCAP 文件不存在: {args.pcap}", file=sys.stderr)
sys.exit(1)
if not Path(args.config).exists():
print(f"错误: 配置文件不存在: {args.config}", file=sys.stderr)
sys.exit(1)
# 加载配置
print(f"📖 加载配置: {args.config}")
config = Config.from_json(args.config)
# 创建解密器
decryptor = PcapDecryptor(config)
# 提取消息
print(f"📦 解析 PCAP: {args.pcap}")
messages = decryptor.extract_messages(args.pcap)
if not messages:
print("⚠️ 未找到加密消息")
sys.exit(1)
# 打印报告
print_report(config, messages, decryptor, args.output)
if __name__ == '__main__':
main()