diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 1dad7b1d600..045b74a15f0 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -83,6 +83,7 @@ jobs: rust: beta crates: - tokio-executor + - tokio # Try cross compiling - template: ci/azure-cross-compile.yml diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4b87917c9cd..e4b501ceb16 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -39,7 +39,7 @@ fs = ["tokio-executor/blocking"] io = ["tokio-io", "bytes", "iovec"] macros = ["tokio-macros"] net-full = ["tcp", "udp", "uds"] -net-driver = ["mio", "tokio-executor/blocking"] +net-driver = ["mio", "tokio-executor/blocking", "lazy_static"] rt-current-thread = [ "timer", "tokio-executor/current-thread", @@ -110,6 +110,10 @@ version = "0.3.8" default-features = false optional = true +[target.'cfg(loom)'.dependencies] +# play nice with loom tests in other crates. +loom = "0.2.11" + [dev-dependencies] tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } @@ -127,5 +131,9 @@ serde_json = "1.0" tempfile = "3.1.0" time = "0.1" +# sharded slab tests +loom = "0.2.11" +proptest = "0.9.4" + [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 814f2a59b21..57403b57b7e 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -69,7 +69,6 @@ //! } //! } //! ``` - macro_rules! if_runtime { ($($i:item)*) => ($( #[cfg(any( @@ -97,6 +96,9 @@ pub mod io; #[cfg(feature = "net-driver")] pub mod net; +#[cfg(feature = "net-driver")] +mod loom; + pub mod prelude; #[cfg(feature = "process")] diff --git a/tokio/src/loom.rs b/tokio/src/loom.rs new file mode 100644 index 00000000000..57ce2df6c81 --- /dev/null +++ b/tokio/src/loom.rs @@ -0,0 +1,45 @@ +//! This module abstracts over `loom` and `std::sync` depending on whether we +//! are running tests or not. +pub(crate) use self::inner::*; + +#[cfg(all(test, loom))] +mod inner { + pub(crate) use loom::sync::CausalCell; + pub(crate) use loom::sync::Mutex; + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::*; + pub(crate) use std::sync::atomic::Ordering; + } +} + +#[cfg(not(all(test, loom)))] +mod inner { + use std::cell::UnsafeCell; + pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Mutex; + + #[derive(Debug)] + pub(crate) struct CausalCell(UnsafeCell); + + impl CausalCell { + pub(crate) fn new(data: T) -> CausalCell { + CausalCell(UnsafeCell::new(data)) + } + + #[inline(always)] + pub(crate) fn with(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + #[inline(always)] + pub(crate) fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } +} diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs index 9079ccc7c3f..b7f33d02e4e 100644 --- a/tokio/src/net/driver/mod.rs +++ b/tokio/src/net/driver/mod.rs @@ -124,7 +124,15 @@ //! [`PollEvented`]: struct.PollEvented.html //! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html //! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +#[cfg(loom)] +macro_rules! loom_thread_local { + ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } } +} +#[cfg(not(loom))] +macro_rules! loom_thread_local { + ($($tts:tt)+) => { std::thread_local!{ $($tts)+ } } +} pub(crate) mod platform; mod reactor; mod registration; diff --git a/tokio/src/net/driver/reactor/dispatch/iter.rs b/tokio/src/net/driver/reactor/dispatch/iter.rs new file mode 100644 index 00000000000..f785f5d480a --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/iter.rs @@ -0,0 +1,53 @@ +use super::{ + page::{self, ScheduledIo}, + Shard, +}; +use std::slice; + +pub(in crate::net::driver::reactor) struct UniqueIter<'a> { + pub(super) shards: slice::IterMut<'a, Shard>, + pub(super) pages: slice::Iter<'a, page::Shared>, + pub(super) slots: Option>, +} + +impl<'a> Iterator for UniqueIter<'a> { + type Item = &'a ScheduledIo; + fn next(&mut self) -> Option { + loop { + if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) { + return Some(item); + } + + if let Some(page) = self.pages.next() { + self.slots = page.iter(); + } + + if let Some(shard) = self.shards.next() { + self.pages = shard.iter(); + } else { + return None; + } + } + } +} + +pub(in crate::net::driver::reactor) struct ShardIter<'a> { + pub(super) pages: slice::IterMut<'a, page::Shared>, + pub(super) slots: Option>, +} + +impl<'a> Iterator for ShardIter<'a> { + type Item = &'a ScheduledIo; + fn next(&mut self) -> Option { + loop { + if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) { + return Some(item); + } + if let Some(page) = self.pages.next() { + self.slots = page.iter(); + } else { + return None; + } + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/mod.rs b/tokio/src/net/driver/reactor/dispatch/mod.rs new file mode 100644 index 00000000000..d7262a354ae --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/mod.rs @@ -0,0 +1,36 @@ +//! A lock-free concurrent slab. + +#[cfg(all(test, loom))] +macro_rules! test_println { + ($($arg:tt)*) => { + println!("{:?} {}", crate::net::driver::reactor::dispatch::Tid::current(), format_args!($($arg)*)) + } +} + +mod iter; +mod pack; +mod page; +mod sharded_slab; +mod tid; + +#[cfg(all(test, loom))] +// this is used by sub-modules +use self::tests::test_util; +use pack::{Pack, WIDTH}; +use sharded_slab::Shard; +#[cfg(all(test, loom))] +pub(crate) use sharded_slab::Slab; +pub(crate) use sharded_slab::{SingleShard, MAX_SOURCES}; +use tid::Tid; + +#[cfg(target_pointer_width = "64")] +const MAX_THREADS: usize = 4096; +#[cfg(target_pointer_width = "32")] +const MAX_THREADS: usize = 2048; +const INITIAL_PAGE_SIZE: usize = 32; +const MAX_PAGES: usize = WIDTH / 4; +// Chosen arbitrarily. +const RESERVED_BITS: usize = 5; + +#[cfg(test)] +mod tests; diff --git a/tokio/src/net/driver/reactor/dispatch/pack.rs b/tokio/src/net/driver/reactor/dispatch/pack.rs new file mode 100644 index 00000000000..0be44a48abf --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/pack.rs @@ -0,0 +1,89 @@ +pub(super) const WIDTH: usize = std::mem::size_of::() * 8; + +/// Trait encapsulating the calculations required for bit-packing slab indices. +/// +/// This allows us to avoid manually repeating some calculations when packing +/// and unpacking indices. +pub(crate) trait Pack: Sized { + // ====== provided by each implementation ================================= + + /// The number of bits occupied by this type when packed into a usize. + /// + /// This must be provided to determine the number of bits into which to pack + /// the type. + const LEN: usize; + /// The type packed on the less significant side of this type. + /// + /// If this type is packed into the least significant bit of a usize, this + /// should be `()`, which occupies no bytes. + /// + /// This is used to calculate the shift amount for packing this value. + type Prev: Pack; + + // ====== calculated automatically ======================================== + + /// A number consisting of `Self::LEN` 1 bits, starting at the least + /// significant bit. + /// + /// This is the higest value this type can represent. This number is shifted + /// left by `Self::SHIFT` bits to calculate this type's `MASK`. + /// + /// This is computed automatically based on `Self::LEN`. + const BITS: usize = { + let shift = 1 << (Self::LEN - 1); + shift | (shift - 1) + }; + /// The number of bits to shift a number to pack it into a usize with other + /// values. + /// + /// This is caculated automatically based on the `LEN` and `SHIFT` constants + /// of the previous value. + const SHIFT: usize = Self::Prev::SHIFT + Self::Prev::LEN; + + /// The mask to extract only this type from a packed `usize`. + /// + /// This is calculated by shifting `Self::BITS` left by `Self::SHIFT`. + const MASK: usize = Self::BITS << Self::SHIFT; + + fn as_usize(&self) -> usize; + fn from_usize(val: usize) -> Self; + + #[inline(always)] + fn pack(&self, to: usize) -> usize { + let value = self.as_usize(); + debug_assert!(value <= Self::BITS); + + (to & !Self::MASK) | (value << Self::SHIFT) + } + + #[inline(always)] + fn from_packed(from: usize) -> Self { + let value = (from & Self::MASK) >> Self::SHIFT; + debug_assert!(value <= Self::BITS); + Self::from_usize(value) + } +} + +impl Pack for () { + const BITS: usize = 0; + const LEN: usize = 0; + const SHIFT: usize = 0; + const MASK: usize = 0; + + type Prev = (); + + fn as_usize(&self) -> usize { + unreachable!() + } + fn from_usize(_val: usize) -> Self { + unreachable!() + } + + fn pack(&self, _to: usize) -> usize { + unreachable!() + } + + fn from_packed(_from: usize) -> Self { + unreachable!() + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/mod.rs b/tokio/src/net/driver/reactor/dispatch/page/mod.rs new file mode 100644 index 00000000000..0b8d3c4c20f --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/mod.rs @@ -0,0 +1,257 @@ +use super::{Pack, INITIAL_PAGE_SIZE, WIDTH}; +use crate::loom::CausalCell; + +pub(crate) mod scheduled_io; +mod stack; +pub(crate) use self::scheduled_io::ScheduledIo; +use self::stack::TransferStack; +use std::fmt; + +/// A page address encodes the location of a slot within a shard (the page +/// number and offset within that page) as a single linear value. +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)] +pub(crate) struct Addr { + addr: usize, +} + +impl Addr { + const NULL: usize = Self::BITS + 1; + const INDEX_SHIFT: usize = INITIAL_PAGE_SIZE.trailing_zeros() as usize + 1; + + pub(crate) fn index(self) -> usize { + // Since every page is twice as large as the previous page, and all page sizes + // are powers of two, we can determine the page index that contains a given + // address by shifting the address down by the smallest page size and + // looking at how many twos places necessary to represent that number, + // telling us what power of two page size it fits inside of. We can + // determine the number of twos places by counting the number of leading + // zeros (unused twos places) in the number's binary representation, and + // subtracting that count from the total number of bits in a word. + WIDTH - ((self.addr + INITIAL_PAGE_SIZE) >> Self::INDEX_SHIFT).leading_zeros() as usize + } + + pub(crate) fn offset(self) -> usize { + self.addr + } +} + +pub(super) fn size(n: usize) -> usize { + INITIAL_PAGE_SIZE * 2usize.pow(n as _) +} + +impl Pack for Addr { + const LEN: usize = super::MAX_PAGES + Self::INDEX_SHIFT; + + type Prev = (); + + fn as_usize(&self) -> usize { + self.addr + } + + fn from_usize(addr: usize) -> Self { + debug_assert!(addr <= Self::BITS); + Self { addr } + } +} + +pub(in crate::net::driver) type Iter<'a> = std::slice::Iter<'a, ScheduledIo>; + +pub(crate) struct Local { + head: CausalCell, +} + +pub(crate) struct Shared { + remote: TransferStack, + size: usize, + prev_sz: usize, + slab: CausalCell>>, +} + +impl Local { + pub(crate) fn new() -> Self { + Self { + head: CausalCell::new(0), + } + } + + #[inline(always)] + fn head(&self) -> usize { + self.head.with(|head| unsafe { *head }) + } + + #[inline(always)] + fn set_head(&self, new_head: usize) { + self.head.with_mut(|head| unsafe { + *head = new_head; + }) + } +} + +impl Shared { + const NULL: usize = Addr::NULL; + + pub(crate) fn new(size: usize, prev_sz: usize) -> Self { + Self { + prev_sz, + size, + remote: TransferStack::new(), + slab: CausalCell::new(None), + } + } + + /// Allocates storage for this page if it does not allready exist. + /// + /// This requires unique access to the page (e.g. it is called from the + /// thread that owns the page, or, in the case of `SingleShard`, while the + /// lock is held). In order to indicate this, a reference to the page's + /// `Local` data is taken by this function; the `Local` argument is not + /// actually used, but requiring it ensures that this is only called when + /// local access is held. + #[cold] + fn alloc_page(&self, _: &Local) { + debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() })); + + let mut slab = Vec::with_capacity(self.size); + slab.extend((1..self.size).map(ScheduledIo::new)); + slab.push(ScheduledIo::new(Self::NULL)); + self.slab.with_mut(|s| { + // this mut access is safe — it only occurs to initially + // allocate the page, which only happens on this thread; if the + // page has not yet been allocated, other threads will not try + // to access it yet. + unsafe { + *s = Some(slab.into_boxed_slice()); + } + }); + } + + #[inline] + pub(crate) fn alloc(&self, local: &Local) -> Option { + let head = local.head(); + + // are there any items on the local free list? (fast path) + let head = if head < self.size { + head + } else { + // if the local free list is empty, pop all the items on the remote + // free list onto the local free list. + self.remote.pop_all()? + }; + + // if the head is still null, both the local and remote free lists are + // empty --- we can't fit any more items on this page. + if head == Self::NULL { + return None; + } + + // do we need to allocate storage for this page? + let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() }); + if page_needs_alloc { + self.alloc_page(local); + } + + let gen = self.slab.with(|slab| { + let slab = unsafe { &*(slab) } + .as_ref() + .expect("page must have been allocated to alloc!"); + let slot = &slab[head]; + local.set_head(slot.next()); + slot.alloc() + }); + + let index = head + self.prev_sz; + Some(gen.pack(index)) + } + + #[inline] + pub(in crate::net::driver) fn get(&self, addr: Addr) -> Option<&ScheduledIo> { + let page_offset = addr.offset() - self.prev_sz; + self.slab + .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset)) + } + + pub(crate) fn remove_local(&self, local: &Local, addr: Addr, idx: usize) { + let offset = addr.offset() - self.prev_sz; + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + if slot.reset(scheduled_io::Generation::from_packed(idx)) { + slot.set_next(local.head()); + local.set_head(offset); + } + }) + } + + pub(crate) fn remove_remote(&self, addr: Addr, idx: usize) { + let offset = addr.offset() - self.prev_sz; + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + if !slot.reset(scheduled_io::Generation::from_packed(idx)) { + return; + } + self.remote.push(offset, |next| slot.set_next(next)); + }) + } + + pub(in crate::net::driver) fn iter(&self) -> Option> { + let slab = self.slab.with(|slab| unsafe { (&*slab).as_ref() }); + slab.map(|slab| slab.iter()) + } +} + +impl fmt::Debug for Local { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.head.with(|head| { + let head = unsafe { *head }; + f.debug_struct("Local") + .field("head", &format_args!("{:#0x}", head)) + .finish() + }) + } +} + +impl fmt::Debug for Shared { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("remote", &self.remote) + .field("prev_sz", &self.prev_sz) + .field("size", &self.size) + // .field("slab", &self.slab) + .finish() + } +} + +impl fmt::Debug for Addr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Addr") + .field("addr", &format_args!("{:#0x}", &self.addr)) + .field("index", &self.index()) + .field("offset", &self.offset()) + .finish() + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + use proptest::prelude::*; + + proptest! { + #[test] + fn addr_roundtrips(pidx in 0usize..Addr::BITS) { + let addr = Addr::from_usize(pidx); + let packed = addr.pack(0); + assert_eq!(addr, Addr::from_packed(packed)); + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs new file mode 100644 index 00000000000..34a07ea8767 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs @@ -0,0 +1,171 @@ +use super::super::{Pack, Tid, RESERVED_BITS, WIDTH}; +use crate::loom::{ + atomic::{AtomicUsize, Ordering}, + CausalCell, +}; + +use tokio_sync::AtomicWaker; + +#[derive(Debug)] +pub(crate) struct ScheduledIo { + /// The offset of the next item on the free list. + next: CausalCell, + readiness: AtomicUsize, + pub(in crate::net::driver) reader: AtomicWaker, + pub(in crate::net::driver) writer: AtomicWaker, +} + +#[repr(transparent)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] +pub(crate) struct Generation { + value: usize, +} + +impl Pack for Generation { + /// Use all the remaining bits in the word for the generation counter, minus + /// any bits reserved by the user. + const LEN: usize = (WIDTH - RESERVED_BITS) - Self::SHIFT; + + type Prev = Tid; + + #[inline(always)] + fn from_usize(u: usize) -> Self { + debug_assert!(u <= Self::BITS); + Self::new(u) + } + + #[inline(always)] + fn as_usize(&self) -> usize { + self.value + } +} + +impl Generation { + const ONE: usize = 1 << Self::SHIFT; + + fn new(value: usize) -> Self { + Self { value } + } + + fn next(self) -> Self { + Self::from_usize((self.value + 1) % Self::BITS) + } +} + +impl ScheduledIo { + pub(super) fn new(next: usize) -> Self { + Self { + next: CausalCell::new(next), + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + } + } + + #[inline] + pub(super) fn alloc(&self) -> Generation { + Generation::from_packed(self.readiness.load(Ordering::SeqCst)) + } + + #[inline(always)] + pub(super) fn next(&self) -> usize { + self.next.with(|next| unsafe { *next }) + } + + #[inline] + pub(super) fn reset(&self, gen: Generation) -> bool { + let mut current = self.readiness.load(Ordering::Acquire); + loop { + if Generation::from_packed(current) != gen { + return false; + } + let next_gen = gen.next().pack(0); + match self.readiness.compare_exchange( + current, + next_gen, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + drop(self.reader.take_waker()); + drop(self.writer.take_waker()); + true + } + + #[inline(always)] + pub(super) fn set_next(&self, next: usize) { + self.next.with_mut(|n| unsafe { + (*n) = next; + }) + } + + /// Returns the current readiness value of this `ScheduledIo`, if the + /// provided `token` is still a valid access. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `None`. + /// Otherwise, this returns the current readiness. + pub(in crate::net::driver) fn get_readiness(&self, token: usize) -> Option { + let gen = token & Generation::MASK; + let ready = self.readiness.load(Ordering::Acquire); + if ready & Generation::MASK != gen { + return None; + } + Some(ready & (!Generation::MASK)) + } + + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on + /// the current value, returning the previous readiness value. + /// + /// # Arguments + /// - `token`: the token for this `ScheduledIo`. + /// - `f`: a closure returning a new readiness value given the previous + /// readiness. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `Err`. + /// Otherwise, this returns the previous readiness. + pub(in crate::net::driver) fn set_readiness( + &self, + token: usize, + f: impl Fn(usize) -> usize, + ) -> Result { + let gen = token & Generation::MASK; + let mut current = self.readiness.load(Ordering::Acquire); + loop { + // Check that the generation for this access is still the current + // one. + if current & Generation::MASK != gen { + return Err(()); + } + // Mask out the generation bits so that the modifying function + // doesn't see them. + let current_readiness = current & mio::Ready::all().as_usize(); + let new = f(current_readiness); + debug_assert!( + new < Generation::ONE, + "new readiness value would overwrite generation bits!" + ); + + match self.readiness.compare_exchange( + current, + new | gen, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Ok(current), + // we lost the race, retry! + Err(actual) => current = actual, + } + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/stack.rs b/tokio/src/net/driver/reactor/dispatch/page/stack.rs new file mode 100644 index 00000000000..26597dc2dee --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/stack.rs @@ -0,0 +1,149 @@ +use crate::loom::atomic::{AtomicUsize, Ordering}; +use std::fmt; + +pub(super) struct TransferStack { + head: AtomicUsize, +} + +impl TransferStack { + pub(super) fn new() -> Self { + Self { + head: AtomicUsize::new(super::Addr::NULL), + } + } + + pub(super) fn pop_all(&self) -> Option { + let val = self.head.swap(super::Addr::NULL, Ordering::Acquire); + if val == super::Addr::NULL { + None + } else { + Some(val) + } + } + + pub(super) fn push(&self, value: usize, before: impl Fn(usize)) { + let mut next = self.head.load(Ordering::Relaxed); + loop { + before(next); + + match self + .head + .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire) + { + // lost the race! + Err(actual) => next = actual, + Ok(_) => return, + } + } + } +} + +impl fmt::Debug for TransferStack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Loom likes to dump all its internal state in `fmt::Debug` impls, so + // we override this to just print the current value in tests. + f.debug_struct("TransferStack") + .field( + "head", + &format_args!("{:#x}", self.head.load(Ordering::Relaxed)), + ) + .finish() + } +} + +#[cfg(all(test, loom))] +mod test { + use super::super::super::test_util; + use super::*; + use crate::loom::CausalCell; + use loom::thread; + use std::sync::Arc; + + #[test] + fn transfer_stack() { + test_util::run_model("transfer_stack", || { + let causalities = [CausalCell::new(None), CausalCell::new(None)]; + let shared = Arc::new((causalities, TransferStack::new())); + let shared1 = shared.clone(); + let shared2 = shared.clone(); + + // Spawn two threads that both try to push to the stack. + let t1 = thread::spawn(move || { + let (causalities, stack) = &*shared1; + stack.push(0, |prev| { + causalities[0].with_mut(|c| unsafe { + *c = Some(prev); + }); + test_println!("prev={:#x}", prev) + }); + }); + + let t2 = thread::spawn(move || { + let (causalities, stack) = &*shared2; + stack.push(1, |prev| { + causalities[1].with_mut(|c| unsafe { + *c = Some(prev); + }); + test_println!("prev={:#x}", prev) + }); + }); + + let (causalities, stack) = &*shared; + + // Try to pop from the stack... + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + test_println!("popped {:#x}", idx); + + let saw_both = causalities[idx].with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + // were there two entries in the stack? if so, check that + // both saw a write. + if let Some(c) = causalities.get(val.unwrap()) { + test_println!("saw both entries!"); + c.with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + }); + true + } else { + false + } + }); + + // We only saw one push. Ensure that the other push happens too. + if !saw_both { + // Try to pop from the stack... + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + + test_println!("popped {:#x}", idx); + causalities[idx].with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + }); + } + + t1.join().unwrap(); + t2.join().unwrap(); + }); + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/sharded_slab.rs b/tokio/src/net/driver/reactor/dispatch/sharded_slab.rs new file mode 100644 index 00000000000..09f6e1117c8 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/sharded_slab.rs @@ -0,0 +1,274 @@ +use super::*; +use std::fmt; + +use crate::loom::Mutex; + +/// A sharded slab. +pub(crate) struct Slab { + shards: Box<[Shard]>, +} + +/// A slab implemented with a single shard. +// TODO(eliza): once worker threads are available, this type will be +// unnecessary and can be removed. +#[derive(Debug)] +pub(crate) struct SingleShard { + shard: Shard, + local: Mutex<()>, +} + +// ┌─────────────┐ ┌────────┐ +// │ page 1 │ │ │ +// ├─────────────┤ ┌───▶│ next──┼─┐ +// │ page 2 │ │ ├────────┤ │ +// │ │ │ │XXXXXXXX│ │ +// │ local_free──┼─┘ ├────────┤ │ +// │ global_free─┼─┐ │ │◀┘ +// ├─────────────┤ └───▶│ next──┼─┐ +// │ page 3 │ ├────────┤ │ +// └─────────────┘ │XXXXXXXX│ │ +// ... ├────────┤ │ +// ┌─────────────┐ │XXXXXXXX│ │ +// │ page n │ ├────────┤ │ +// └─────────────┘ │ │◀┘ +// │ next──┼───▶ +// ├────────┤ +// │XXXXXXXX│ +// └────────┘ +// ... +pub(super) struct Shard { + #[cfg(debug_assertions)] + tid: usize, + /// The local free list for each page. + /// + /// These are only ever accessed from this shard's thread, so they are + /// stored separately from the shared state for the page that can be + /// accessed concurrently, to minimize false sharing. + local: Box<[page::Local]>, + /// The shared state for each page in this shard. + /// + /// This consists of the page's metadata (size, previous size), remote free + /// list, and a pointer to the actual array backing that page. + shared: Box<[page::Shared]>, +} + +pub(crate) const TOKEN_SHIFT: usize = Tid::SHIFT + Tid::LEN; +pub(crate) const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; + +#[allow(dead_code)] // coming back soon! +impl Slab { + /// Returns a new slab with the default configuration parameters. + pub(crate) fn new() -> Self { + Self::with_max_threads(MAX_THREADS) + } + + pub(crate) fn with_max_threads(max_threads: usize) -> Self { + // Round the max number of threads to the next power of two and clamp to + // the maximum representable number. + let max = max_threads.next_power_of_two().min(MAX_THREADS); + let shards = (0..max).map(Shard::new).collect(); + Self { shards } + } + + /// allocs a value into the slab, returning a key that can be used to + /// access it. + /// + /// If this function returns `None`, then the shard for the current thread + /// is full and no items can be added until some are removed, or the maximum + /// number of shards has been reached. + pub(crate) fn alloc(&self) -> Option { + let tid = Tid::current(); + self.shards[tid.as_usize()].alloc().map(|idx| tid.pack(idx)) + } + + /// Removes the value associated with the given key from the slab. + pub(crate) fn remove(&self, idx: usize) { + let tid = Tid::from_packed(idx); + let shard = &self.shards[tid.as_usize()]; + if tid.is_current() { + shard.remove_local(idx) + } else { + shard.remove_remote(idx) + } + } + + /// Return a reference to the value associated with the given key. + /// + /// If the slab does not contain a value for the given key, `None` is + /// returned instead. + pub(in crate::net::driver) fn get(&self, token: usize) -> Option<&page::ScheduledIo> { + let tid = Tid::from_packed(token); + self.shards.get(tid.as_usize())?.get(token) + } + + /// Returns an iterator over all the items in the slab. + pub(in crate::net::driver::reactor) fn unique_iter(&mut self) -> iter::UniqueIter<'_> { + let mut shards = self.shards.iter_mut(); + let shard = shards.next().expect("must be at least 1 shard"); + let mut pages = shard.iter(); + let slots = pages.next().and_then(page::Shared::iter); + iter::UniqueIter { + shards, + slots, + pages, + } + } +} + +impl SingleShard { + /// Returns a new slab with the default configuration parameters. + pub(crate) fn new() -> Self { + Self { + shard: Shard::new(0), + local: Mutex::new(()), + } + } + + /// allocs a value into the slab, returning a key that can be used to + /// access it. + /// + /// If this function returns `None`, then the shard for the current thread + /// is full and no items can be added until some are removed, or the maximum + /// number of shards has been reached. + pub(crate) fn alloc(&self) -> Option { + // we must lock the slab to alloc an item. + let _local = self.local.lock().unwrap(); + self.shard.alloc() + } + + /// Removes the value associated with the given key from the slab. + pub(crate) fn remove(&self, idx: usize) { + // try to lock the slab so that we can use `remove_local`. + let lock = self.local.try_lock(); + // if we were able to lock the slab, we are "local" and can use the fast + // path; otherwise, we will use `remove_remote`. + if lock.is_ok() { + self.shard.remove_local(idx) + } else { + self.shard.remove_remote(idx) + } + } + + /// Return a reference to the value associated with the given key. + /// + /// If the slab does not contain a value for the given key, `None` is + /// returned instead. + pub(in crate::net::driver) fn get(&self, token: usize) -> Option<&page::ScheduledIo> { + self.shard.get(token) + } + + /// Returns an iterator over all the items in the slab. + pub(in crate::net::driver::reactor) fn unique_iter(&mut self) -> iter::ShardIter<'_> { + let mut pages = self.shard.iter_mut(); + let slots = pages.next().and_then(|pg| pg.iter()); + iter::ShardIter { slots, pages } + } +} + +impl Shard { + fn new(_idx: usize) -> Self { + let mut total_sz = 0; + let shared = (0..MAX_PAGES) + .map(|page_num| { + let sz = page::size(page_num); + let prev_sz = total_sz; + total_sz += sz; + page::Shared::new(sz, prev_sz) + }) + .collect(); + let local = (0..MAX_PAGES).map(|_| page::Local::new()).collect(); + Self { + #[cfg(debug_assertions)] + tid: _idx, + local, + shared, + } + } + + fn alloc(&self) -> Option { + // Can we fit the value into an existing page? + for (page_idx, page) in self.shared.iter().enumerate() { + let local = self.local(page_idx); + if let Some(page_offset) = page.alloc(local) { + return Some(page_offset); + } + } + + None + } + + #[inline(always)] + fn get(&self, idx: usize) -> Option<&page::ScheduledIo> { + #[cfg(debug_assertions)] + debug_assert_eq!(Tid::from_packed(idx).as_usize(), self.tid); + + let addr = page::Addr::from_packed(idx); + let i = addr.index(); + + if i > self.shared.len() { + return None; + } + self.shared[i].get(addr) + } + + /// Remove an item on the shard's local thread. + fn remove_local(&self, idx: usize) { + #[cfg(debug_assertions)] + debug_assert_eq!(Tid::from_packed(idx).as_usize(), self.tid); + let addr = page::Addr::from_packed(idx); + let page_idx = addr.index(); + + if let Some(page) = self.shared.get(page_idx) { + page.remove_local(self.local(page_idx), addr, idx); + } + } + + /// Remove an item, while on a different thread from the shard's local thread. + fn remove_remote(&self, idx: usize) { + #[cfg(debug_assertions)] + debug_assert_eq!(Tid::from_packed(idx).as_usize(), self.tid); + let addr = page::Addr::from_packed(idx); + let page_idx = addr.index(); + + if let Some(page) = self.shared.get(page_idx) { + page.remove_remote(addr, idx); + } + } + + #[inline(always)] + fn local(&self, i: usize) -> &page::Local { + &self.local[i] + } + + pub(super) fn iter(&self) -> std::slice::Iter<'_, page::Shared> { + self.shared.iter() + } + + fn iter_mut(&mut self) -> std::slice::IterMut<'_, page::Shared> { + self.shared.iter_mut() + } +} + +impl fmt::Debug for Slab { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Slab") + .field("shards", &self.shards) + .finish() + } +} + +unsafe impl Send for Slab {} +unsafe impl Sync for Slab {} + +unsafe impl Send for SingleShard {} +unsafe impl Sync for SingleShard {} + +impl fmt::Debug for Shard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("Shard"); + + #[cfg(debug_assertions)] + d.field("tid", &self.tid); + d.field("shared", &self.shared).finish() + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs b/tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs new file mode 100644 index 00000000000..495cac1c589 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs @@ -0,0 +1,204 @@ +use self::test_util::*; +use super::super::Slab; +use loom::sync::{Arc, Condvar, Mutex}; +use loom::thread; + +pub(crate) mod test_util { + use std::sync::atomic::{AtomicUsize, Ordering}; + + pub(crate) fn run_model(name: &'static str, f: impl Fn() + Sync + Send + 'static) { + run_builder(name, loom::model::Builder::new(), f) + } + + pub(crate) fn run_builder( + name: &'static str, + builder: loom::model::Builder, + f: impl Fn() + Sync + Send + 'static, + ) { + let iters = AtomicUsize::new(1); + builder.check(move || { + println!( + "\n------------ running test {}; iteration {} ------------\n", + name, + iters.fetch_add(1, Ordering::SeqCst) + ); + f() + }); + } +} + +fn store_val(slab: &Arc, readiness: usize) -> usize { + println!("store: {}", readiness); + let key = slab.alloc().expect("allocate slot"); + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key +} + +fn get_val(slab: &Arc, key: usize) -> Option { + slab.get(key).and_then(|s| s.get_readiness(key)) +} + +mod single_shard; +mod small_slab; + +#[test] +fn local_remove() { + run_model("local_remove", || { + let slab = Arc::new(Slab::new()); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + let idx = store_val(&s, 1); + assert_eq!(get_val(&s, idx), Some(1)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 2); + assert_eq!(get_val(&s, idx), Some(2)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + let idx = store_val(&s, 3); + assert_eq!(get_val(&s, idx), Some(3)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 4); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab; + let idx1 = store_val(&s, 5); + assert_eq!(get_val(&s, idx1), Some(5)); + let idx2 = store_val(&s, 6); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx1); + assert_eq!(get_val(&s, idx1), None); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + }); +} + +#[test] +fn remove_remote() { + run_model("remove_remote", || { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + assert_eq!(get_val(&slab, idx1), Some(1)); + + let idx2 = store_val(&slab, 2); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let idx3 = store_val(&slab, 3); + assert_eq!(get_val(&slab, idx3), Some(3)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + assert_eq!(get_val(&s, idx2), Some(2)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + assert_eq!(get_val(&s, idx3), Some(3)); + s.remove(idx3); + assert_eq!(get_val(&s, idx3), None); + }); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), None); + assert_eq!(get_val(&slab, idx3), None); + }); +} + +#[test] +fn concurrent_alloc_remove() { + run_model("concurrent_alloc_remove", || { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let remover = thread::spawn(move || { + let (lock, cvar) = &*pair2; + for i in 0..2 { + test_println!("--- remover i={} ---", i); + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.take().unwrap(); + slab2.remove(key); + assert_eq!(get_val(&slab2, key), None); + cvar.notify_one(); + } + }); + + let (lock, cvar) = &*pair; + for i in 0..2 { + test_println!("--- allocator i={} ---", i); + let key = store_val(&slab, i); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + // Wait for the item to be removed. + while next.is_some() { + next = cvar.wait(next).unwrap(); + } + + assert_eq!(get_val(&slab, key), None); + } + + remover.join().unwrap(); + }) +} + +// #[test] +// fn unique_iter() { +// run_model("unique_iter", || { +// let mut slab = Arc::new(Slab::new()); + +// let s = slab.clone(); +// let t1 = thread::spawn(move || { +// store_val(&s, 1); +// store_val(&s, 2); +// }); + +// let s = slab.clone(); +// let t2 = thread::spawn(move || { +// store_val(&s, 3); +// store_val(&s, 4); +// }); + +// t1.join().expect("thread 1 should not panic"); +// t2.join().expect("thread 2 should not panic"); + +// let slab = Arc::get_mut(&mut slab).expect("other arcs should be dropped"); +// let items: Vec<_> = slab +// .unique_iter() +// .map(|i| i.readiness.load(Ordering::Acquire)) +// .collect(); +// assert!(items.contains(&1), "items: {:?}", items); +// assert!(items.contains(&2), "items: {:?}", items); +// assert!(items.contains(&3), "items: {:?}", items); +// assert!(items.contains(&4), "items: {:?}", items); +// }); +// } diff --git a/tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs b/tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs new file mode 100644 index 00000000000..cbc974caf21 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs @@ -0,0 +1,181 @@ +use super::super::super::SingleShard; +use super::test_util; +use loom::sync::{Arc, Condvar, Mutex}; +use loom::thread; + +fn store_val(slab: &Arc, readiness: usize) -> usize { + println!("store: {}", readiness); + let key = slab.alloc().expect("allocate slot"); + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key +} + +fn get_val(slab: &Arc, key: usize) -> Option { + slab.get(key).and_then(|s| { + let rdy = s.get_readiness(key); + test_println!("--> got readiness {:?} with key {:#x}", rdy, key); + rdy + }) +} + +#[test] +fn local_remove() { + test_util::run_model("single_shard::local_remove", || { + let slab = Arc::new(SingleShard::new()); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + let idx = store_val(&s, 1); + assert_eq!(get_val(&s, idx), Some(1)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 2); + assert_eq!(get_val(&s, idx), Some(2)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + let idx = store_val(&s, 3); + assert_eq!(get_val(&s, idx), Some(3)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 4); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab; + let idx1 = store_val(&s, 5); + assert_eq!(get_val(&s, idx1), Some(5)); + let idx2 = store_val(&s, 6); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx1); + assert_eq!(get_val(&s, idx1), None); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + }); +} + +#[test] +fn remove_remote() { + test_util::run_model("single_shard::remove_remote", || { + let slab = Arc::new(SingleShard::new()); + + let idx1 = store_val(&slab, 1); + assert_eq!(get_val(&slab, idx1), Some(1)); + + let idx2 = store_val(&slab, 2); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let idx3 = store_val(&slab, 3); + assert_eq!(get_val(&slab, idx3), Some(3)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + assert_eq!(get_val(&s, idx2), Some(2)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + assert_eq!(get_val(&s, idx3), Some(3)); + s.remove(idx3); + assert_eq!(get_val(&s, idx3), None); + }); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), None); + assert_eq!(get_val(&slab, idx3), None); + }); +} + +#[test] +fn concurrent_alloc_remove() { + test_util::run_model("single_shard::concurrent_alloc_remove", || { + let slab = Arc::new(SingleShard::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let remover = thread::spawn(move || { + let (lock, cvar) = &*pair2; + for i in 0..2 { + test_println!("--- remover i={} ---", i); + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.take().unwrap(); + slab2.remove(key); + assert_eq!(get_val(&slab2, key), None); + cvar.notify_one(); + } + }); + + let (lock, cvar) = &*pair; + for i in 0..2 { + test_println!("--- allocator i={} ---", i); + let key = store_val(&slab, i); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + // Wait for the item to be removed. + while next.is_some() { + next = cvar.wait(next).unwrap(); + } + + assert_eq!(get_val(&slab, key), None); + } + + remover.join().unwrap(); + }) +} + +// #[test] +// fn unique_iter() { +// test_util::run_model("single_shard::unique_iter", || { +// let mut slab = Arc::new(SingleShard::new()); + +// let s = slab.clone(); +// let t1 = thread::spawn(move || { +// store_val(&s, 1); +// store_val(&s, 2); +// }); + +// let s = slab.clone(); +// let t2 = thread::spawn(move || { +// store_val(&s, 3); +// store_val(&s, 4); +// }); + +// t1.join().expect("thread 1 should not panic"); +// t2.join().expect("thread 2 should not panic"); + +// let slab = Arc::get_mut(&mut slab).expect("other arcs should be dropped"); +// let items: Vec<_> = slab +// .unique_iter() +// .map(|i| i.readiness.load(Ordering::Acquire)) +// .collect(); +// assert!(items.contains(&1), "items: {:?}", items); +// assert!(items.contains(&2), "items: {:?}", items); +// assert!(items.contains(&3), "items: {:?}", items); +// assert!(items.contains(&4), "items: {:?}", items); +// }); +// } diff --git a/tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs b/tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs new file mode 100644 index 00000000000..108f3b54b91 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs @@ -0,0 +1,473 @@ +use super::test_util; +use loom::sync::{Arc, Condvar, Mutex}; +use loom::thread; + +use pack::{Pack, WIDTH}; +use sharded_slab::Shard; +use sharded_slab::Slab; +use tid::Tid; + +// Overridden for tests +const INITIAL_PAGE_SIZE: usize = 2; +const MAX_PAGES: usize = 1; + +// Constants not overridden +#[cfg(target_pointer_width = "64")] +const MAX_THREADS: usize = 4096; +#[cfg(target_pointer_width = "32")] +const MAX_THREADS: usize = 2048; +const RESERVED_BITS: usize = 5; + +#[path = "../../page/mod.rs"] +#[allow(dead_code)] +mod page; + +#[path = "../../pack.rs"] +#[allow(dead_code)] +mod pack; + +#[path = "../../iter.rs"] +#[allow(dead_code)] +mod iter; + +#[path = "../../sharded_slab.rs"] +#[allow(dead_code)] +mod sharded_slab; + +#[path = "../../tid.rs"] +#[allow(dead_code)] +mod tid; + +fn store_val(slab: &Arc, readiness: usize) -> usize { + println!("store: {}", readiness); + let key = slab.alloc().expect("allocate slot"); + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key +} + +fn get_val(slab: &Arc, key: usize) -> Option { + slab.get(key).and_then(|s| { + let rdy = s.get_readiness(key); + test_println!("--> got readiness {:?} with key {:#x}", rdy, key); + rdy + }) +} + +fn store_when_free(slab: &Arc, readiness: usize) -> usize { + test_println!("store: {}", readiness); + let key = loop { + if let Some(key) = slab.alloc() { + break key; + } + test_println!("-> full; retry"); + thread::yield_now(); + }; + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key +} + +#[test] +fn remove_remote_and_reuse() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + test_util::run_builder("remove_remote_and_reuse", model, || { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + let value = get_val(&s, idx1); + + // We may or may not see the new value yet, depending on when + // this occurs, but we must either see the new value or `None`; + // the old value has been removed! + assert!(value == None || value == Some(3)); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + + assert_eq!(get_val(&slab, idx3), Some(3)); + assert_eq!(get_val(&slab, idx2), Some(2)); + }); +} + +#[test] +fn concurrent_remove_remote_and_reuse() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + // set a preemption bound, or else this will run for a *really* long time. + model.preemption_bound = Some(2); // chosen arbitrarily. + test_util::run_builder("concurrent_remove_remote_and_reuse", model, || { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let s2 = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + }); + + let t2 = thread::spawn(move || { + s2.remove(idx2); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 1 should not panic"); + + assert!(get_val(&slab, idx1).is_none()); + assert!(get_val(&slab, idx2).is_none()); + assert_eq!(get_val(&slab, idx3), Some(3)); + }); +} + +mod single_shard { + use super::sharded_slab::SingleShard; + use super::*; + + fn store_val(slab: &Arc, readiness: usize) -> usize { + println!("store: {}", readiness); + let key = slab.alloc().expect("allocate slot"); + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key + } + + fn get_val(slab: &Arc, key: usize) -> Option { + slab.get(key).and_then(|s| { + let rdy = s.get_readiness(key); + test_println!("--> got readiness {:?} with key {:#x}", rdy, key); + rdy + }) + } + + fn store_when_free(slab: &Arc, readiness: usize) -> usize { + test_println!("store: {}", readiness); + let key = loop { + if let Some(key) = slab.alloc() { + break key; + } + test_println!("-> full; retry"); + thread::yield_now(); + }; + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:#x}", key); + } + key + } + + #[test] + fn remove_remote_and_reuse() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + test_util::run_builder("single_shard::remove_remote_and_reuse", model, || { + let slab = Arc::new(SingleShard::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + let value = get_val(&s, idx1); + + // We may or may not see the new value yet, depending on when + // this occurs, but we must either see the new value or `None`; + // the old value has been removed! + assert!(value == None || value == Some(3)); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + + assert_eq!(get_val(&slab, idx3), Some(3)); + assert_eq!(get_val(&slab, idx2), Some(2)); + }); + } + + #[test] + fn concurrent_remove_remote_and_reuse() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + // set a preemption bound, or else this will run for a *really* long time. + model.preemption_bound = Some(2); // chosen arbitrarily. + test_util::run_builder("single_shard::remove_remote_and_reuse", model, || { + let slab = Arc::new(SingleShard::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let s2 = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + }); + + let t2 = thread::spawn(move || { + s2.remove(idx2); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 1 should not panic"); + + assert!(get_val(&slab, idx1).is_none()); + assert!(get_val(&slab, idx2).is_none()); + assert_eq!(get_val(&slab, idx3), Some(3)); + }); + } + + #[test] + fn alloc_remove_get() { + test_util::run_model("single_shard::alloc_remove_get", || { + let slab = Arc::new(SingleShard::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + // remove the second entry + slab.remove(key); + // store a new readiness at the same location (since the slab + // already has an entry in slot 0) + store_val(&slab, 2); + }); + + let (lock, cvar) = &*pair; + // wait for the second entry to be stored... + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // our generation will be stale when the second store occurs at that + // index, we must not see the value of that store. + let val = get_val(&slab, key); + assert_ne!(val, Some(2), "generation must have advanced!"); + + t1.join().unwrap(); + }) + } + + #[test] + fn alloc_remove_set() { + test_util::run_model("single_shard::alloc_remove_set", || { + let slab = Arc::new(SingleShard::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + slab.remove(key); + // remove the old entry and insert a new one, with a new generation. + let key2 = slab.alloc().expect("store key 2"); + // after the remove, we must not see the value written with the + // stale index. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + key2 + }); + + let (lock, cvar) = &*pair; + + // wait for the second entry to be stored. the index we get from the + // other thread may become stale after a write. + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // try to write to the index with our generation + slab.get(key).map(|val| val.set_readiness(key, |_| 2)); + + let key2 = t1.join().unwrap(); + // after the remove, we must not see the value written with the + // stale index either. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + }) + } +} + +#[test] +fn alloc_remove_get() { + test_util::run_model("alloc_remove_get", || { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + // remove the second entry + slab.remove(key); + // store a new readiness at the same location (since the slab + // already has an entry in slot 0) + store_val(&slab, 2); + }); + + let (lock, cvar) = &*pair; + // wait for the second entry to be stored... + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // our generation will be stale when the second store occurs at that + // index, we must not see the value of that store. + let val = get_val(&slab, key); + assert_ne!(val, Some(2), "generation must have advanced!"); + + t1.join().unwrap(); + }) +} + +#[test] +fn alloc_remove_set() { + test_util::run_model("alloc_remove_set", || { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + slab.remove(key); + // remove the old entry and insert a new one, with a new generation. + let key2 = slab.alloc().expect("store key 2"); + // after the remove, we must not see the value written with the + // stale index. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + key2 + }); + + let (lock, cvar) = &*pair; + + // wait for the second entry to be stored. the index we get from the + // other thread may become stale after a write. + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // try to write to the index with our generation + slab.get(key).map(|val| val.set_readiness(key, |_| 2)); + + let key2 = t1.join().unwrap(); + // after the remove, we must not see the value written with the + // stale index either. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + }) +} + +// #[test] +// fn custom_page_sz() { +// let mut model = loom::model::Builder::new(); +// model.max_branches = 100000; +// model.check(|| { +// let slab = Arc::new(Slab::new()); + +// for i in 0..1024 { +// test_println!("{}", i); +// let k = store_val(&slab, i); +// assert_eq!(get_val(&slab, k), Some(i)); +// } +// }); +// } diff --git a/tokio/src/net/driver/reactor/dispatch/tests/mod.rs b/tokio/src/net/driver/reactor/dispatch/tests/mod.rs new file mode 100644 index 00000000000..65ead8ef252 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/tests/mod.rs @@ -0,0 +1,30 @@ +mod idx { + use super::super::{page, Pack, Tid}; + use proptest::prelude::*; + + proptest! { + #[test] + fn tid_roundtrips(tid in 0usize..Tid::BITS) { + let tid = Tid::from_usize(tid); + let packed = tid.pack(0); + assert_eq!(tid, Tid::from_packed(packed)); + } + + #[test] + fn idx_roundtrips( + tid in 0usize..Tid::BITS, + addr in 0usize..page::Addr::BITS, + ) { + let tid = Tid::from_usize(tid); + let addr = page::Addr::from_usize(addr); + let packed = tid.pack(addr.pack(0)); + assert_eq!(addr, page::Addr::from_packed(packed)); + assert_eq!(tid, Tid::from_packed(packed)); + } + } +} + +#[cfg(loom)] +mod loom; +#[cfg(loom)] +pub(super) use self::loom::test_util; diff --git a/tokio/src/net/driver/reactor/dispatch/tid.rs b/tokio/src/net/driver/reactor/dispatch/tid.rs new file mode 100644 index 00000000000..6d0e1c7a874 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/tid.rs @@ -0,0 +1,168 @@ +use super::{page, Pack}; +use std::{ + cell::{Cell, UnsafeCell}, + collections::VecDeque, + fmt, + marker::PhantomData, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }, +}; + +use lazy_static::lazy_static; + +/// Uniquely identifies a thread. +#[derive(PartialEq, Eq, Copy, Clone)] +pub(crate) struct Tid { + id: usize, + _not_send: PhantomData>, +} + +/// Registers that a thread is currently using a thread ID. +/// +/// This is stored in a thread local on each thread that has been assigned an +/// ID. When the thread terminates, the thread local is dropped, indicating that +/// that thread's ID number may be reused. This is to avoid exhausting the +/// available bits for thread IDs in scenarios where threads are spawned and +/// terminated very frequently. +#[derive(Debug)] +struct Registration(Cell>); + +/// Tracks any thread IDs that can be reused, and a monotonic counter for +/// generating new thread IDs. +struct Registry { + /// The next thread ID number; used when there are no free IDs. + next: AtomicUsize, + /// A queue of thread IDs whose threads have terminated. These will be + /// reused if possible. + free: Mutex>, +} + +lazy_static! { + static ref REGISTRY: Registry = Registry { + next: AtomicUsize::new(0), + free: Mutex::new(VecDeque::new()), + }; +} +loom_thread_local! { + static REGISTRATION: Registration = Registration::new(); +} + +// === impl Tid === + +impl Pack for Tid { + const LEN: usize = super::MAX_THREADS.trailing_zeros() as usize + 1; + + type Prev = page::Addr; + + #[inline(always)] + fn as_usize(&self) -> usize { + self.id + } + + #[inline(always)] + fn from_usize(id: usize) -> Self { + debug_assert!(id <= Self::BITS); + Self { + id, + _not_send: PhantomData, + } + } +} + +impl Tid { + #[inline] + pub(crate) fn current() -> Self { + REGISTRATION + .try_with(Registration::current) + .unwrap_or_else(|_| Self::poisoned()) + } + + pub(crate) fn is_current(self) -> bool { + REGISTRATION + .try_with(|r| self == r.current()) + .unwrap_or(false) + } + + #[inline(always)] + pub(crate) fn new(id: usize) -> Self { + Self { + id, + _not_send: PhantomData, + } + } + + #[cold] + fn poisoned() -> Self { + Self { + id: std::usize::MAX, + _not_send: PhantomData, + } + } + + /// Returns true if the local thread ID was accessed while unwinding. + pub(crate) fn is_poisoned(self) -> bool { + self.id == std::usize::MAX + } +} + +impl fmt::Debug for Tid { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_poisoned() { + f.debug_tuple("Tid") + .field(&format_args!("")) + .finish() + } else { + f.debug_tuple("Tid") + .field(&format_args!("{:#x}", self.id)) + .finish() + } + } +} + +// === impl Registration === + +impl Registration { + fn new() -> Self { + Self(Cell::new(None)) + } + + #[inline(always)] + fn current(&self) -> Tid { + if let Some(tid) = self.0.get().map(Tid::new) { + tid + } else { + self.register() + } + } + + #[cold] + fn register(&self) -> Tid { + let id = REGISTRY + .free + .lock() + .ok() + .and_then(|mut free| { + if free.len() > 1 { + free.pop_front() + } else { + None + } + }) + .unwrap_or_else(|| REGISTRY.next.fetch_add(1, Ordering::AcqRel)); + debug_assert!(id <= Tid::BITS, "thread ID overflow!"); + self.0.set(Some(id)); + Tid::new(id) + } +} + +impl Drop for Registration { + fn drop(&mut self) { + if let Some(id) = self.0.get() { + if let Ok(mut free) = REGISTRY.free.lock() { + free.push_back(id); + } + } + } +} diff --git a/tokio/src/net/driver/reactor.rs b/tokio/src/net/driver/reactor/mod.rs similarity index 68% rename from tokio/src/net/driver/reactor.rs rename to tokio/src/net/driver/reactor/mod.rs index 384abe470b0..44443d3524e 100644 --- a/tokio/src/net/driver/reactor.rs +++ b/tokio/src/net/driver/reactor/mod.rs @@ -1,21 +1,21 @@ use super::platform; +use crate::loom::atomic::{AtomicUsize, Ordering::SeqCst}; -use tokio_executor::park::{Park, Unpark}; -use tokio_sync::AtomicWaker; +mod dispatch; +use dispatch::SingleShard; +pub(crate) use dispatch::MAX_SOURCES; use mio::event::Evented; -use slab::Slab; use std::cell::RefCell; use std::io; use std::marker::PhantomData; #[cfg(all(unix, not(target_os = "fuchsia")))] use std::os::unix::io::{AsRawFd, RawFd}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, RwLock, Weak}; +use std::sync::{Arc, Weak}; use std::task::Waker; use std::time::Duration; use std::{fmt, usize}; +use tokio_executor::park::{Park, Unpark}; /// The core reactor, or event loop. /// @@ -56,23 +56,18 @@ pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, - /// ABA guard counter - next_aba_guard: AtomicUsize, - /// Dispatch slabs for I/O and futures events - pub(super) io_dispatch: RwLock>, + // TODO(eliza): once worker threads are available, replace this with a + // properly sharded slab. + pub(super) io_dispatch: SingleShard, + + /// The number of sources in `io_dispatch`. + n_sources: AtomicUsize, /// Used to wake up the reactor from a call to `turn` wakeup: mio::SetReadiness, } -pub(super) struct ScheduledIo { - aba_guard: usize, - pub(super) readiness: AtomicUsize, - pub(super) reader: AtomicWaker, - pub(super) writer: AtomicWaker, -} - #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub(super) enum Direction { Read, @@ -84,10 +79,6 @@ thread_local! { static CURRENT_REACTOR: RefCell> = RefCell::new(None) } -const TOKEN_SHIFT: usize = 22; - -// Kind of arbitrary, but this reserves some token space for later usage. -const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); fn _assert_kinds() { @@ -151,8 +142,8 @@ impl Reactor { _wakeup_registration: wakeup_pair.0, inner: Arc::new(Inner { io, - next_aba_guard: AtomicUsize::new(0), - io_dispatch: RwLock::new(Slab::with_capacity(1)), + io_dispatch: SingleShard::new(), + n_sources: AtomicUsize::new(0), wakeup: wakeup_pair.1, }), }) @@ -206,7 +197,7 @@ impl Reactor { /// Idle is defined as all tasks that have been spawned have completed, /// either successfully or with an error. pub fn is_idle(&self) -> bool { - self.inner.io_dispatch.read().unwrap().is_empty() + self.inner.n_sources.load(SeqCst) == 0 } fn poll(&mut self, max_wait: Option) -> io::Result<()> { @@ -236,35 +227,28 @@ impl Reactor { } fn dispatch(&self, token: mio::Token, ready: mio::Ready) { - let aba_guard = token.0 & !MAX_SOURCES; - let token = token.0 & MAX_SOURCES; - let mut rd = None; let mut wr = None; - // Create a scope to ensure that notifying the tasks stays out of the - // lock's critical section. - { - let io_dispatch = self.inner.io_dispatch.read().unwrap(); - - let io = match io_dispatch.get(token) { - Some(io) => io, - None => return, - }; - - if aba_guard != io.aba_guard { - return; - } + let io = match self.inner.io_dispatch.get(token.0) { + Some(io) => io, + None => return, + }; - io.readiness.fetch_or(ready.as_usize(), Relaxed); + if io + .set_readiness(token.0, |curr| curr | ready.as_usize()) + .is_err() + { + // token no longer valid! + return; + } - if ready.is_writable() || platform::is_hup(ready) { - wr = io.writer.take_waker(); - } + if ready.is_writable() || platform::is_hup(ready) { + wr = io.writer.take_waker(); + } - if !(ready & (!mio::Ready::writable())).is_empty() { - rd = io.reader.take_waker(); - } + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_waker(); } if let Some(w) = rd { @@ -363,31 +347,13 @@ impl Inner { /// /// The registration token is returned. pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result { - // Get an ABA guard value - let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); - - let key = { - // Block to contain the write lock - let mut io_dispatch = self.io_dispatch.write().unwrap(); - - if io_dispatch.len() == MAX_SOURCES { - return Err(io::Error::new( - io::ErrorKind::Other, - "reactor at max \ - registered I/O resources", - )); - } - - io_dispatch.insert(ScheduledIo { - aba_guard, - readiness: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), - }) - }; - - let token = aba_guard | key; - + let token = self.io_dispatch.alloc().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + })?; + self.n_sources.fetch_add(1, SeqCst); self.io.register( source, mio::Token(token), @@ -395,7 +361,7 @@ impl Inner { mio::PollOpt::edge(), )?; - Ok(key) + Ok(token) } /// Deregisters an I/O resource from the reactor. @@ -404,13 +370,19 @@ impl Inner { } pub(super) fn drop_source(&self, token: usize) { - self.io_dispatch.write().unwrap().remove(token); + self.io_dispatch.remove(token); + self.n_sources.fetch_sub(1, SeqCst); } /// Registers interest in the I/O resource associated with `token`. pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) { - let io_dispatch = self.io_dispatch.read().unwrap(); - let sched = io_dispatch.get(token).unwrap(); + let sched = self + .io_dispatch + .get(token) + .unwrap_or_else(|| panic!("IO resource for token {} does not exist!", token)); + let readiness = sched + .get_readiness(token) + .unwrap_or_else(|| panic!("token {} no longer valid!", token)); let (waker, ready) = match dir { Direction::Read => (&sched.reader, !mio::Ready::writable()), @@ -418,8 +390,7 @@ impl Inner { }; waker.register(w); - - if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + if readiness & ready.as_usize() != 0 { waker.wake(); } } @@ -430,8 +401,7 @@ impl Drop for Inner { // When a reactor is dropped it needs to wake up all blocked tasks as // they'll never receive a notification, and all connected I/O objects // will start returning errors pretty quickly. - let io = self.io_dispatch.read().unwrap(); - for (_, io) in io.iter() { + for io in self.io_dispatch.unique_iter() { io.writer.wake(); io.reader.wake(); } @@ -449,3 +419,111 @@ impl Direction { } } } + +#[cfg(all(test, loom))] +mod tests { + use super::*; + use loom::thread; + + // No-op `Evented` impl just so we can have something to pass to `add_source`. + struct NotEvented; + + impl Evented for NotEvented { + fn register( + &self, + _: &mio::Poll, + _: mio::Token, + _: mio::Ready, + _: mio::PollOpt, + ) -> io::Result<()> { + Ok(()) + } + + fn reregister( + &self, + _: &mio::Poll, + _: mio::Token, + _: mio::Ready, + _: mio::PollOpt, + ) -> io::Result<()> { + Ok(()) + } + + fn deregister(&self, _: &mio::Poll) -> io::Result<()> { + Ok(()) + } + } + + #[test] + fn tokens_unique_when_dropped() { + loom::model(|| { + println!("\n--- iteration ---\n"); + let reactor = Reactor::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + + let token_1 = inner.add_source(&NotEvented).unwrap(); + println!("token 1: {:#x}", token_1); + let thread = thread::spawn(move || { + inner2.drop_source(token_1); + println!("dropped: {:#x}", token_1); + }); + + let token_2 = inner.add_source(&NotEvented).unwrap(); + println!("token 2: {:#x}", token_2); + thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } + + #[test] + fn tokens_unique_when_dropped_on_full_page() { + loom::model(|| { + println!("\n--- iteration ---\n"); + let reactor = Reactor::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + // add sources to fill up the first page so that the dropped index + // may be reused. + for _ in 0..31 { + inner.add_source(&NotEvented).unwrap(); + } + + let token_1 = inner.add_source(&NotEvented).unwrap(); + println!("token 1: {:#x}", token_1); + let thread = thread::spawn(move || { + inner2.drop_source(token_1); + println!("dropped: {:#x}", token_1); + }); + + let token_2 = inner.add_source(&NotEvented).unwrap(); + println!("token 2: {:#x}", token_2); + thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } + + #[test] + fn tokens_unique_concurrent_add() { + loom::model(|| { + println!("\n--- iteration ---\n"); + let reactor = Reactor::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + + let thread = thread::spawn(move || { + let token_2 = inner2.add_source(&NotEvented).unwrap(); + println!("token 2: {:#x}", token_2); + token_2 + }); + + let token_1 = inner.add_source(&NotEvented).unwrap(); + println!("token 1: {:#x}", token_1); + let token_2 = thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } +} diff --git a/tokio/src/net/driver/registration.rs b/tokio/src/net/driver/registration.rs index 91cceef6e97..8e946d5a94f 100644 --- a/tokio/src/net/driver/registration.rs +++ b/tokio/src/net/driver/registration.rs @@ -2,7 +2,6 @@ use super::platform; use super::reactor::{Direction, Handle}; use mio::{self, Evented}; -use std::sync::atomic::Ordering::SeqCst; use std::task::{Context, Poll}; use std::{io, usize}; @@ -219,8 +218,7 @@ impl Registration { let mask = direction.mask(); let mask_no_hup = (mask - platform::hup()).as_usize(); - let io_dispatch = inner.io_dispatch.read().unwrap(); - let sched = &io_dispatch[self.token]; + let sched = inner.io_dispatch.get(self.token).unwrap(); // This consumes the current readiness state **except** for HUP. HUP is // excluded because a) it is a final state and never transitions out of @@ -230,8 +228,10 @@ impl Registration { // If HUP were to be cleared when `direction` is `Read`, then when // `poll_ready` is called again with a _`direction` of `Write`, the HUP // state would not be visible. - let mut ready = - mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + let curr_ready = sched + .set_readiness(self.token, |curr| curr & (!mask_no_hup)) + .unwrap_or_else(|_| panic!("token {} no longer valid!", self.token)); + let mut ready = mask & mio::Ready::from_usize(curr_ready); if ready.is_empty() { if let Some(cx) = cx { @@ -242,8 +242,10 @@ impl Registration { } // Try again - ready = - mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + let curr_ready = sched + .set_readiness(self.token, |curr| curr & (!mask_no_hup)) + .unwrap_or_else(|_| panic!("token {} no longer valid!", self.token)); + ready = mask & mio::Ready::from_usize(curr_ready); } } diff --git a/tokio/src/net/driver/sharded_rwlock.rs b/tokio/src/net/driver/sharded_rwlock.rs deleted file mode 100644 index 678924812b5..00000000000 --- a/tokio/src/net/driver/sharded_rwlock.rs +++ /dev/null @@ -1,217 +0,0 @@ -//! A scalable reader-writer lock. -//! -//! This implementation makes read operations faster and more scalable due to less contention, -//! while making write operations slower. It also incurs much higher memory overhead than -//! traditional reader-writer locks. - -use crossbeam_utils::CachePadded; -use lazy_static::lazy_static; -use num_cpus; -use parking_lot; -use std::cell::UnsafeCell; -use std::collections::HashMap; -use std::marker::PhantomData; -use std::mem; -use std::ops::{Deref, DerefMut}; -use std::sync::Mutex; -use std::thread::{self, ThreadId}; - -/// A scalable read-writer lock. -/// -/// This type of lock allows a number of readers or at most one writer at any point in time. The -/// write portion of this lock typically allows modification of the underlying data (exclusive -/// access) and the read portion of this lock typically allows for read-only access (shared -/// access). -/// -/// This reader-writer lock differs from typical implementations in that it internally creates a -/// list of reader-writer locks called 'shards'. Shards are aligned and padded to the cache line -/// size. -/// -/// Read operations lock only one shard specific to the current thread, while write operations lock -/// every shard in succession. This strategy makes concurrent read operations faster due to less -/// contention, but write operations are slower due to increased amount of locking. -pub(crate) struct RwLock { - /// A list of locks protecting the internal data. - shards: Vec>>, - - /// The internal data. - value: UnsafeCell, -} - -unsafe impl Send for RwLock {} -unsafe impl Sync for RwLock {} - -impl RwLock { - /// Creates a new `RwLock` initialized with `value`. - pub(crate) fn new(value: T) -> RwLock { - // The number of shards is a power of two so that the modulo operation in `read` becomes a - // simple bitwise "and". - let num_shards = num_cpus::get().next_power_of_two(); - - RwLock { - shards: (0..num_shards) - .map(|_| CachePadded::new(parking_lot::RwLock::new(()))) - .collect(), - value: UnsafeCell::new(value), - } - } - - /// Locks this `RwLock` with shared read access, blocking the current thread until it can be - /// acquired. - /// - /// The calling thread will be blocked until there are no more writers which hold the lock. - /// There may be other readers currently inside the lock when this method returns. This method - /// does not provide any guarantees with respect to the ordering of whether contentious readers - /// or writers will acquire the lock first. - /// - /// Returns an RAII guard which will release this thread's shared access once it is dropped. - pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> { - // Take the current thread index and map it to a shard index. Thread indices will tend to - // distribute shards among threads equally, thus reducing contention due to read-locking. - let shard_index = thread_index() & (self.shards.len() - 1); - - RwLockReadGuard { - parent: self, - _guard: self.shards[shard_index].read(), - _marker: PhantomData, - } - } - - /// Locks this rwlock with exclusive write access, blocking the current thread until it can be - /// acquired. - /// - /// This function will not return while other writers or other readers currently have access to - /// the lock. - /// - /// Returns an RAII guard which will drop the write access of this rwlock when dropped. - pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> { - // Write-lock each shard in succession. - for shard in &self.shards { - // The write guard is forgotten, but the lock will be manually unlocked in `drop`. - mem::forget(shard.write()); - } - - RwLockWriteGuard { - parent: self, - _marker: PhantomData, - } - } -} - -/// A guard used to release the shared read access of a `RwLock` when dropped. -pub(crate) struct RwLockReadGuard<'a, T> { - parent: &'a RwLock, - _guard: parking_lot::RwLockReadGuard<'a, ()>, - _marker: PhantomData>, -} - -unsafe impl<'a, T: Sync> Sync for RwLockReadGuard<'a, T> {} - -impl<'a, T> Deref for RwLockReadGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -/// A guard used to release the exclusive write access of a `RwLock` when dropped. -pub(crate) struct RwLockWriteGuard<'a, T> { - parent: &'a RwLock, - _marker: PhantomData>, -} - -unsafe impl<'a, T: Sync> Sync for RwLockWriteGuard<'a, T> {} - -impl<'a, T> Drop for RwLockWriteGuard<'a, T> { - fn drop(&mut self) { - // Unlock the shards in reverse order of locking. - for shard in self.parent.shards.iter().rev() { - unsafe { - shard.force_unlock_write(); - } - } - } -} - -impl<'a, T> Deref for RwLockWriteGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} - -/// Returns a `usize` that identifies the current thread. -/// -/// Each thread is associated with an 'index'. Indices usually tend to be consecutive numbers -/// between 0 and the number of running threads, but there are no guarantees. During TLS teardown -/// the associated index might change. -#[inline] -pub(crate) fn thread_index() -> usize { - REGISTRATION.try_with(|reg| reg.index).unwrap_or(0) -} - -/// The global registry keeping track of registered threads and indices. -struct ThreadIndices { - /// Mapping from `ThreadId` to thread index. - mapping: HashMap, - - /// A list of free indices. - free_list: Vec, - - /// The next index to allocate if the free list is empty. - next_index: usize, -} - -lazy_static! { - static ref THREAD_INDICES: Mutex = Mutex::new(ThreadIndices { - mapping: HashMap::new(), - free_list: Vec::new(), - next_index: 0, - }); -} - -/// A registration of a thread with an index. -/// -/// When dropped, unregisters the thread and frees the reserved index. -struct Registration { - index: usize, - thread_id: ThreadId, -} - -impl Drop for Registration { - fn drop(&mut self) { - let mut indices = THREAD_INDICES.lock().unwrap(); - indices.mapping.remove(&self.thread_id); - indices.free_list.push(self.index); - } -} - -thread_local! { - static REGISTRATION: Registration = { - let thread_id = thread::current().id(); - let mut indices = THREAD_INDICES.lock().unwrap(); - - let index = match indices.free_list.pop() { - Some(i) => i, - None => { - let i = indices.next_index; - indices.next_index += 1; - i - } - }; - indices.mapping.insert(thread_id, index); - - Registration { - index, - thread_id, - } - }; -} diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 74ddacbc309..1a08c428985 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -938,7 +938,7 @@ mod sys { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use std::future::Future; use std::io; diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index c02c903fa5d..7a52575d5aa 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -87,7 +87,7 @@ impl OrphanQueue for AtomicOrphanQueue { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use super::Wait; use super::{AtomicOrphanQueue, OrphanQueue}; diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index 3a264974de6..631025d4dec 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -131,7 +131,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use super::*; use futures_core::stream::Stream; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 1b81396620e..7d2fd7a910c 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -175,7 +175,7 @@ where GLOBALS.as_ref() } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod tests { use super::*; use crate::runtime::current_thread::Runtime; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index e9f63af80ec..14d48a709a6 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -410,7 +410,7 @@ impl Stream for Signal { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod tests { use super::*; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index 79a4aaaba7e..abde334b1b1 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -172,7 +172,7 @@ impl Stream for CtrlBreak { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod tests { use super::*; use crate::runtime::current_thread::Runtime; diff --git a/tokio/src/timer/wheel/level.rs b/tokio/src/timer/wheel/level.rs index a6089664713..256ed4a73ae 100644 --- a/tokio/src/timer/wheel/level.rs +++ b/tokio/src/timer/wheel/level.rs @@ -234,7 +234,7 @@ fn slot_for(duration: u64, level: usize) -> usize { } /* -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/timer/wheel/mod.rs b/tokio/src/timer/wheel/mod.rs index e46345c62e0..6cac2a8edff 100644 --- a/tokio/src/timer/wheel/mod.rs +++ b/tokio/src/timer/wheel/mod.rs @@ -257,7 +257,7 @@ impl Poll { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use super::*;