From 04fb85b4638219101bb4255625fbd9873d8d8513 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 2 Jun 2023 15:31:17 +0900 Subject: [PATCH 1/6] fix(multistream-select): don't wait for negotiation in poll_close With `Version::V1Lazy` and negotiation of a singlel protocol only, a stream initiator optimistically sends application data right after proposing its protocol. More specifically an application can write data via `AsyncWrite::poll_write` even though the remote has not yet confirmed the stream protocol. This saves one round-trip. ``` mermaid sequenceDiagram A->>B: "/multistream/1.0.0" A->>B: "/perf/1.0.0" A->>B: B->>A: "/multistream/1.0.0" B->>A: "/perf/1.0.0" B->>A: ``` When considering stream closing, i.e. `AsyncWrite::poll_close`, and using stream closing as an operation in ones protocol, e.g. using stream closing to signal the end of a request, this becomes tricky. The behavior without this commit was as following: ``` mermaid sequenceDiagram A->>B: "/multistream/1.0.0" A->>B: "/perf/1.0.0" A->>B: Note right of A: Call `AsyncWrite::poll_close` which first waits for the
optimistic multistream-select negotiation to finish, before closing the stream,
i.e. setting the FIN bit. B->>A: "/multistream/1.0.0" B->>A: "/perf/1.0.0" Note right of B: Waiting for A to close the stream (i.e. set the `FIN` bit)
before sending the response. A->>B: FIN B->>A: ``` The above takes 2 round trips: 1. Send the optimistic multistream-select protocol proposals as well as the initiator protocol payload and waits for the confirmation of the protocols. 2. Close the stream, i.e. sends the `FIN` bit and waits for the responder protocol payload. This commit proposes that the stream initiator should not wait for the multistream-select protocol confirmation when closing the stream, but close the stream within the first round-trip. ``` mermaid sequenceDiagram A->>B: "/multistream/1.0.0" A->>B: "/perf/1.0.0" A->>B: A->>B: FIN B->>A: "/multistream/1.0.0" B->>A: "/perf/1.0.0" B->>A: ``` This takes 1 round-trip. The downside of this commit is, that the stream initiator will no longer notice a negotiation error when closing the stream. They will only notice it when reading from the stream. E.g. say that B does not support "/perf/1.0.0", A will only notice on `AsyncRead::poll_read`, not on `AsyncWrite::poll_close`. This is problematic for protocols where A only sends data, but never receives data, i.e. never calls `AsyncRead::poll_read`. Though one can argue that such protocol is flawed in the first place. With a response-less protocol, as even if negotiation succceeds, A doesn't know whether B received the protocol payload. --- misc/multistream-select/src/negotiated.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index dabcec4f605..bebaa78313d 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -305,9 +305,7 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Ensure all data has been flushed and expected negotiation messages - // have been received. - ready!(self.as_mut().poll(cx).map_err(Into::::into)?); + // Ensure all data has been flushed. ready!(self .as_mut() .poll_flush(cx) From 98216024fdedcc8d845f34536c1ba0db7c213c27 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 5 Jun 2023 11:28:50 +0900 Subject: [PATCH 2/6] Add changelog entry --- misc/multistream-select/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index b893c5d19e3..670d36dea47 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,8 +1,13 @@ ## 0.13.0 - unreleased +- Don't wait for negotiation on `::poll_close`. + This can save one round-trip for protocols that use stream closing as an operation in ones protocol, e.g. using stream closing to signal the end of a request. + See [PR 4019] for details. + - Raise MSRV to 1.65. See [PR 3715]. +[PR 4019]: https://github.com/libp2p/rust-libp2p/pull/4019 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 ## 0.12.1 From 22668dafde7e7e79416c33f373c10e51fce6683d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 5 Jun 2023 11:37:24 +0900 Subject: [PATCH 3/6] Add debug log line --- misc/multistream-select/src/negotiated.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index bebaa78313d..941b60765ca 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -305,7 +305,7 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Ensure all data has been flushed. + // Ensure all data has been flushed, including optimistic multistream-select messages. ready!(self .as_mut() .poll_flush(cx) @@ -314,7 +314,13 @@ where // Continue with the shutdown of the underlying I/O stream. match self.project().state.project() { StateProj::Completed { io, .. } => io.poll_close(cx), - StateProj::Expecting { io, .. } => io.poll_close(cx), + StateProj::Expecting { io, .. } => { + let close_poll = io.poll_close(cx); + if let Poll::Ready(Ok(())) = close_poll { + log::debug!("Stream closed. Confirmation from remote for optimstic protocol negotiation still pending.") + } + close_poll + } StateProj::Invalid => panic!("Negotiated: Invalid state"), } } From 2e16ec5f355885d58495a66a4bbe6a16e570a7eb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 5 Jun 2023 17:12:13 +0900 Subject: [PATCH 4/6] Add test --- misc/multistream-select/Cargo.toml | 4 +-- .../multistream-select/tests/dialer_select.rs | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 329e6b92ab2..c9c029bac43 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -19,7 +19,7 @@ smallvec = "1.6.1" unsigned-varint = "0.7" [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" libp2p-core = { workspace = true } libp2p-mplex = { workspace = true } @@ -30,7 +30,7 @@ quickcheck = { workspace = true } rand = "0.8" rw-stream-sink = { workspace = true } -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index f080730b939..0f6df2f17c7 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -20,6 +20,8 @@ //! Integration tests for protocol negotiation. +use std::time::Duration; + use async_std::net::{TcpListener, TcpStream}; use futures::prelude::*; use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version}; @@ -176,3 +178,34 @@ fn negotiation_failed() { } } } + +#[async_std::test] +async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let _server = async_std::task::spawn(async move { + let _connec = listener.accept().await.unwrap().0; + // Blocks forever as only a single connection is dialed by the client. Never interacts with + // `_connec`. + listener.accept().await.unwrap(); + }); + + let client = async_std::task::spawn(async move { + let connec = TcpStream::connect(&listener_addr).await.unwrap(); + // Single protocol to allow for lazy (or optimistic) protocol negotiation. + let protos = vec!["/proto1"]; + let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), Version::V1Lazy) + .await + .unwrap(); + assert_eq!(proto, "/proto1"); + + // client can close the connection even though protocol negotiation is not yet done, i.e. + // server never interacted with the connection. + io.close().await.unwrap(); + }); + + async_std::future::timeout(Duration::from_secs(10), client) + .await + .unwrap(); +} From b714c4f607baa55a67a4765aef8eda55739d94b6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 6 Jun 2023 13:20:28 +0900 Subject: [PATCH 5/6] Use futures-ringbuf --- Cargo.lock | 34 +++++++++++++++---- misc/multistream-select/Cargo.toml | 1 + .../multistream-select/tests/dialer_select.rs | 20 ++++------- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29b30f03cf4..d978113e84b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1748,7 +1748,19 @@ dependencies = [ "futures", "log", "log-derive", - "ringbuf", + "ringbuf 0.2.8", + "rustc_version", +] + +[[package]] +name = "futures_ringbuf" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6628abb6eb1fc74beaeb20cd0670c43d158b0150f7689b38c3eaf663f99bdec7" +dependencies = [ + "futures", + "log", + "ringbuf 0.3.3", "rustc_version", ] @@ -2500,7 +2512,7 @@ dependencies = [ "async-std", "flate2", "futures", - "futures_ringbuf", + "futures_ringbuf 0.3.1", "libp2p-core", "libp2p-tcp", "quickcheck-ext", @@ -2741,7 +2753,7 @@ version = "0.1.0" dependencies = [ "futures", "futures-timer", - "futures_ringbuf", + "futures_ringbuf 0.3.1", "libp2p-core", "log", ] @@ -2754,7 +2766,7 @@ dependencies = [ "curve25519-dalek 3.2.0", "env_logger 0.10.0", "futures", - "futures_ringbuf", + "futures_ringbuf 0.3.1", "libp2p-core", "libp2p-identity", "log", @@ -2827,7 +2839,7 @@ dependencies = [ "bytes", "env_logger 0.10.0", "futures", - "futures_ringbuf", + "futures_ringbuf 0.3.1", "libp2p-core", "libp2p-identity", "log", @@ -2950,7 +2962,7 @@ dependencies = [ "async-trait", "env_logger 0.10.0", "futures", - "futures_ringbuf", + "futures_ringbuf 0.3.1", "instant", "libp2p-core", "libp2p-identity", @@ -3428,6 +3440,7 @@ dependencies = [ "bytes", "env_logger 0.10.0", "futures", + "futures_ringbuf 0.4.0", "libp2p-core", "libp2p-identity", "libp2p-mplex", @@ -4308,6 +4321,15 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "ringbuf" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "rmp" version = "0.8.11" diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index c9c029bac43..936488c75c6 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -21,6 +21,7 @@ unsigned-varint = "0.7" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" +futures_ringbuf = "0.4.0" libp2p-core = { workspace = true } libp2p-mplex = { workspace = true } libp2p-plaintext = { workspace = true } diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index 0f6df2f17c7..a557d6a086b 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -181,27 +181,19 @@ fn negotiation_failed() { #[async_std::test] async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let _server = async_std::task::spawn(async move { - let _connec = listener.accept().await.unwrap().0; - // Blocks forever as only a single connection is dialed by the client. Never interacts with - // `_connec`. - listener.accept().await.unwrap(); - }); + let (client_connection, _server_connection) = futures_ringbuf::Endpoint::pair(1024 * 1024, 1); let client = async_std::task::spawn(async move { - let connec = TcpStream::connect(&listener_addr).await.unwrap(); // Single protocol to allow for lazy (or optimistic) protocol negotiation. let protos = vec!["/proto1"]; - let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), Version::V1Lazy) - .await - .unwrap(); + let (proto, mut io) = + dialer_select_proto(client_connection, protos.into_iter(), Version::V1Lazy) + .await + .unwrap(); assert_eq!(proto, "/proto1"); // client can close the connection even though protocol negotiation is not yet done, i.e. - // server never interacted with the connection. + // `_server_connection` had been untouched. io.close().await.unwrap(); }); From 13fbb8e9f6e378487d8ceb054e811e4f3b9038a8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 6 Jun 2023 20:48:48 +0200 Subject: [PATCH 6/6] Fix formatting --- misc/multistream-select/tests/dialer_select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index c4e89301146..18f8238cd25 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -20,9 +20,9 @@ //! Integration tests for protocol negotiation. -use std::time::Duration; use futures::prelude::*; use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version}; +use std::time::Duration; #[test] fn select_proto_basic() {