Skip to content

Commit 814ff4b

Browse files
committed
swarm: Add doc example for carrying state in handler
1 parent ceb77e5 commit 814ff4b

File tree

3 files changed

+201
-23
lines changed

3 files changed

+201
-23
lines changed

swarm/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ wasm-timer = "0.2"
2020
void = "1"
2121

2222
[dev-dependencies]
23-
libp2p-mplex = { path = "../muxers/mplex" }
24-
libp2p-noise = { path = "../transports/noise" }
23+
libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] }
2524
quickcheck = "0.9.0"
2625
rand = "0.7.2"

swarm/src/behaviour.rs

Lines changed: 186 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -282,16 +282,17 @@ pub enum NetworkBehaviourAction<
282282
///
283283
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
284284
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
285+
///
286+
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
287+
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
288+
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
289+
/// can be included in the handler, and thus directly send on connection success or extracted by
290+
/// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for
291+
/// example.
285292
DialAddress {
286293
/// The address to dial.
287294
address: Multiaddr,
288295
/// The handler to be used to handle the connection to the peer.
289-
///
290-
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
291-
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
292-
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
293-
/// peer can be included in the handler, and thus directly send on connection success or
294-
/// extracted by the [`NetworkBehaviour`] on connection failure.
295296
handler: THandler,
296297
},
297298

@@ -305,18 +306,191 @@ pub enum NetworkBehaviourAction<
305306
///
306307
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
307308
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
309+
///
310+
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
311+
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
312+
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
313+
/// can be included in the handler, and thus directly send on connection success or extracted by
314+
/// the [`NetworkBehaviour`] on connection failure.
315+
///
316+
/// Example showcasing usage of handler to carry state:
317+
///
318+
/// ```rust
319+
/// # use futures::executor::block_on;
320+
/// # use futures::stream::StreamExt;
321+
/// # use libp2p::core::connection::ConnectionId;
322+
/// # use libp2p::core::identity;
323+
/// # use libp2p::core::transport::{MemoryTransport, Transport};
324+
/// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
325+
/// # use libp2p::core::PeerId;
326+
/// # use libp2p::plaintext::PlainText2Config;
327+
/// # use libp2p::swarm::{
328+
/// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream,
329+
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
330+
/// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
331+
/// # };
332+
/// # use libp2p::yamux;
333+
/// # use std::collections::VecDeque;
334+
/// # use std::task::{Context, Poll};
335+
/// # use void::Void;
336+
/// #
337+
/// # let local_key = identity::Keypair::generate_ed25519();
338+
/// # let local_public_key = local_key.public();
339+
/// # let local_peer_id = PeerId::from(local_public_key.clone());
340+
/// #
341+
/// # let transport = MemoryTransport::default()
342+
/// # .upgrade(upgrade::Version::V1)
343+
/// # .authenticate(PlainText2Config { local_public_key })
344+
/// # .multiplex(yamux::YamuxConfig::default())
345+
/// # .boxed();
346+
/// #
347+
/// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id);
348+
/// #
349+
/// // Super precious message that we should better not lose.
350+
/// let message = PreciousMessage("My precious message".to_string());
351+
///
352+
/// // Unfortunately this peer is offline, thus sending our message to it will fail.
353+
/// let offline_peer = PeerId::random();
354+
///
355+
/// // Let's send it anyways. We should get it back in case connecting to the peer fails.
356+
/// swarm.behaviour_mut().send(offline_peer, message);
357+
///
358+
/// block_on(async {
359+
/// // As expected, sending failed. But great news, we got our message back.
360+
/// matches!(
361+
/// swarm.next().await.expect("Infinite stream"),
362+
/// SwarmEvent::Behaviour(PreciousMessage(_))
363+
/// );
364+
/// });
365+
///
366+
/// # #[derive(Default)]
367+
/// # struct MyBehaviour {
368+
/// # outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
369+
/// # }
370+
/// #
371+
/// # impl MyBehaviour {
372+
/// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
373+
/// # self.outbox_to_swarm
374+
/// # .push_back(NetworkBehaviourAction::DialPeer {
375+
/// # peer_id,
376+
/// # condition: DialPeerCondition::Always,
377+
/// # handler: MyHandler { message: Some(msg) },
378+
/// # });
379+
/// # }
380+
/// # }
381+
/// #
382+
/// impl NetworkBehaviour for MyBehaviour {
383+
/// # type ProtocolsHandler = MyHandler;
384+
/// # type OutEvent = PreciousMessage;
385+
/// #
386+
/// # fn new_handler(&mut self) -> Self::ProtocolsHandler {
387+
/// # MyHandler { message: None }
388+
/// # }
389+
/// #
390+
/// #
391+
/// # fn inject_event(
392+
/// # &mut self,
393+
/// # _: PeerId,
394+
/// # _: ConnectionId,
395+
/// # _: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
396+
/// # ) {
397+
/// # unreachable!();
398+
/// # }
399+
/// #
400+
/// fn inject_dial_failure(
401+
/// &mut self,
402+
/// _: &PeerId,
403+
/// handler: Self::ProtocolsHandler,
404+
/// _: DialError,
405+
/// ) {
406+
/// // As expected, sending the message failed. But lucky us, we got the handler back, thus
407+
/// // the precious message is not lost and we can return it back to the user.
408+
/// let msg = handler.message.unwrap();
409+
/// self.outbox_to_swarm
410+
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg))
411+
/// }
412+
/// #
413+
/// # fn poll(
414+
/// # &mut self,
415+
/// # _: &mut Context<'_>,
416+
/// # _: &mut impl PollParameters,
417+
/// # ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
418+
/// # if let Some(action) = self.outbox_to_swarm.pop_front() {
419+
/// # return Poll::Ready(action);
420+
/// # }
421+
/// # Poll::Pending
422+
/// # }
423+
/// }
424+
///
425+
/// # struct MyHandler {
426+
/// # message: Option<PreciousMessage>,
427+
/// # }
428+
/// #
429+
/// # impl ProtocolsHandler for MyHandler {
430+
/// # type InEvent = Void;
431+
/// # type OutEvent = Void;
432+
/// # type Error = Void;
433+
/// # type InboundProtocol = DeniedUpgrade;
434+
/// # type OutboundProtocol = DeniedUpgrade;
435+
/// # type InboundOpenInfo = ();
436+
/// # type OutboundOpenInfo = Void;
437+
/// #
438+
/// # fn listen_protocol(
439+
/// # &self,
440+
/// # ) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
441+
/// # SubstreamProtocol::new(DeniedUpgrade, ())
442+
/// # }
443+
/// #
444+
/// # fn inject_fully_negotiated_inbound(
445+
/// # &mut self,
446+
/// # _: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
447+
/// # _: Self::InboundOpenInfo,
448+
/// # ) {
449+
/// # }
450+
/// #
451+
/// # fn inject_fully_negotiated_outbound(
452+
/// # &mut self,
453+
/// # _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
454+
/// # _: Self::OutboundOpenInfo,
455+
/// # ) {
456+
/// # }
457+
/// #
458+
/// # fn inject_event(&mut self, _event: Self::InEvent) {}
459+
/// #
460+
/// # fn inject_dial_upgrade_error(
461+
/// # &mut self,
462+
/// # _: Self::OutboundOpenInfo,
463+
/// # _: ProtocolsHandlerUpgrErr<Void>,
464+
/// # ) {
465+
/// # }
466+
/// #
467+
/// # fn connection_keep_alive(&self) -> KeepAlive {
468+
/// # KeepAlive::Yes
469+
/// # }
470+
/// #
471+
/// # fn poll(
472+
/// # &mut self,
473+
/// # _: &mut Context<'_>,
474+
/// # ) -> Poll<
475+
/// # ProtocolsHandlerEvent<
476+
/// # Self::OutboundProtocol,
477+
/// # Self::OutboundOpenInfo,
478+
/// # Self::OutEvent,
479+
/// # Self::Error,
480+
/// # >,
481+
/// # > {
482+
/// # todo!("If `Self::message.is_some()` send the message to the remote.")
483+
/// # }
484+
/// # }
485+
/// # #[derive(Debug, PartialEq, Eq)]
486+
/// # struct PreciousMessage(String);
487+
/// ```
308488
DialPeer {
309489
/// The peer to try reach.
310490
peer_id: PeerId,
311491
/// The condition for initiating a new dialing attempt.
312492
condition: DialPeerCondition,
313493
/// The handler to be used to handle the connection to the peer.
314-
///
315-
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
316-
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
317-
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
318-
/// peer can be included in the handler, and thus directly send on connection success or
319-
/// extracted by the [`NetworkBehaviour`] on connection failure.
320494
handler: THandler,
321495
},
322496

