Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,24 @@ impl Handle {
}
}

/// TEST PURPOSE RELATED TO PR #7773
#[cfg(feature = "full")]
impl Handle {
/// Returns the number of pending registrations (test-only, not part of public API)
#[doc(hidden)]
#[allow(unreachable_pub)]
pub fn io_pending_registration_count(&self) -> usize {
self.inner.driver().io().pending_registration_count()
}

/// Returns the total number of registrations in the main list (test-only, not part of public API)
#[doc(hidden)]
#[allow(unreachable_pub)]
pub fn io_total_registration_count(&self) -> usize {
self.inner.driver().io().total_registration_count()
}
}

impl std::panic::UnwindSafe for Handle {}

impl std::panic::RefUnwindSafe for Handle {}
Expand Down
23 changes: 22 additions & 1 deletion tokio/src/runtime/io/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ impl Handle {
source: &mut impl Source,
) -> io::Result<()> {
// Deregister the source with the OS poller **first**
self.registry.deregister(source)?;
// Cleanup ALWAYS happens
let os_result = self.registry.deregister(source);

if self
.registrations
Expand All @@ -307,6 +308,8 @@ impl Handle {

self.metrics.dec_fd_count();

os_result?; // Return error after cleanup

Ok(())
}

Expand All @@ -317,6 +320,24 @@ impl Handle {
}
}

