Skip to content

Commit cfcf6bb

Browse files
troygiorshevPiRK
authored andcommitted
Per-Peer Message Capture
Summary: The purpose and scope of this feature is intentionally limited. It answers a question anyone new to Bitcoin's P2P protocol has had: "Can I see what messages my node is sending and receiving?". When a new debug-only command line argument `capturemessages` is set, any message that the node receives or sends is captured. The capture occurs in the `MessageHandler` thread. When receiving a message, it is captured as soon as the `MessageHandler` thread takes the message off of the `vProcessMsg` queue. When sending, the message is captured just before the message is pushed onto the `vSendMsg` queue. The message capture is as minimal as possible to reduce the performance impact on the node. Messages are captured to a new message_capture folder in the datadir. Each node has their own subfolder named with their IP address and port. Inside, received and sent messages are captured into two binary files, msgs_recv.dat and msgs_sent.dat, like so: ``` message_capture/203.0.113.7:56072/msgs_recv.dat message_capture/203.0.113.7:56072/msgs_sent.dat ``` Because the messages are raw binary dumps, included in this PR is a Python parsing tool to convert the binary files into human-readable JSON. This script has been placed on its own and out of the way in the new contrib/message-capture folder. Its usage is simple and easily discovered by the autogenerated -h option. This is a backport of [[bitcoin/bitcoin#19509 | core#19509]] Test Plan: `ninja all check-all` `src/bitcoind -capturemessages` `./contrib/message-capture/message-capture-parser.py -o out.json /bitcoinddata/message_capture/**/*.dat` Reviewers: #bitcoin_abc, Fabien Reviewed By: #bitcoin_abc, Fabien Subscribers: Fabien Differential Revision: https://reviews.bitcoinabc.org/D12531
1 parent 472e6c7 commit cfcf6bb

8 files changed

