99// at https://opensource.org/licenses/MIT.
1010
1111use super :: ControlCommand ;
12- use crate :: { error:: ConnectionError , Stream } ;
12+ use crate :: error:: { into_io_error, ConnectionError } ;
13+ use crate :: Stream ;
1314use futures:: {
1415 channel:: { mpsc, oneshot} ,
1516 prelude:: * ,
1617 ready,
1718} ;
1819use std:: {
20+ io,
1921 pin:: Pin ,
2022 task:: { Context , Poll } ,
2123} ;
2224
23- type Result < T > = std:: result:: Result < T , ConnectionError > ;
24-
2525/// The Yamux `Connection` controller.
2626///
2727/// While a Yamux connection makes progress via its `next_stream` method,
@@ -36,7 +36,7 @@ pub struct Control {
3636 /// Command channel to `Connection`.
3737 sender : mpsc:: Sender < ControlCommand > ,
3838 /// Pending state of `poll_open_stream`.
39- pending_open : Option < oneshot:: Receiver < Result < Stream > > > ,
39+ pending_open : Option < oneshot:: Receiver < Result < Stream , ConnectionError > > > ,
4040 /// Pending state of `poll_close`.
4141 pending_close : Option < oneshot:: Receiver < ( ) > > ,
4242}
@@ -61,14 +61,19 @@ impl Control {
6161 }
6262
6363 /// Open a new stream to the remote.
64- pub async fn open_stream ( & mut self ) -> Result < Stream > {
64+ pub async fn open_stream ( & mut self ) -> io :: Result < Stream > {
6565 let ( tx, rx) = oneshot:: channel ( ) ;
66- self . sender . send ( ControlCommand :: OpenStream ( tx) ) . await ?;
67- rx. await ?
66+ self . sender
67+ . send ( ControlCommand :: OpenStream ( tx) )
68+ . await
69+ . map_err ( into_io_error) ?;
70+ let stream = rx. await . map_err ( into_io_error) ?. map_err ( into_io_error) ?;
71+
72+ Ok ( stream)
6873 }
6974
7075 /// Close the connection.
71- pub async fn close ( & mut self ) -> Result < ( ) > {
76+ pub async fn close ( & mut self ) -> io :: Result < ( ) > {
7277 let ( tx, rx) = oneshot:: channel ( ) ;
7378 if self
7479 . sender
@@ -86,17 +91,22 @@ impl Control {
8691 }
8792
8893 /// [`Poll`] based alternative to [`Control::open_stream`].
89- pub fn poll_open_stream ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Result < Stream > > {
94+ pub fn poll_open_stream (
95+ mut self : Pin < & mut Self > ,
96+ cx : & mut Context ,
97+ ) -> Poll < io:: Result < Stream > > {
9098 loop {
9199 match self . pending_open . take ( ) {
92100 None => {
93- ready ! ( self . sender. poll_ready( cx) ?) ;
101+ ready ! ( self . sender. poll_ready( cx) . map_err ( into_io_error ) ?) ;
94102 let ( tx, rx) = oneshot:: channel ( ) ;
95- self . sender . start_send ( ControlCommand :: OpenStream ( tx) ) ?;
103+ self . sender
104+ . start_send ( ControlCommand :: OpenStream ( tx) )
105+ . map_err ( into_io_error) ?;
96106 self . pending_open = Some ( rx)
97107 }
98- Some ( mut rx) => match rx. poll_unpin ( cx) ? {
99- Poll :: Ready ( result) => return Poll :: Ready ( result) ,
108+ Some ( mut rx) => match rx. poll_unpin ( cx) . map_err ( into_io_error ) ? {
109+ Poll :: Ready ( result) => return Poll :: Ready ( result. map_err ( into_io_error ) ) ,
100110 Poll :: Pending => {
101111 self . pending_open = Some ( rx) ;
102112 return Poll :: Pending ;
@@ -112,7 +122,7 @@ impl Control {
112122 }
113123
114124 /// [`Poll`] based alternative to [`Control::close`].
115- pub fn poll_close ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Result < ( ) > > {
125+ pub fn poll_close ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io :: Result < ( ) > > {
116126 loop {
117127 match self . pending_close . take ( ) {
118128 None => {
0 commit comments