Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
59617de
transports/webrtc/: Test message framing sizes
mxinden Aug 19, 2022
7dc3ce1
transports/webrtc/: Implement message framing
mxinden Aug 22, 2022
503e32f
transports/webrtc: Update protobuf
mxinden Aug 22, 2022
22e97a0
Merge remote-tracking branch 'melekes/anton/webrtc-transport' into we…
mxinden Sep 5, 2022
55da918
transports/webrtc/: Change semantic of RESET
mxinden Sep 5, 2022
11c016f
transports/webrtc/: Import message_proto types
mxinden Sep 10, 2022
1a6e4bd
transports/webrtc/: Refactor AsyncRead match arm
mxinden Sep 10, 2022
d46a171
transports/webrtc/: Handle flags when read side closed
mxinden Sep 10, 2022
9cd4ef7
transports/webrtc: Enforce maximum message length
mxinden Sep 11, 2022
31e019a
minor refactoring
melekes Sep 19, 2022
6f57ed6
rename PollDataChannel to Substream
melekes Sep 19, 2022
421a8ec
Merge branch 'master' into webrtc-message-framing
melekes Sep 19, 2022
4a1d4d6
add comments
melekes Sep 19, 2022
c6c5a96
add debug to handle_flag
melekes Sep 20, 2022
1b0b671
Create noise-prologue from server + client FP in fixed order
thomaseizinger Sep 26, 2022
aa38c81
Make `substream` a top-level module
thomaseizinger Sep 27, 2022
d0e918b
Replace nightly feature with refactoring
thomaseizinger Sep 27, 2022
171c613
Remove use of import rename
thomaseizinger Sep 27, 2022
cb481a6
Merge branch 'anton/webrtc-transport' into webrtc-message-framing
thomaseizinger Sep 27, 2022
6402665
Remove unused public API
thomaseizinger Sep 27, 2022
d537696
Make sure we don't construct substreams outside of this crate
thomaseizinger Sep 27, 2022
c8c2446
Don't expose public APIs for temporary workarounds
thomaseizinger Sep 27, 2022
d58f219
Remove pub where not necessary
thomaseizinger Sep 27, 2022
eb09d36
Remove utilities below usage
thomaseizinger Sep 27, 2022
99af2a1
Remove stale derive
thomaseizinger Sep 27, 2022
2d85ab4
Update docs
thomaseizinger Sep 27, 2022
6e2aeb1
Fix clippy warnings
thomaseizinger Sep 27, 2022
a6b2aac
Revert "Create noise-prologue from server + client FP in fixed order"
thomaseizinger Oct 6, 2022
afac31c
Merge branch 'master' into webrtc-message-framing
thomaseizinger Oct 7, 2022
8266fc4
Merge remote-tracking branch 'melekes/anton/webrtc-transport' into we…
thomaseizinger Oct 11, 2022
d2da793
Add initial test suite for substream state machine
thomaseizinger Oct 11, 2022
1b520a9
Precompute substream ID
thomaseizinger Oct 11, 2022
e483974
Implement new state machine
thomaseizinger Oct 11, 2022
29d6f74
Reset flag clears buffer
thomaseizinger Oct 11, 2022
d829fda
Remove substream ID
thomaseizinger Oct 11, 2022
058a153
Remove use of `map_err`
thomaseizinger Oct 11, 2022
e1df3c4
Replace error with `Poll::Pending`
thomaseizinger Oct 11, 2022
8c8feaa
Remove unnecessary dependency
thomaseizinger Oct 11, 2022
e6c177c
Group imports
thomaseizinger Oct 11, 2022
b2961a0
Add spec wording to constant
thomaseizinger Oct 11, 2022
f827f62
Introduce dedicated `State` submodule
thomaseizinger Oct 11, 2022
16433db
Send reset flag for dropped substreams
thomaseizinger Oct 11, 2022
834ec64
Fix clippy warnings
thomaseizinger Oct 11, 2022
7e0e46d
Fix docs
thomaseizinger Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions misc/prost-codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,12 @@ pub enum Error {
std::io::Error,
),
}

impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
match e {
Error::Decode(e) => e.into(),
Error::Io(e) => e,
}
}
}
14 changes: 11 additions & 3 deletions transports/webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
asynchronous-codec = "0.6"
async-trait = "0.1"
bytes = "1"
fnv = "1.0"
Expand All @@ -23,20 +24,27 @@ libp2p-noise = { version = "0.40.0", path = "../../transports/noise" }
log = "0.4"
multihash = { version = "0.16", default-features = false, features = ["sha2"] }
multibase = "0.9"
prost = "0.11"
prost-codec = { version = "0.2", path = "../../misc/prost-codec" }
rand = "0.8"
rcgen = "0.9.3"
serde = { version = "1.0", features = ["derive"] }
stun = "0.4"
thiserror = "1"
tinytemplate = "1.2"
tokio-crate = { package = "tokio", version = "1.18", features = ["net"]}
tokio-util = { version = "0.7", features = ["compat"] }
webrtc = "0.5.0"

[build-dependencies]
prost-build = "0.11"

[dev-dependencies]
anyhow = "1.0"
env_logger = "0.9"
libp2p = { path = "../..", features = ["request-response", "webrtc"], default-features = false }
rand_core = "0.5"
quickcheck = "1"
hex-literal = "0.3"
libp2p = { path = "../..", features = ["request-response", "webrtc"], default-features = false }
multihash = { version = "0.16", default-features = false, features = ["sha3"] }
quickcheck = "1"
rand_core = "0.5"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
23 changes: 23 additions & 0 deletions transports/webrtc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

