diff --git a/src/transport/webrtc/substream.rs b/src/transport/webrtc/substream.rs index cf35a178..5dcf5456 100644 --- a/src/transport/webrtc/substream.rs +++ b/src/transport/webrtc/substream.rs @@ -118,6 +118,7 @@ impl Substream { shutdown_waker: Arc::clone(&shutdown_waker), write_waker: Arc::clone(&write_waker), read_closed: std::sync::atomic::AtomicBool::new(false), + reset_sent: false, }; ( @@ -157,6 +158,9 @@ pub struct SubstreamHandle { /// Whether we've already sent RecvClosed to the inbound channel. /// Prevents duplicate RecvClosed events if multiple FIN messages are received. read_closed: std::sync::atomic::AtomicBool, + + /// Whether RESET_STREAM has been sent on abrupt close. + reset_sent: bool, } impl SubstreamHandle { @@ -272,8 +276,28 @@ impl Stream for SubstreamHandle { // Check if Substream has been dropped (inbound channel closed) // When Substream is dropped, there will be no more outbound messages // Since we've already tried to recv above and got Pending, we know the queue is empty - // Therefore, it's safe to signal closure if self.inbound_tx.is_closed() { + let state = *self.state.lock(); + + // If shutdown completed gracefully (FinAcked), just close + if matches!(state, State::FinAcked) { + return Poll::Ready(None); + } + + // Abrupt close - send RESET_STREAM to notify remote peer + // This follows the libp2p WebRTC spec for non-graceful stream termination + if !self.reset_sent { + self.reset_sent = true; + tracing::debug!( + target: "litep2p::webrtc::substream", + "Substream dropped without graceful close, sending RESET_STREAM" + ); + return Poll::Ready(Some(Event::Message { + payload: vec![], + flag: Some(Flag::ResetStream), + })); + } + return Poll::Ready(None); } @@ -1380,7 +1404,7 @@ mod tests { } other => panic!("Unexpected result: {:?}", other), } - // Substream dropped here (server closes after receiving FIN) + // Substream dropped here without calling shutdown() - this is an abrupt close }); // Remote (client) sends FIN @@ -1404,11 +1428,89 @@ mod tests { // Wait for server to close substream server_task.await.unwrap(); - // Verify handle signals closure (returns None) - this is the key fix! + // Since server didn't call shutdown(), this is an abrupt close - RESET_STREAM is sent + assert_eq!( + handle.next().await, + Some(Event::Message { + payload: vec![], + flag: Some(Flag::ResetStream) + }), + "SubstreamHandle should send RESET_STREAM when server drops without shutdown" + ); + + // Then closure + assert_eq!( + handle.next().await, + None, + "SubstreamHandle should signal closure after sending RESET_STREAM" + ); + } + + #[tokio::test] + async fn abrupt_close_sends_reset_stream() { + use futures::StreamExt; + + let (substream, mut handle) = Substream::new(); + + // Drop substream without calling shutdown() - this is an abrupt close + drop(substream); + + // Verify RESET_STREAM is sent before closure + assert_eq!( + handle.next().await, + Some(Event::Message { + payload: vec![], + flag: Some(Flag::ResetStream) + }), + "SubstreamHandle should send RESET_STREAM on abrupt close" + ); + + // Then verify handle signals closure + assert_eq!( + handle.next().await, + None, + "SubstreamHandle should signal closure after sending RESET_STREAM" + ); + } + + #[tokio::test] + async fn graceful_close_does_not_send_reset_stream() { + use futures::StreamExt; + + let (mut substream, mut handle) = Substream::new(); + + // Complete graceful shutdown + let shutdown_task = tokio::spawn(async move { + substream.shutdown().await.unwrap(); + // Substream dropped after graceful shutdown + }); + + // Receive FIN + assert_eq!( + handle.next().await, + Some(Event::Message { + payload: vec![], + flag: Some(Flag::Fin) + }) + ); + + // Send FIN_ACK to complete handshake + handle + .on_message(WebRtcMessage { + payload: None, + flag: Some(Flag::FinAck), + }) + .await + .unwrap(); + + // Wait for shutdown to complete + shutdown_task.await.unwrap(); + + // Verify handle signals closure directly (no RESET_STREAM) assert_eq!( handle.next().await, None, - "SubstreamHandle should signal closure after server receives FIN and drops Substream" + "SubstreamHandle should NOT send RESET_STREAM after graceful close" ); }