Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct Ops {
// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
lifecycle: Slab<op::Lifecycle>,

/// Received but unserviced Op completions
completions: Slab<op::Completion>,
}

impl Driver {
Expand Down Expand Up @@ -133,11 +136,15 @@ impl Ops {
fn new() -> Ops {
Ops {
lifecycle: Slab::with_capacity(64),
completions: Slab::with_capacity(64),
}
}

fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
self.lifecycle.get_mut(index)
fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab<op::Completion>)> {
let completions = &mut self.completions;
self.lifecycle
.get_mut(index)
.map(|lifecycle| (lifecycle, completions))
}

// Insert a new operation
Expand All @@ -151,7 +158,8 @@ impl Ops {
}

fn complete(&mut self, index: usize, cqe: op::CqeResult) {
if self.lifecycle[index].complete(cqe) {
let completions = &mut self.completions;
if self.lifecycle[index].complete(completions, cqe) {
self.lifecycle.remove(index);
}
}
Expand All @@ -160,5 +168,6 @@ impl Ops {
impl Drop for Ops {
fn drop(&mut self) {
assert!(self.lifecycle.is_empty());
assert!(self.completions.is_empty());
}
}
125 changes: 105 additions & 20 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,40 @@ use std::task::{Context, Poll, Waker};

use io_uring::{cqueue, squeue};

mod slab_list;

use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};

use crate::driver;
use crate::runtime::CONTEXT;
use crate::util::PhantomUnsendUnsync;

/// A SlabList is used to hold unserved completions.
///
/// This is relevant to multi-completion Operations,
/// which require an unknown number of CQE events to be
/// captured before completion.
pub(crate) type Completion = SlabListEntry<CqeResult>;

/// In-flight operation
pub(crate) struct Op<T: 'static> {
pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
// Operation index in the slab
pub(super) index: usize,

// Per-operation data
data: Option<T>,

// CqeType marker
_cqe_type: PhantomData<CqeType>,

// Make !Send + !Sync
_phantom: PhantomUnsendUnsync,
}

/// A Marker for Ops which expect only a single completion event
pub(crate) struct SingleCQE;

pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> Self::Output;
Expand All @@ -39,12 +58,15 @@ pub(crate) enum Lifecycle {

/// The operation has completed with a single cqe result
Completed(CqeResult),

/// One or more completion results have been recieved
/// This holds the indices uniquely identifying the list within the slab
CompletionList(SlabListIndices),
}

/// A single CQE entry
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
#[allow(dead_code)]
pub(crate) flags: u32,
}

Expand All @@ -61,7 +83,7 @@ impl From<cqueue::Entry> for CqeResult {
}
}

impl<T> Op<T>
impl<T, CqeType> Op<T, CqeType>
where
T: Completable,
{
Expand All @@ -70,6 +92,7 @@ where
Op {
index: inner.ops.insert(),
data: Some(data),
_cqe_type: PhantomData,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -114,7 +137,7 @@ where
}
}

impl<T> Future for Op<T>
impl<T> Future for Op<T, SingleCQE>
where
T: Unpin + 'static + Completable,
{
Expand All @@ -127,7 +150,7 @@ where

CONTEXT.with(|runtime_context| {
runtime_context.with_driver_mut(|driver| {
let lifecycle = driver
let (lifecycle, _) = driver
.ops
.get_mut(me.index)
.expect("invalid internal state");
Expand All @@ -149,31 +172,57 @@ where
Lifecycle::Completed(cqe) => {
driver.ops.remove(me.index);
me.index = usize::MAX;

Poll::Ready(me.data.take().unwrap().complete(cqe))
}
Lifecycle::CompletionList(..) => {
unreachable!("No `more` flag set for SingleCQE")
}
}
})
})
}
}

impl<T> Drop for Op<T> {
/// The operation may have pending cqe's not yet processed.
/// To manage this, the lifecycle associated with the Op may if required
/// be placed in LifeCycle::Ignored state to handle cqe's which arrive after
/// the Op has been dropped.
impl<T, CqeType> Drop for Op<T, CqeType> {
fn drop(&mut self) {
use std::mem;

CONTEXT.with(|runtime_context| {
runtime_context.with_driver_mut(|driver| {
let lifecycle = match driver.ops.get_mut(self.index) {
Some(lifecycle) => lifecycle,
None => return,
// Get the Op Lifecycle state from the driver
let (lifecycle, completions) = match driver.ops.get_mut(self.index) {
Some(val) => val,
None => {
// Op dropped after the driver
return;
}
};

match lifecycle {
match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
}
Lifecycle::Completed(..) => {
driver.ops.remove(self.index);
}
Lifecycle::CompletionList(indices) => {
// Deallocate list entries, recording if more CQE's are expected
let more = {
let mut list = indices.into_list(completions);
io_uring::cqueue::more(list.peek_end().unwrap().flags)
// Dropping list deallocates the list entries
};
if more {
// If more are expected, we have to keep the op around
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
} else {
driver.ops.remove(self.index);
}
}
Lifecycle::Ignored(..) => unreachable!(),
}
})
Expand All @@ -182,21 +231,54 @@ impl<T> Drop for Op<T> {
}

impl Lifecycle {
pub(super) fn complete(&mut self, cqe: CqeResult) -> bool {
pub(super) fn complete(&mut self, completions: &mut Slab<Completion>, cqe: CqeResult) -> bool {
use std::mem;

match mem::replace(self, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*self = Lifecycle::Completed(cqe);
x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
if io_uring::cqueue::more(cqe.flags) {
let mut list = SlabListIndices::new().into_list(completions);
list.push(cqe);
*self = Lifecycle::CompletionList(list.into_indices());
} else {
*self = Lifecycle::Completed(cqe);
}
if let Lifecycle::Waiting(waker) = x {
// waker is woken to notify cqe has arrived
// Note: Maybe defer calling until cqe with !`more` flag set?
waker.wake();
}
false
}
Lifecycle::Waiting(waker) => {
*self = Lifecycle::Completed(cqe);
waker.wake();

lifecycle @ Lifecycle::Ignored(..) => {
if io_uring::cqueue::more(cqe.flags) {
// Not yet complete. The Op has been dropped, so we can drop the CQE
// but we must keep the lifecycle alive until no more CQE's expected
*self = lifecycle;
false
} else {
// This Op has completed, we can drop
true
}
}

Lifecycle::Completed(..) => {
// Completions with more flag set go straight onto the slab,
// and are handled in Lifecycle::CompletionList.
// To construct Lifecycle::Completed, a CQE with `more` flag unset was received
// we shouldn't be receiving another.
unreachable!("invalid operation state")
}

Lifecycle::CompletionList(indices) => {
// A completion list may contain CQE's with and without `more` flag set.
// Only the final one may have `more` unset, although we don't check.
let mut list = indices.into_list(completions);
list.push(cqe);
*self = Lifecycle::CompletionList(list.into_indices());
false
}
Lifecycle::Ignored(..) => true,
Lifecycle::Completed(..) => unreachable!("invalid operation state"),
}
}
}
Expand Down Expand Up @@ -377,7 +459,10 @@ mod test {

fn release() {
CONTEXT.with(|cx| {
cx.with_driver_mut(|driver| driver.ops.lifecycle.clear());
cx.with_driver_mut(|driver| {
driver.ops.lifecycle.clear();
driver.ops.completions.clear();
});

cx.unset_driver();
});
Expand Down
Loading