Skip to content
Merged
5 changes: 5 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
## 0.13.0 - unreleased

- Don't wait for negotiation on `<Negotiated as AsyncWrite>::poll_close`.
This can save one round-trip for protocols that use stream closing as an operation in ones protocol, e.g. using stream closing to signal the end of a request.
See [PR 4019] for details.

- Raise MSRV to 1.65.
See [PR 3715].

[PR 4019]: https://github.com/libp2p/rust-libp2p/pull/4019
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715

## 0.12.1
Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ smallvec = "1.6.1"
unsigned-varint = "0.7"

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10"
futures_ringbuf = "0.4.0"
libp2p-core = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,7 @@ where
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// Ensure all data has been flushed and expected negotiation messages
// have been received.
ready!(self.as_mut().poll(cx).map_err(Into::<io::Error>::into)?);
// Ensure all data has been flushed, including optimistic multistream-select messages.
ready!(self
.as_mut()
.poll_flush(cx)
Expand All @@ -316,7 +314,13 @@ where
// Continue with the shutdown of the underlying I/O stream.
match self.project().state.project() {
StateProj::Completed { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => {
let close_poll = io.poll_close(cx);
if let Poll::Ready(Ok(())) = close_poll {
log::debug!("Stream closed. Confirmation from remote for optimstic protocol negotiation still pending.")
}
close_poll
}
StateProj::Invalid => panic!("Negotiated: Invalid state"),
}
}
Expand Down
24 changes: 24 additions & 0 deletions misc/multistream-select/tests/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use futures::prelude::*;
use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version};
use std::time::Duration;

#[test]
fn select_proto_basic() {
Expand Down Expand Up @@ -176,3 +177,26 @@ fn negotiation_failed() {
}
}
}

#[async_std::test]
async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() {
let (client_connection, _server_connection) = futures_ringbuf::Endpoint::pair(1024 * 1024, 1);

let client = async_std::task::spawn(async move {
// Single protocol to allow for lazy (or optimistic) protocol negotiation.
let protos = vec!["/proto1"];
let (proto, mut io) =
dialer_select_proto(client_connection, protos.into_iter(), Version::V1Lazy)
.await
.unwrap();
assert_eq!(proto, "/proto1");

// client can close the connection even though protocol negotiation is not yet done, i.e.
// `_server_connection` had been untouched.
io.close().await.unwrap();
});

async_std::future::timeout(Duration::from_secs(10), client)
.await
.unwrap();
}