swarm/src/lib.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,8 +1326,9 @@ mod tests {
13261326
use crate::protocols_handler::DummyProtocolsHandler;
13271327
use crate::test::{CallTraceBehaviour, MockBehaviour};
13281328
use futures::{executor, future};
1329-
use libp2p_core::{identity, multiaddr, transport, upgrade};
1330-
use libp2p_noise as noise;
1329+
use libp2p::core::{identity, multiaddr, transport, upgrade};
1330+
use libp2p::plaintext;
1331+
use libp2p::yamux;
13311332

13321333
// Test execution state.
13331334
// Connection => Disconnecting => Connecting.
@@ -1343,17 +1344,16 @@ mod tests {
13431344
O: Send + 'static,
13441345
{
13451346
let id_keys = identity::Keypair::generate_ed25519();
1346-
let pubkey = id_keys.public();
1347-
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
1348-
.into_authentic(&id_keys)
1349-
.unwrap();
1347+
let local_public_key = id_keys.public();
13501348
let transport = transport::MemoryTransport::default()
13511349
.upgrade(upgrade::Version::V1)
1352-
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
1353-
.multiplex(libp2p_mplex::MplexConfig::new())
1350+
.authenticate(plaintext::PlainText2Config {
1351+
local_public_key: local_public_key.clone(),
1352+
})
1353+
.multiplex(yamux::YamuxConfig::default())
13541354
.boxed();
13551355
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
1356-
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
1356+
SwarmBuilder::new(transport, behaviour, local_public_key.into()).build()
13571357
}
13581358

13591359
fn swarms_connected<TBehaviour>(
@@ -1704,4 +1704,9 @@ mod tests {
17041704
}
17051705
}))
17061706
}
1707+
1708+
/// [`NetworkBehaviourAction::DialAddress`] and [`NetworkBehaviourAction::DialPeer`] require a
1709+
/// handler. This handler can be used to carry state. See corresponding doc comments.
1710+
#[test]
1711+
fn use_handler_to_carry_state() {}
17071712
}

0 commit comments

Comments
 (0)