Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9bd4a34
Fix clippy warnings
thomaseizinger Oct 25, 2022
9af35b8
Remove unnecessary `Option`
thomaseizinger Oct 8, 2022
54296c5
Make `gargabe_collect` not async
thomaseizinger Oct 8, 2022
6234e6f
Handle socket closing outside of `on_control_command`
thomaseizinger Oct 8, 2022
d5d5ef4
Handle GoAway logic outside of `on_stream_command`
thomaseizinger Oct 8, 2022
6a55933
Remove `async` from a bunch of functions
thomaseizinger Oct 8, 2022
c1e805d
Introduce `Connection::poll` function
thomaseizinger Oct 8, 2022
e02bdbc
Minimise diff
thomaseizinger Oct 8, 2022
3a6a374
Split `on_control_command`
thomaseizinger Oct 11, 2022
5e64609
Use whitespace
thomaseizinger Oct 11, 2022
3817398
Split `on_stream_command`
thomaseizinger Oct 11, 2022
0ebd9cc
Prioritise control and stream commands over reading the socket
thomaseizinger Oct 11, 2022
b37e484
Fail if we want to double close a connection
thomaseizinger Oct 11, 2022
f776a43
Introduce internal `ConnectionState` enum
thomaseizinger Oct 11, 2022
d63cde2
Track closed state in connection state enum
thomaseizinger Oct 11, 2022
a1db18e
Inline `on_close_connection`
thomaseizinger Oct 11, 2022
fd886ee
Implement shutdown procedurally
thomaseizinger Oct 12, 2022
527012d
Don't pause `control_receiver`
thomaseizinger Oct 12, 2022
f9d3f4b
Implement `ConnectionState::poll`
thomaseizinger Oct 12, 2022
2df23f0
Implement `into_stream` by directly calling `poll_next`
thomaseizinger Oct 12, 2022
6ca0d1f
Improve naming of `poll` fn on `ConnectionState`
thomaseizinger Oct 12, 2022
bb96cd9
Improve docs
thomaseizinger Oct 12, 2022
ff27080
Don't fail if connection got closed gracefully
thomaseizinger Oct 12, 2022
527df19
Rename `Failing` to `Cleanup`
thomaseizinger Oct 12, 2022
19ee6e8
Add docs to state variants
thomaseizinger Oct 12, 2022
370f5bc
Track reply sender outside of `close` function
thomaseizinger Oct 20, 2022
7dcecde
Handle `ControlCommand` outside of `ConnectionState::poll`
thomaseizinger Oct 20, 2022
fe7d000
Introduce `Frame::close_stream` ctor
thomaseizinger Oct 20, 2022
2bae656
Move `Drop` impl from `Active` to `Connection`
thomaseizinger Oct 20, 2022
47c684b
Implement connection closing as manual state machine
thomaseizinger Oct 20, 2022
8279c27
Implement connection cleanup as manual state machine
thomaseizinger Oct 25, 2022
ce57e25
Don't require `T` to be `Send + 'static'`
thomaseizinger Oct 25, 2022
0ac90a0
Rewrite `Control` to be a layer on top of `Connection`
thomaseizinger Oct 24, 2022
79c1479
Make `control` as sister module of `connection`
thomaseizinger Oct 25, 2022
3941cdc
Add test for poll-based API
thomaseizinger Oct 25, 2022
a0ba23b
Create workspace
thomaseizinger Nov 3, 2022
654e38a
Remove comment and improve variable naming
thomaseizinger Nov 14, 2022
1525cf8
Fix typo
thomaseizinger Nov 14, 2022
27ed7ac
Match exhaustively
thomaseizinger Nov 14, 2022
1ca32e6
Use doc link
thomaseizinger Nov 14, 2022
63938b1
Bump version and add changelog entry
thomaseizinger Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 97 additions & 172 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ use futures::{
};
use nohash_hasher::IntMap;
use std::collections::VecDeque;
use std::{fmt, io, sync::Arc, task::Poll};
use std::task::Context;
use std::{fmt, sync::Arc, task::Poll};

