Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
fn main() {
prost_build::compile_protos(
&[
"src/schema/keys.proto",
"src/schema/noise.proto",
"src/schema/webrtc.proto",
"src/protocol/libp2p/schema/identify.proto",
"src/protocol/libp2p/schema/kademlia.proto",
"src/protocol/libp2p/schema/bitswap.proto",
],
&["src"],
)
.unwrap();
prost_build::compile_protos(
&[
"src/schema/keys.proto",
"src/schema/noise.proto",
"src/schema/webrtc.proto",
"src/protocol/libp2p/schema/identify.proto",
"src/protocol/libp2p/schema/kademlia.proto",
"src/protocol/libp2p/schema/bitswap.proto",
],
&["src"],
)
.unwrap();
}
184 changes: 97 additions & 87 deletions examples/custom_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
//! Run: `RUST_LOG=info cargo run --example custom_executor`

use litep2p::{
config::ConfigBuilder,
executor::Executor,
protocol::libp2p::ping::{Config as PingConfig, PingEvent},
transport::tcp::config::Config as TcpConfig,
Litep2p,
config::ConfigBuilder,
executor::Executor,
protocol::libp2p::ping::{Config as PingConfig, PingEvent},
transport::tcp::config::Config as TcpConfig,
Litep2p,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
Expand All @@ -44,102 +44,112 @@ use std::{future::Future, pin::Pin, sync::Arc};
///
/// Just a wrapper around `FuturesUnordered` which receives the futures over `mpsc::Receiver`.
struct TaskExecutor {
rx: Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
futures: FuturesUnordered<BoxFuture<'static, ()>>,
rx: Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
futures: FuturesUnordered<BoxFuture<'static, ()>>,
}

impl TaskExecutor {
/// Create new [`TaskExecutor`].
fn new() -> (Self, Sender<Pin<Box<dyn Future<Output = ()> + Send>>>) {
let (tx, rx) = channel(64);

(Self { rx, futures: FuturesUnordered::new() }, tx)
}

/// Drive the futures forward and poll the receiver for any new futures.
async fn next(&mut self) {
loop {
tokio::select! {
future = self.rx.recv() => self.futures.push(future.unwrap()),
_ = self.futures.next(), if !self.futures.is_empty() => {}
}
}
}
/// Create new [`TaskExecutor`].
fn new() -> (Self, Sender<Pin<Box<dyn Future<Output = ()> + Send>>>) {
let (tx, rx) = channel(64);

(
Self {
rx,
futures: FuturesUnordered::new(),
},
tx,
)
}

/// Drive the futures forward and poll the receiver for any new futures.
async fn next(&mut self) {
loop {
tokio::select! {
future = self.rx.recv() => self.futures.push(future.unwrap()),
_ = self.futures.next(), if !self.futures.is_empty() => {}
}
}
}
}

struct TaskExecutorHandle {
tx: Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
tx: Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl Executor for TaskExecutorHandle {
fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.tx.try_send(future);
}
fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.tx.try_send(future);
}

fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.tx.try_send(future);
}
fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.tx.try_send(future);
}
}

fn make_litep2p() -> (Litep2p, TaskExecutor, Box<dyn Stream<Item = PingEvent> + Send + Unpin>) {
let (executor, sender) = TaskExecutor::new();
let (ping_config, ping_event_stream) = PingConfig::default();

let litep2p = Litep2p::new(
ConfigBuilder::new()
.with_executor(Arc::new(TaskExecutorHandle { tx: sender.clone() }))
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_ping(ping_config)
.build(),
)
.unwrap();

(litep2p, executor, ping_event_stream)
fn make_litep2p() -> (
Litep2p,
TaskExecutor,
Box<dyn Stream<Item = PingEvent> + Send + Unpin>,
) {
let (executor, sender) = TaskExecutor::new();
let (ping_config, ping_event_stream) = PingConfig::default();

let litep2p = Litep2p::new(
ConfigBuilder::new()
.with_executor(Arc::new(TaskExecutorHandle { tx: sender.clone() }))
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_ping(ping_config)
.build(),
)
.unwrap();

(litep2p, executor, ping_event_stream)
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

// create two identical litep2ps
let (mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p();
let (mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p();

// dial `litep2p1`
litep2p2
.dial_address(litep2p1.listen_addresses().next().unwrap().clone())
.await
.unwrap();

tokio::spawn(async move {
loop {
tokio::select! {
_ = executor1.next() => {}
_ = litep2p1.next_event() => {},
_ = ping_event_stream1.next() => {},
}
}
});

// poll litep2p, task executor and ping event stream all together
//
// since a custom task executor was provided, it's now the user's responsibility
// to actually make sure to poll those futures so that litep2p can make progress
loop {
tokio::select! {
_ = executor2.next() => {}
_ = litep2p2.next_event() => {},
event = ping_event_stream2.next() => match event {
Some(PingEvent::Ping { peer, ping }) => tracing::info!(
"ping time with {peer:?}: {ping:?}"
),
_ => {}
}
}
}
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

// create two identical litep2ps
let (mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p();
let (mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p();

// dial `litep2p1`
litep2p2
.dial_address(litep2p1.listen_addresses().next().unwrap().clone())
.await
.unwrap();

tokio::spawn(async move {
loop {
tokio::select! {
_ = executor1.next() => {}
_ = litep2p1.next_event() => {},
_ = ping_event_stream1.next() => {},
}
}
});

// poll litep2p, task executor and ping event stream all together
//
// since a custom task executor was provided, it's now the user's responsibility
// to actually make sure to poll those futures so that litep2p can make progress
loop {
tokio::select! {
_ = executor2.next() => {}
_ = litep2p2.next_event() => {},
event = ping_event_stream2.next() => match event {
Some(PingEvent::Ping { peer, ping }) => tracing::info!(
"ping time with {peer:?}: {ping:?}"
),
_ => {}
}
}
}
}
Loading