Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 106 additions & 4 deletions src/transport/webrtc/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl Substream {
shutdown_waker: Arc::clone(&shutdown_waker),
write_waker: Arc::clone(&write_waker),
read_closed: std::sync::atomic::AtomicBool::new(false),
reset_sent: false,
};

(
Expand Down Expand Up @@ -157,6 +158,9 @@ pub struct SubstreamHandle {
/// Whether we've already sent RecvClosed to the inbound channel.
/// Prevents duplicate RecvClosed events if multiple FIN messages are received.
read_closed: std::sync::atomic::AtomicBool,

/// Whether RESET_STREAM has been sent on abrupt close.
reset_sent: bool,
}

impl SubstreamHandle {
Expand Down Expand Up @@ -272,8 +276,28 @@ impl Stream for SubstreamHandle {
// Check if Substream has been dropped (inbound channel closed)
// When Substream is dropped, there will be no more outbound messages
// Since we've already tried to recv above and got Pending, we know the queue is empty
// Therefore, it's safe to signal closure
if self.inbound_tx.is_closed() {
let state = *self.state.lock();

// If shutdown completed gracefully (FinAcked), just close
if matches!(state, State::FinAcked) {
return Poll::Ready(None);
}

// Abrupt close - send RESET_STREAM to notify remote peer
// This follows the libp2p WebRTC spec for non-graceful stream termination
if !self.reset_sent {
self.reset_sent = true;
tracing::debug!(
target: "litep2p::webrtc::substream",
"Substream dropped without graceful close, sending RESET_STREAM"
);
return Poll::Ready(Some(Event::Message {
payload: vec![],
flag: Some(Flag::ResetStream),
}));
}

return Poll::Ready(None);
}

Expand Down Expand Up @@ -1380,7 +1404,7 @@ mod tests {
}
other => panic!("Unexpected result: {:?}", other),
}
// Substream dropped here (server closes after receiving FIN)
// Substream dropped here without calling shutdown() - this is an abrupt close
});

// Remote (client) sends FIN
Expand All @@ -1404,11 +1428,89 @@ mod tests {
// Wait for server to close substream
server_task.await.unwrap();

// Verify handle signals closure (returns None) - this is the key fix!
// Since server didn't call shutdown(), this is an abrupt close - RESET_STREAM is sent
assert_eq!(
handle.next().await,
Some(Event::Message {
payload: vec![],
flag: Some(Flag::ResetStream)
}),
"SubstreamHandle should send RESET_STREAM when server drops without shutdown"
);

// Then closure
assert_eq!(
handle.next().await,
None,
"SubstreamHandle should signal closure after sending RESET_STREAM"
);
}

#[tokio::test]
async fn abrupt_close_sends_reset_stream() {
use futures::StreamExt;

let (substream, mut handle) = Substream::new();

// Drop substream without calling shutdown() - this is an abrupt close
drop(substream);

// Verify RESET_STREAM is sent before closure
assert_eq!(
handle.next().await,
Some(Event::Message {
payload: vec![],
flag: Some(Flag::ResetStream)
}),
"SubstreamHandle should send RESET_STREAM on abrupt close"
);

// Then verify handle signals closure
assert_eq!(
handle.next().await,
None,
"SubstreamHandle should signal closure after sending RESET_STREAM"
);
}

#[tokio::test]
async fn graceful_close_does_not_send_reset_stream() {
use futures::StreamExt;

let (mut substream, mut handle) = Substream::new();

// Complete graceful shutdown
let shutdown_task = tokio::spawn(async move {
substream.shutdown().await.unwrap();
// Substream dropped after graceful shutdown
});

// Receive FIN
assert_eq!(
handle.next().await,
Some(Event::Message {
payload: vec![],
flag: Some(Flag::Fin)
})
);

// Send FIN_ACK to complete handshake
handle
.on_message(WebRtcMessage {
payload: None,
flag: Some(Flag::FinAck),
})
.await
.unwrap();

// Wait for shutdown to complete
shutdown_task.await.unwrap();

// Verify handle signals closure directly (no RESET_STREAM)
assert_eq!(
handle.next().await,
None,
"SubstreamHandle should signal closure after server receives FIN and drops Substream"
"SubstreamHandle should NOT send RESET_STREAM after graceful close"
);
}

Expand Down