Skip to content

Commit 7779b8e

Browse files
dvc94chmxinden
andauthored
swarm: Extend NetworkBehaviour callbacks. (#2011)
Co-authored-by: Max Inden <[email protected]>
1 parent be2fb4e commit 7779b8e

File tree

9 files changed

+155
-49
lines changed

9 files changed

+155
-49
lines changed

core/src/network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ where
187187
///
188188
/// The translation is transport-specific. See [`Transport::address_translation`].
189189
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
190-
-> impl Iterator<Item = Multiaddr> + 'a
190+
-> Vec<Multiaddr>
191191
where
192192
TMuxer: 'a,
193193
THandler: 'a,
@@ -201,7 +201,7 @@ where
201201
addrs.sort_unstable();
202202
addrs.dedup();
203203

204-
addrs.into_iter()
204+
addrs
205205
}
206206

207207
/// Returns the peer id of the local node.

protocols/identify/src/identify.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use libp2p_core::{
2626
Multiaddr,
2727
PeerId,
2828
PublicKey,
29-
connection::ConnectionId,
29+
connection::{ConnectionId, ListenerId},
3030
upgrade::UpgradeError
3131
};
3232
use libp2p_swarm::{
@@ -233,13 +233,13 @@ impl NetworkBehaviour for Identify {
233233
self.pending_push.remove(peer_id);
234234
}
235235

236-
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) {
236+
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
237237
if self.config.push_listen_addr_updates {
238238
self.pending_push.extend(self.connected.keys());
239239
}
240240
}
241241

242-
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) {
242+
fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
243243
if self.config.push_listen_addr_updates {
244244
self.pending_push.extend(self.connected.keys());
245245
}

protocols/kad/src/behaviour.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
3737
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
3838
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
3939
use fnv::{FnvHashMap, FnvHashSet};
40-
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
40+
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
4141
use libp2p_swarm::{
4242
DialPeerCondition,
4343
NetworkBehaviour,
@@ -1888,11 +1888,11 @@ where
18881888
};
18891889
}
18901890

1891-
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
1891+
fn inject_new_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
18921892
self.local_addrs.insert(addr.clone());
18931893
}
18941894

1895-
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
1895+
fn inject_expired_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
18961896
self.local_addrs.remove(addr);
18971897
}
18981898

swarm-derive/src/lib.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
244244
})
245245
};
246246

247+
// Build the list of statements to put in the body of `inject_new_listener()`.
248+
let inject_new_listener_stmts = {
249+
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
250+
if is_ignored(&field) {
251+
return None;
252+
}
253+
254+
Some(match field.ident {
255+
Some(ref i) => quote!{ self.#i.inject_new_listener(id); },
256+
None => quote!{ self.#field_n.inject_new_listener(id); },
257+
})
258+
})
259+
};
260+
247261
// Build the list of statements to put in the body of `inject_new_listen_addr()`.
248262
let inject_new_listen_addr_stmts = {
249263
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
@@ -252,8 +266,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
252266
}
253267

254268
Some(match field.ident {
255-
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(addr); },
256-
None => quote!{ self.#field_n.inject_new_listen_addr(addr); },
269+
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(id, addr); },
270+
None => quote!{ self.#field_n.inject_new_listen_addr(id, addr); },
257271
})
258272
})
259273
};
@@ -266,8 +280,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
266280
}
267281

268282
Some(match field.ident {
269-
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(addr); },
270-
None => quote!{ self.#field_n.inject_expired_listen_addr(addr); },
283+
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(id, addr); },
284+
None => quote!{ self.#field_n.inject_expired_listen_addr(id, addr); },
271285
})
272286
})
273287
};
@@ -286,6 +300,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
286300
})
287301
};
288302

303+
// Build the list of statements to put in the body of `inject_expired_external_addr()`.
304+
let inject_expired_external_addr_stmts = {
305+
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
306+
if is_ignored(&field) {
307+
return None;
308+
}
309+
310+
Some(match field.ident {
311+
Some(ref i) => quote!{ self.#i.inject_expired_external_addr(addr); },
312+
None => quote!{ self.#field_n.inject_expired_external_addr(addr); },
313+
})
314+
})
315+
};
316+
289317
// Build the list of statements to put in the body of `inject_listener_error()`.
290318
let inject_listener_error_stmts = {
291319
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
@@ -504,18 +532,26 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
504532
#(#inject_dial_failure_stmts);*
505533
}
506534

507-
fn inject_new_listen_addr(&mut self, addr: &#multiaddr) {
535+
fn inject_new_listener(&mut self, id: #listener_id) {
536+
#(#inject_new_listener_stmts);*
537+
}
538+
539+
fn inject_new_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
508540
#(#inject_new_listen_addr_stmts);*
509541
}
510542

