Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
311
312
&
+
<
Expand Down Expand Up @@ -195,6 +195,7 @@ ntasks
NUMA
ok
oneshot
opcode
ORed
os
parker
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ tracing = { version = "0.1.29", default-features = false, features = ["std"], op
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(all(tokio_unstable, target_os = "linux"))'.dependencies]
io-uring = { version = "0.7.6", default-features = false, optional = true }
io-uring = { version = "0.7.11", default-features = false, optional = true }
libc = { version = "0.2.168", optional = true }
mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"], optional = true }
slab = { version = "0.4.9", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/fs/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ impl OpenOptions {
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();

if driver_handle.check_and_init()? {
if driver_handle.check_and_init(io_uring::opcode::OpenAt::CODE)? {
Op::open(path.as_ref(), opts)?.await
} else {
let opts = opts.clone().into();
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
if driver_handle.check_and_init(io_uring::opcode::Read::CODE)? {
return read_uring(&path).await;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/fs/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
{
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
if driver_handle.check_and_init(io_uring::opcode::Write::CODE)? {
return write_uring(path, contents).await;
}
}
Expand Down
45 changes: 35 additions & 10 deletions tokio/src/runtime/io/driver/uring.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use io_uring::{squeue::Entry, IoUring};
use io_uring::{squeue::Entry, IoUring, Probe};
use mio::unix::SourceFd;
use slab::Slab;

Expand Down Expand Up @@ -38,6 +38,7 @@ impl State {

pub(crate) struct UringContext {
pub(crate) uring: Option<io_uring::IoUring>,
pub(crate) probe: io_uring::Probe,
pub(crate) ops: slab::Slab<Lifecycle>,
}

Expand All @@ -46,6 +47,7 @@ impl UringContext {
Self {
ops: Slab::new(),
uring: None,
probe: Probe::new(),
}
}

Expand All @@ -57,6 +59,10 @@ impl UringContext {
self.uring.as_mut().expect("io_uring not initialized")
}

pub(crate) fn is_opcode_supported(&self, opcode: u8) -> bool {
self.probe.is_supported(opcode)
}

/// Perform `io_uring_setup` system call, and Returns true if this
/// actually initialized the io_uring.
///
Expand All @@ -68,7 +74,18 @@ impl UringContext {
return Ok(false);
}

self.uring.replace(IoUring::new(DEFAULT_RING_SIZE)?);
let uring = IoUring::new(DEFAULT_RING_SIZE)?;

match uring.submitter().register_probe(&mut self.probe) {
Ok(_) => {}
Err(e) if e.raw_os_error() == Some(libc::EINVAL) => {
// The kernel does not support IORING_REGISTER_PROBE.
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
}
Err(e) => return Err(e),
}

self.uring.replace(uring);

Ok(true)
}
Expand Down Expand Up @@ -182,12 +199,19 @@ impl Handle {
}

/// Check if the io_uring context is initialized. If not, it will try to initialize it.
pub(crate) fn check_and_init(&self) -> io::Result<bool> {
/// Then, check if the provided opcode is supported.
///
/// If both the context initialization succeeds and the opcode is supported,
/// this returns `Ok(true)`.
/// If either io_uring is unsupported or the opcode is unsupported,
/// this returns `Ok(false)`.
/// An error is returned if an io_uring syscall returns an unexpected error value.
pub(crate) fn check_and_init(&self, opcode: u8) -> io::Result<bool> {
match State::from_usize(self.uring_state.load(Ordering::Acquire)) {
State::Uninitialized => match self.try_init() {
Ok(()) => {
State::Uninitialized => match self.try_init_and_check_opcode(opcode) {
Ok(opcode_supported) => {
self.set_uring_state(State::Initialized);
Ok(true)
Ok(opcode_supported)
}
// If the system doesn't support io_uring, we set the state to Unsupported.
Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => {
Expand All @@ -205,18 +229,19 @@ impl Handle {
Err(e) => Err(e),
},
State::Unsupported => Ok(false),
State::Initialized => Ok(true),
State::Initialized => Ok(self.get_uring().lock().is_opcode_supported(opcode)),
}
}

/// Initialize the io_uring context if it hasn't been initialized yet.
fn try_init(&self) -> io::Result<()> {
/// Then, check whether the given opcode is supported.
fn try_init_and_check_opcode(&self, opcode: u8) -> io::Result<bool> {
let mut guard = self.get_uring().lock();
if guard.try_init()? {
self.add_uring_source(guard.ring().as_raw_fd())?;
}

Ok(())
Ok(guard.is_opcode_supported(opcode))
}

/// Register an operation with the io_uring.
Expand All @@ -231,7 +256,7 @@ impl Handle {
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
// Note: Maybe this check can be removed if upstream callers consistently use `check_and_init`.
if !self.check_and_init()? {
if !self.check_and_init(entry.get_opcode() as u8)? {
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
}

Expand Down
Loading