Skip to content

Commit ce23cbe

Browse files
mxindenAgeManning
andauthored
protocols/gossipsub: Fix inconsistency in mesh peer tracking (#2189)
Co-authored-by: Age Manning <[email protected]>
1 parent f701b24 commit ce23cbe

File tree

3 files changed

+79
-40
lines changed

3 files changed

+79
-40
lines changed

protocols/gossipsub/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# 0.33.0 [unreleased]
22

3+
- Improve internal peer tracking.
4+
[PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175)
5+
36
- Update dependencies.
47

58
- Allow `message_id_fn`s to accept closures that capture variables.

protocols/gossipsub/src/behaviour.rs

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,15 @@ where
12411241

12421242
let mut do_px = self.config.do_px();
12431243

1244+
// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1245+
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
1246+
for topic in &topics {
1247+
self.peer_topics
1248+
.entry(*peer_id)
1249+
.or_default()
1250+
.insert(topic.clone());
1251+
}
1252+
12441253
// we don't GRAFT to/from explicit peers; complain loudly if this happens
12451254
if self.explicit_peers.contains(peer_id) {
12461255
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
@@ -1283,7 +1292,7 @@ where
12831292
peer_score.add_penalty(peer_id, 1);
12841293
}
12851294
}
1286-
//no PX
1295+
// no PX
12871296
do_px = false;
12881297

12891298
to_prune_topics.insert(topic_hash.clone());
@@ -2808,34 +2817,33 @@ where
28082817
// Ignore connections from blacklisted peers.
28092818
if self.blacklisted_peers.contains(peer_id) {
28102819
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
2811-
return;
2812-
}
2813-
2814-
debug!("New peer connected: {}", peer_id);
2815-
// We need to send our subscriptions to the newly-connected node.
2816-
let mut subscriptions = vec![];
2817-
for topic_hash in self.mesh.keys() {
2818-
subscriptions.push(GossipsubSubscription {
2819-
topic_hash: topic_hash.clone(),
2820-
action: GossipsubSubscriptionAction::Subscribe,
2821-
});
2822-
}
2820+
} else {
2821+
debug!("New peer connected: {}", peer_id);
2822+
// We need to send our subscriptions to the newly-connected node.
2823+
let mut subscriptions = vec![];
2824+
for topic_hash in self.mesh.keys() {
2825+
subscriptions.push(GossipsubSubscription {
2826+
topic_hash: topic_hash.clone(),
2827+
action: GossipsubSubscriptionAction::Subscribe,
2828+
});
2829+
}
28232830

2824-
if !subscriptions.is_empty() {
2825-
// send our subscriptions to the peer
2826-
if self
2827-
.send_message(
2828-
*peer_id,
2829-
GossipsubRpc {
2830-
messages: Vec::new(),
2831-
subscriptions,
2832-
control_msgs: Vec::new(),
2833-
}
2834-
.into_protobuf(),
2835-
)
2836-
.is_err()
2837-
{
2838-
error!("Failed to send subscriptions, message too large");
2831+
if !subscriptions.is_empty() {
2832+
// send our subscriptions to the peer
2833+
if self
2834+
.send_message(
2835+
*peer_id,
2836+
GossipsubRpc {
2837+
messages: Vec::new(),
2838+
subscriptions,
2839+
control_msgs: Vec::new(),
2840+
}
2841+
.into_protobuf(),
2842+
)
2843+
.is_err()
2844+
{
2845+
error!("Failed to send subscriptions, message too large");
2846+
}
28392847
}
28402848
}
28412849

@@ -2854,9 +2862,10 @@ where
28542862
let topics = match self.peer_topics.get(peer_id) {
28552863
Some(topics) => (topics),
28562864
None => {
2857-
if !self.blacklisted_peers.contains(peer_id) {
2858-
debug!("Disconnected node, not in connected nodes");
2859-
}
2865+
debug_assert!(
2866+
self.blacklisted_peers.contains(peer_id),
2867+
"Disconnected node not in connected list"
2868+
);
28602869
return;
28612870
}
28622871
};
@@ -2890,12 +2899,12 @@ where
28902899
.get_mut(&topic)
28912900
.map(|peers| peers.remove(peer_id));
28922901
}
2893-
2894-
//forget px and outbound status for this peer
2895-
self.px_peers.remove(peer_id);
2896-
self.outbound_peers.remove(peer_id);
28972902
}
28982903

2904+
// Forget px and outbound status for this peer
2905+
self.px_peers.remove(peer_id);
2906+
self.outbound_peers.remove(peer_id);
2907+
28992908
// Remove peer from peer_topics and connected_peers
29002909
// NOTE: It is possible the peer has already been removed from all mappings if it does not
29012910
// support the protocol.
@@ -2913,11 +2922,6 @@ where
29132922
connection_id: &ConnectionId,
29142923
endpoint: &ConnectedPoint,
29152924
) {
2916-
// Ignore connections from blacklisted peers.
2917-
if self.blacklisted_peers.contains(peer_id) {
2918-
return;
2919-
}
2920-
29212925
// Check if the peer is an outbound peer
29222926
if let ConnectedPoint::Dialer { .. } = endpoint {
29232927
// Diverging from the go implementation we only want to consider a peer as outbound peer

protocols/gossipsub/src/behaviour/tests.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5228,4 +5228,36 @@ mod tests {
52285228
//nobody got penalized
52295229
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
52305230
}
5231+
5232+
#[test]
5233+
/// Test nodes that send grafts without subscriptions.
5234+
fn test_graft_without_subscribe() {
5235+
// The node should:
5236+
// - Create an empty vector in mesh[topic]
5237+
// - Send subscription request to all peers
5238+
// - run JOIN(topic)
5239+
5240+
let topic = String::from("test_subscribe");
5241+
let subscribe_topic = vec![topic.clone()];
5242+
let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()];
5243+
let (mut gs, peers, topic_hashes) = inject_nodes1()
5244+
.peer_no(1)
5245+
.topics(subscribe_topic)
5246+
.to_subscribe(false)
5247+
.create_network();
5248+
5249+
assert!(
5250+
gs.mesh.get(&topic_hashes[0]).is_some(),
5251+
"Subscribe should add a new entry to the mesh[topic] hashmap"
5252+
);
5253+
5254+
// The node sends a graft for the subscribe topic.
5255+
gs.handle_graft(&peers[0], subscribe_topic_hash);
5256+
5257+
// The node disconnects
5258+
gs.inject_disconnected(&peers[0]);
5259+
5260+
// We unsubscribe from the topic.
5261+
let _ = gs.unsubscribe(&Topic::new(topic));
5262+
}
52315263
}

0 commit comments

Comments
 (0)