511-
fn inject_expired_listen_addr(&mut self, addr: &#multiaddr) {
543+
fn inject_expired_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
512544
#(#inject_expired_listen_addr_stmts);*
513545
}
514546

515547
fn inject_new_external_addr(&mut self, addr: &#multiaddr) {
516548
#(#inject_new_external_addr_stmts);*
517549
}
518550

551+
fn inject_expired_external_addr(&mut self, addr: &#multiaddr) {
552+
#(#inject_expired_external_addr_stmts);*
553+
}
554+
519555
fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) {
520556
#(#inject_listener_error_stmts);*
521557
}

swarm/src/behaviour.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,17 @@ pub trait NetworkBehaviour: Send + 'static {
147147
fn inject_dial_failure(&mut self, _peer_id: &PeerId) {
148148
}
149149

150-
/// Indicates to the behaviour that we have started listening on a new multiaddr.
151-
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) {
150+
/// Indicates to the behaviour that a new listener was created.
151+
fn inject_new_listener(&mut self, _id: ListenerId) {
152152
}
153153

154-
/// Indicates to the behaviour that a new multiaddr we were listening on has expired,
155-
/// which means that we are no longer listening in it.
156-
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) {
154+
/// Indicates to the behaviour that we have started listening on a new multiaddr.
155+
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
157156
}
158157

159-
/// Indicates to the behaviour that we have discovered a new external address for us.
160-
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
158+
/// Indicates to the behaviour that a multiaddr we were listening on has expired,
159+
/// which means that we are no longer listening in it.
160+
fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
161161
}
162162

163163
/// A listener experienced an error.
@@ -168,6 +168,14 @@ pub trait NetworkBehaviour: Send + 'static {
168168
fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {
169169
}
170170

171+
/// Indicates to the behaviour that we have discovered a new external address for us.
172+
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
173+
}
174+
175+
/// Indicates to the behaviour that an external address was removed.
176+
fn inject_expired_external_addr(&mut self, _addr: &Multiaddr) {
177+
}
178+
171179
/// Polls for things that swarm should do.
172180
///
173181
/// This API mimics the API of the `Stream` trait. The method may register the current task in

swarm/src/lib.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
327327
///
328328
/// Returns an error if the address is not supported.
329329
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
330-
self.network.listen_on(addr)
330+
let id = self.network.listen_on(addr)?;
331+
self.behaviour.inject_new_listener(id);
332+
Ok(id)
331333
}
332334

333335
/// Remove some listener.
@@ -412,7 +414,18 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
412414
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
413415
/// through this method.
414416
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
415-
self.external_addrs.add(a, s)
417+
let result = self.external_addrs.add(a.clone(), s);
418+
let expired = match &result {
419+
AddAddressResult::Inserted { expired } => {
420+
self.behaviour.inject_new_external_addr(&a);
421+
expired
422+
}
423+
AddAddressResult::Updated { expired } => expired,
424+
};
425+
for a in expired {
426+
self.behaviour.inject_expired_external_addr(&a.addr);
427+
}
428+
result
416429
}
417430

418431
/// Removes an external address of the local node, regardless of
@@ -422,7 +435,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
422435
/// Returns `true` if the address existed and was removed, `false`
423436
/// otherwise.
424437
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
425-
self.external_addrs.remove(addr)
438+
if self.external_addrs.remove(addr) {
439+
self.behaviour.inject_expired_external_addr(addr);
440+
true
441+
} else {
442+
false
443+
}
426444
}
427445

428446
/// Bans a peer by its peer ID.
@@ -565,19 +583,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
565583
if !this.listened_addrs.contains(&listen_addr) {
566584
this.listened_addrs.push(listen_addr.clone())
567585
}
568-
this.behaviour.inject_new_listen_addr(&listen_addr);
586+
this.behaviour.inject_new_listen_addr(listener_id, &listen_addr);
569587
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
570588
}
571589
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
572590
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
573591
this.listened_addrs.retain(|a| a != &listen_addr);
574-
this.behaviour.inject_expired_listen_addr(&listen_addr);
592+
this.behaviour.inject_expired_listen_addr(listener_id, &listen_addr);
575593
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
576594
}
577595
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
578596
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
579597
for addr in addresses.iter() {
580-
this.behaviour.inject_expired_listen_addr(addr);
598+
this.behaviour.inject_expired_listen_addr(listener_id, addr);
581599
}
582600
this.behaviour.inject_listener_closed(listener_id, match &reason {
583601
Ok(()) => Ok(()),
@@ -732,10 +750,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
732750
},
733751
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
734752
for addr in this.network.address_translation(&address) {
735-
if this.external_addrs.iter().all(|a| a.addr != addr) {
736-
this.behaviour.inject_new_external_addr(&addr);
737-
}
738-
this.external_addrs.add(addr, score);
753+
this.add_external_address(addr, score);
739754
}
740755
},
741756
}

