Skip to content

Commit e18c093

Browse files
alexgghdmitry-markinlexnv
authored andcommitted
authorithy-discovery: Make changing of peer-id while active a bit more robust (paritytech#3786)
In the case when nodes don't persist their node-key or they want to generate a new one while being in the active set, things go wrong because both the old addresses and the new ones will still be present in DHT, so because of the distributed nature of the DHT both will survive in the network untill the old ones expires which is 36 hours. Nodes in the network will randomly resolve the authorithy-id to the old address or the new one. More details in: paritytech#3673 This PR proposes we mitigate this problem, by: 1. Let the query for a DHT key retrieve more than one results(4), that is also bounded by the replication factor which is 20, currently we interrupt the querry on the first result. ~2. Modify the authority-discovery service to keep all the discovered addresses around for 24h since they last seen an address.~ ~3. Plumb through other subsystems where the assumption was that an authorithy-id will resolve only to one PeerId. Currently, the authorithy-discovery keeps just the last record it received from DHT and queries the DHT every 10 minutes. But they could always receive only the old address, only the new address or a flip-flop between them depending on what node wins the race to provide the record~ 2. Extend the `SignedAuthorityRecord` with a signed creation_time. 3. Modify authority discovery to keep track of nodes that sent us old record and once we are made aware of a new record update the nodes we know about with the new record. 4. Update gossip-support to try resolve authorities more often than every session. ~This would gives us a lot more chances for the nodes in the networks to also discover not only the old address of the node but also the new one and should improve the time it takes for a node to be properly connected in the network. The behaviour won't be deterministic because there is no guarantee the all nodes will see the new record at least once, since they could query only nodes that have the old one.~ ## TODO - [x] Add unittests for the new paths. - [x] Make sure the implementation is backwards compatible - [x] Evaluate if there are any bad consequence of letting the query continue rather than finish it at first record found. - [x] Bake in versi the new changes. --------- Signed-off-by: Alexandru Gheorghe <[email protected]> Co-authored-by: Dmitry Markin <[email protected]> Co-authored-by: Alexandru Vasile <[email protected]>
1 parent b442a0b commit e18c093

22 files changed

Lines changed: 1511 additions & 177 deletions

File tree

polkadot/node/network/bridge/src/network.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static {
204204
multiaddresses: HashSet<Multiaddr>,
205205
) -> Result<(), String>;
206206

207+
/// Ask the network to extend the reserved set with these nodes.
208+
async fn add_peers_to_reserved_set(
209+
&mut self,
210+
protocol: ProtocolName,
211+
multiaddresses: HashSet<Multiaddr>,
212+
) -> Result<(), String>;
213+
207214
/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
208215
async fn remove_from_peers_set(
209216
&mut self,
@@ -240,6 +247,14 @@ impl Network for Arc<dyn NetworkService> {
240247
<dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
241248
}
242249

250+
async fn add_peers_to_reserved_set(
251+
&mut self,
252+
protocol: ProtocolName,
253+
multiaddresses: HashSet<Multiaddr>,
254+
) -> Result<(), String> {
255+
<dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
256+
}
257+
243258
async fn remove_from_peers_set(
244259
&mut self,
245260
protocol: ProtocolName,

polkadot/node/network/bridge/src/rx/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ impl Network for TestNetwork {
124124
Ok(())
125125
}
126126

127+
async fn add_peers_to_reserved_set(
128+
&mut self,
129+
_protocol: ProtocolName,
130+
_: HashSet<Multiaddr>,
131+
) -> Result<(), String> {
132+
Ok(())
133+
}
134+
127135
async fn remove_from_peers_set(
128136
&mut self,
129137
_protocol: ProtocolName,

polkadot/node/network/bridge/src/tx/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,22 @@ where
370370
.await;
371371
return (network_service, authority_discovery_service)
372372
},
373+
374+
NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
375+
gum::trace!(
376+
target: LOG_TARGET,
377+
action = "AddToResolvedValidators",
378+
peer_set = ?peer_set,
379+
?validator_addrs,
380+
"Received a resolved validator connection request",
381+
);
382+
383+
let all_addrs = validator_addrs.into_iter().flatten().collect();
384+
let network_service = validator_discovery
385+
.on_add_to_resolved_request(all_addrs, peer_set, network_service)
386+
.await;
387+
return (network_service, authority_discovery_service)
388+
},
373389
}
374390
(network_service, authority_discovery_service)
375391
}

polkadot/node/network/bridge/src/tx/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,14 @@ impl Network for TestNetwork {
148148
Ok(())
149149
}
150150