pub use control::Control;
pub use stream::{Packet, State, Stream};
Expand Down Expand Up @@ -365,120 +366,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
/// case of an error or at EOF.
async fn next(&mut self) -> Result<Stream> {
loop {
self.garbage_collect();

while let Some(frame) = self.pending_frames.pop_front() {
self.socket
.feed(frame)
.await
.or(Err(ConnectionError::Closed))?
}

// Wait for the frame sink to be ready or, if there is a pending
// write, for an incoming frame. I.e. as long as there is a pending
// write, we only read, unless a read results in needing to send a
// frame, in which case we must wait for the pending write to
// complete. When the frame sink is ready, we can proceed with
// waiting for a new stream or control command or another inbound
// frame.
let next_io_event = if self.socket.is_terminated() {
Either::Left(future::pending())
} else {
let socket = &mut self.socket;
let io = future::poll_fn(move |cx| {
if let Poll::Ready(res) = socket.poll_ready_unpin(cx) {
res.or(Err(ConnectionError::Closed))?;
return Poll::Ready(Result::Ok(IoEvent::OutboundReady));
}

// At this point we know the socket sink has a pending
// write, so we try to read the next frame instead.
let next_frame = futures::ready!(socket.poll_next_unpin(cx))
.transpose()
.map_err(ConnectionError::from);
Poll::Ready(Ok(IoEvent::Inbound(next_frame)))
});
Either::Right(io)
};

if let IoEvent::Inbound(frame) = next_io_event.await? {
if let Some(stream) = self.on_frame(frame)? {
return Ok(stream);
}
continue; // The socket sink still has a pending write.
}

// Getting this far implies that the socket is ready to accept
// a new frame, so we can now listen for new commands while waiting
// for the next inbound frame. To that end, for each channel and the
// socket we create a future that gets the next item. We will poll
// each future and if any one of them yields an item, we return the
// tuple of poll results which are then all processed.
//
// For terminated sources we create non-finishing futures.
// This guarantees that if the remaining futures are pending
// we properly wait until woken up because we actually can make
// progress.

let mut num_terminated = 0;

let next_frame = if self.socket.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
// Poll socket for next incoming frame, but also make sure any pending writes are properly flushed.
let socket = &mut self.socket;
let mut flush_done = false;
let next_frame = future::poll_fn(move |cx| {
if let Poll::Ready(res) = socket.poll_next_unpin(cx) {
return Poll::Ready(res);
}

// Prevent calling potentially heavy `flush` once it has completed.
if !flush_done {
match socket.poll_flush_unpin(cx) {
Poll::Ready(Ok(_)) => {
flush_done = true;
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Some(Err(err.into())));
}
Poll::Pending => {}
}
}

Poll::Pending
});

Either::Right(next_frame)
};

let next_stream_command = if self.stream_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Either::Right(self.stream_receiver.next())
};

let next_control_command = if self.control_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Either::Right(self.control_receiver.next())
};

if num_terminated == 3 {
log::debug!("{}: socket and channels are terminated", self.id);
return Err(ConnectionError::Closed);
}

let combined_future = future::select(
future::select(next_stream_command, next_control_command),
next_frame,
);
match combined_future.await {
Either::Left((Either::Left((Some(cmd), _)), _)) => self.on_stream_command(cmd)?,
Either::Left((Either::Left((None, _)), _)) => {
match future::poll_fn(|cx| self.poll(cx)).await? {
Event::StreamReceiverClosed => {
// We only get to this point when `self.stream_receiver`
// was closed which only happens in response to a close control
// command. Now that we are at the end of the stream command queue,
Expand All @@ -500,8 +389,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
self.control_receiver.unpause();
self.control_receiver.stream().close()
}
Either::Left((Either::Right((Some(cmd), _)), _)) => self.on_control_command(cmd)?,
Either::Left((Either::Right((None, _)), _)) => {
Event::ControlReceiverClosed => {
// We only get here after the whole connection shutdown is complete.
// No further processing of commands of any kind or incoming frames
// will happen.
Expand All @@ -514,12 +402,66 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {

return Err(ConnectionError::Closed);
}
Either::Right((frame, _)) => {
if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into))? {
return Ok(stream);
Event::NewStream(stream) => {
return Ok(stream);
}
}
}
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Event>> {
loop {
self.garbage_collect();

if self.socket.poll_ready_unpin(cx).is_ready() {
if let Some(frame) = self.pending_frames.pop_front() {
self.socket.start_send_unpin(frame)?;
continue;
}
}

match self.socket.poll_flush_unpin(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {}
}
Comment on lines +455 to +458
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether we should flush so early in the poll method, or whether it shouldn't be one of the last actions. Rational being that frequent flushing hurts performance, especially in case one can increase the batch instead.

Just a thought. Needs more thought and potentially data to back it up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any consistent performance improvement in my benchmarks when moving this block of code up or down.

However, this got me thinking: We do we communicate via channels between the connection and the stream for writing but we use shared buffers when reading? We could just as easily have a buffer of frames in Shared and wake the Connection whenever we write any frames to that. This would allow us to drain the buffer of all streams in one go, without having to receive individual frames over a channel.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any consistent performance improvement in my benchmarks when moving this block of code up or down.

Thanks for testing. Let's keep as is.

However, this got me thinking: We do we communicate via channels between the connection and the stream for writing but we use shared buffers when reading? We could just as easily have a buffer of frames in Shared and wake the Connection whenever we write any frames to that. This would allow us to drain the buffer of all streams in one go, without having to receive individual frames over a channel.

I am decisive whether the connection should communicate with the stream via a channel or via plain Mutex and Waker. Whatever change we want to make, I think it should not happen within this pull request.


match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
if let Some(stream) = self.on_frame(frame?)? {
return Poll::Ready(Ok(Event::NewStream(stream)));
}
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed));
}
_ => {}
}

match self.control_receiver.poll_next_unpin(cx) {
Poll::Ready(Some(cmd)) => {
self.on_control_command(cmd)?;
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Ok(Event::ControlReceiverClosed));
}
_ => {}
}

