Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
931635d
io: always cleanup AsyncFd registration list on deregister
F4RAN Dec 12, 2025
3d7d87d
fix: formatter issues
F4RAN Dec 12, 2025
b6452c7
test: in linux environment
F4RAN Dec 13, 2025
bf5c706
test: linux test with fix
F4RAN Dec 13, 2025
e659d60
chore: remove additional method
F4RAN Dec 13, 2025
b44e56d
chore: remove additional debug comments
F4RAN Dec 13, 2025
94f46b9
fix:formatter
F4RAN Dec 13, 2025
0a0f94e
fix: style: fix clippy warnings and format code
F4RAN Dec 13, 2025
c4141e3
Update tokio/src/runtime/io/registration_set.rs
F4RAN Dec 15, 2025
125dc5d
Update tokio/src/runtime/io/driver.rs
F4RAN Dec 15, 2025
4d30241
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN Dec 15, 2025
45d3da2
fix: remove additional imports
F4RAN Dec 15, 2025
ab770fa
restore AsyncFd::try_with_interest()
F4RAN Dec 15, 2025
e36a489
style: run formatter
F4RAN Dec 15, 2025
0b0495f
test(internals): gate test-only APIs behind __internal_test (no publi…
F4RAN Dec 16, 2025
08d5645
fix: spelling error is solved using backticks
F4RAN Dec 16, 2025
e3969ed
fix: rename __internal_test to integration_test
F4RAN Dec 16, 2025
580f197
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN Dec 16, 2025
872097a
fix: turn from integration_test to tokio_unstable
F4RAN Dec 29, 2025
0fbd65d
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN Dec 29, 2025
a7895ac
test: revert to buggy code
F4RAN Jan 1, 2026
87864bf
test: lsan only test
F4RAN Jan 1, 2026
23d5e04
test: heap profiling
F4RAN Jan 1, 2026
1c1cdd7
test: revert fix to check the test again
F4RAN Jan 1, 2026
23344ce
fix: rss test is applied and works
F4RAN Jan 1, 2026
abd4d91
fix: resolve clippy format
F4RAN Jan 1, 2026
3b8d349
test: improve RSS memory leak test with stabilization approach
F4RAN Jan 1, 2026
3c18df5
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN Jan 1, 2026
3eba842
Update tokio/src/runtime/io/driver.rs
F4RAN Jan 2, 2026
ffe88ce
fix: additional line in Cargo file
F4RAN Jan 2, 2026
b8ef07c
test: add custom allocator memory leak test for issue #7563
F4RAN Jan 5, 2026
d84af1c
test: revert to check in linux machine
F4RAN Jan 5, 2026
6fa3271
test: fix test
F4RAN Jan 5, 2026
fdf771e
fix: inline format args to satisfy clippy
F4RAN Jan 5, 2026
9879ccc
test: address review nits for io_async_fd_memory_leak test
F4RAN Jan 5, 2026
2977a5f
fix: allocation problem
F4RAN Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ signal = [
sync = []
test-util = ["rt", "sync", "time"]
time = []

# Internal test-only feature. Not part of public API. Subject to change without notice.
# Used for internal testing infrastructure.
__internal_test = []

# Unstable feature. Requires `--cfg tokio_unstable` to enable.
io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"]
# Unstable feature. Requires `--cfg tokio_unstable` to enable.
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,24 @@ impl Handle {
}
}

cfg_io_driver! {
/// Internal test methods for PR #7773
/// These methods are only available with the `__internal_test` feature flag
/// and are not part of the public API. They may change without notice.
#[cfg(feature = "__internal_test")]
impl Handle {
/// Returns the number of pending I/O registrations (internal test-only method)
pub fn io_pending_registration_count(&self) -> usize {
self.inner.driver().io().pending_registration_count()
}

/// Returns the total number of I/O registrations (internal test-only method)
pub fn io_total_registration_count(&self) -> usize {
self.inner.driver().io().total_registration_count()
}
}
}

impl std::panic::UnwindSafe for Handle {}

impl std::panic::RefUnwindSafe for Handle {}
Expand Down
30 changes: 29 additions & 1 deletion tokio/src/runtime/io/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ impl Handle {
source: &mut impl Source,
) -> io::Result<()> {
// Deregister the source with the OS poller **first**
self.registry.deregister(source)?;
// Cleanup ALWAYS happens
let os_result = self.registry.deregister(source);

if self
.registrations
Expand All @@ -307,6 +308,8 @@ impl Handle {

self.metrics.dec_fd_count();

os_result?; // Return error after cleanup

Ok(())
}

Expand All @@ -317,6 +320,31 @@ impl Handle {
}
}

/// Internal test methods for PR #7773
/// These methods are only available with the `__internal_test` feature flag
/// and are not part of the public API. They may change without notice.
#[cfg(feature = "__internal_test")]
impl Handle {
/// Returns the number of pending registrations (internal test-only method)
///
/// # Warning
/// This method is NOT part of the public API and is subject to change
/// without notice. It should only be used for internal testing.
pub(crate) fn pending_registration_count(&self) -> usize {
self.registrations.pending_release_count()
}

/// Returns the total number of registrations in the main list (internal test-only method)
///
/// # Warning
/// This method is NOT part of the public API and is subject to change
/// without notice. It should only be used for internal testing.
pub(crate) fn total_registration_count(&self) -> usize {
self.registrations
.total_registration_count(&mut self.synced.lock())
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
Expand Down
30 changes: 30 additions & 0 deletions tokio/src/runtime/io/registration_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,36 @@ impl RegistrationSet {
self.num_pending_release.load(Acquire) != 0
}

/// Internal test method for PR #7773
/// Returns the number of pending registrations (internal test-only method)
#[cfg(feature = "__internal_test")]
pub(super) fn pending_release_count(&self) -> usize {
self.num_pending_release.load(Acquire)
}

/// Internal test method for PR #7773
/// Returns the total number of registrations in the main list (internal test-only method)
#[cfg(feature = "__internal_test")]
pub(super) fn total_registration_count(&self, synced: &mut Synced) -> usize {
// Count by temporarily draining the list, then restoring it
// This is safe for test purposes
let mut items = Vec::new();

// Drain all items
while let Some(item) = synced.registrations.pop_back() {
items.push(item);
}

let count = items.len();

// Restore items in reverse order (since we popped from back)
for item in items.into_iter() {
synced.registrations.push_front(item);
}

count
}

pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
if synced.is_shutdown {
return Err(io::Error::new(
Expand Down
82 changes: 82 additions & 0 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,85 @@ async fn try_with_interest() {

assert!(Arc::ptr_eq(&original, &returned));
}

/// Regression test for issue #7563
///
/// Reproduces the bug where closing fd before dropping AsyncFd causes
/// OS deregister to fail, preventing cleanup and leaking ScheduledIo objects.
///
/// Note: This test requires the `__internal_test` feature flag to access
/// internal registration counting methods. Run with:
/// ```sh
/// cargo test --test io_async_fd --features __internal_test memory_leak_when_fd_closed_before_drop
/// ```
///
/// The test verifies that registration counts don't grow over iterations,
/// which would indicate a memory leak. On Linux with epoll, the leak is
/// detectable; on macOS, the test still exercises the code path.
#[tokio::test]
#[cfg(feature = "__internal_test")]
async fn memory_leak_when_fd_closed_before_drop() {
use tokio::runtime::Handle;

use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};

struct RawFdWrapper {
fd: RawFd,
}

impl AsRawFd for RawFdWrapper {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}

let rt_handle = Handle::current();
tokio::task::yield_now().await;
let initial_count = rt_handle.io_total_registration_count();

const ITERATIONS: usize = 30;
let mut max_count_seen = initial_count;

for _ in 0..ITERATIONS {
let (fd_a, _fd_b) = socket::socketpair(
AddressFamily::Unix,
SockType::Stream,
None,
SockFlag::empty(),
)
.expect("socketpair");
let raw_fd = fd_a.as_raw_fd();
set_nonblocking(raw_fd);
std::mem::forget(fd_a);

let afd = Arc::new(RawFdWrapper { fd: raw_fd });
let async_fd = AsyncFd::new(ArcFd(afd.clone())).unwrap();

unsafe {
libc::close(raw_fd);
}

drop(async_fd);
tokio::task::yield_now().await;

let current_count = rt_handle.io_total_registration_count();
max_count_seen = max_count_seen.max(current_count);
}

tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(100)).await;

let final_count = rt_handle.io_total_registration_count();
max_count_seen = max_count_seen.max(final_count);

assert!(
final_count <= initial_count + 2 && max_count_seen <= initial_count + 2,
"Memory leak detected: final count {} (initial: {}), max seen: {}. \
With bug, count would be ~{} ({} leaked objects).",
final_count,
initial_count,
max_count_seen,
initial_count + ITERATIONS,
max_count_seen.saturating_sub(initial_count)
);
}