151+
async fn add_peers_to_reserved_set(
152+
&mut self,
153+
_protocol: ProtocolName,
154+
_: HashSet<Multiaddr>,
155+
) -> Result<(), String> {
156+
Ok(())
157+
}
158+
151159
async fn remove_from_peers_set(
152160
&mut self,
153161
_protocol: ProtocolName,

polkadot/node/network/bridge/src/validator_discovery.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
9292
network_service
9393
}
9494

95+
/// Connect to already resolved addresses.
96+
pub async fn on_add_to_resolved_request(
97+
&mut self,
98+
newly_requested: HashSet<Multiaddr>,
99+
peer_set: PeerSet,
100+
mut network_service: N,
101+
) -> N {
102+
let state = &mut self.state[peer_set];
103+
let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
104+
let num_peers = new_peer_ids.len();
105+
106+
state.previously_requested.extend(new_peer_ids);
107+
108+
gum::debug!(
109+
target: LOG_TARGET,
110+
?peer_set,
111+
?num_peers,
112+
"New add to resolved validators request",
113+
);
114+
115+
// ask the network to connect to these nodes and not disconnect
116+
// from them until they are removed from the set.
117+
//
118+
// for peer-set management, the main protocol name should be used regardless of
119+
// the negotiated version.
120+
if let Err(e) = network_service
121+
.add_peers_to_reserved_set(
122+
self.peerset_protocol_names.get_main_name(peer_set),
123+
newly_requested,
124+
)
125+
.await
126+
{
127+
gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
128+
}
129+
130+
network_service
131+
}
132+
95133
/// On a new connection request, a peer set update will be issued.
96134
/// It will ask the network to connect to the validators and not disconnect
97135
/// from them at least until the next request is issued for the same peer set.
@@ -222,6 +260,15 @@ mod tests {
222260
Ok(())
223261
}
224262

