diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index b893c5d19e3..670d36dea47 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,8 +1,13 @@ ## 0.13.0 - unreleased +- Don't wait for negotiation on `::poll_close`. + This can save one round-trip for protocols that use stream closing as an operation in ones protocol, e.g. using stream closing to signal the end of a request. + See [PR 4019] for details. + - Raise MSRV to 1.65. See [PR 3715]. +[PR 4019]: https://github.com/libp2p/rust-libp2p/pull/4019 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 ## 0.12.1 diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 100d3c30b7c..b4394812032 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -19,7 +19,7 @@ smallvec = "1.6.1" unsigned-varint = "0.7" [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" futures_ringbuf = "0.4.0" libp2p-core = { workspace = true } diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index dabcec4f605..941b60765ca 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -305,9 +305,7 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Ensure all data has been flushed and expected negotiation messages - // have been received. - ready!(self.as_mut().poll(cx).map_err(Into::::into)?); + // Ensure all data has been flushed, including optimistic multistream-select messages. ready!(self .as_mut() .poll_flush(cx) @@ -316,7 +314,13 @@ where // Continue with the shutdown of the underlying I/O stream. match self.project().state.project() { StateProj::Completed { io, .. } => io.poll_close(cx), - StateProj::Expecting { io, .. } => io.poll_close(cx), + StateProj::Expecting { io, .. } => { + let close_poll = io.poll_close(cx); + if let Poll::Ready(Ok(())) = close_poll { + log::debug!("Stream closed. Confirmation from remote for optimstic protocol negotiation still pending.") + } + close_poll + } StateProj::Invalid => panic!("Negotiated: Invalid state"), } } diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index 412ccc1a0fd..18f8238cd25 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -22,6 +22,7 @@ use futures::prelude::*; use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version}; +use std::time::Duration; #[test] fn select_proto_basic() { @@ -176,3 +177,26 @@ fn negotiation_failed() { } } } + +#[async_std::test] +async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() { + let (client_connection, _server_connection) = futures_ringbuf::Endpoint::pair(1024 * 1024, 1); + + let client = async_std::task::spawn(async move { + // Single protocol to allow for lazy (or optimistic) protocol negotiation. + let protos = vec!["/proto1"]; + let (proto, mut io) = + dialer_select_proto(client_connection, protos.into_iter(), Version::V1Lazy) + .await + .unwrap(); + assert_eq!(proto, "/proto1"); + + // client can close the connection even though protocol negotiation is not yet done, i.e. + // `_server_connection` had been untouched. + io.close().await.unwrap(); + }); + + async_std::future::timeout(Duration::from_secs(10), client) + .await + .unwrap(); +}