fn main() {
prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap();
}
73 changes: 52 additions & 21 deletions transports/webrtc/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,30 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod poll_data_channel;

use futures::stream::FuturesUnordered;
use futures::{
channel::{
mpsc,
oneshot::{self, Sender},
},
lock::Mutex as FutMutex,
{future::BoxFuture, prelude::*, ready},
StreamExt,
{future::BoxFuture, ready},
};
use futures_lite::StreamExt;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use log::{debug, error, trace};
use webrtc::data::data_channel::DataChannel as DetachedDataChannel;
use webrtc::data_channel::RTCDataChannel;
use webrtc::peer_connection::RTCPeerConnection;

use std::task::Waker;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use crate::error::Error;
pub(crate) use poll_data_channel::PollDataChannel;
use crate::{error::Error, substream, substream::Substream};

const MAX_DATA_CHANNELS_IN_FLIGHT: usize = 10;

Expand All @@ -61,6 +60,9 @@ pub struct Connection {

/// Future, which, once polled, will result in closing the entire connection.
close_fut: Option<BoxFuture<'static, Result<(), Error>>>,

drop_listeners: FuturesUnordered<substream::DropListener>,
no_drop_listeners_waker: Option<Waker>,
}

impl Unpin for Connection {}
Expand All @@ -77,6 +79,8 @@ impl Connection {
incoming_data_channels_rx: data_channel_rx,
outbound_fut: None,
close_fut: None,
drop_listeners: FuturesUnordered::default(),
no_drop_listeners_waker: None,
}
}

Expand Down Expand Up @@ -138,30 +142,52 @@ impl Connection {
}

impl StreamMuxer for Connection {
type Substream = PollDataChannel;
type Substream = Substream;
type Error = Error;

fn poll_inbound(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match ready!(self.incoming_data_channels_rx.poll_next(cx)) {
match ready!(self.incoming_data_channels_rx.poll_next_unpin(cx)) {
Some(detached) => {
trace!("Incoming substream {}", detached.stream_identifier());

Poll::Ready(Ok(PollDataChannel::new(detached)))
let (substream, drop_listener) = Substream::new(detached);
self.drop_listeners.push(drop_listener);
if let Some(waker) = self.no_drop_listeners_waker.take() {
waker.wake()
}

Poll::Ready(Ok(substream))
}
None => {
debug_assert!(
false,
"Sender-end of channel should be owned by `RTCPeerConnection`"
);

Poll::Pending // Return `Pending` without registering a waker: If the channel is closed, we don't need to be called anymore.
}
None => Poll::Ready(Err(Error::Internal(
"incoming_data_channels_rx is closed (no messages left)".to_string(),
))),
}
}

fn poll(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
loop {
match ready!(self.drop_listeners.poll_next_unpin(cx)) {
Some(Ok(())) => {}
Some(Err(e)) => {
log::debug!("a DropListener failed: {e}")
}
None => {
self.no_drop_listeners_waker = Some(cx.waker().clone());
return Poll::Pending;
}
}
}
}

fn poll_outbound(
Expand All @@ -173,10 +199,7 @@ impl StreamMuxer for Connection {
let peer_conn = peer_conn.lock().await;

// Create a datachannel with label 'data'
let data_channel = peer_conn
.create_data_channel("data", None)
.map_err(Error::WebRTC)
.await?;
let data_channel = peer_conn.create_data_channel("data", None).await?;

trace!("Opening outbound substream {}", data_channel.id());

Expand All @@ -198,7 +221,13 @@ impl StreamMuxer for Connection {
match ready!(fut.as_mut().poll(cx)) {
Ok(detached) => {
self.outbound_fut = None;
Poll::Ready(Ok(PollDataChannel::new(detached)))
let (substream, drop_listener) = Substream::new(detached);
self.drop_listeners.push(drop_listener);
if let Some(waker) = self.no_drop_listeners_waker.take() {
waker.wake()
}

Poll::Ready(Ok(substream))
}
Err(e) => {
self.outbound_fut = None;
Expand All @@ -213,7 +242,9 @@ impl StreamMuxer for Connection {
let peer_conn = self.peer_conn.clone();
let fut = self.close_fut.get_or_insert(Box::pin(async move {
let peer_conn = peer_conn.lock().await;
peer_conn.close().await.map_err(Error::WebRTC)
peer_conn.close().await?;

Ok(())
}));

match ready!(fut.as_mut().poll(cx)) {
Expand Down
140 changes: 0 additions & 140 deletions transports/webrtc/src/connection/poll_data_channel.rs

This file was deleted.

2 changes: 1 addition & 1 deletion transports/webrtc/src/fingerprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Fingerprint {
}

/// Returns the algorithm used (e.g. "sha-256").
/// See https://datatracker.ietf.org/doc/html/rfc8122#section-5
/// See <https://datatracker.ietf.org/doc/html/rfc8122#section-5>
pub fn algorithm(&self) -> String {
SHA256.to_owned()
}
Expand Down
6 changes: 6 additions & 0 deletions transports/webrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ mod error;
mod fingerprint;
mod req_res_chan;
mod sdp;
mod substream;
mod transport;
mod udp_mux;
mod upgrade;
mod message_proto {
#![allow(clippy::derive_partial_eq_without_eq)]

include!(concat!(env!("OUT_DIR"), "/webrtc.pb.rs"));
}

pub use connection::Connection;
pub use error::Error;
Expand Down
Loading