Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ use libp2p::{
Swarm,
NetworkBehaviour,
identity,
tokio_codec::{FramedRead, LinesCodec}
tokio_codec::{FramedRead, LinesCodec},
tokio_io::{AsyncRead, AsyncWrite},
floodsub::{self, Floodsub, FloodsubEvent},
mdns::{Mdns, MdnsEvent},
swarm::NetworkBehaviourEventProcess
};

fn main() {
Expand All @@ -70,25 +74,25 @@ fn main() {
let transport = libp2p::build_development_transport(local_key);

// Create a Floodsub topic
let floodsub_topic = libp2p::floodsub::TopicBuilder::new("chat").build();
let floodsub_topic = floodsub::TopicBuilder::new("chat").build();

// We create a custom network behaviour that combines floodsub and mDNS.
// In the future, we want to improve libp2p to make this easier to do.
#[derive(NetworkBehaviour)]
struct MyBehaviour<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> {
floodsub: libp2p::floodsub::Floodsub<TSubstream>,
mdns: libp2p::mdns::Mdns<TSubstream>,
struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
floodsub: Floodsub<TSubstream>,
mdns: Mdns<TSubstream>,
}

impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::mdns::MdnsEvent> for MyBehaviour<TSubstream> {
fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) {
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<TSubstream> {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
libp2p::mdns::MdnsEvent::Discovered(list) => {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer);
}
},
libp2p::mdns::MdnsEvent::Expired(list) => {
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
Expand All @@ -99,10 +103,10 @@ fn main() {
}
}

impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubEvent> for MyBehaviour<TSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour<TSubstream> {
// Called when `floodsub` produces an event.
fn inject_event(&mut self, message: libp2p::floodsub::FloodsubEvent) {
if let libp2p::floodsub::FloodsubEvent::Message(message) = message {
fn inject_event(&mut self, message: FloodsubEvent) {
if let FloodsubEvent::Message(message) = message {
println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source);
}
}
Expand All @@ -111,12 +115,12 @@ fn main() {
// Create a Swarm to manage peers and events
let mut swarm = {
let mut behaviour = MyBehaviour {
floodsub: libp2p::floodsub::Floodsub::new(local_peer_id.clone()),
mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"),
floodsub: Floodsub::new(local_peer_id.clone()),
mdns: Mdns::new().expect("Failed to create mDNS service"),
};

behaviour.floodsub.subscribe(floodsub_topic.clone());
libp2p::Swarm::new(transport, behaviour, local_peer_id)
Swarm::new(transport, behaviour, local_peer_id)
};

// Reach out to another node if specified
Expand All @@ -138,7 +142,7 @@ fn main() {
let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());

// Listen on all interfaces and whatever port the OS assigns
libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();

// Kick it off
let mut listening = false;
Expand Down