swarm/src/registry.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ impl Default for Addresses {
168168
/// The result of adding an address to an ordered list of
169169
/// addresses with associated scores.
170170
pub enum AddAddressResult {
171-
Inserted,
172-
Updated,
171+
Inserted { expired: SmallVec<[AddressRecord; 8]> },
172+
Updated { expired: SmallVec<[AddressRecord; 8]> },
173173
}
174174

175175
impl Addresses {
@@ -206,8 +206,11 @@ impl Addresses {
206206
}
207207

208208
// Remove addresses that have a score of 0.
209+
let mut expired = SmallVec::new();
209210
while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
210-
self.registry.pop();
211+
if let Some(addr) = self.registry.pop() {
212+
expired.push(addr);
213+
}
211214
}
212215

213216
// If the address score is finite, remember this report.
@@ -220,13 +223,13 @@ impl Addresses {
220223
if r.addr == addr {
221224
r.score = r.score + score;
222225
isort(&mut self.registry);
223-
return AddAddressResult::Updated
226+
return AddAddressResult::Updated { expired }
224227
}
225228
}
226229

227230
// It is a new record.
228231
self.registry.push(AddressRecord::new(addr, score));
229-
AddAddressResult::Inserted
232+
AddAddressResult::Inserted { expired }
230233
}
231234

232235
/// Explicitly remove an address from the collection.

swarm/src/test.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,11 @@ where
115115
pub inject_event: Vec<(PeerId, ConnectionId, <<TInner::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent)>,
116116
pub inject_addr_reach_failure: Vec<(Option<PeerId>, Multiaddr)>,
117117
pub inject_dial_failure: Vec<PeerId>,
118-
pub inject_new_listen_addr: Vec<Multiaddr>,
118+
pub inject_new_listener: Vec<ListenerId>,
119+
pub inject_new_listen_addr: Vec<(ListenerId, Multiaddr)>,
119120
pub inject_new_external_addr: Vec<Multiaddr>,
120-
pub inject_expired_listen_addr: Vec<Multiaddr>,
121+
pub inject_expired_listen_addr: Vec<(ListenerId, Multiaddr)>,
122+
pub inject_expired_external_addr: Vec<Multiaddr>,
121123
pub inject_listener_error: Vec<ListenerId>,
122124
pub inject_listener_closed: Vec<(ListenerId, bool)>,
123125
pub poll: usize,
@@ -138,9 +140,11 @@ where
138140
inject_event: Vec::new(),
139141
inject_addr_reach_failure: Vec::new(),
140142
inject_dial_failure: Vec::new(),
143+
inject_new_listener: Vec::new(),
141144
inject_new_listen_addr: Vec::new(),
142145
inject_new_external_addr: Vec::new(),
143146
inject_expired_listen_addr: Vec::new(),
147+
inject_expired_external_addr: Vec::new(),
144148
inject_listener_error: Vec::new(),
145149
inject_listener_closed: Vec::new(),
146150
poll: 0,
@@ -217,21 +221,31 @@ where
217221
self.inner.inject_dial_failure(p);
218222
}
219223

220-
fn inject_new_listen_addr(&mut self, a: &Multiaddr) {
221-
self.inject_new_listen_addr.push(a.clone());
222-
self.inner.inject_new_listen_addr(a);
224+
fn inject_new_listener(&mut self, id: ListenerId) {
225+
self.inject_new_listener.push(id);
226+
self.inner.inject_new_listener(id);
223227
}
224228

225-
fn inject_expired_listen_addr(&mut self, a: &Multiaddr) {
226-
self.inject_expired_listen_addr.push(a.clone());
227-
self.inner.inject_expired_listen_addr(a);
229+
fn inject_new_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
230+
self.inject_new_listen_addr.push((id, a.clone()));
231+
self.inner.inject_new_listen_addr(id, a);
232+
}
233+
234+
fn inject_expired_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
235+
self.inject_expired_listen_addr.push((id, a.clone()));
236+
self.inner.inject_expired_listen_addr(id, a);
228237
}
229238

230239
fn inject_new_external_addr(&mut self, a: &Multiaddr) {
231240
self.inject_new_external_addr.push(a.clone());
232241
self.inner.inject_new_external_addr(a);
233242
}
234243

244+
fn inject_expired_external_addr(&mut self, a: &Multiaddr) {
245+
self.inject_expired_external_addr.push(a.clone());
246+
self.inner.inject_expired_external_addr(a);
247+
}
248+
235249
fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) {
236250
self.inject_listener_error.push(l.clone());
237251
self.inner.inject_listener_error(l, e);

0 commit comments

Comments
 (0)