-
-
Notifications
You must be signed in to change notification settings - Fork 3k
io: always cleanup AsyncFd registration list on deregister #7773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+212
−2
Merged
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
931635d
io: always cleanup AsyncFd registration list on deregister
F4RAN 3d7d87d
fix: formatter issues
F4RAN b6452c7
test: in linux environment
F4RAN bf5c706
test: linux test with fix
F4RAN e659d60
chore: remove additional method
F4RAN b44e56d
chore: remove additional debug comments
F4RAN 94f46b9
fix:formatter
F4RAN 0a0f94e
fix: style: fix clippy warnings and format code
F4RAN c4141e3
Update tokio/src/runtime/io/registration_set.rs
F4RAN 125dc5d
Update tokio/src/runtime/io/driver.rs
F4RAN 4d30241
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN 45d3da2
fix: remove additional imports
F4RAN ab770fa
restore AsyncFd::try_with_interest()
F4RAN e36a489
style: run formatter
F4RAN 0b0495f
test(internals): gate test-only APIs behind __internal_test (no publi…
F4RAN 08d5645
fix: spelling error is solved using backticks
F4RAN e3969ed
fix: rename __internal_test to integration_test
F4RAN 580f197
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN 872097a
fix: turn from integration_test to tokio_unstable
F4RAN 0fbd65d
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN a7895ac
test: revert to buggy code
F4RAN 87864bf
test: lsan only test
F4RAN 23d5e04
test: heap profiling
F4RAN 1c1cdd7
test: revert fix to check the test again
F4RAN 23344ce
fix: rss test is applied and works
F4RAN abd4d91
fix: resolve clippy format
F4RAN 3b8d349
test: improve RSS memory leak test with stabilization approach
F4RAN 3c18df5
Merge branch 'master' into 7563-fix-asyncfd-leak
F4RAN 3eba842
Update tokio/src/runtime/io/driver.rs
F4RAN ffe88ce
fix: additional line in Cargo file
F4RAN b8ef07c
test: add custom allocator memory leak test for issue #7563
F4RAN d84af1c
test: revert to check in linux machine
F4RAN 6fa3271
test: fix test
F4RAN fdf771e
fix: inline format args to satisfy clippy
F4RAN 9879ccc
test: address review nits for io_async_fd_memory_leak test
F4RAN 2977a5f
fix: allocation problem
F4RAN File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,188 @@ | ||
| //! Regression test for issue #7563 - Memory leak when fd closed before AsyncFd drop | ||
| //! | ||
| //! This test uses a custom global allocator to track actual memory usage, | ||
| //! avoiding false positives from RSS measurements which include freed-but-retained memory. | ||
|
|
||
| #![cfg(all(unix, target_os = "linux"))] | ||
|
|
||
| use std::alloc::{GlobalAlloc, Layout, System}; | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
|
|
||
| /// A tracking allocator that counts bytes currently allocated | ||
| struct TrackingAllocator; | ||
|
|
||
| static ALLOCATED: AtomicUsize = AtomicUsize::new(0); | ||
martin-g marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| unsafe impl GlobalAlloc for TrackingAllocator { | ||
| unsafe fn alloc(&self, layout: Layout) -> *mut u8 { | ||
| let ptr = unsafe { System.alloc(layout) }; | ||
| if !ptr.is_null() { | ||
| ALLOCATED.fetch_add(layout.size(), Ordering::Relaxed); | ||
| } | ||
| ptr | ||
| } | ||
|
|
||
| unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { | ||
| unsafe { System.dealloc(ptr, layout) }; | ||
| ALLOCATED.fetch_sub(layout.size(), Ordering::Relaxed); | ||
| } | ||
|
|
||
| unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { | ||
| let new_ptr = unsafe { System.realloc(ptr, layout, new_size) }; | ||
| if !new_ptr.is_null() { | ||
| // Subtract old size, add new size | ||
| if new_size > layout.size() { | ||
| ALLOCATED.fetch_add(new_size - layout.size(), Ordering::Relaxed); | ||
| } else { | ||
| ALLOCATED.fetch_sub(layout.size() - new_size, Ordering::Relaxed); | ||
| } | ||
| } | ||
| new_ptr | ||
| } | ||
| } | ||
|
|
||
| #[global_allocator] | ||
| static GLOBAL: TrackingAllocator = TrackingAllocator; | ||
|
|
||
| fn allocated_bytes() -> usize { | ||
| ALLOCATED.load(Ordering::Relaxed) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn memory_leak_when_fd_closed_before_drop() { | ||
| use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; | ||
| use std::os::unix::io::{AsRawFd, RawFd}; | ||
| use std::sync::Arc; | ||
| use tokio::io::unix::AsyncFd; | ||
|
|
||
| struct RawFdWrapper { | ||
| fd: RawFd, | ||
| } | ||
|
|
||
| impl AsRawFd for RawFdWrapper { | ||
| fn as_raw_fd(&self) -> RawFd { | ||
| self.fd | ||
| } | ||
| } | ||
|
|
||
| struct ArcFd(Arc<RawFdWrapper>); | ||
|
|
||
| impl AsRawFd for ArcFd { | ||
| fn as_raw_fd(&self) -> RawFd { | ||
| self.0.as_raw_fd() | ||
| } | ||
| } | ||
|
|
||
| fn set_nonblocking(fd: RawFd) { | ||
| unsafe { | ||
| let flags = libc::fcntl(fd, libc::F_GETFL); | ||
martin-g marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK); | ||
| } | ||
| } | ||
|
|
||
| // Warm up - let runtime and allocator stabilize | ||
| for _ in 0..100 { | ||
| tokio::task::yield_now().await; | ||
| } | ||
|
|
||
| const ITERATIONS: usize = 1000; | ||
|
|
||
| // Phase 1: Warm up allocations | ||
| for _ in 0..ITERATIONS { | ||
| let (fd_a, _fd_b) = socket::socketpair( | ||
| AddressFamily::Unix, | ||
| SockType::Stream, | ||
| None, | ||
| SockFlag::empty(), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let raw_fd = fd_a.as_raw_fd(); | ||
| set_nonblocking(raw_fd); | ||
| std::mem::forget(fd_a); | ||
|
|
||
| let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); | ||
| let async_fd = AsyncFd::new(ArcFd(wrapper)).unwrap(); | ||
|
|
||
| // Close fd before dropping AsyncFd - this triggers the bug | ||
| unsafe { | ||
| libc::close(raw_fd); | ||
| } | ||
|
|
||
| drop(async_fd); | ||
| } | ||
|
|
||
| // Let things settle | ||
| tokio::task::yield_now().await; | ||
| let baseline = allocated_bytes(); | ||
|
|
||
| // Phase 2: Run more iterations and check for growth | ||
| for _ in 0..ITERATIONS { | ||
| let (fd_a, _fd_b) = socket::socketpair( | ||
| AddressFamily::Unix, | ||
| SockType::Stream, | ||
| None, | ||
| SockFlag::empty(), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let raw_fd = fd_a.as_raw_fd(); | ||
| set_nonblocking(raw_fd); | ||
| std::mem::forget(fd_a); | ||
|
|
||
| let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); | ||
| let async_fd = AsyncFd::new(ArcFd(wrapper)).unwrap(); | ||
|
|
||
| unsafe { | ||
| libc::close(raw_fd); | ||
| } | ||
|
|
||
| drop(async_fd); | ||
| } | ||
|
|
||
| tokio::task::yield_now().await; | ||
| let after_phase2 = allocated_bytes(); | ||
|
|
||
| // Phase 3: Run even more iterations | ||
| for _ in 0..ITERATIONS { | ||
| let (fd_a, _fd_b) = socket::socketpair( | ||
| AddressFamily::Unix, | ||
| SockType::Stream, | ||
| None, | ||
| SockFlag::empty(), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let raw_fd = fd_a.as_raw_fd(); | ||
| set_nonblocking(raw_fd); | ||
| std::mem::forget(fd_a); | ||
|
|
||
| let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); | ||
| let async_fd = AsyncFd::new(ArcFd(wrapper)).unwrap(); | ||
|
|
||
| unsafe { | ||
| libc::close(raw_fd); | ||
| } | ||
|
|
||
| drop(async_fd); | ||
| } | ||
|
|
||
| tokio::task::yield_now().await; | ||
| let after_phase3 = allocated_bytes(); | ||
|
|
||
| let growth_phase2 = after_phase2.saturating_sub(baseline); | ||
| let growth_phase3 = after_phase3.saturating_sub(after_phase2); | ||
|
|
||
| // If there's a leak, each phase adds ~250KB (1000 * ~256 bytes per ScheduledIo) | ||
| // If fixed, memory should stabilize (minimal growth between phases) | ||
| // Allow 64KB tolerance for normal allocation variance | ||
| let threshold = 64 * 1024; // 64KB | ||
|
|
||
| assert!( | ||
| growth_phase2 < threshold || growth_phase3 < threshold, | ||
| "Memory leak detected: allocations keep growing without stabilizing. \ | ||
| Phase 1->2: +{growth_phase2} bytes, Phase 2->3: +{growth_phase3} bytes. \ | ||
| (baseline: {baseline} bytes, phase2: {after_phase2} bytes, phase3: {after_phase3} bytes). \ | ||
| Expected at least one phase with <{threshold} bytes growth.", | ||
| ); | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.