263+
async fn add_peers_to_reserved_set(
264+
&mut self,
265+
_protocol: ProtocolName,
266+
multiaddresses: HashSet<Multiaddr>,
267+
) -> Result<(), String> {
268+
self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
269+
Ok(())
270+
}
271+
225272
async fn remove_from_peers_set(
226273
&mut self,
227274
_protocol: ProtocolName,

polkadot/node/network/gossip-support/src/lib.rs

Lines changed: 112 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
6969
#[cfg(test)]
7070
const BACKOFF_DURATION: Duration = Duration::from_millis(500);
7171

72+
// The authorithy_discovery queries runs every ten minutes,
73+
// so it make sense to run a bit more often than that to
74+
// detect changes as often as we can, but not too often since
75+
// it won't help.
76+
#[cfg(not(test))]
77+
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60);
78+
79+
#[cfg(test)]
80+
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2);
81+
7282
/// Duration after which we consider low connectivity a problem.
7383
///
7484
/// Especially at startup low connectivity is expected (authority discovery cache needs to be
@@ -91,6 +101,14 @@ pub struct GossipSupport<AD> {
91101
// `None` otherwise.
92102
last_failure: Option<Instant>,
93103

104+
// Validators can restart during a session, so if they change
105+
// their PeerID, we will connect to them in the best case after
106+
// a session, so we need to try more often to resolved peers and
107+
// reconnect to them. The authorithy_discovery queries runs every ten
108+
// minutes, so we can't detect changes in the address more often
109+
// that that.
110+
last_connection_request: Option<Instant>,
111+
94112
/// First time we did not reach our connectivity threshold.
95113
///
96114
/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
@@ -131,6 +149,7 @@ where
131149
keystore,
132150
last_session_index: None,
133151
last_failure: None,
152+
last_connection_request: None,
134153
failure_start: None,
135154
resolved_authorities: HashMap::new(),
136155
connected_authorities: HashMap::new(),
@@ -196,15 +215,22 @@ where
196215
for leaf in leaves {
197216
let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
198217
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
218+
let since_last_reconnect =
219+
self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();
220+
199221
let force_request = since_failure >= BACKOFF_DURATION;
222+
let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES;
200223
let leaf_session = Some((current_index, leaf));
201224
let maybe_new_session = match self.last_session_index {
202225
Some(i) if current_index <= i => None,
203226
_ => leaf_session,
204227
};
205228

206-
let maybe_issue_connection =
207-
if force_request { leaf_session } else { maybe_new_session };
229+
let maybe_issue_connection = if force_request || re_resolve_authorities {
230+
leaf_session
231+
} else {
232+
maybe_new_session
233+
};
208234

209235
if let Some((session_index, relay_parent)) = maybe_issue_connection {
210236
let session_info =
@@ -248,7 +274,7 @@ where
248274
// connections to a much broader set of validators.
249275
{
250276
let mut connections = authorities_past_present_future(sender, leaf).await?;
251-
277+
self.last_connection_request = Some(Instant::now());
252278
// Remove all of our locally controlled validator indices so we don't connect to
253279
// ourself.
254280
let connections =
@@ -259,7 +285,12 @@ where
259285
// to clean up all connections.
260286
Vec::new()
261287
};
262-
self.issue_connection_request(sender, connections).await;
288+
289+
if force_request || is_new_session {
290+
self.issue_connection_request(sender, connections).await;
291+
} else if re_resolve_authorities {
292+
self.issue_connection_request_to_changed(sender, connections).await;
293+
}
263294
}
264295

265296
if is_new_session {
@@ -324,17 +355,14 @@ where
324355
authority_check_result
325356
}
326357

327-
async fn issue_connection_request<Sender>(
358+
async fn resolve_authorities(
328359
&mut self,
329-
sender: &mut Sender,
330360
authorities: Vec<AuthorityDiscoveryId>,
331-
) where
332-
Sender: overseer::GossipSupportSenderTrait,
333-
{
334-
let num = authorities.len();
361+
) -> (Vec<HashSet<Multiaddr>>, HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, usize) {
335362
let mut validator_addrs = Vec::with_capacity(authorities.len());
336-
let mut failures = 0;
337363
let mut resolved = HashMap::with_capacity(authorities.len());
364+
let mut failures = 0;
365+
338366
for authority in authorities {
339367
if let Some(addrs) =
340368
self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
@@ -350,6 +378,67 @@ where
350378
);
351379
}
352380
}
381+
(validator_addrs, resolved, failures)
382+
}
383+
384+
async fn issue_connection_request_to_changed<Sender>(
385+
&mut self,
386+
sender: &mut Sender,
387+
authorities: Vec<AuthorityDiscoveryId>,
388+
) where
389+
Sender: overseer::GossipSupportSenderTrait,
390+
{
391+
let (_, resolved, _) = self.resolve_authorities(authorities).await;
392+
393+
let mut changed = Vec::new();
394+
395+
for (authority, new_addresses) in &resolved {
396+
let new_peer_ids = new_addresses
397+
.iter()
398+
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
399+
.collect::<HashSet<_>>();
400+
match self.resolved_authorities.get(authority) {
401+
Some(old_addresses) => {
402+
let old_peer_ids = old_addresses
403+
.iter()
404+
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
405+
.collect::<HashSet<_>>();
406+
if !old_peer_ids.is_superset(&new_peer_ids) {
407+
changed.push(new_addresses.clone());
408+
}
409+
},
410+
None => changed.push(new_addresses.clone()),
411+
}
412+
}
413+
gum::debug!(
414+
target: LOG_TARGET,
415+
num_changed = ?changed.len(),
416+
?changed,
417+
"Issuing a connection request to changed validators"
418+
);
419+
if !changed.is_empty() {
420+
self.resolved_authorities = resolved;
421+
422+
sender
423+
.send_message(NetworkBridgeTxMessage::AddToResolvedValidators {
424+
validator_addrs: changed,
425+
peer_set: PeerSet::Validation,
426+
})
427+
.await;
428+
}
429+
}
430+
431+
async fn issue_connection_request<Sender>(
432+
&mut self,
433+
sender: &mut Sender,
434+
authorities: Vec<AuthorityDiscoveryId>,
435+
) where
436+
Sender: overseer::GossipSupportSenderTrait,
437+
{
438+
let num = authorities.len();
439+
440+
let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await;
441+
353442
self.resolved_authorities = resolved;
354443
gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
355444

@@ -399,16 +488,24 @@ where
399488
{
400489
let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
401490
for authority in authorities {
402-
let peer_id = self
491+
let peer_ids = self
403492
.authority_discovery
404493
.get_addresses_by_authority_id(authority.clone())
405494
.await
406495
.into_iter()
407496
.flat_map(|list| list.into_iter())
408-
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
497+
.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
498+
.collect::<HashSet<_>>();
499+
500+
gum::trace!(
501+
target: LOG_TARGET,
502+
?peer_ids,
503+
?authority,
504+
"Resolved to peer ids"
505+
);
409506

410-
if let Some(p) = peer_id {
411-
authority_ids.entry(p).or_default().insert(authority);
507+
for p in peer_ids {
508+
authority_ids.entry(p).or_default().insert(authority.clone());
412509
}
413510
}
414511

0 commit comments

Comments
 (0)