Skip to content

Commit 1b7117f

Browse files
[sync] Let new subscribers know about already connected peers (backward-compatible) (#7344)
Revert #7011 and replace it with a backward-compatible solution suitable for backporting to a release branch. ### Review notes It's easier to review this PR per commit: the first commit is just a revert, so it's enough to review only the second one, which is almost a one-liner. --------- Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com> (cherry picked from commit ee30ec7)
1 parent 64fe443 commit 1b7117f

File tree

7 files changed

+39
-54
lines changed

7 files changed

+39
-54
lines changed

prdoc/pr_7344.prdoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
title: '[sync] Let new subscribers know about already connected peers (backward-compatible)'
2+
doc:
3+
- audience: Node Dev
4+
description: Revert https://github.com/paritytech/polkadot-sdk/pull/7011 and replace
5+
it with a backward-compatible solution suitable for backporting to a release branch.
6+
crates:
7+
- name: sc-network-gossip
8+
bump: patch
9+
- name: sc-network-statement
10+
bump: patch
11+
- name: sc-network-sync
12+
bump: patch
13+
- name: sc-network-transactions
14+
bump: patch

substrate/client/network-gossip/src/bridge.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
256256

257257
match sync_event_stream {
258258
Poll::Ready(Some(event)) => match event {
259-
SyncEvent::InitialPeers(peer_ids) =>
260-
this.network.add_set_reserved(peer_ids, this.protocol.clone()),
261-
SyncEvent::PeerConnected(peer_id) =>
262-
this.network.add_set_reserved(vec![peer_id], this.protocol.clone()),
263-
SyncEvent::PeerDisconnected(peer_id) =>
264-
this.network.remove_set_reserved(peer_id, this.protocol.clone()),
259+
SyncEvent::PeerConnected(remote) =>
260+
this.network.add_set_reserved(remote, this.protocol.clone()),
261+
SyncEvent::PeerDisconnected(remote) =>
262+
this.network.remove_set_reserved(remote, this.protocol.clone()),
265263
},
266264
// The sync event stream closed. Do the same for [`GossipValidator`].
267265
Poll::Ready(None) => {

substrate/client/network-gossip/src/lib.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,15 @@ mod validator;
8282

8383
/// Abstraction over a network.
8484
pub trait Network<B: BlockT>: NetworkPeers + NetworkEventStream {
85-
fn add_set_reserved(&self, peer_ids: Vec<PeerId>, protocol: ProtocolName) {
86-
let addrs = peer_ids
87-
.into_iter()
88-
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
89-
.collect();
90-
let result = self.add_peers_to_reserved_set(protocol, addrs);
85+
fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
86+
let addr = Multiaddr::empty().with(Protocol::P2p(*who.as_ref()));
87+
let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect());
9188
if let Err(err) = result {
9289
log::error!(target: "gossip", "add_set_reserved failed: {}", err);
9390
}
9491
}
95-
fn remove_set_reserved(&self, peer_id: PeerId, protocol: ProtocolName) {
96-
let result = self.remove_peers_from_reserved_set(protocol, iter::once(peer_id).collect());
92+
fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
93+
let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect());
9794
if let Err(err) = result {
9895
log::error!(target: "gossip", "remove_set_reserved failed: {}", err);
9996
}

substrate/client/network/statement/src/lib.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}
3333
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
3434
use sc_network::{
3535
config::{NonReservedPeerMode, SetConfig},
36-
error,
37-
multiaddr::{Multiaddr, Protocol},
36+
error, multiaddr,
3837
peer_store::PeerStoreProvider,
3938
service::{
4039
traits::{NotificationEvent, NotificationService, ValidationResult},
@@ -297,19 +296,9 @@ where
297296

298297
fn handle_sync_event(&mut self, event: SyncEvent) {
299298
match event {
300-
SyncEvent::InitialPeers(peer_ids) => {
301-
let addrs = peer_ids
302-
.into_iter()
303-
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
304-
.collect();
305-
let result =
306-
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
307-
if let Err(err) = result {
308-
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
309-
}
310-
},
311-
SyncEvent::PeerConnected(peer_id) => {
312-
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
299+
SyncEvent::PeerConnected(remote) => {
300+
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
301+
.collect::<multiaddr::Multiaddr>();
313302
let result = self.network.add_peers_to_reserved_set(
314303
self.protocol_name.clone(),
315304
iter::once(addr).collect(),
@@ -318,10 +307,10 @@ where
318307
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
319308
}
320309
},
321-
SyncEvent::PeerDisconnected(peer_id) => {
310+
SyncEvent::PeerDisconnected(remote) => {
322311
let result = self.network.remove_peers_from_reserved_set(
323312
self.protocol_name.clone(),
324-
iter::once(peer_id).collect(),
313+
iter::once(remote).collect(),
325314
);
326315
if let Err(err) = result {
327316
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");

substrate/client/network/sync/src/engine.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,8 +648,10 @@ where
648648
self.strategy.set_sync_fork_request(peers, &hash, number);
649649
},
650650
ToServiceCommand::EventStream(tx) => {
651-
let _ = tx
652-
.unbounded_send(SyncEvent::InitialPeers(self.peers.keys().cloned().collect()));
651+
// Let a new subscriber know about already connected peers.
652+
for peer_id in self.peers.keys() {
653+
let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
654+
}
653655
self.event_streams.push(tx);
654656
},
655657
ToServiceCommand::RequestJustification(hash, number) =>

substrate/client/network/sync/src/types.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,6 @@ where
127127

128128
/// Syncing-related events that other protocols can subscribe to.
129129
pub enum SyncEvent {
130-
/// All connected peers that the syncing implementation is tracking.
131-
/// Always sent as the first message to the stream.
132-
InitialPeers(Vec<PeerId>),
133-
134130
/// Peer that the syncing implementation is tracking connected.
135131
PeerConnected(PeerId),
136132

substrate/client/network/transactions/src/lib.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ use log::{debug, trace, warn};
3535
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
3636
use sc_network::{
3737
config::{NonReservedPeerMode, ProtocolId, SetConfig},
38-
error,
39-
multiaddr::{Multiaddr, Protocol},
38+
error, multiaddr,
4039
peer_store::PeerStoreProvider,
4140
service::{
4241
traits::{NotificationEvent, NotificationService, ValidationResult},
@@ -378,19 +377,9 @@ where
378377

379378
fn handle_sync_event(&mut self, event: SyncEvent) {
380379
match event {
381-
SyncEvent::InitialPeers(peer_ids) => {
382-
let addrs = peer_ids
383-
.into_iter()
384-
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
385-
.collect();
386-
let result =
387-
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
388-
if let Err(err) = result {
389-
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
390-
}
391-
},
392-
SyncEvent::PeerConnected(peer_id) => {
393-
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
380+
SyncEvent::PeerConnected(remote) => {
381+
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
382+
.collect::<multiaddr::Multiaddr>();
394383
let result = self.network.add_peers_to_reserved_set(
395384
self.protocol_name.clone(),
396385
iter::once(addr).collect(),
@@ -399,10 +388,10 @@ where
399388
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
400389
}
401390
},
402-
SyncEvent::PeerDisconnected(peer_id) => {
391+
SyncEvent::PeerDisconnected(remote) => {
403392
let result = self.network.remove_peers_from_reserved_set(
404393
self.protocol_name.clone(),
405-
iter::once(peer_id).collect(),
394+
iter::once(remote).collect(),
406395
);
407396
if let Err(err) = result {
408397
log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);

0 commit comments

Comments
 (0)