Lines changed: 399 additions & 10 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Per-Peer Message Capture
2+
3+
## Purpose
4+
5+
This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"
6+
7+
## Usage and Functionality
8+
9+
* Run `bitcoind` with the `-capturemessages` option.
10+
* Look in the `message_capture` folder in your datadir.
11+
* Typically this will be `~/.bitcoin/message_capture`.
12+
* See that there are many folders inside, one for each peer names with its IP address and port.
13+
* Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
14+
* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
15+
* See the `-h` option for help.
16+
* To see all messages, both sent and received, for all peers use:
17+
```
18+
./contrib/message-capture/message-capture-parser.py -o out.json \
19+
~/.bitcoin/message_capture/**/*.dat
20+
```
21+
* Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
22+
* If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
23+
* View the resulting output.
24+
* The output file is `JSON` formatted.
25+
* Suggestion: use `jq` to view the output, with `jq . out.json`
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2020 The Bitcoin Core developers
3+
# Distributed under the MIT software license, see the accompanying
4+
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
"""Parse message capture binary files.
6+
To be used in conjunction with -capturemessages.
7+
"""
8+
9+
import argparse
10+
import os
11+
import shutil
12+
import sys
13+
from io import BytesIO
14+
import json
15+
from pathlib import Path
16+
from typing import Any, Dict, List, Optional
17+
18+
sys.path.append(
19+
os.path.join(
20+
os.path.dirname(__file__),
21+
'../../test/functional'))
22+
23+
from test_framework.messages import ser_uint256 # noqa: E402
24+
from test_framework.p2p import MESSAGEMAP # noqa: E402
25+
26+
TIME_SIZE = 8
27+
LENGTH_SIZE = 4
28+
MSGTYPE_SIZE = 12
29+
30+
# The test framework classes stores hashes as large ints in many cases.
31+
# These are variables of type uint256 in core.
32+
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
33+
# As such, they are itemized here.
34+
# Any variables with these names that are of type int are actually uint256 variables.
35+
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
36+
HASH_INTS = [
37+
"blockhash",
38+
"block_hash",
39+
"hash",
40+
"hashMerkleRoot",
41+
"hashPrevBlock",
42+
"hashstop",
43+
"limited_proofid",
44+
"prev_header",
45+
"sha256",
46+
"stop_hash",
47+
]
48+
49+
HASH_INT_VECTORS = [
50+
"hashes",
51+
"headers",
52+
"vHave",
53+
"vHash",
54+
]
55+
56+
57+
class ProgressBar:
58+
def __init__(self, total: float):
59+
self.total = total
60+
self.running = 0
61+
62+
def set_progress(self, progress: float):
63+
cols = shutil.get_terminal_size()[0]
64+
if cols <= 12:
65+
return
66+
max_blocks = cols - 9
67+
num_blocks = int(max_blocks * progress)
68+
print('\r[ {}{} ] {:3.0f}%'
69+
.format('#' * num_blocks,
70+
' ' * (max_blocks - num_blocks),
71+
progress * 100),
72+
end='')
73+
74+
def update(self, more: float):
75+
self.running += more
76+
self.set_progress(self.running / self.total)
77+
78+
79+
def to_jsonable(obj: Any) -> Any:
80+
if hasattr(obj, "__dict__"):
81+
return obj.__dict__
82+
elif hasattr(obj, "__slots__"):
83+
ret: Dict[str, Any] = {}
84+
for slot in obj.__slots__:
85+
val = getattr(obj, slot, None)
86+
if slot in HASH_INTS and isinstance(val, int):
87+
ret[slot] = ser_uint256(val).hex()
88+
elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
89+
ret[slot] = [ser_uint256(a).hex() for a in val]
90+
else:
91+
ret[slot] = to_jsonable(val)
92+
return ret
93+
elif isinstance(obj, list):
94+
return [to_jsonable(a) for a in obj]
95+
elif isinstance(obj, bytes):
96+
return obj.hex()
97+
else:
98+
return obj
99+
100+
101+
def process_file(path: str, messages: List[Any], recv: bool,
102+
progress_bar: Optional[ProgressBar]) -> None:
103+
with open(path, 'rb') as f_in:
104+
if progress_bar:
105+
bytes_read = 0
106+
107+
while True:
108+
if progress_bar:
109+
# Update progress bar
110+
diff = f_in.tell() - bytes_read - 1
111+
progress_bar.update(diff)
112+
bytes_read = f_in.tell() - 1
113+
114+
# Read the Header
115+
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
116+
if not tmp_header_raw:
117+
break
118+
tmp_header = BytesIO(tmp_header_raw)
119+
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little")
120+
msgtype: bytes = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0]
121+
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")
122+
123+
# Start converting the message to a dictionary
124+
msg_dict = {}
125+
msg_dict["direction"] = "recv" if recv else "sent"
126+
msg_dict["time"] = time
127+
# "size" is less readable here, but more readable in the output
128+
msg_dict["size"] = length
129+
130+
msg_ser = BytesIO(f_in.read(length))
131+
132+
# Determine message type
133+
if msgtype not in MESSAGEMAP:
134+
# Unrecognized message type
135+
try:
136+
msgtype_tmp = msgtype.decode()
137+
if not msgtype_tmp.isprintable():
138+
raise UnicodeDecodeError
139+
msg_dict["msgtype"] = msgtype_tmp
140+
except UnicodeDecodeError:
141+
msg_dict["msgtype"] = "UNREADABLE"
142+
msg_dict["body"] = msg_ser.read().hex()
143+
msg_dict["error"] = "Unrecognized message type."
144+
messages.append(msg_dict)
145+
print(
146+
f"WARNING - Unrecognized message type {msgtype} in {path}",
147+
file=sys.stderr)
148+
continue
149+
150+
# Deserialize the message
151+
msg = MESSAGEMAP[msgtype]()
152+
msg_dict["msgtype"] = msgtype.decode()
153+
154+
try:
155+
msg.deserialize(msg_ser)
156+
except KeyboardInterrupt:
157+
raise
158+
except Exception:
159+
# Unable to deserialize message body
160+
msg_ser.seek(0, os.SEEK_SET)
161+
msg_dict["body"] = msg_ser.read().hex()
162+
msg_dict["error"] = "Unable to deserialize message."
163+
messages.append(msg_dict)
164+
print(
165+
f"WARNING - Unable to deserialize message in {path}",
166+
file=sys.stderr)
167+
continue
168+
169+
# Convert body of message into a jsonable object
170+
if length:
171+
msg_dict["body"] = to_jsonable(msg)
172+
messages.append(msg_dict)
173+
174+
if progress_bar:
175+
# Update the progress bar to the end of the current file
176+
# in case we exited the loop early
177+
# Go to end of file
178+
f_in.seek(0, os.SEEK_END)
179+
diff = f_in.tell() - bytes_read - 1
180+
progress_bar.update(diff)
181+
182+
183+
def main():
184+
parser = argparse.ArgumentParser(
185+
description=__doc__,
186+
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(
187+
sys.argv[0]),
188+
formatter_class=argparse.RawTextHelpFormatter)
189+
parser.add_argument(
190+
"capturepaths",
191+
nargs='+',
192+
help="binary message capture files to parse.")
193+
parser.add_argument(
194+
"-o", "--output",
195+
help="output file. If unset print to stdout")
196+
parser.add_argument(
197+
"-n", "--no-progress-bar",
198+
action='store_true',
199+
help="disable the progress bar. Automatically set if the output is not a terminal")
200+
args = parser.parse_args()
201+
capturepaths = [Path.cwd() / Path(capturepath)
202+
for capturepath in args.capturepaths]
203+
output = Path.cwd() / Path(args.output) if args.output else False
204+
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
205+
206+
messages: List[Any] = []
207+
if use_progress_bar:
208+
total_size = sum(capture.stat().st_size for capture in capturepaths)
209+
progress_bar = ProgressBar(total_size)
210+
else:
211+
progress_bar = None
212+
213+
for capture in capturepaths:
214+
process_file(
215+
str(capture),
216+
messages,
217+
"recv" in capture.stem,
218+
progress_bar)
219+
220+
messages.sort(key=lambda msg: msg['time'])
221+
222+
if use_progress_bar:
223+
progress_bar.set_progress(1)
224+
jsonrep = json.dumps(messages)
225+
if output:
226+
with open(str(output), 'w+', encoding="utf8") as f_out:
227+
f_out.write(jsonrep)
228+
else:
229+
print(jsonrep)
230+
231+
232+
if __name__ == "__main__":
233+
main()

