Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions swarm/src/behaviour/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,12 @@ where
}
}
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
if let Some(inner) = self.inner.as_mut() {
inner.poll_close(cx)
} else {
Poll::Ready(None)
}
}
}
14 changes: 14 additions & 0 deletions swarm/src/handler/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ where
Poll::Ready(event)
}

fn poll_close(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Self::ToBehaviour>> {
let event = match self {
Either::Left(handler) => futures::ready!(handler.poll_close(cx))
.map(Either::Left),
Either::Right(handler) => futures::ready!(handler.poll_close(cx))
.map(Either::Right),
};

Poll::Ready(event)
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Expand Down
7 changes: 7 additions & 0 deletions swarm/src/handler/map_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ where
self.inner.poll(cx)
}

fn poll_close(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Self::ToBehaviour>> {
self.inner.poll_close(cx)
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Expand Down
6 changes: 6 additions & 0 deletions swarm/src/handler/map_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ where
})
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
self.inner
.poll_close(cx)
.map(|ev| ev.map(|ev| (self.map)(ev)))
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Expand Down
19 changes: 19 additions & 0 deletions swarm/src/handler/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,25 @@ where

Poll::Pending
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
'outer: loop {
for (k, h) in self.handlers.iter_mut() {
match h.poll_close(cx) {
Poll::Ready(Some(e)) => {
return Poll::Ready(Some((k.clone(), e)));
}
Poll::Ready(None) => {
self.handlers.remove(k).expect("to be present");
continue 'outer;
}
Poll::Pending => {}
}
}

return Poll::Pending;
}
}
}

/// Split [`MultiHandler`] into parts.
Expand Down
34 changes: 32 additions & 2 deletions swarm/src/handler/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::handler::{
};
use crate::upgrade::SendWrapper;
use either::Either;
use futures::future;
use futures::{future, ready};
use libp2p_core::upgrade::SelectUpgrade;
use std::{cmp, task::Context, task::Poll};

Expand All @@ -36,12 +36,24 @@ pub struct ConnectionHandlerSelect<TProto1, TProto2> {
proto1: TProto1,
/// The second protocol.
proto2: TProto2,
/// The state when closing via [`ConnectionHandler::poll_close`].
closing_state: ClosingState,
}

#[derive(Debug, Clone)]
enum ClosingState {
Open,
Closed1,
}

impl<TProto1, TProto2> ConnectionHandlerSelect<TProto1, TProto2> {
/// Builds a [`ConnectionHandlerSelect`].
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
ConnectionHandlerSelect { proto1, proto2 }
ConnectionHandlerSelect {
proto1,
proto2,
closing_state: ClosingState::Open,
}
}

pub fn into_inner(self) -> (TProto1, TProto2) {
Expand Down Expand Up @@ -271,6 +283,24 @@ where
Poll::Pending
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
loop {
match self.closing_state {
ClosingState::Open => match ready!(self.proto1.poll_close(cx)) {
Some(event) => return Poll::Ready(Some(Either::Left(event))),
None => {
self.closing_state = ClosingState::Closed1;
continue;
}
},
ClosingState::Closed1 => match ready!(self.proto2.poll_close(cx)) {
Some(event) => return Poll::Ready(Some(Either::Right(event))),
None => return Poll::Ready(None),
},
}
}
}

fn on_connection_event(
&mut self,
event: ConnectionEvent<
Expand Down