From 5497971e6a95752cf5e4ff074a7bedc9223e8325 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 31 May 2022 14:50:30 +0400 Subject: [PATCH 1/6] stream: allow reading after stream is closed Closes #13 --- src/stream/mod.rs | 37 +++++++++-------- src/stream/stream_test.rs | 86 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 169d724..6848f45 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -168,8 +168,7 @@ impl Stream { } /// read reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. - /// Returns EOF when the stream is reset or an error if the stream is closed - /// otherwise. + /// Returns EOF when the stream is reset or an error if `p` is too short. pub async fn read(&self, p: &mut [u8]) -> Result { let (n, _) = self.read_sctp(p).await?; Ok(n) @@ -177,27 +176,25 @@ impl Stream { /// read_sctp reads a packet of len(p) bytes and returns the associated Payload /// Protocol Identifier. - /// Returns EOF when the stream is reset or an error if the stream is closed - /// otherwise. + /// Returns EOF when the stream is reset or an error if `p` is too short. pub async fn read_sctp(&self, p: &mut [u8]) -> Result<(usize, PayloadProtocolIdentifier)> { - while !self.closed.load(Ordering::SeqCst) { + loop { let result = { let mut reassembly_queue = self.reassembly_queue.lock().await; reassembly_queue.read(p) }; - if result.is_ok() { - return result; - } else if let Err(err) = result { - if Error::ErrShortBuffer == err { - return Err(err); + match result { + Ok(_) => return result, + Err(Error::ErrShortBuffer) => return result, + Err(_) => + { + // TODO: shouldn't we mark read_notifier as notified in other cases as well? + self.read_notifier.notified().await; } } - self.read_notifier.notified().await; } - - Err(Error::ErrStreamClosed) } pub(crate) async fn handle_data(&self, pd: ChunkPayloadData) { @@ -265,6 +262,10 @@ impl Stream { /// write_sctp writes len(p) bytes from p to the DTLS connection pub async fn write_sctp(&self, p: &Bytes, ppi: PayloadProtocolIdentifier) -> Result { + if self.closed.load(Ordering::SeqCst) { + return Err(Error::ErrStreamClosed); + } + if p.len() > self.max_message_size.load(Ordering::SeqCst) as usize { return Err(Error::ErrOutboundPacketTooLarge); } @@ -342,12 +343,14 @@ impl Stream { /// Close closes the write-direction of the stream. /// Future calls to write are not permitted after calling Close. pub async fn close(&self) -> Result<()> { - if !self.closed.load(Ordering::SeqCst) { - // Reset the outgoing stream - // https://tools.ietf.org/html/rfc6525 - self.send_reset_request(self.stream_identifier).await?; + if self.closed.load(Ordering::SeqCst) { + return Ok(()); } + self.closed.store(true, Ordering::SeqCst); + // Reset the outgoing stream + // https://tools.ietf.org/html/rfc6525 + self.send_reset_request(self.stream_identifier).await?; self.read_notifier.notify_waiters(); // broadcast regardless Ok(()) diff --git a/src/stream/stream_test.rs b/src/stream/stream_test.rs index cab45af..6925ab8 100644 --- a/src/stream/stream_test.rs +++ b/src/stream/stream_test.rs @@ -72,6 +72,75 @@ async fn test_stream_amount_on_buffered_amount_low() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_stream() -> std::result::Result<(), io::Error> { + let s = Stream::new( + "test_poll_stream".to_owned(), + 0, + 4096, + Arc::new(AtomicU32::new(4096)), + Arc::new(AtomicU8::new(AssociationState::Established as u8)), + None, + Arc::new(PendingQueue::new()), + ); + + // getters + assert_eq!(0, s.stream_identifier()); + assert_eq!(0, s.buffered_amount()); + assert_eq!(0, s.buffered_amount_low_threshold()); + assert_eq!(0, s.get_num_bytes_in_reassembly_queue().await); + + // setters + s.set_default_payload_type(PayloadProtocolIdentifier::Binary); + s.set_reliability_params(true, ReliabilityType::Reliable, 0); + + // write + let n = s.write(&Bytes::from("Hello ")).await?; + assert_eq!(6, n); + assert_eq!(6, s.buffered_amount()); + let n = s + .write_sctp(&Bytes::from("world"), PayloadProtocolIdentifier::Binary) + .await?; + assert_eq!(5, n); + assert_eq!(11, s.buffered_amount()); + + // async read + // 1. pretend that we've received a chunk + s.handle_data(ChunkPayloadData { + unordered: true, + beginning_fragment: true, + ending_fragment: true, + user_data: Bytes::from_static(&[0, 1, 2, 3, 4]), + payload_type: PayloadProtocolIdentifier::Binary, + ..Default::default() + }) + .await; + // 2. read it + let mut buf = [0; 5]; + s.read(&mut buf).await?; + assert_eq!(buf, [0, 1, 2, 3, 4]); + + // close + s.close().await?; + // write must fail + assert!(s.write(&Bytes::from("error")).await.is_err()); + // read should continue working + s.handle_data(ChunkPayloadData { + unordered: true, + beginning_fragment: true, + ending_fragment: true, + user_data: Bytes::from_static(&[5, 6, 7, 8, 9]), + payload_type: PayloadProtocolIdentifier::Binary, + ..Default::default() + }) + .await; + let mut buf = [0; 5]; + s.read(&mut buf).await?; + assert_eq!(buf, [5, 6, 7, 8, 9]); + + Ok(()) +} + #[tokio::test] async fn test_poll_stream() -> std::result::Result<(), io::Error> { let s = Arc::new(Stream::new( @@ -116,8 +185,21 @@ async fn test_poll_stream() -> std::result::Result<(), io::Error> { // shutdown poll_stream.shutdown().await?; - assert_eq!(true, sc.closed.load(Ordering::Relaxed)); - assert!(poll_stream.read(&mut buf).await.is_err()); + // write must fail + assert!(poll_stream.write(&[1, 2, 3]).await.is_err()); + // read should continue working + sc.handle_data(ChunkPayloadData { + unordered: true, + beginning_fragment: true, + ending_fragment: true, + user_data: Bytes::from_static(&[5, 6, 7, 8, 9]), + payload_type: PayloadProtocolIdentifier::Binary, + ..Default::default() + }) + .await; + let mut buf = [0; 5]; + poll_stream.read(&mut buf).await?; + assert_eq!(buf, [5, 6, 7, 8, 9]); // misc. let clone = poll_stream.clone(); From 9b98923b10d6e73733f03c690877502035676c25 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 31 May 2022 15:45:10 +0400 Subject: [PATCH 2/6] format code --- src/stream/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 6848f45..efc8726 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -187,13 +187,11 @@ impl Stream { match result { Ok(_) => return result, Err(Error::ErrShortBuffer) => return result, - Err(_) => - { + Err(_) => { // TODO: shouldn't we mark read_notifier as notified in other cases as well? self.read_notifier.notified().await; } } - } } From 2e7dbf1ed4556b6a5e117a2399e9865cbb1385fb Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 8 Jun 2022 12:59:15 +0400 Subject: [PATCH 3/6] add Stream::shutdown - deprecate Stream::close in favor of it - only shutdown writing half of stream in PollStream::poll_shutdown Refs https://github.com/webrtc-rs/sctp/issues/13#issuecomment-1148548764 --- examples/ping.rs | 2 +- examples/pong.rs | 2 +- src/association/association_internal.rs | 7 +- src/stream/mod.rs | 95 ++++++++++++++++++------- src/stream/stream_test.rs | 6 +- 5 files changed, 81 insertions(+), 31 deletions(-) diff --git a/examples/ping.rs b/examples/ping.rs index 842619a..13d8002 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -108,7 +108,7 @@ async fn main() -> Result<(), Error> { signal::ctrl_c().await.expect("failed to listen for event"); println!("Closing stream and association..."); - stream.close().await?; + stream.shutdown(Shutdown::Both).await?; a.close().await?; let _ = done_rx.recv().await; diff --git a/examples/pong.rs b/examples/pong.rs index a1dcad7..115e520 100644 --- a/examples/pong.rs +++ b/examples/pong.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Error> { signal::ctrl_c().await.expect("failed to listen for event"); println!("Closing stream and association..."); - stream.close().await?; + stream.shutdown(Shutdown::Write).await?; a.close().await?; let _ = done_rx.recv().await; diff --git a/src/association/association_internal.rs b/src/association/association_internal.rs index 48d5bca..2d8e0f0 100644 --- a/src/association/association_internal.rs +++ b/src/association/association_internal.rs @@ -294,8 +294,11 @@ impl AssociationInternal { fn unregister_stream(&mut self, stream_identifier: u16) { let s = self.streams.remove(&stream_identifier); if let Some(s) = s { - s.closed.store(true, Ordering::SeqCst); - s.read_notifier.notify_waiters(); + // NOTE: shutdown is not used here because it resets the stream. + if !s.read_shutdown.swap(true, Ordering::SeqCst) { + s.read_notifier.notify_waiters(); + } + s.write_shutdown.store(true, Ordering::SeqCst); } } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9490023..0b4c664 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -19,6 +19,14 @@ use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::{mpsc, Mutex, Notify}; +/// Possible values which can be passed to the [`Stream::shutdown`] method. +#[derive(PartialEq)] +pub enum Shutdown { + Read, + Write, + Both, +} + #[derive(Debug, Copy, Clone, PartialEq)] #[repr(C)] pub enum ReliabilityType { @@ -76,7 +84,8 @@ pub struct Stream { pub(crate) reassembly_queue: Mutex, pub(crate) sequence_number: AtomicU16, pub(crate) read_notifier: Notify, - pub(crate) closed: AtomicBool, + pub(crate) read_shutdown: AtomicBool, + pub(crate) write_shutdown: AtomicBool, pub(crate) unordered: AtomicBool, pub(crate) reliability_type: AtomicU8, //ReliabilityType, pub(crate) reliability_value: AtomicU32, @@ -97,7 +106,8 @@ impl fmt::Debug for Stream { .field("default_payload_type", &self.default_payload_type) .field("reassembly_queue", &self.reassembly_queue) .field("sequence_number", &self.sequence_number) - .field("closed", &self.closed) + .field("read_shutdown", &self.read_shutdown) + .field("write_shutdown", &self.write_shutdown) .field("unordered", &self.unordered) .field("reliability_type", &self.reliability_type) .field("reliability_value", &self.reliability_value) @@ -130,7 +140,8 @@ impl Stream { reassembly_queue: Mutex::new(ReassemblyQueue::new(stream_identifier)), sequence_number: AtomicU16::new(0), read_notifier: Notify::new(), - closed: AtomicBool::new(false), + read_shutdown: AtomicBool::new(false), + write_shutdown: AtomicBool::new(false), unordered: AtomicBool::new(false), reliability_type: AtomicU8::new(0), //ReliabilityType::Reliable, reliability_value: AtomicU32::new(0), @@ -167,28 +178,34 @@ impl Stream { self.reliability_value.store(rel_val, Ordering::SeqCst); } - /// read reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. - /// Returns EOF when the stream is reset or an error if `p` is too short. + /// Reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. + /// + /// Returns EOF when the stream is reset or an error if `p` is too short or the reading half of + /// the stream is shutdown. pub async fn read(&self, p: &mut [u8]) -> Result { let (n, _) = self.read_sctp(p).await?; Ok(n) } - /// read_sctp reads a packet of len(p) bytes and returns the associated Payload - /// Protocol Identifier. - /// Returns EOF when the stream is reset or an error if `p` is too short. + /// Reads a packet of len(p) bytes and returns the associated Payload Protocol Identifier. + /// + /// Returns EOF when the stream is reset or an error if `p` is too short or the reading half of + /// the stream is shutdown. pub async fn read_sctp(&self, p: &mut [u8]) -> Result<(usize, PayloadProtocolIdentifier)> { loop { + if self.read_shutdown.load(Ordering::SeqCst) { + return Err(Error::ErrStreamClosed); + } + let result = { let mut reassembly_queue = self.reassembly_queue.lock().await; reassembly_queue.read(p) }; match result { - Ok(_) => return result, - Err(Error::ErrShortBuffer) => return result, + Ok(_) | Err(Error::ErrShortBuffer) => return result, Err(_) => { - // TODO: shouldn't we mark read_notifier as notified in other cases as well? + // wait for the next chunk to become available self.read_notifier.notified().await; } } @@ -252,15 +269,19 @@ impl Stream { } } - /// write writes len(p) bytes from p with the default Payload Protocol Identifier + /// Writes `p` to the DTLS connection with the default Payload Protocol Identifier. + /// + /// Returns an error if the write half of this stream is shutdown or `p` is too large. pub async fn write(&self, p: &Bytes) -> Result { self.write_sctp(p, self.default_payload_type.load(Ordering::SeqCst).into()) .await } - /// write_sctp writes len(p) bytes from p to the DTLS connection + /// Writes `p` to the DTLS connection with the given Payload Protocol Identifier. + /// + /// Returns an error if the write half of this stream is shutdown or `p` is too large. pub async fn write_sctp(&self, p: &Bytes, ppi: PayloadProtocolIdentifier) -> Result { - if self.closed.load(Ordering::SeqCst) { + if self.write_shutdown.load(Ordering::SeqCst) { return Err(Error::ErrStreamClosed); } @@ -338,18 +359,44 @@ impl Stream { chunks } - /// Close closes the write-direction of the stream. - /// Future calls to write are not permitted after calling Close. + /// Closes both read and write halves of this stream. + /// + /// Use [`Stream::shutdown`] instead. + #[deprecated] pub async fn close(&self) -> Result<()> { - if self.closed.load(Ordering::SeqCst) { - return Ok(()); + self.shutdown(Shutdown::Both).await + } + + /// Shuts down the read, write, or both halves of this stream. + /// + /// This function will cause all pending and future I/O on the specified portions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// Resets the stream when both halves of this stream are shutdown. + pub async fn shutdown(&self, how: Shutdown) -> Result<()> { + match how { + Shutdown::Write => self.write_shutdown.store(true, Ordering::SeqCst), + Shutdown::Read => { + if !self.read_shutdown.swap(true, Ordering::SeqCst) { + self.read_notifier.notify_waiters(); + } + } + Shutdown::Both => { + if !self.read_shutdown.swap(true, Ordering::SeqCst) { + self.read_notifier.notify_waiters(); + } + self.write_shutdown.store(true, Ordering::SeqCst); + } } - self.closed.store(true, Ordering::SeqCst); - // Reset the outgoing stream - // https://tools.ietf.org/html/rfc6525 - self.send_reset_request(self.stream_identifier).await?; - self.read_notifier.notify_waiters(); // broadcast regardless + if how == Shutdown::Both + || (self.read_shutdown.load(Ordering::SeqCst) + && self.write_shutdown.load(Ordering::SeqCst)) + { + // Reset the stream + // https://tools.ietf.org/html/rfc6525 + self.send_reset_request(self.stream_identifier).await?; + } Ok(()) } @@ -726,7 +773,7 @@ impl AsyncWrite for PollStream { None => { let stream = self.stream.clone(); self.shutdown_fut - .get_or_insert(Box::pin(async move { stream.close().await })) + .get_or_insert(Box::pin(async move { stream.shutdown(Shutdown::Write).await })) } }; diff --git a/src/stream/stream_test.rs b/src/stream/stream_test.rs index 6925ab8..cb85be5 100644 --- a/src/stream/stream_test.rs +++ b/src/stream/stream_test.rs @@ -120,8 +120,8 @@ async fn test_stream() -> std::result::Result<(), io::Error> { s.read(&mut buf).await?; assert_eq!(buf, [0, 1, 2, 3, 4]); - // close - s.close().await?; + // shutdown write + s.shutdown(Shutdown::Write).await?; // write must fail assert!(s.write(&Bytes::from("error")).await.is_err()); // read should continue working @@ -183,7 +183,7 @@ async fn test_poll_stream() -> std::result::Result<(), io::Error> { poll_stream.read(&mut buf).await?; assert_eq!(buf, [0, 1, 2, 3, 4]); - // shutdown + // shutdown write poll_stream.shutdown().await?; // write must fail assert!(poll_stream.write(&[1, 2, 3]).await.is_err()); From 980b7a5c0a2e86875abc8df307818456a6688a6a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 8 Jun 2022 13:02:37 +0400 Subject: [PATCH 4/6] format code --- src/stream/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0b4c664..d73f400 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -179,7 +179,7 @@ impl Stream { } /// Reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. - /// + /// /// Returns EOF when the stream is reset or an error if `p` is too short or the reading half of /// the stream is shutdown. pub async fn read(&self, p: &mut [u8]) -> Result { @@ -772,8 +772,9 @@ impl AsyncWrite for PollStream { Some(fut) => fut, None => { let stream = self.stream.clone(); - self.shutdown_fut - .get_or_insert(Box::pin(async move { stream.shutdown(Shutdown::Write).await })) + self.shutdown_fut.get_or_insert(Box::pin(async move { + stream.shutdown(Shutdown::Write).await + })) } }; From 51ee4314f91cf31fbc6226e004e73835fa842936 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 8 Jun 2022 13:13:11 +0400 Subject: [PATCH 5/6] return early if shutdown called twice --- examples/pong.rs | 2 +- src/stream/mod.rs | 23 +++++++++++------------ src/stream/stream_test.rs | 5 +++++ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/examples/pong.rs b/examples/pong.rs index 115e520..0ea3e8a 100644 --- a/examples/pong.rs +++ b/examples/pong.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Error> { signal::ctrl_c().await.expect("failed to listen for event"); println!("Closing stream and association..."); - stream.shutdown(Shutdown::Write).await?; + stream.shutdown(Shutdown::Both).await?; a.close().await?; let _ = done_rx.recv().await; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d73f400..e6801c3 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -374,18 +374,17 @@ impl Stream { /// /// Resets the stream when both halves of this stream are shutdown. pub async fn shutdown(&self, how: Shutdown) -> Result<()> { - match how { - Shutdown::Write => self.write_shutdown.store(true, Ordering::SeqCst), - Shutdown::Read => { - if !self.read_shutdown.swap(true, Ordering::SeqCst) { - self.read_notifier.notify_waiters(); - } - } - Shutdown::Both => { - if !self.read_shutdown.swap(true, Ordering::SeqCst) { - self.read_notifier.notify_waiters(); - } - self.write_shutdown.store(true, Ordering::SeqCst); + if self.read_shutdown.load(Ordering::SeqCst) && self.write_shutdown.load(Ordering::SeqCst) { + return Ok(()); + } + + if how == Shutdown::Write || how == Shutdown::Both { + self.write_shutdown.store(true, Ordering::SeqCst); + } + + if how == Shutdown::Read || how == Shutdown::Both { + if !self.read_shutdown.swap(true, Ordering::SeqCst) { + self.read_notifier.notify_waiters(); } } diff --git a/src/stream/stream_test.rs b/src/stream/stream_test.rs index cb85be5..e698c61 100644 --- a/src/stream/stream_test.rs +++ b/src/stream/stream_test.rs @@ -138,6 +138,11 @@ async fn test_stream() -> std::result::Result<(), io::Error> { s.read(&mut buf).await?; assert_eq!(buf, [5, 6, 7, 8, 9]); + // shutdown read + s.shutdown(Shutdown::Read).await?; + // read must fail + assert!(s.read(&mut buf).await.is_err()); + Ok(()) } From 49a71df685e2a6671d96036863ea9dced86242c8 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 8 Jun 2022 15:29:30 +0400 Subject: [PATCH 6/6] remove Shutdown and modify read to return 0 when stream's reading half is shutdown --- examples/ping.rs | 1 + examples/pong.rs | 1 + src/stream/mod.rs | 19 ++++++------------- src/stream/stream_test.rs | 4 ++-- 4 files changed, 10 insertions(+), 15 deletions(-) diff --git a/examples/ping.rs b/examples/ping.rs index 13d8002..a7f78bd 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -5,6 +5,7 @@ use webrtc_sctp::Error; use bytes::Bytes; use clap::{App, AppSettings, Arg}; +use std::net::Shutdown; use std::sync::Arc; use tokio::net::UdpSocket; use tokio::signal; diff --git a/examples/pong.rs b/examples/pong.rs index 0ea3e8a..0d423fb 100644 --- a/examples/pong.rs +++ b/examples/pong.rs @@ -4,6 +4,7 @@ use webrtc_sctp::Error; use bytes::Bytes; use clap::{App, AppSettings, Arg}; +use std::net::Shutdown; use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e6801c3..9cb9641 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -12,6 +12,7 @@ use bytes::Bytes; use std::fmt; use std::future::Future; use std::io; +use std::net::Shutdown; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; @@ -19,14 +20,6 @@ use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::{mpsc, Mutex, Notify}; -/// Possible values which can be passed to the [`Stream::shutdown`] method. -#[derive(PartialEq)] -pub enum Shutdown { - Read, - Write, - Both, -} - #[derive(Debug, Copy, Clone, PartialEq)] #[repr(C)] pub enum ReliabilityType { @@ -180,8 +173,8 @@ impl Stream { /// Reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. /// - /// Returns EOF when the stream is reset or an error if `p` is too short or the reading half of - /// the stream is shutdown. + /// Returns EOF when the stream is reset or an error if `p` is too short. + /// Returns `0` if the reading half of this stream is shutdown. pub async fn read(&self, p: &mut [u8]) -> Result { let (n, _) = self.read_sctp(p).await?; Ok(n) @@ -189,12 +182,12 @@ impl Stream { /// Reads a packet of len(p) bytes and returns the associated Payload Protocol Identifier. /// - /// Returns EOF when the stream is reset or an error if `p` is too short or the reading half of - /// the stream is shutdown. + /// Returns EOF when the stream is reset or an error if `p` is too short. + /// Returns `(0, PayloadProtocolIdentifier::Unknown)` if the reading half of this stream is shutdown. pub async fn read_sctp(&self, p: &mut [u8]) -> Result<(usize, PayloadProtocolIdentifier)> { loop { if self.read_shutdown.load(Ordering::SeqCst) { - return Err(Error::ErrStreamClosed); + return Ok((0, PayloadProtocolIdentifier::Unknown)); } let result = { diff --git a/src/stream/stream_test.rs b/src/stream/stream_test.rs index e698c61..8976313 100644 --- a/src/stream/stream_test.rs +++ b/src/stream/stream_test.rs @@ -140,8 +140,8 @@ async fn test_stream() -> std::result::Result<(), io::Error> { // shutdown read s.shutdown(Shutdown::Read).await?; - // read must fail - assert!(s.read(&mut buf).await.is_err()); + // read must return 0 + assert_eq!(Ok(0), s.read(&mut buf).await); Ok(()) }