src/init.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,9 @@ void SetupServerArgs(NodeContext &node) {
10411041
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost",
10421042
ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY,
10431043
OptionsCategory::DEBUG_TEST);
1044-
1044+
argsman.AddArg("-capturemessages", "Capture all P2P messages to disk",
1045+
ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY,
1046+
OptionsCategory::DEBUG_TEST);
10451047
argsman.AddArg("-debug=<category>",
10461048
strprintf("Output debugging information (default: %u, "
10471049
"supplying <category> is optional)",
@@ -1866,19 +1868,19 @@ bool AppInitParameterInteraction(Config &config, const ArgsManager &args) {
18661868
// Trim requested connection counts, to fit into system limitations
18671869
// <int> in std::min<int>(...) to work around FreeBSD compilation issue
18681870
// described in #2695
1869-
nFD = RaiseFileDescriptorLimit(nMaxConnections + nBind +
1870-
MIN_CORE_FILEDESCRIPTORS +
1871-
MAX_ADDNODE_CONNECTIONS);
1871+
nFD = RaiseFileDescriptorLimit(
1872+
nMaxConnections + nBind + MIN_CORE_FILEDESCRIPTORS +
1873+
MAX_ADDNODE_CONNECTIONS + NUM_FDS_MESSAGE_CAPTURE);
18721874
#ifdef USE_POLL
18731875
int fd_max = nFD;
18741876
#else
18751877
int fd_max = FD_SETSIZE;
18761878
#endif
1877-
nMaxConnections =
1878-
std::max(std::min<int>(nMaxConnections, fd_max - nBind -
1879-
MIN_CORE_FILEDESCRIPTORS -
1880-
MAX_ADDNODE_CONNECTIONS),
1881-
0);
1879+
nMaxConnections = std::max(
1880+
std::min<int>(nMaxConnections,
1881+
fd_max - nBind - MIN_CORE_FILEDESCRIPTORS -
1882+
MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE),
1883+
0);
18821884
if (nFD < MIN_CORE_FILEDESCRIPTORS) {
18831885
return InitError(_("Not enough file descriptors available."));
18841886
}

src/net.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3503,6 +3503,9 @@ void CConnman::PushMessage(CNode *pnode, CSerializedNetMsg &&msg) {
35033503
size_t nMessageSize = msg.data.size();
35043504
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n",
35053505
SanitizeString(msg.m_type), nMessageSize, pnode->GetId());
3506+
if (gArgs.GetBoolArg("-capturemessages", false)) {
3507+
CaptureMessage(pnode->addr, msg.m_type, msg.data, /*incoming=*/false);
3508+
}
35063509

35073510
// make sure we use the appropriate network transport format
35083511
std::vector<uint8_t> serializedHeader;
@@ -3629,3 +3632,32 @@ std::string userAgent(const Config &config) {
36293632
// Size compliance is checked at startup, it is safe to not check it again
36303633
return FormatUserAgent(client_name, client_version, uacomments);
36313634
}
3635+
3636+
void CaptureMessage(const CAddress &addr, const std::string &msg_type,
3637+
const Span<const uint8_t> &data, bool is_incoming) {
3638+
// Note: This function captures the message at the time of processing,
3639+
// not at socket receive/send time.
3640+
// This ensures that the messages are always in order from an application
3641+
// layer (processing) perspective.
3642+
auto now = GetTime<std::chrono::microseconds>();
3643+
3644+
// Windows folder names can not include a colon
3645+
std::string clean_addr = addr.ToString();
3646+
std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
3647+
3648+
fs::path base_path = gArgs.GetDataDirNet() / "message_capture" / clean_addr;
3649+
fs::create_directories(base_path);
3650+
3651+
fs::path path =
3652+
base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
3653+
CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
3654+
3655+
ser_writedata64(f, now.count());
3656+
f.write(msg_type.data(), msg_type.length());
3657+
for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
3658+
f << '\0';
3659+
}
3660+
uint32_t size = data.size();
3661+
ser_writedata32(f, size);
3662+
f.write((const char *)data.data(), data.size());
3663+
}

src/net.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <pubkey.h>
2626
#include <radix.h>
2727
#include <random.h>
28+
#include <span.h>
2829
#include <streams.h>
2930
#include <sync.h>
3031
#include <threadinterrupt.h>
@@ -94,6 +95,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0;
9495
static const bool DEFAULT_BLOCKSONLY = false;
9596
/** -peertimeout default */
9697
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
98+
/** Number of file descriptors required for message capture **/
99+
static const int NUM_FDS_MESSAGE_CAPTURE = 1;
97100

98101
static const bool DEFAULT_FORCEDNSSEED = false;
99102
static const bool DEFAULT_DNSSEED = true;
@@ -1438,6 +1441,10 @@ PoissonNextSend(std::chrono::microseconds now,
14381441
std::string getSubVersionEB(uint64_t MaxBlockSize);
14391442
std::string userAgent(const Config &config);
14401443

1444+
/** Dump binary message to file, with timestamp */
1445+
void CaptureMessage(const CAddress &addr, const std::string &msg_type,
1446+
const Span<const uint8_t> &data, bool is_incoming);
1447+
14411448
struct NodeEvictionCandidate {
14421449
NodeId id;
14431450
std::chrono::seconds m_connected;

src/net_processing.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5866,6 +5866,11 @@ bool PeerManagerImpl::ProcessMessages(const Config &config, CNode *pfrom,
58665866
}
58675867
CNetMessage &msg(msgs.front());
58685868

5869+
if (gArgs.GetBoolArg("-capturemessages", false)) {
5870+
CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv),
5871+
/*incoming=*/true);
5872+
}
5873+
58695874
msg.SetVersion(pfrom->GetCommonVersion());
58705875

58715876
// Check network magic

0 commit comments

Comments
 (0)