/// TEST PURPOSE RELATED TO PR #7773
#[cfg(feature = "full")]
impl Handle {
/// Returns the number of pending registrations (test-only, not part of public API)
#[doc(hidden)]
#[allow(unreachable_pub)]
pub fn pending_registration_count(&self) -> usize {
self.registrations.pending_release_count()
}
/// Returns the total number of registrations in the main list (test-only)
#[doc(hidden)]
#[allow(unreachable_pub)]
pub fn total_registration_count(&self) -> usize {
self.registrations
.total_registration_count(&mut self.synced.lock())
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
Expand Down
27 changes: 27 additions & 0 deletions tokio/src/runtime/io/registration_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ impl RegistrationSet {
self.num_pending_release.load(Acquire) != 0
}

/// TEST PURPOSE RELATED TO PR #7773
#[cfg(feature = "full")]
pub(super) fn pending_release_count(&self) -> usize {
self.num_pending_release.load(Acquire)
}
/// TEST PURPOSE RELATED TO PR #7773
#[cfg(feature = "full")]
pub(super) fn total_registration_count(&self, synced: &mut Synced) -> usize {
// Count by temporarily draining the list, then restoring it
// This is safe for test purposes
let mut items = Vec::new();

// Drain all items
while let Some(item) = synced.registrations.pop_back() {
items.push(item);
}

let count = items.len();

// Restore items in reverse order (since we popped from back)
for item in items.into_iter().rev() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This drain/restore logic appears to reverse synced.registrations (collecting via pop_back, then iterating rev() and push_front). Since this mutates driver state even in a test-only helper, it could subtly change behavior after calling the counter.

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The reverse iteration while re-adding the items combined with push_front() reverses the final order of the items with the one before counting them.

synced.registrations.push_front(item);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: List restoration reverses registration order

The total_registration_count function corrupts the registrations list by reversing its order. Items are popped from the back (yielding [C, B, A]), then the iterator is reversed (giving [A, B, C]), and each is pushed to the front — resulting in [C, B, A] instead of the original [A, B, C]. Either push_front without .rev() or push_back with .rev() would preserve the correct order. Though marked test-only, this still corrupts runtime state during tests.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Bugbot AI reviewer is correct! The reverse iteration while re-adding the items combined with push_front() reverses the final order of the items with the one before counting them.


count
}
Comment on lines +61 to +81
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

List order is reversed after counting.

The drain-and-restore logic reverses the list order. After popping from back into a Vec and then iterating in reverse while pushing to front, the final order is the reverse of the original.

While this may not affect test correctness (since you're only counting), the comment "Restore items in reverse order" is misleading—it describes the iteration order, not the restoration goal.

Consider using a simple traversal if the LinkedList supports iteration, or fix the restoration logic:

-        // Restore items in reverse order (since we popped from back)
-        for item in items.into_iter().rev() {
-            synced.registrations.push_front(item);
+        // Restore items in original order
+        for item in items.into_iter() {
+            synced.registrations.push_back(item);
         }

Alternatively, if order truly doesn't matter for tests, update the comment to clarify that order is not preserved.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In tokio/src/runtime/io/registration_set.rs around lines 61–81, the current
drain-and-restore approach reverses the LinkedList order; replace the
destructive pop/restore logic with a non-mutating count (e.g., use
synced.registrations.len() or synced.registrations.iter().count()) so the list
order is preserved, and remove or update the misleading "Restore items in
reverse order" comment accordingly.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The reverse iteration while re-adding the items combined with push_front() reverses the final order of the items with the one before counting them.


pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
if synced.is_shutdown {
return Err(io::Error::new(
Expand Down
74 changes: 69 additions & 5 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,12 +947,76 @@ async fn try_new() {
assert!(Arc::ptr_eq(&original, &returned));
}

/// Regression test for issue #7563
///
/// Reproduces the bug where closing fd before dropping AsyncFd causes
/// OS deregister to fail, preventing cleanup and leaking ScheduledIo objects.
#[tokio::test]
async fn try_with_interest() {
let original = Arc::new(InvalidSource);
async fn memory_leak_when_fd_closed_before_drop() {
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::runtime::Handle;

let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
let (returned, _cause) = error.into_parts();
use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};

assert!(Arc::ptr_eq(&original, &returned));
struct RawFdWrapper {
fd: RawFd,
}

impl AsRawFd for RawFdWrapper {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}

let rt_handle = Handle::current();
tokio::task::yield_now().await;
let initial_count = rt_handle.io_total_registration_count();

const ITERATIONS: usize = 30;
let mut max_count_seen = initial_count;

for _ in 0..ITERATIONS {
let (fd_a, _fd_b) = socket::socketpair(
AddressFamily::Unix,
SockType::Stream,
None,
SockFlag::empty(),
)
.expect("socketpair");
let raw_fd = fd_a.as_raw_fd();
set_nonblocking(raw_fd);
std::mem::forget(fd_a);

let afd = Arc::new(RawFdWrapper { fd: raw_fd });
let async_fd = AsyncFd::new(ArcFd(afd.clone())).unwrap();

unsafe {
libc::close(raw_fd);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value of libc::close is ignored here; if close fails (or is interrupted), the fd may remain open and the test may not reliably exercise the intended deregister-failure path.

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The Augment AI reviewer is correct! The system call result should be checked before continuing with the rest of the test because now it assumes that everything is always successful.

}

drop(async_fd);
tokio::task::yield_now().await;

let current_count = rt_handle.io_total_registration_count();
max_count_seen = max_count_seen.max(current_count);
}

tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(100)).await;

let final_count = rt_handle.io_total_registration_count();
max_count_seen = max_count_seen.max(final_count);

assert!(
final_count <= initial_count + 2 && max_count_seen <= initial_count + 2,
"Memory leak detected: final count {} (initial: {}), max seen: {}. \
With bug, count would be ~{} ({} leaked objects).",
final_count,
initial_count,
max_count_seen,
initial_count + ITERATIONS,
max_count_seen.saturating_sub(initial_count)
);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test for try_with_interest accidentally deleted

The try_with_interest test was completely replaced by the new memory_leak_when_fd_closed_before_drop test rather than adding the new test alongside it. The deleted test verified important API behavior - that when try_with_interest fails, the error correctly returns the original Arc source via into_parts(), allowing callers to recover their resource. This appears to be accidental since the new test serves a completely different purpose (testing memory leak fixes in tokio-rs#7563).

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Bugbot AI reviewer is correct ! The old test purpose was unrelated to the functionality covered by the new test, so it will be good to restore it back. Prevents introducing regressions related to AsyncFd:: try_with_interest()