match self.stream_receiver.poll_next_unpin(cx) {
Poll::Ready(Some(cmd)) => {
self.on_stream_command(cmd)?;
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Ok(Event::StreamReceiverClosed));
}
_ => {}
}

// If we make it this far, at least once of the above must have registered a waker.
return Poll::Pending;
}
}

Expand Down Expand Up @@ -626,58 +568,43 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
/// and return a corresponding error, which terminates the connection.
/// Otherwise we process the frame and potentially return a new `Stream`
/// if one was opened by the remote.
fn on_frame(&mut self, frame: Result<Option<Frame<()>>>) -> Result<Option<Stream>> {
match frame {
Ok(Some(frame)) => {
log::trace!("{}: received: {}", self.id, frame.header());
let action = match frame.header().tag() {
Tag::Data => self.on_data(frame.into_data()),
Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
Tag::Ping => self.on_ping(&frame.into_ping()),
Tag::GoAway => return Err(ConnectionError::Closed),
};
match action {
Action::None => {}
Action::New(stream, update) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
if let Some(f) = update {
log::trace!("{}/{}: sending update", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
return Ok(Some(stream));
}
Action::Update(f) => {
log::trace!("{}: sending update: {:?}", self.id, f.header());
self.pending_frames.push_back(f.into());
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Action::Reset(f) => {
log::trace!("{}/{}: sending reset", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frames.push_back(f.into());
}
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
log::trace!("{}: received: {}", self.id, frame.header());
let action = match frame.header().tag() {
Tag::Data => self.on_data(frame.into_data()),
Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
Tag::Ping => self.on_ping(&frame.into_ping()),
Tag::GoAway => return Err(ConnectionError::Closed),
};
match action {
Action::None => {}
Action::New(stream, update) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
if let Some(f) = update {
log::trace!("{}/{}: sending update", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Ok(None)
return Ok(Some(stream));
}
Action::Update(f) => {
log::trace!("{}: sending update: {:?}", self.id, f.header());
self.pending_frames.push_back(f.into());
}
Ok(None) => {
log::debug!("{}: socket eof", self.id);
Err(ConnectionError::Closed)
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Err(e) if e.io_kind() == Some(io::ErrorKind::ConnectionReset) => {
log::debug!("{}: connection reset", self.id);
Err(ConnectionError::Closed)
Action::Reset(f) => {
log::trace!("{}/{}: sending reset", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Err(e) => {
log::error!("{}: socket error: {}", self.id, e);
Err(e)
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frames.push_back(f.into());
}
}

Ok(None)
}

fn on_data(&mut self, frame: Frame<Data>) -> Action {
Expand Down Expand Up @@ -1035,12 +962,10 @@ impl<T> Drop for Connection<T> {
}
}

/// Events related to reading from or writing to the underlying socket.
enum IoEvent {
/// A new inbound frame arrived.
Inbound(Result<Option<Frame<()>>>),
/// We can continue sending frames.
OutboundReady,
enum Event {
NewStream(Stream),
ControlReceiverClosed,
StreamReceiverClosed,
}

/// Turn a Yamux [`Connection`] into a [`futures::Stream`].
Expand Down
11 changes: 0 additions & 11 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ pub enum ConnectionError {
TooManyStreams,
}

impl ConnectionError {
/// Return the `ErrorKind` of this `ConnectionError` if it holds an I/O error.
pub(crate) fn io_kind(&self) -> Option<std::io::ErrorKind> {
match self {
ConnectionError::Io(e) => Some(e.kind()),
ConnectionError::Decode(FrameDecodeError::Io(e)) => Some(e.kind()),
_ => None,
}
}
}

impl std::fmt::Display for ConnectionError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Expand Down