diff --git a/Cargo.toml b/Cargo.toml index 6f0d5e522c..4d9aece317 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ -# "futures", + "futures", # "futures-async-runtime", "futures-core", "futures-channel", -# "futures-executor", + "futures-executor", "futures-io", # "futures-macro-async", # "futures-macro-await", diff --git a/futures-channel/Cargo.toml b/futures-channel/Cargo.toml index 046eceda43..c48686f42d 100644 --- a/futures-channel/Cargo.toml +++ b/futures-channel/Cargo.toml @@ -18,5 +18,5 @@ default = ["std"] futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false } [dev-dependencies] -# futures = { path = "../futures", version = "0.2.0", default-features = true } +futures = { path = "../futures", version = "0.3.0-alpha", default-features = true } # futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = true } diff --git a/futures-channel/src/oneshot.rs b/futures-channel/src/oneshot.rs index 0bfc541e70..8d1b86e5bd 100644 --- a/futures-channel/src/oneshot.rs +++ b/futures-channel/src/oneshot.rs @@ -97,7 +97,7 @@ struct Inner { /// # let t = /// thread::spawn(|| { /// let future = c.map(|i| { -/// println!("got: {}", i); +/// println!("got: {:?}", i); /// }); /// // ... /// # return future; diff --git a/futures-channel/tests/channel.rs b/futures-channel/tests_disabled/channel.rs similarity index 100% rename from futures-channel/tests/channel.rs rename to futures-channel/tests_disabled/channel.rs diff --git a/futures-channel/tests/mpsc-close.rs b/futures-channel/tests_disabled/mpsc-close.rs similarity index 100% rename from futures-channel/tests/mpsc-close.rs rename to futures-channel/tests_disabled/mpsc-close.rs diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests_disabled/mpsc.rs similarity index 100% rename from futures-channel/tests/mpsc.rs rename to futures-channel/tests_disabled/mpsc.rs diff --git a/futures-channel/tests/oneshot.rs b/futures-channel/tests_disabled/oneshot.rs similarity index 100% rename from futures-channel/tests/oneshot.rs rename to futures-channel/tests_disabled/oneshot.rs diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index 546f685772..16b371cbf5 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -63,15 +63,25 @@ macro_rules! unsafe_unpinned { ) } -mod poll; -pub use poll::Poll; +#[macro_export] +macro_rules! pin_mut { + ($($x:ident),*) => { $( + // Move the value to ensure that it is owned + let mut $x = $x; + // Shadow the original binding so that it can't be directly accessed + // ever again. + #[allow(unused_mut)] + let mut $x = unsafe { ::std::mem::PinMut::new_unchecked(&mut $x) }; + )* } +} pub mod future; -pub use future::{Future, TryFuture}; +pub use future::{Future, CoreFutureExt, TryFuture}; pub mod stream; pub use stream::Stream; pub mod task; +pub use task::Poll; pub mod executor; diff --git a/futures-core/src/task/atomic_waker.rs b/futures-core/src/task/atomic_waker.rs index fbb40aff62..d41e94cd5a 100755 --- a/futures-core/src/task/atomic_waker.rs +++ b/futures-core/src/task/atomic_waker.rs @@ -170,7 +170,7 @@ impl AtomicWaker { /// Here is how `register` is used when implementing a flag. /// /// ``` - /// # #![feature(pin, arbitrary_self_types)] + /// # #![feature(pin, arbitrary_self_types, futures_api)] /// # use futures_core::{Future, Poll}; /// # use futures_core::task::{self, AtomicWaker}; /// # use std::sync::atomic::AtomicBool; diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index 557c174d7c..af4f5191b6 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -2,12 +2,16 @@ use Future; -pub use core::task::{UnsafeWake, Waker}; +pub use core::task::{UnsafeWake, Waker, LocalWaker}; + #[cfg(feature = "std")] -pub use std::task::Wake; +pub use std::task::{Wake, local_waker, local_waker_from_nonlocal}; pub use core::task::Context; +mod poll; +pub use self::poll::Poll; + #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] mod atomic_waker; #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] diff --git a/futures-core/src/poll.rs b/futures-core/src/task/poll.rs similarity index 100% rename from futures-core/src/poll.rs rename to futures-core/src/task/poll.rs diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index 36ef8c3dce..4b4b33dc09 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-executor" -version = "0.2.0" +version = "0.3.0-alpha" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -15,12 +15,12 @@ std = ["num_cpus", "futures-core/std", "futures-util/std", "futures-channel/std" default = ["std"] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false} -futures-util = { path = "../futures-util", version = "0.2.0", default-features = false} -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false} +futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false} +futures-util = { path = "../futures-util", version = "0.3.0-alpha", default-features = false} +futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false} num_cpus = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0" } -futures-channel = { path = "../futures-channel", version = "0.2.0" } +futures = { path = "../futures", version = "0.3.0-alpha" } +futures-channel = { path = "../futures-channel", version = "0.3.0-alpha" } diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index b76aa237c8..8d47a51121 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -1,5 +1,7 @@ //! Built-in executors and related tools. +#![feature(pin, arbitrary_self_types, futures_api)] + #![no_std] #![deny(missing_docs)] #![doc(html_root_url = "https://docs.rs/futures-executor/0.2.0")] @@ -8,6 +10,9 @@ #[macro_use] extern crate std; +#[macro_use] +extern crate futures_core; + macro_rules! if_std { ($($i:item)*) => ($( #[cfg(feature = "std")] @@ -19,13 +24,10 @@ if_std! { #[macro_use] extern crate lazy_static; - extern crate futures_core; extern crate futures_util; extern crate futures_channel; extern crate num_cpus; - mod thread; - mod local_pool; pub use local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalExecutor}; diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index e5cf80f447..62848c909d 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -1,24 +1,20 @@ use std::prelude::v1::*; use std::cell::{RefCell}; +use std::marker::Unpin; use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::thread::{self, Thread}; -use futures_core::{Future, Poll, Async, Stream}; -use futures_core::task::{Context, Waker, LocalMap}; -use futures_core::executor::{Executor, SpawnError}; -use futures_core::never::Never; +use futures_core::{Future, Poll, Stream}; +use futures_core::task::{self, Context, LocalWaker, TaskObj, Wake}; +use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind}; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; -use thread::ThreadNotify; use enter; use ThreadPool; -struct Task { - fut: Box>, - map: LocalMap, -} - /// A single-threaded task pool. /// /// This executor allows you to multiplex any number of tasks onto a single @@ -31,7 +27,7 @@ struct Task { /// single-threaded, it supports a special form of task spawning for non-`Send` /// futures, via [`spawn_local`](LocalExecutor::spawn_local). pub struct LocalPool { - pool: FuturesUnordered, + pool: FuturesUnordered, incoming: Rc, } @@ -42,22 +38,39 @@ pub struct LocalExecutor { incoming: Weak, } -type Incoming = RefCell>; +type Incoming = RefCell>; + +pub(crate) struct ThreadNotify { + thread: Thread +} + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { + thread: thread::current(), + }); +} + +impl Wake for ThreadNotify { + fn wake(arc_self: &Arc) { + arc_self.thread.unpark(); + } +} // Set up and run a basic single-threaded executor loop, invocing `f` on each // turn. -fn run_executor Async>(mut f: F) -> T { +fn run_executor Poll>(mut f: F) -> T { let _enter = enter() .expect("cannot execute `LocalPool` executor from within \ another executor"); - ThreadNotify::with_current(|thread| { - let waker = &Waker::from(thread.clone()); + CURRENT_THREAD_NOTIFY.with(|thread_notify| { + let local_waker = + task::local_waker_from_nonlocal(thread_notify.clone()); loop { - if let Async::Ready(t) = f(waker) { + if let Poll::Ready(t) = f(&local_waker) { return t; } - thread.park(); + thread::park(); } }) } @@ -101,8 +114,8 @@ impl LocalPool { /// /// The function will block the calling thread until *all* tasks in the pool /// are complete, including any spawned while running existing tasks. - pub fn run(&mut self, exec: &mut Executor) { - run_executor(|waker| self.poll_pool(waker, exec)) + pub fn run(&mut self, exec: &mut Exec) where Exec: Executor + Sized { + run_executor(|local_waker| self.poll_pool(local_waker, exec)) } /// Runs all the tasks in the pool until the given future completes. @@ -112,14 +125,15 @@ impl LocalPool { /// the `LocalPool` by using its executor handle: /// /// ``` + /// # #![feature(pin, arbitrary_self_types, futures_api)] /// # extern crate futures; /// # use futures::executor::LocalPool; - /// # use futures::future::{Future, ok}; + /// # use futures::future::ready; /// /// # fn main() { /// let mut pool = LocalPool::new(); /// let mut exec = pool.executor(); - /// # let my_app: Box> = Box::new(ok(())); + /// # let my_app = ready(()); /// /// // run tasks in the pool until `my_app` completes, by default spawning /// // further tasks back onto the pool @@ -132,35 +146,36 @@ impl LocalPool { /// be inert after the call completes, but can continue with further use of /// `run` or `run_until`. While the function is running, however, all tasks /// in the pool will try to make progress. - pub fn run_until(&mut self, mut f: F, exec: &mut Executor) -> Result - where F: Future + pub fn run_until(&mut self, future: F, exec: &mut Exec) + -> F::Output + where F: Future, Exec: Executor + Sized { - // persistent state for the "main task" - let mut main_map = LocalMap::new(); + pin_mut!(future); - run_executor(|waker| { + run_executor(|local_waker| { { - let mut main_cx = Context::new(&mut main_map, waker, exec); + let mut main_cx = Context::new(local_waker, exec); // if our main task is done, so are we - match f.poll(&mut main_cx) { - Ok(Async::Ready(v)) => return Async::Ready(Ok(v)), - Err(err) => return Async::Ready(Err(err)), + match future.reborrow().poll(&mut main_cx) { + Poll::Ready(output) => return Poll::Ready(output), _ => {} } } - self.poll_pool(waker, exec); - Async::Pending + self.poll_pool(local_waker, exec); + Poll::Pending }) } // Make maximal progress on the entire pool of spawned task, returning `Ready` // if the pool is empty and `Pending` if no further progress can be made. - fn poll_pool(&mut self, waker: &Waker, exec: &mut Executor) -> Async<()> { + fn poll_pool(&mut self, local_waker: &LocalWaker, exec: &mut Exec) + -> Poll<()> + where Exec: Executor + Sized + { // state for the FuturesUnordered, which will never be used - let mut pool_map = LocalMap::new(); - let mut pool_cx = Context::new(&mut pool_map, waker, exec); + let mut pool_cx = Context::new(local_waker, exec); loop { // empty the incoming queue of newly-spawned tasks @@ -171,18 +186,17 @@ impl LocalPool { } } - if let Ok(ret) = self.pool.poll_next(&mut pool_cx) { - // we queued up some new tasks; add them and poll again - if !self.incoming.borrow().is_empty() { - continue; - } + let ret = self.pool.poll_next_unpin(&mut pool_cx); + // we queued up some new tasks; add them and poll again + if !self.incoming.borrow().is_empty() { + continue; + } - // no queued tasks; we may be done - match ret { - Async::Pending => return Async::Pending, - Async::Ready(None) => return Async::Ready(()), - _ => {} - } + // no queued tasks; we may be done + match ret { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + _ => {} } } } @@ -202,14 +216,14 @@ lazy_static! { /// /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over /// spawned tasks. -pub fn block_on(f: F) -> Result { +pub fn block_on(f: F) -> F::Output { let mut pool = LocalPool::new(); pool.run_until(f, &mut GLOBAL_POOL.clone()) } /// Turn a stream into a blocking iterator. /// -/// Whne `next` is called on the resulting `BlockingStream`, the caller +/// When `next` is called on the resulting `BlockingStream`, the caller /// will be blocked until the next element of the `Stream` becomes available. /// The default executor for the future is a global `ThreadPool`. pub fn block_on_stream(s: S) -> BlockingStream { @@ -226,16 +240,12 @@ impl BlockingStream { } } -impl Iterator for BlockingStream { - type Item = Result; +impl Iterator for BlockingStream where S: Unpin { + type Item = S::Item; fn next(&mut self) -> Option { let s = self.stream.take().expect("BlockingStream shouldn't be empty"); let (item, s) = - match LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone()) { - Ok((Some(item), s)) => (Some(Ok(item)), s), - Ok((None, s)) => (None, s), - Err((e, s)) => (Some(Err(e)), s), - }; + LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone()); self.stream = Some(s); item @@ -243,31 +253,28 @@ impl Iterator for BlockingStream { } impl Executor for LocalExecutor { - fn spawn(&mut self, f: Box + Send>) -> Result<(), SpawnError> { - self.spawn_task(Task { - fut: f, - map: LocalMap::new(), - }) + fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(task); + Ok(()) + } else { + Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() }) + } } - fn status(&self) -> Result<(), SpawnError> { + fn status(&self) -> Result<(), SpawnErrorKind> { if self.incoming.upgrade().is_some() { Ok(()) } else { - Err(SpawnError::shutdown()) + Err(SpawnErrorKind::shutdown()) } } } impl LocalExecutor { - fn spawn_task(&self, task: Task) -> Result<(), SpawnError> { - let incoming = self.incoming.upgrade().ok_or(SpawnError::shutdown())?; - incoming.borrow_mut().push(task); - Ok(()) - } - + /* /// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool). - pub fn spawn_local(&mut self, f: F) -> Result<(), SpawnError> + pub fn spawn_local(&mut self, f: F) -> Result<(), SpawnObjError> where F: Future + 'static { self.spawn_task(Task { @@ -275,13 +282,5 @@ impl LocalExecutor { map: LocalMap::new(), }) } -} - -impl Future for Task { - type Item = (); - type Error = Never; - - fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> { - self.fut.poll(&mut cx.with_locals(&mut self.map)) - } + */ } diff --git a/futures-executor/src/spawn.rs b/futures-executor/src/spawn.rs index 2d7a184026..d6f820af7c 100644 --- a/futures-executor/src/spawn.rs +++ b/futures-executor/src/spawn.rs @@ -1,6 +1,5 @@ -use futures_core::{Future, Async, Poll}; -use futures_core::never::Never; -use futures_core::task::{self, Context}; +use futures_core::{Future, Poll, CoreFutureExt}; +use futures_core::task::{Context, ContextExt}; use futures_channel::oneshot::{channel, Sender, Receiver}; use futures_util::FutureExt; @@ -9,12 +8,21 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use std::panic::{self, AssertUnwindSafe}; use std::sync::atomic::AtomicBool; +use std::mem::PinMut; +use std::boxed::Box; +use std::marker::Unpin; /// A future representing the completion of task spawning. /// /// See [`spawn`](spawn()) for details. #[derive(Debug)] -pub struct Spawn(Option); +pub struct Spawn { + future: Option +} + +impl Spawn { + unsafe_unpinned!(future -> Option); +} /// Spawn a task onto the default executor. /// @@ -22,18 +30,18 @@ pub struct Spawn(Option); /// onto the default executor. It does *not* provide any way to wait on task /// completion or extract a value from the task. That can either be done through /// a channel, or by using [`spawn_with_handle`](::spawn_with_handle). -pub fn spawn(f: F) -> Spawn - where F: Future + 'static + Send +pub fn spawn(future: F) -> Spawn + where F: Future + 'static + Send { - Spawn(Some(f)) + Spawn { future: Some(future) } } -impl + Send + 'static> Future for Spawn { - type Item = (); - type Error = Never; - fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> { - cx.spawn(self.0.take().unwrap()); - Ok(Async::Ready(())) +impl + Send + 'static> Future for Spawn { + type Output = (); + + fn poll(mut self: PinMut, cx: &mut Context) -> Poll<()> { + cx.spawn(self.future().take().unwrap()); + Poll::Ready(()) } } @@ -42,7 +50,13 @@ impl + Send + 'static> Future for Spawn { /// /// See [`spawn_with_handle`](::spawn_with_handle) for details. #[derive(Debug)] -pub struct SpawnWithHandle(Option); +pub struct SpawnWithHandle { + future: Option +} + +impl SpawnWithHandle { + unsafe_unpinned!(future -> Option); +} /// Spawn a task onto the default executor, yielding a /// [`JoinHandle`](::JoinHandle) to the spawned task. @@ -55,82 +69,62 @@ pub struct SpawnWithHandle(Option); /// # Examples /// /// ``` +/// # #![feature(pin, arbitrary_self_types, futures_api)] /// # extern crate futures; /// # /// use futures::prelude::*; /// use futures::future; /// use futures::executor::{block_on, spawn_with_handle}; /// -/// # fn main() { -/// # fn inner() -> Result<(), Never> { -/// # Ok({ -/// let future = future::ok::(1); -/// let join_handle = block_on(spawn_with_handle(future))?; -/// let result = block_on(join_handle); -/// assert_eq!(result, Ok(1)); -/// # }) -/// # } -/// # inner().unwrap(); -/// # } -/// ``` -/// -/// ``` -/// # extern crate futures; -/// # -/// use futures::prelude::*; -/// use futures::future; -/// use futures::executor::{block_on, spawn_with_handle}; -/// -/// # fn main() { -/// # fn inner() -> Result<(), Never> { -/// # Ok({ -/// let future = future::err::("boom"); -/// let join_handle = block_on(spawn_with_handle(future))?; -/// let result = block_on(join_handle); -/// assert_eq!(result, Err("boom")); -/// # }) -/// # } -/// # inner().unwrap(); -/// # } +/// let future = future::ready::(1); +/// let join_handle = block_on(spawn_with_handle(future)); +/// let output = block_on(join_handle); +/// assert_eq!(output, 1); /// ``` pub fn spawn_with_handle(f: F) -> SpawnWithHandle - where F: Future + 'static + Send, F::Item: Send, F::Error: Send + where F: Future + 'static + Send, F::Output: Send { - SpawnWithHandle(Some(f)) + SpawnWithHandle { future: Some(f) } } impl Future for SpawnWithHandle where F: Future + Send + 'static, - F::Item: Send, - F::Error: Send, + F::Output: Send, { - type Item = JoinHandle; - type Error = Never; - fn poll(&mut self, cx: &mut Context) -> Poll { + type Output = JoinHandle; + + fn poll(mut self: PinMut, cx: &mut Context) -> Poll { let (tx, rx) = channel(); let keep_running_flag = Arc::new(AtomicBool::new(false)); // AssertUnwindSafe is used here because `Send + 'static` is basically // an alias for an implementation of the `UnwindSafe` trait but we can't // express that in the standard library right now. let sender = MySender { - fut: AssertUnwindSafe(self.0.take().unwrap()).catch_unwind(), + future: AssertUnwindSafe(self.future().take().unwrap()).catch_unwind(), tx: Some(tx), keep_running_flag: keep_running_flag.clone(), }; cx.spawn(sender); - Ok(Async::Ready(JoinHandle { + Poll::Ready(JoinHandle { inner: rx , keep_running_flag: keep_running_flag.clone() - })) + }) } } struct MySender { - fut: F, + future: F, tx: Option>, keep_running_flag: Arc, } +impl Unpin for MySender {} + +impl MySender { + unsafe_pinned!(future -> F); + unsafe_unpinned!(tx -> Option>); + unsafe_unpinned!(keep_running_flag -> Arc); +} /// The type of future returned from the `ThreadPool::spawn` function, which /// proxies the futures running on the thread pool. @@ -139,12 +133,12 @@ struct MySender { /// will propagate panics. #[must_use] #[derive(Debug)] -pub struct JoinHandle { - inner: Receiver>>, +pub struct JoinHandle { + inner: Receiver>, keep_running_flag: Arc, } -impl JoinHandle { +impl JoinHandle { /// Drop this handle *without* canceling the underlying future. /// /// When `JoinHandle` is dropped, `ThreadPool` will try to abort the associated @@ -155,41 +149,38 @@ impl JoinHandle { } } -impl Future for JoinHandle { - type Item = T; - type Error = E; +impl Future for JoinHandle { + type Output = T; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - match self.inner.poll(cx).expect("cannot poll JoinHandle twice") { - Async::Ready(Ok(Ok(e))) => Ok(e.into()), - Async::Ready(Ok(Err(e))) => Err(e), - Async::Ready(Err(e)) => panic::resume_unwind(e), - Async::Pending => Ok(Async::Pending), + fn poll(mut self: PinMut, cx: &mut Context) -> Poll { // ToDo: This was weird! Double check! + match self.inner.poll_unpin(cx) { + Poll::Ready(Ok(Ok(output))) => Poll::Ready(output), + Poll::Ready(Ok(Err(e))) => panic::resume_unwind(e), + Poll::Ready(Err(e)) => panic::resume_unwind(Box::new(e)), + Poll::Pending => Poll::Pending, } } } -impl Future for MySender> { - type Item = (); - type Error = Never; +impl Future for MySender { + type Output = (); - fn poll(&mut self, cx: &mut task::Context) -> Poll<(), Never> { - if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel(cx) { - if !self.keep_running_flag.load(Ordering::SeqCst) { + fn poll(mut self: PinMut, cx: &mut Context) -> Poll<()> { + if let Poll::Ready(_) = self.tx().as_mut().unwrap().poll_cancel(cx) { + if !self.keep_running_flag().load(Ordering::SeqCst) { // Cancelled, bail out - return Ok(().into()) + return Poll::Ready(()) } } - let res = match self.fut.poll(cx) { - Ok(Async::Ready(e)) => Ok(e), - Ok(Async::Pending) => return Ok(Async::Pending), - Err(e) => Err(e), + let output = match self.future().poll(cx) { + Poll::Ready(output) => output, + Poll::Pending => return Poll::Pending, }; // if the receiving end has gone away then that's ok, we just ignore the // send error here. - drop(self.tx.take().unwrap().send(res)); - Ok(Async::Ready(())) + drop(self.tx().take().unwrap().send(output)); + Poll::Ready(()) } } diff --git a/futures-executor/src/thread.rs b/futures-executor/src/thread.rs deleted file mode 100644 index 1c0e301297..0000000000 --- a/futures-executor/src/thread.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::sync::Arc; -use std::thread::{self, Thread}; - -use futures_core::task::Wake; - -pub(crate) struct ThreadNotify { - thread: Thread, -} - -thread_local! { - static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { - thread: thread::current(), - }); -} - -impl ThreadNotify { - pub(crate) fn with_current(f: F) -> R - where F: FnOnce(&Arc) -> R, - { - CURRENT_THREAD_NOTIFY.with(f) - } - - pub(crate) fn park(&self) { - thread::park(); - } -} - -impl Wake for ThreadNotify { - fn wake(arc_self: &Arc) { - arc_self.thread.unpark(); - } -} diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 98e987983c..5d95d7ae4b 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -8,9 +8,8 @@ use std::thread; use std::fmt; use futures_core::*; -use futures_core::task::{self, Wake, Waker, LocalMap}; -use futures_core::executor::{Executor, SpawnError}; -use futures_core::never::Never; +use futures_core::task::{self, Wake, TaskObj}; +use futures_core::executor::{Executor, SpawnObjError}; use enter; use num_cpus; @@ -64,7 +63,7 @@ impl fmt::Debug for ThreadPoolBuilder { } enum Message { - Run(Task), + Run(TaskContainer), Close, } @@ -96,23 +95,22 @@ impl ThreadPool { /// /// Note that the function will return when the provided future completes, /// even if some of the tasks it spawned are still running. - pub fn run(&mut self, f: F) -> Result { + pub fn run(&mut self, f: F) -> F::Output { ::LocalPool::new().run_until(f, self) } } impl Executor for ThreadPool { - fn spawn(&mut self, f: Box + Send>) -> Result<(), SpawnError> { - let task = Task { - spawn: f, - map: LocalMap::new(), + fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> { + let task_container = TaskContainer { + task, wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new(), }), exec: self.clone(), }; - self.state.send(Message::Run(task)); + self.state.send(Message::Run(task_container)); Ok(()) } } @@ -131,7 +129,7 @@ impl PoolState { loop { let msg = self.rx.lock().unwrap().recv().unwrap(); match msg { - Message::Run(r) => r.run(), + Message::Run(task_container) => task_container.run(), Message::Close => break, } } @@ -265,52 +263,47 @@ impl ThreadPoolBuilder { /// Units of work submitted to an `Executor`, currently only created /// internally. -struct Task { - spawn: Box + Send>, - map: LocalMap, +struct TaskContainer { + task: TaskObj, exec: ThreadPool, wake_handle: Arc, } struct WakeHandle { - mutex: UnparkMutex, + mutex: UnparkMutex, exec: ThreadPool, } -impl Task { - /// Actually run the task (invoking `poll` on its future) on the current - /// thread. +impl TaskContainer { + /// Actually run the task (invoking `poll`) on the current thread. pub fn run(self) { - let Task { mut spawn, wake_handle, mut map, mut exec } = self; - let waker = Waker::from(wake_handle.clone()); + let TaskContainer { mut task, wake_handle, mut exec } = self; + let local_waker = task::local_waker_from_nonlocal(wake_handle.clone()); - // SAFETY: the ownership of this `Task` object is evidence that + // SAFETY: the ownership of this `TaskContainer` object is evidence that // we are in the `POLLING`/`REPOLL` state for the mutex. unsafe { wake_handle.mutex.start_poll(); loop { let res = { - let mut cx = task::Context::new(&mut map, &waker, &mut exec); - spawn.poll(&mut cx) + let mut cx = task::Context::new(&local_waker, &mut exec); + task.poll_unpin(&mut cx) }; match res { - Ok(Async::Pending) => {} - Ok(Async::Ready(())) => return wake_handle.mutex.complete(), - Err(never) => match never {}, + Poll::Pending => {} + Poll::Ready(()) => return wake_handle.mutex.complete(), } - let task = Task { - spawn, - map, + let task_container = TaskContainer { + task, wake_handle: wake_handle.clone(), exec: exec }; - match wake_handle.mutex.wait(task) { - Ok(()) => return, // we've waited - Err(r) => { // someone's notified us - spawn = r.spawn; - map = r.map; - exec = r.exec; + match wake_handle.mutex.wait(task_container) { + Ok(()) => return, // we've waited + Err(task_container) => { // someone's notified us + task = task_container.task; + exec = task_container.exec; } } } @@ -318,7 +311,7 @@ impl Task { } } -impl fmt::Debug for Task { +impl fmt::Debug for TaskContainer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Task") .field("contents", &"...") diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 8634cd0054..f286753d79 100755 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,13 +1,16 @@ #![allow(unused_imports)] +#![feature(pin, arbitrary_self_types, futures_api)] + extern crate futures; extern crate futures_executor; extern crate futures_channel; use std::cell::{Cell, RefCell}; -use std::rc::Rc; +use std::sync::Arc; use std::thread; use std::time::Duration; +use std::mem::PinMut; use futures::future::lazy; use futures::prelude::*; @@ -16,23 +19,20 @@ use futures::task; use futures_executor::*; use futures_channel::oneshot; -struct Pending(Rc<()>); +struct Pending(Arc<()>); impl Future for Pending { - type Item = (); - type Error = Never; + type Output = (); - fn poll(&mut self, _: &mut task::Context) -> Poll<(), Never> { - Ok(Async::Pending) + fn poll(self: PinMut, _cx: &mut task::Context) -> Poll<()> { + Poll::Pending } } fn pending() -> Pending { - Pending(Rc::new(())) + Pending(Arc::new(())) } -const DONE: Result<(), Never> = Ok(()); - #[test] fn run_until_single_future() { let mut cnt = 0; @@ -42,9 +42,9 @@ fn run_until_single_future() { let mut exec = pool.executor(); let fut = lazy(|_| { cnt += 1; - DONE + () }); - pool.run_until(fut, &mut exec).unwrap(); + assert_eq!(pool.run_until(fut, &mut exec), ()); } assert_eq!(cnt, 1); @@ -54,8 +54,8 @@ fn run_until_single_future() { fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn_local(Box::new(pending())).unwrap(); - pool.run_until(lazy(|_| DONE), &mut exec).unwrap(); + exec.spawn_obj(Box::new(pending()).into()).unwrap(); // This test used the currently not implemented spawn_local method before + assert_eq!(pool.run_until(lazy(|_| ()), &mut exec), ()); } #[test] @@ -63,13 +63,14 @@ fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn_local(Box::new(lazy(move |_| { + exec.spawn_obj(Box::new(lazy(move |_| { // This test used the currently not implemented spawn_local method before tx.send(()).unwrap(); - DONE - }))).unwrap(); + () + })).into()).unwrap(); pool.run_until(rx, &mut exec).unwrap(); } +/* // This test does not work because it relies on spawn_local which is not implemented #[test] fn run_executes_spawned() { let cnt = Rc::new(Cell::new(0)); @@ -82,16 +83,17 @@ fn run_executes_spawned() { exec.spawn_local(Box::new(lazy(move |_| { exec2.spawn_local(Box::new(lazy(move |_| { cnt2.set(cnt2.get() + 1); - DONE - }))).unwrap(); - DONE - }))).unwrap(); + () + })).into()).unwrap(); + () + })).into()).unwrap(); pool.run(&mut exec); assert_eq!(cnt.get(), 1); } + #[test] fn run_spawn_many() { const ITER: usize = 200; @@ -105,7 +107,7 @@ fn run_spawn_many() { let cnt = cnt.clone(); exec.spawn_local(Box::new(lazy(move |_| { cnt.set(cnt.get() + 1); - DONE + () }))).unwrap(); } @@ -139,10 +141,9 @@ fn tasks_are_scheduled_fairly() { } impl Future for Spin { - type Item = (); - type Error = Never; + type Output = (); - fn poll(&mut self, cx: &mut task::Context) -> Poll<(), Never> { + fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll<()> { let mut state = self.state.borrow_mut(); if self.idx == 0 { @@ -151,18 +152,18 @@ fn tasks_are_scheduled_fairly() { assert!(diff.abs() <= 1); if state[0] >= 50 { - return Ok(().into()); + return Poll::Ready(()); } } state[self.idx] += 1; if state[self.idx] >= 100 { - return Ok(().into()); + return Poll::Ready(()); } cx.waker().wake(); - Ok(Async::Pending) + Poll::Pending } } @@ -181,3 +182,4 @@ fn tasks_are_scheduled_fairly() { pool.run(&mut exec); } +*/ diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index e118b5814f..104195b9d6 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -23,6 +23,6 @@ futures-sink = { path = "../futures-sink", version = "0.3.0-alpha", default-feat either = { version = "1.4", default-features = false } [dev-dependencies] -# futures = { path = "../futures", version = "0.2.0" } -# futures-executor = { path = "../futures-executor", version = "0.2.0" } +futures = { path = "../futures", version = "0.3.0-alpha" } +futures-executor = { path = "../futures-executor", version = "0.3.0-alpha" } # futures-channel = { path = "../futures-channel", version = "0.2.0" } diff --git a/futures-util/src/future/join.rs b/futures-util/src/future/join.rs index e32470c26a..f236ba7971 100644 --- a/futures-util/src/future/join.rs +++ b/futures-util/src/future/join.rs @@ -159,8 +159,8 @@ impl MaybeDone { MaybeDone::Gone => panic!("cannot poll Join twice"), }; match res { - Async::Ready(res) => { - *self = MaybeDone::Done(res); + Async::Ready(output) => { + *self = MaybeDone::Done(output); Ok(true) } Async::Pending => Ok(false), diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 0524fe4cf7..f3e154c50d 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -85,7 +85,7 @@ pub trait FutureExt: Future { /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// /// # fn main() { /// let future = future::ready(1); @@ -119,8 +119,8 @@ pub trait FutureExt: Future { /// # extern crate futures; /// use futures::prelude::*; /// use futures::future; + /// use futures::executor::block_on; /// - /// # fn main() { /// let future_of_1 = future::ready(1); /// let future_of_4 = future_of_1.then(|x| future::ready(x + 3)); /// assert_eq!(block_on(future_of_4), 4); @@ -270,6 +270,8 @@ pub trait FutureExt: Future { } */ + /* ToDo: futures-core cannot implement Future for Either anymore because of + the orphan rule. Remove? Implement our own `Either`? /// Wrap this future in an `Either` future, making it the left-hand variant /// of that `Either`. /// @@ -329,7 +331,7 @@ pub trait FutureExt: Future { Self: Sized, { Either::Right(self) - } + }*/ /// Convert this future into a single element stream. /// @@ -346,7 +348,7 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ready::(17); + /// let future = future::ready(17); /// let stream = future.into_stream(); /// let collected: Vec<_> = block_on(stream.collect()); /// assert_eq!(collected, vec![17]); @@ -435,7 +437,7 @@ pub trait FutureExt: Future { /// /// # fn main() { /// let stream_items = vec![17, 18, 19]; - /// let future_of_a_stream = future::ready(stream::iter_ok(stream_items)); + /// let future_of_a_stream = future::ready(stream::iter(stream_items)); /// /// let stream = future_of_a_stream.flatten_stream(); /// let list: Vec<_> = block_on(stream.collect()); @@ -522,7 +524,7 @@ pub trait FutureExt: Future { /// # extern crate futures; /// # extern crate futures_executor; /// use futures::prelude::*; - /// use futures::future::{self, FutureResult}; + /// use futures::future::{self, ReadyFuture}; /// use futures_executor::block_on; /// /// # fn main() { @@ -614,14 +616,12 @@ pub trait FutureExt: Future { /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::{block_on, spawn, ThreadPool}; + /// use futures_executor::{block_on, spawn_with_handle, ThreadPool}; /// - /// # fn main() { /// let pool = ThreadPool::new().expect("unable to create threadpool"); - /// let future = future::ready(3); - /// let spawn_future = spawn(future).with_executor(pool); - /// assert_eq!(block_on(spawn_future), 3); - /// # } + /// let future = spawn_with_handle(future::ready(3)).with_executor(pool); + /// let output = block_on(block_on(future)); + /// assert_eq!(output, 3); /// ``` #[cfg(feature = "std")] fn with_executor(self, executor: E) -> WithExecutor diff --git a/futures-util/src/future/poll_fn.rs b/futures-util/src/future/poll_fn.rs index 5cbb0cec4a..cf9f4bc946 100644 --- a/futures-util/src/future/poll_fn.rs +++ b/futures-util/src/future/poll_fn.rs @@ -22,21 +22,19 @@ pub struct PollFn { /// # Examples /// /// ``` +/// # #![feature(futures_api)] /// # extern crate futures; /// use futures::prelude::*; /// use futures::future::poll_fn; -/// use futures::never::Never; /// use futures::task; /// -/// # fn main() { -/// fn read_line(cx: &mut task::Context) -> Poll { -/// Ok(Async::Ready("Hello, World!".into())) +/// fn read_line(cx: &mut task::Context) -> Poll { +/// Poll::Ready("Hello, World!".into()) /// } /// /// let read_future = poll_fn(read_line); -/// # } /// ``` -pub fn poll_fn(f: F) -> PollFn +pub fn poll_fn(f: F) -> PollFn where F: FnMut(&mut task::Context) -> Poll { PollFn { inner: f } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 3ee8b38801..54bd841fe3 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -75,11 +75,3 @@ pub use stream::StreamExt; pub mod sink; pub use sink::SinkExt; - -pub mod prelude { - //! Prelude containing the extension traits, which add functionality to - //! existing asynchronous types. - // pub use {FutureExt, StreamExt, SinkExt}; - // #[cfg(feature = "std")] - // pub use {AsyncReadExt, AsyncWriteExt}; -} diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index f6e4705b2b..4e3a25c70f 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -79,6 +79,7 @@ impl Sink for Fanout } } +/* ToDo Reenable test: Disabled because `StreamExt::forward` is not implemented #[cfg(test)] #[cfg(feature = "std")] mod tests { @@ -107,3 +108,4 @@ mod tests { assert_eq!(lists, Ok((&*expected, &*expected))); } } +*/ diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index e5529c46fd..2a99443f06 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -58,6 +58,7 @@ pub trait SinkExt: Sink { with::new(self, f) } + /* /// Composes a function *in front of* the sink. /// /// This adapter produces a new sink that passes each value through the @@ -95,14 +96,13 @@ pub trait SinkExt: Sink { /// assert_eq!(block_on(rx.collect()), Ok(vec![42, 42, 42, 42, 42])); /// # } /// ``` - /* fn with_flat_map(self, f: F) -> WithFlatMap where F: FnMut(U) -> St, St: Stream, Self: Sized - { - with_flat_map::new(self, f) - } + { + with_flat_map::new(self, f) + } */ /* diff --git a/futures-util/src/stream/catch_unwind.rs b/futures-util/src/stream/catch_unwind.rs index b2644df141..7137fcb5b6 100644 --- a/futures-util/src/stream/catch_unwind.rs +++ b/futures-util/src/stream/catch_unwind.rs @@ -13,16 +13,18 @@ use futures_core::task; #[must_use = "streams do nothing unless polled"] pub struct CatchUnwind where S: Stream { stream: S, + caught_unwind: bool, } pub fn new(stream: S) -> CatchUnwind where S: Stream + UnwindSafe, { - CatchUnwind { stream } + CatchUnwind { stream, caught_unwind: false } } impl CatchUnwind { unsafe_pinned!(stream -> S); + unsafe_unpinned!(caught_unwind -> bool); } impl Stream for CatchUnwind @@ -31,10 +33,17 @@ impl Stream for CatchUnwind type Item = Result>; fn poll_next(mut self: PinMut, cx: &mut task::Context) -> Poll> { - let res = catch_unwind(AssertUnwindSafe(|| self.stream().poll_next(cx))); - match res { - Ok(poll) => poll.map(|opt| opt.map(Ok)), - Err(e) => Poll::Ready(Some(Err(e))), + if *self.caught_unwind() { + return Poll::Ready(None) + } else { + let res = catch_unwind(AssertUnwindSafe(|| self.stream().poll_next(cx))); + match res { + Ok(poll) => poll.map(|opt| opt.map(Ok)), + Err(e) => { + *self.caught_unwind() = true; + Poll::Ready(Some(Err(e))) + }, + } } } } diff --git a/futures-util/src/stream/futures_unordered.rs b/futures-util/src/stream/futures_unordered.rs deleted file mode 100644 index df0941ceb5..0000000000 --- a/futures-util/src/stream/futures_unordered.rs +++ /dev/null @@ -1,656 +0,0 @@ -//! An unbounded set of futures. - -use std::cell::UnsafeCell; -use std::fmt::{self, Debug}; -use std::iter::FromIterator; -use std::marker::PhantomData; -use std::mem; -use std::ptr; -use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; -use std::sync::atomic::{AtomicPtr, AtomicBool}; -use std::sync::{Arc, Weak}; -use std::usize; - -use futures_core::{Stream, Future, Poll, Async, IntoFuture}; -use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker}; - -/// A set of `Future`s which may complete in any order. -/// -/// This structure is optimized to manage a large number of futures. -/// Futures managed by `FuturesUnordered` will only be polled when they -/// generate notifications. This reduces the required amount of work needed to -/// poll large numbers of futures. -/// -/// `FuturesUnordered` can be filled by `collect`ing an iterator of `Future`s -/// into a `FuturesUnordered`, or by `push`ing `Future`s onto an existing -/// `FuturesUnordered`. When new `Future`s are added, `poll_next` must be -/// called in order to begin receiving wakeups for new `Future`s. -/// -/// Note that you can create a ready-made `FuturesUnordered` via the -/// `futures_unordered` function in the `stream` module, or you can start with an -/// empty set with the `FuturesUnordered::new` constructor. -#[must_use = "streams do nothing unless polled"] -pub struct FuturesUnordered { - inner: Arc>, - len: usize, - head_all: *const Node, -} - -impl Send for FuturesUnordered {} -impl Sync for FuturesUnordered {} - -// FuturesUnordered is implemented using two linked lists. One which links all -// futures managed by a `FuturesUnordered` and one that tracks futures that have -// been scheduled for polling. The first linked list is not thread safe and is -// only accessed by the thread that owns the `FuturesUnordered` value. The -// second linked list is an implementation of the intrusive MPSC queue algorithm -// described by 1024cores.net. -// -// When a future is submitted to the set a node is allocated and inserted in -// both linked lists. The next call to `poll` will (eventually) see this node -// and call `poll` on the future. -// -// Before a managed future is polled, the current task's `Wake` is replaced -// with one that is aware of the specific future being run. This ensures that -// task notifications generated by that specific future are visible to -// `FuturesUnordered`. When a notification is received, the node is scheduled -// for polling by being inserted into the concurrent linked list. -// -// Each node uses an `AtomicUsize` to track it's state. The node state is the -// reference count (the number of outstanding handles to the node) as well as a -// flag tracking if the node is currently inserted in the atomic queue. When the -// future is notified, it will only insert itself into the linked list if it -// isn't currently inserted. - -struct Inner { - // The task using `FuturesUnordered`. - parent: AtomicWaker, - - // Head/tail of the readiness queue - head_readiness: AtomicPtr>, - tail_readiness: UnsafeCell<*const Node>, - stub: Arc>, -} - -struct Node { - // The future - future: UnsafeCell>, - - // Next pointer for linked list tracking all active nodes - next_all: UnsafeCell<*const Node>, - - // Previous node in linked list tracking all active nodes - prev_all: UnsafeCell<*const Node>, - - // Next pointer in readiness queue - next_readiness: AtomicPtr>, - - // Queue that we'll be enqueued to when notified - queue: Weak>, - - // Whether or not this node is currently in the mpsc queue. - queued: AtomicBool, -} - -enum Dequeue { - Data(*const Node), - Empty, - Inconsistent, -} - -impl FuturesUnordered - where T: Future, -{ - /// Constructs a new, empty `FuturesUnordered` - /// - /// The returned `FuturesUnordered` does not contain any futures and, in this - /// state, `FuturesUnordered::poll_next` will return `Ok(Async::Ready(None))`. - pub fn new() -> FuturesUnordered { - let stub = Arc::new(Node { - future: UnsafeCell::new(None), - next_all: UnsafeCell::new(ptr::null()), - prev_all: UnsafeCell::new(ptr::null()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Weak::new(), - }); - let stub_ptr = &*stub as *const Node; - let inner = Arc::new(Inner { - parent: AtomicWaker::new(), - head_readiness: AtomicPtr::new(stub_ptr as *mut _), - tail_readiness: UnsafeCell::new(stub_ptr), - stub: stub, - }); - - FuturesUnordered { - len: 0, - head_all: ptr::null_mut(), - inner: inner, - } - } -} - -impl FuturesUnordered { - /// Returns the number of futures contained in the set. - /// - /// This represents the total number of in-flight futures. - pub fn len(&self) -> usize { - self.len - } - - /// Returns `true` if the set contains no futures - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - /// Push a future into the set. - /// - /// This function submits the given future to the set for managing. This - /// function will not call `poll` on the submitted future. The caller must - /// ensure that `FuturesUnordered::poll_next` is called in order to receive task - /// notifications. - pub fn push(&mut self, future: T) { - let node = Arc::new(Node { - future: UnsafeCell::new(Some(future)), - next_all: UnsafeCell::new(ptr::null_mut()), - prev_all: UnsafeCell::new(ptr::null_mut()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Arc::downgrade(&self.inner), - }); - - // Right now our node has a strong reference count of 1. We transfer - // ownership of this reference count to our internal linked list - // and we'll reclaim ownership through the `unlink` function below. - let ptr = self.link(node); - - // We'll need to get the future "into the system" to start tracking it, - // e.g. getting its unpark notifications going to us tracking which - // futures are ready. To do that we unconditionally enqueue it for - // polling here. - self.inner.enqueue(ptr); - } - - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut { - IterMut { - node: self.head_all, - len: self.len, - _marker: PhantomData - } - } - - fn release_node(&mut self, node: Arc>) { - // The future is done, try to reset the queued flag. This will prevent - // `notify` from doing any work in the future - let prev = node.queued.swap(true, SeqCst); - - // Drop the future, even if it hasn't finished yet. This is safe - // because we're dropping the future on the thread that owns - // `FuturesUnordered`, which correctly tracks T's lifetimes and such. - unsafe { - drop((*node.future.get()).take()); - } - - // If the queued flag was previously set then it means that this node - // is still in our internal mpsc queue. We then transfer ownership - // of our reference count to the mpsc queue, and it'll come along and - // free it later, noticing that the future is `None`. - // - // If, however, the queued flag was *not* set then we're safe to - // release our reference count on the internal node. The queued flag - // was set above so all future `enqueue` operations will not actually - // enqueue the node, so our node will never see the mpsc queue again. - // The node itself will be deallocated once all reference counts have - // been dropped by the various owning tasks elsewhere. - if prev { - mem::forget(node); - } - } - - /// Insert a new node into the internal linked list. - fn link(&mut self, node: Arc>) -> *const Node { - let ptr = Arc::into_raw(node); - unsafe { - *(*ptr).next_all.get() = self.head_all; - if !self.head_all.is_null() { - *(*self.head_all).prev_all.get() = ptr; - } - } - - self.head_all = ptr; - self.len += 1; - ptr - } - - /// Remove the node from the linked list tracking all nodes currently - /// managed by `FuturesUnordered`. - unsafe fn unlink(&mut self, node: *const Node) -> Arc> { - let node = Arc::from_raw(node); - let next = *node.next_all.get(); - let prev = *node.prev_all.get(); - *node.next_all.get() = ptr::null_mut(); - *node.prev_all.get() = ptr::null_mut(); - - if !next.is_null() { - *(*next).prev_all.get() = prev; - } - - if !prev.is_null() { - *(*prev).next_all.get() = next; - } else { - self.head_all = next; - } - self.len -= 1; - node - } -} - -impl Stream for FuturesUnordered - where T: Future -{ - type Item = T::Item; - type Error = T::Error; - - fn poll_next(&mut self, cx: &mut task::Context) -> Poll, T::Error> { - // Ensure `parent` is correctly set. - self.inner.parent.register(cx.waker()); - - loop { - let node = match unsafe { self.inner.dequeue() } { - Dequeue::Empty => { - if self.is_empty() { - return Ok(Async::Ready(None)); - } else { - return Ok(Async::Pending) - } - } - Dequeue::Inconsistent => { - // At this point, it may be worth yielding the thread & - // spinning a few times... but for now, just yield using the - // task system. - cx.waker().wake(); - return Ok(Async::Pending); - } - Dequeue::Data(node) => node, - }; - - debug_assert!(node != self.inner.stub()); - - unsafe { - let mut future = match (*(*node).future.get()).take() { - Some(future) => future, - - // If the future has already gone away then we're just - // cleaning out this node. See the comment in - // `release_node` for more information, but we're basically - // just taking ownership of our reference count here. - None => { - let node = Arc::from_raw(node); - assert!((*node.next_all.get()).is_null()); - assert!((*node.prev_all.get()).is_null()); - continue - } - }; - - // Unset queued flag... this must be done before - // polling. This ensures that the future gets - // rescheduled if it is notified **during** a call - // to `poll`. - let prev = (*node).queued.swap(false, SeqCst); - assert!(prev); - - // We're going to need to be very careful if the `poll` - // function below panics. We need to (a) not leak memory and - // (b) ensure that we still don't have any use-after-frees. To - // manage this we do a few things: - // - // * This "bomb" here will call `release_node` if dropped - // abnormally. That way we'll be sure the memory management - // of the `node` is managed correctly. - // * The future was extracted above (taken ownership). That way - // if it panics we're guaranteed that the future is - // dropped on this thread and doesn't accidentally get - // dropped on a different thread (bad). - // * We unlink the node from our internal queue to preemptively - // assume it'll panic, in which case we'll want to discard it - // regardless. - struct Bomb<'a, T: 'a> { - queue: &'a mut FuturesUnordered, - node: Option>>, - } - impl<'a, T> Drop for Bomb<'a, T> { - fn drop(&mut self) { - if let Some(node) = self.node.take() { - self.queue.release_node(node); - } - } - } - let mut bomb = Bomb { - node: Some(self.unlink(node)), - queue: self, - }; - - // Poll the underlying future with the appropriate `notify` - // implementation. This is where a large bit of the unsafety - // starts to stem from internally. The `notify` instance itself - // is basically just our `Arc>` and tracks the mpsc - // queue of ready futures. - // - // Critically though `Node` won't actually access `T`, the - // future, while it's floating around inside of `Task` - // instances. These structs will basically just use `T` to size - // the internal allocation, appropriately accessing fields and - // deallocating the node if need be. - let res = { - let notify = NodeToHandle(bomb.node.as_ref().unwrap()); - let waker = Waker::from(notify); - let mut cx = cx.with_waker(&waker); - future.poll(&mut cx) - }; - - let ret = match res { - Ok(Async::Pending) => { - let node = bomb.node.take().unwrap(); - *node.future.get() = Some(future); - bomb.queue.link(node); - continue - } - Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))), - Err(e) => Err(e), - }; - return ret - } - } - } -} - -impl Debug for FuturesUnordered { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "FuturesUnordered {{ ... }}") - } -} - -impl Drop for FuturesUnordered { - fn drop(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures associated - // with it. At the same time though there may be tons of `Task` handles - // flying around which contain `Node` references inside them. We'll - // let those naturally get deallocated when the `Task` itself goes out - // of scope or gets notified. - unsafe { - while !self.head_all.is_null() { - let head = self.head_all; - let node = self.unlink(head); - self.release_node(node); - } - } - - // Note that at this point we could still have a bunch of nodes in the - // mpsc queue. None of those nodes, however, have futures associated - // with them so they're safe to destroy on any thread. At this point - // the `FuturesUnordered` struct, the owner of the one strong reference - // to `Inner` will drop the strong reference. At that point - // whichever thread releases the strong refcount last (be it this - // thread or some other thread as part of an `upgrade`) will clear out - // the mpsc queue and free all remaining nodes. - // - // While that freeing operation isn't guaranteed to happen here, it's - // guaranteed to happen "promptly" as no more "blocking work" will - // happen while there's a strong refcount held. - } -} - -impl FromIterator for FuturesUnordered { - fn from_iter(iter: T) -> Self - where - T: IntoIterator, - { - let acc = FuturesUnordered::new(); - iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) - } -} - -#[derive(Debug)] -/// Mutable iterator over all futures in the unordered set. -pub struct IterMut<'a, F: 'a> { - node: *const Node, - len: usize, - _marker: PhantomData<&'a mut FuturesUnordered> -} - -impl<'a, F> Iterator for IterMut<'a, F> { - type Item = &'a mut F; - - fn next(&mut self) -> Option<&'a mut F> { - if self.node.is_null() { - return None; - } - unsafe { - let future = (*(*self.node).future.get()).as_mut().unwrap(); - let next = *(*self.node).next_all.get(); - self.node = next; - self.len -= 1; - Some(future) - } - } - - fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) - } -} - -impl<'a, F> ExactSizeIterator for IterMut<'a, F> {} - -impl Inner { - /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. - fn enqueue(&self, node: *const Node) { - unsafe { - debug_assert!((*node).queued.load(Relaxed)); - - // This action does not require any coordination - (*node).next_readiness.store(ptr::null_mut(), Relaxed); - - // Note that these atomic orderings come from 1024cores - let node = node as *mut _; - let prev = self.head_readiness.swap(node, AcqRel); - (*prev).next_readiness.store(node, Release); - } - } - - /// The dequeue function from the 1024cores intrusive MPSC queue algorithm - /// - /// Note that this unsafe as it required mutual exclusion (only one thread - /// can call this) to be guaranteed elsewhere. - unsafe fn dequeue(&self) -> Dequeue { - let mut tail = *self.tail_readiness.get(); - let mut next = (*tail).next_readiness.load(Acquire); - - if tail == self.stub() { - if next.is_null() { - return Dequeue::Empty; - } - - *self.tail_readiness.get() = next; - tail = next; - next = (*next).next_readiness.load(Acquire); - } - - if !next.is_null() { - *self.tail_readiness.get() = next; - debug_assert!(tail != self.stub()); - return Dequeue::Data(tail); - } - - if self.head_readiness.load(Acquire) as *const _ != tail { - return Dequeue::Inconsistent; - } - - self.enqueue(self.stub()); - - next = (*tail).next_readiness.load(Acquire); - - if !next.is_null() { - *self.tail_readiness.get() = next; - return Dequeue::Data(tail); - } - - Dequeue::Inconsistent - } - - fn stub(&self) -> *const Node { - &*self.stub - } -} - -impl Drop for Inner { - fn drop(&mut self) { - // Once we're in the destructor for `Inner` we need to clear out the - // mpsc queue of nodes if there's anything left in there. - // - // Note that each node has a strong reference count associated with it - // which is owned by the mpsc queue. All nodes should have had their - // futures dropped already by the `FuturesUnordered` destructor above, - // so we're just pulling out nodes and dropping their refcounts. - unsafe { - loop { - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } - } - } -} - -#[allow(missing_debug_implementations)] -struct NodeToHandle<'a, T: 'a>(&'a Arc>); - -impl<'a, T> Clone for NodeToHandle<'a, T> { - fn clone(&self) -> Self { - NodeToHandle(self.0) - } -} - -#[doc(hidden)] -impl<'a, T> From> for Waker { - fn from(handle: NodeToHandle<'a, T>) -> Waker { - unsafe { - let ptr = handle.0.clone(); - let ptr = mem::transmute::>, *const ArcNode>(ptr); - Waker::new(hide_lt(ptr)) - } - } -} - -struct ArcNode(PhantomData); - -// We should never touch `T` on any thread other than the one owning -// `FuturesUnordered`, so this should be a safe operation. -impl Send for ArcNode {} -impl Sync for ArcNode {} - -impl UnsafeWake for ArcNode { - unsafe fn clone_raw(&self) -> Waker { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = &*(me as *const Arc>); - NodeToHandle(me).into() - } - - unsafe fn drop_raw(&self) { - let mut me: *const ArcNode = self; - let me = &mut me as *mut *const ArcNode as *mut Arc>; - ptr::drop_in_place(me); - } - - unsafe fn wake(&self) { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = me as *const Arc>; - Node::notify(&*me) - } -} - -unsafe fn hide_lt(p: *const ArcNode) -> *const UnsafeWake { - mem::transmute(p as *const UnsafeWake) -} - -impl Node { - fn notify(me: &Arc>) { - let inner = match me.queue.upgrade() { - Some(inner) => inner, - None => return, - }; - - // It's our job to notify the node that it's ready to get polled, - // meaning that we need to enqueue it into the readiness queue. To - // do this we flag that we're ready to be queued, and if successful - // we then do the literal queueing operation, ensuring that we're - // only queued once. - // - // Once the node is inserted we be sure to notify the parent task, - // as it'll want to come along and pick up our node now. - // - // Note that we don't change the reference count of the node here, - // we're just enqueueing the raw pointer. The `FuturesUnordered` - // implementation guarantees that if we set the `queued` flag true that - // there's a reference count held by the main `FuturesUnordered` queue - // still. - let prev = me.queued.swap(true, SeqCst); - if !prev { - inner.enqueue(&**me); - inner.parent.wake(); - } - } -} - -impl Drop for Node { - fn drop(&mut self) { - // Currently a `Node` is sent across all threads for any lifetime, - // regardless of `T`. This means that for memory safety we can't - // actually touch `T` at any time except when we have a reference to the - // `FuturesUnordered` itself. - // - // Consequently it *should* be the case that we always drop futures from - // the `FuturesUnordered` instance, but this is a bomb in place to catch - // any bugs in that logic. - unsafe { - if (*self.future.get()).is_some() { - abort("future still here when dropping"); - } - } - } -} - -fn abort(s: &str) -> ! { - struct DoublePanic; - - impl Drop for DoublePanic { - fn drop(&mut self) { - panic!("panicking twice to abort the program"); - } - } - - let _bomb = DoublePanic; - panic!("{}", s); -} - -/// Converts a list of futures into a `Stream` of results from the futures. -/// -/// This function will take an list of futures (e.g. a vector, an iterator, -/// etc), and return a stream. The stream will yield items as they become -/// available on the futures internally, in the order that they become -/// available. This function is similar to `buffer_unordered` in that it may -/// return items in a different order than in the list specified. -/// -/// Note that the returned set can also be used to dynamically push more -/// futures into the set as they become available. -pub fn futures_unordered(futures: I) -> FuturesUnordered<::Future> -where - I: IntoIterator, - I::Item: IntoFuture, -{ - futures.into_iter().map(|f| f.into_future()).collect() -} diff --git a/futures-util/src/stream/futures_unordered/abort.rs b/futures-util/src/stream/futures_unordered/abort.rs new file mode 100644 index 0000000000..1a42d24369 --- /dev/null +++ b/futures-util/src/stream/futures_unordered/abort.rs @@ -0,0 +1,12 @@ +pub(super) fn abort(s: &str) -> ! { + struct DoublePanic; + + impl Drop for DoublePanic { + fn drop(&mut self) { + panic!("panicking twice to abort the program"); + } + } + + let _bomb = DoublePanic; + panic!("{}", s); +} diff --git a/futures-util/src/stream/futures_unordered/iter_mut.rs b/futures-util/src/stream/futures_unordered/iter_mut.rs new file mode 100644 index 0000000000..9e57251ccd --- /dev/null +++ b/futures-util/src/stream/futures_unordered/iter_mut.rs @@ -0,0 +1,22 @@ +use std::marker::Unpin; +use std::mem::PinMut; + +use super::iter_pin_mut::IterPinMut; + +#[derive(Debug)] +/// Mutable iterator over all futures in the unordered set. +pub struct IterMut<'a, F: 'a + Unpin> (pub(super) IterPinMut<'a, F>); + +impl<'a, F: Unpin> Iterator for IterMut<'a, F> { + type Item = &'a mut F; + + fn next(&mut self) -> Option<&'a mut F> { + self.0.next().map(|f| unsafe { PinMut::get_mut(f) }) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl<'a, F: Unpin> ExactSizeIterator for IterMut<'a, F> {} diff --git a/futures-util/src/stream/futures_unordered/iter_pin_mut.rs b/futures-util/src/stream/futures_unordered/iter_pin_mut.rs new file mode 100644 index 0000000000..3b1e7bf87a --- /dev/null +++ b/futures-util/src/stream/futures_unordered/iter_pin_mut.rs @@ -0,0 +1,38 @@ +use std::marker::{PhantomData, Unpin}; +use std::mem::PinMut; + +use super::FuturesUnordered; +use super::node::Node; + +#[derive(Debug)] +/// Mutable iterator over all futures in the unordered set. +pub struct IterPinMut<'a, F: 'a> { + pub(super) node: *const Node, + pub(super) len: usize, + pub(super) _marker: PhantomData<&'a mut FuturesUnordered> +} + +impl<'a, F: Unpin> Unpin for IterPinMut<'a, F> {} + +impl<'a, F> Iterator for IterPinMut<'a, F> { + type Item = PinMut<'a, F>; + + fn next(&mut self) -> Option> { + if self.node.is_null() { + return None; + } + unsafe { + let future = (*(*self.node).future.get()).as_mut().unwrap(); + let next = *(*self.node).next_all.get(); + self.node = next; + self.len -= 1; + Some(PinMut::new_unchecked(future)) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, F> ExactSizeIterator for IterPinMut<'a, F> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs new file mode 100644 index 0000000000..9a9121e1cc --- /dev/null +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -0,0 +1,427 @@ +//! An unbounded set of futures. + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::iter::FromIterator; +use std::marker::{PhantomData, Unpin}; +use std::mem::{self, PinMut}; +use std::ptr; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::usize; + +use futures_core::{Stream, Future, Poll}; +use futures_core::task::{self, AtomicWaker}; + +mod abort; +mod ready_to_run_queue; +mod iter_mut; +mod iter_pin_mut; +mod node; + +use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; +use self::iter_mut::IterMut; +use self::iter_pin_mut::IterPinMut; +use self::node::Node; + +/// A set of `Future`s which may complete in any order. +/// +/// This structure is optimized to manage a large number of futures. +/// Futures managed by `FuturesUnordered` will only be polled when they +/// generate notifications. This reduces the required amount of work needed to +/// poll large numbers of futures. +/// +/// `FuturesUnordered` can be filled by `collect`ing an iterator of `Future`s +/// into a `FuturesUnordered`, or by `push`ing `Future`s onto an existing +/// `FuturesUnordered`. When new `Future`s are added, `poll_next` must be +/// called in order to begin receiving wakeups for new `Future`s. +/// +/// Note that you can create a ready-made `FuturesUnordered` via the +/// `futures_unordered` function in the `stream` module, or you can start with +/// an empty set with the `FuturesUnordered::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct FuturesUnordered { + ready_to_run_queue: Arc>, + len: usize, + head_all: *const Node, +} + +unsafe impl Send for FuturesUnordered {} +unsafe impl Sync for FuturesUnordered {} +impl Unpin for FuturesUnordered {} + +// FuturesUnordered is implemented using two linked lists. One which links all +// futures managed by a `FuturesUnordered` and one that tracks futures that have +// been scheduled for polling. The first linked list is not thread safe and is +// only accessed by the thread that owns the `FuturesUnordered` value. The +// second linked list is an implementation of the intrusive MPSC queue algorithm +// described by 1024cores.net. +// +// When a future is submitted to the set a node is allocated and inserted in +// both linked lists. The next call to `poll_next` will (eventually) see this +// node and call `poll` on the future. +// +// Before a managed future is polled, the current task's `Waker` is replaced +// with one that is aware of the specific future being run. This ensures that +// task notifications generated by that specific future are visible to +// `FuturesUnordered`. When a notification is received, the node is scheduled +// for polling by being inserted into the concurrent linked list. +// +// Each node is wrapped in an `Arc` and thereby atomically reference counted. +// Also, each node contains an `AtomicBool` which acts as a flag that indicates +// whether the node is currently inserted in the atomic queue. When the future +// is notified, it will only insert itself into the linked list if it isn't +// currently inserted. + +impl FuturesUnordered + where T: Future +{ + /// Constructs a new, empty `FuturesUnordered` + /// + /// The returned `FuturesUnordered` does not contain any futures. + /// In this state, `FuturesUnordered::poll_next` will return + /// `Poll::Ready(None)`. + pub fn new() -> FuturesUnordered { + let stub = Arc::new(Node { + future: UnsafeCell::new(None), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_ready_to_run: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + ready_to_run_queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node; + let ready_to_run_queue = Arc::new(ReadyToRunQueue { + parent: AtomicWaker::new(), + head: AtomicPtr::new(stub_ptr as *mut _), + tail: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + FuturesUnordered { + len: 0, + head_all: ptr::null_mut(), + ready_to_run_queue: ready_to_run_queue, + } + } +} + +impl FuturesUnordered { + /// Returns the number of futures contained in the set. + /// + /// This represents the total number of in-flight futures. + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the set contains no futures + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Push a future into the set. + /// + /// This function submits the given future to the set for managing. This + /// function will not call `poll` on the submitted future. The caller must + /// ensure that `FuturesUnordered::poll_next` is called in order to receive + /// task notifications. + pub fn push(&mut self, future: T) { + let node = Arc::new(Node { + future: UnsafeCell::new(Some(future)), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_ready_to_run: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.link(node); + + // We'll need to get the future "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // futures are ready. To do that we unconditionally enqueue it for + // polling here. + self.ready_to_run_queue.enqueue(ptr); + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> IterMut where T: Unpin { + IterMut(IterPinMut { + node: self.head_all, + len: self.len, + _marker: PhantomData + }) + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_pin_mut<'a>(self: PinMut<'a, Self>) -> IterPinMut<'a, T> { + IterPinMut { + node: self.head_all, + len: self.len, + _marker: PhantomData + } + } + + fn release_node(&mut self, node: Arc>) { + // The future is done, try to reset the queued flag. This will prevent + // `notify` from doing any work in the future + let prev = node.queued.swap(true, SeqCst); + + // Drop the future, even if it hasn't finished yet. This is safe + // because we're dropping the future on the thread that owns + // `FuturesUnordered`, which correctly tracks T's lifetimes and such. + unsafe { + drop((*node.future.get()).take()); + } + + // If the queued flag was previously set then it means that this node + // is still in our internal mpsc queue. We then transfer ownership + // of our reference count to the mpsc queue, and it'll come along and + // free it later, noticing that the future is `None`. + // + // If, however, the queued flag was *not* set then we're safe to + // release our reference count on the internal node. The queued flag + // was set above so all future `enqueue` operations will not actually + // enqueue the node, so our node will never see the mpsc queue again. + // The node itself will be deallocated once all reference counts have + // been dropped by the various owning tasks elsewhere. + if prev { + mem::forget(node); + } + } + + /// Insert a new node into the internal linked list. + fn link(&mut self, node: Arc>) -> *const Node { + let ptr = Arc::into_raw(node); + unsafe { + *(*ptr).next_all.get() = self.head_all; + if !self.head_all.is_null() { + *(*self.head_all).prev_all.get() = ptr; + } + } + + self.head_all = ptr; + self.len += 1; + ptr + } + + /// Remove the node from the linked list tracking all nodes currently + /// managed by `FuturesUnordered`. + /// This function is unsafe because it has be guaranteed that `node` is a + /// valid pointer. + unsafe fn unlink(&mut self, node: *const Node) -> Arc> { + let node = Arc::from_raw(node); + let next = *node.next_all.get(); + let prev = *node.prev_all.get(); + *node.next_all.get() = ptr::null_mut(); + *node.prev_all.get() = ptr::null_mut(); + + if !next.is_null() { + *(*next).prev_all.get() = prev; + } + + if !prev.is_null() { + *(*prev).next_all.get() = next; + } else { + self.head_all = next; + } + self.len -= 1; + node + } +} + +impl Stream for FuturesUnordered + where T: Future +{ + type Item = T::Output; + + fn poll_next(mut self: PinMut, cx: &mut task::Context) + -> Poll> + { + // Ensure `parent` is correctly set. + self.ready_to_run_queue.parent.register(cx.waker()); + + loop { + // Safety: &mut self guarantees the mutual exclusion `dequeue` + // expects + let node = match unsafe { self.ready_to_run_queue.dequeue() } { + Dequeue::Empty => { + if self.is_empty() { + return Poll::Ready(None); + } else { + return Poll::Pending; + } + } + Dequeue::Inconsistent => { + // At this point, it may be worth yielding the thread & + // spinning a few times... but for now, just yield using the + // task system. + cx.local_waker().wake(); + return Poll::Pending; + } + Dequeue::Data(node) => node, + }; + + debug_assert!(node != self.ready_to_run_queue.stub()); + + // Safety: + // - Node is a valid pointer. + // - We are the only thread that accesses the `UnsafeCell` that + // contains the future + let future = match unsafe { &mut *(*node).future.get() } { + Some(future) => future, + + // If the future has already gone away then we're just + // cleaning out this node. See the comment in + // `release_node` for more information, but we're basically + // just taking ownership of our reference count here. + None => { + // Saftey: `release_node` uses `mem::forget` + let node = unsafe { Arc::from_raw(node) }; + assert!(node.next_all.get().is_null()); + assert!(node.prev_all.get().is_null()); + continue + } + }; + + // Safety: `node` is a valid pointer + let node = unsafe { self.unlink(node) }; + + // Unset queued flag... this must be done before + // polling. This ensures that the future gets + // rescheduled if it is notified **during** a call + // to `poll`. + let prev = node.queued.swap(false, SeqCst); + assert!(prev); + + let local_waker = node.local_waker(); + + // We're going to need to be very careful if the `poll` + // function below panics. We need to (a) not leak memory and + // (b) ensure that we still don't have any use-after-frees. To + // manage this we do a few things: + // + // * A "bomb" is created which if dropped abnormally will call + // `release_node`. That way we'll be sure the memory management + // of the `node` is managed correctly. In particular + // `release_node` will drop the future. This ensures that it is + // dropped on this thread and not accidentally on a different + // thread (bad). + // * We unlink the node from our internal queue to preemptively + // assume it'll panic, in which case we'll want to discard it + // regardless. + struct Bomb<'a, T: 'a> { + queue: &'a mut FuturesUnordered, + node: Option>>, + } + + impl<'a, T> Drop for Bomb<'a, T> { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + self.queue.release_node(node); + } + } + } + + let mut bomb = Bomb { + node: Some(node), + queue: &mut *self, + }; + + // Poll the underlying future with the appropriate waker + // implementation. This is where a large bit of the unsafety + // starts to stem from internally. The waker is basically just + // our `Arc>` and tracks the mpsc queue of ready futures. + // + // Critically though `Node` won't actually access `T`, the + // future, while it's floating around inside of `Task` + // instances. These structs will basically just use `T` to size + // the internal allocation, appropriately accessing fields and + // deallocating the node if need be. + + // Safety: We won't move the future ever again + let future = unsafe { PinMut::new_unchecked(future) }; + + let mut cx = cx.with_waker(&local_waker); + let res = future.poll(&mut cx); + + let ret = match res { + Poll::Pending => { + let node = bomb.node.take().unwrap(); + bomb.queue.link(node); + continue + } + Poll::Ready(result) => Poll::Ready(Some(result)), + }; + return ret + } + } +} + +impl Debug for FuturesUnordered { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "FuturesUnordered {{ ... }}") + } +} + +impl Drop for FuturesUnordered { + fn drop(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // `Task` handles flying around which contain `Node` references + // inside them. We'll let those naturally get deallocated when the + // `Task` itself goes out of scope or gets notified. + unsafe { + while !self.head_all.is_null() { + let head = self.head_all; + let node = self.unlink(head); + self.release_node(node); + } + } + + // Note that at this point we could still have a bunch of nodes in the + // mpsc queue. None of those nodes, however, have futures associated + // with them so they're safe to destroy on any thread. At this point + // the `FuturesUnordered` struct, the owner of the one strong reference + // to `MPSCQueue` will drop the strong reference. At that point + // whichever thread releases the strong refcount last (be it this + // thread or some other thread as part of an `upgrade`) will clear out + // the mpsc queue and free all remaining nodes. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. + } +} + +impl FromIterator for FuturesUnordered { + fn from_iter(iter: T) -> Self + where + T: IntoIterator, + { + let acc = FuturesUnordered::new(); + iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) + } +} + +/// Converts a list of futures into a `Stream` of results from the futures. +/// +/// This function will take an list of futures (e.g. a vector, an iterator, +/// etc), and return a stream. The stream will yield items as they become +/// available on the futures internally, in the order that they become +/// available. This function is similar to `buffer_unordered` in that it may +/// return items in a different order than in the list specified. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +pub fn futures_unordered(futures: I) -> FuturesUnordered +where + I: IntoIterator, + I::Item: Future, +{ + futures.into_iter().collect() +} diff --git a/futures-util/src/stream/futures_unordered/node.rs b/futures-util/src/stream/futures_unordered/node.rs new file mode 100644 index 0000000000..1034c0d329 --- /dev/null +++ b/futures-util/src/stream/futures_unordered/node.rs @@ -0,0 +1,148 @@ +use std::cell::UnsafeCell; +use std::mem; +use std::marker::PhantomData; +use std::ptr::{self, NonNull}; +use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::atomic::Ordering::SeqCst; + +use futures_core::task::{UnsafeWake, Waker, LocalWaker}; + +use super::ReadyToRunQueue; +use super::abort::abort; + +pub(super) struct Node { + // The future + pub(super) future: UnsafeCell>, + + // Next pointer for linked list tracking all active nodes + pub(super) next_all: UnsafeCell<*const Node>, + + // Previous node in linked list tracking all active nodes + pub(super) prev_all: UnsafeCell<*const Node>, + + // Next pointer in readiness queue + pub(super) next_ready_to_run: AtomicPtr>, + + // Queue that we'll be enqueued to when notified + pub(super) ready_to_run_queue: Weak>, + + // Whether or not this node is currently in the mpsc queue. + pub(super) queued: AtomicBool, +} + +impl Node { + pub(super) fn wake(self: &Arc>) { + let inner = match self.ready_to_run_queue.upgrade() { + Some(inner) => inner, + None => return, + }; + + // It's our job to notify the node that it's ready to get polled, + // meaning that we need to enqueue it into the readiness queue. To + // do this we flag that we're ready to be queued, and if successful + // we then do the literal queueing operation, ensuring that we're + // only queued once. + // + // Once the node is inserted we be sure to notify the parent task, + // as it'll want to come along and pick up our node now. + // + // Note that we don't change the reference count of the node here, + // we're just enqueueing the raw pointer. The `FuturesUnordered` + // implementation guarantees that if we set the `queued` flag true that + // there's a reference count held by the main `FuturesUnordered` queue + // still. + let prev = self.queued.swap(true, SeqCst); + if !prev { + inner.enqueue(&**self); + inner.parent.wake(); + } + } + + // Saftey: The returned `NonNull` needs to be put into a `Waker` + // or `LocalWaker` + unsafe fn clone_as_unsafe_wake_without_lifetime(self: &Arc>) + -> NonNull + { + let clone = self.clone(); + + // Safety: This is save because an `Arc` is a struct which contains + // a single field that is a pointer. + let ptr = mem::transmute::>, NonNull>>(clone); + + let ptr = ptr as NonNull; + + // Hide lifetime of `T` + // Safety: This is safe because `UnsafeWake` is guaranteed not to + // touch `T` + mem::transmute::, NonNull>(ptr) + } + + pub(super) fn local_waker(self: &Arc>) -> LocalWaker { + unsafe { LocalWaker::new(self.clone_as_unsafe_wake_without_lifetime()) } + } + + pub(super) fn waker(self: &Arc>) -> Waker { + unsafe { Waker::new(self.clone_as_unsafe_wake_without_lifetime()) } + } +} + +impl Drop for Node { + fn drop(&mut self) { + // Currently a `Node` is sent across all threads for any lifetime, + // regardless of `T`. This means that for memory safety we can't + // actually touch `T` at any time except when we have a reference to the + // `FuturesUnordered` itself. + // + // Consequently it *should* be the case that we always drop futures from + // the `FuturesUnordered` instance, but this is a bomb in place to catch + // any bugs in that logic. + unsafe { + if (*self.future.get()).is_some() { + abort("future still here when dropping"); + } + } + } +} + +// `ArcNode` represents conceptually the struct an `Arc>` points to. +// `*const ArcNode` is equal to `Arc>` +// It may only be used through references because its layout obviously doesn't +// match the real inner struct of an `Arc` which (currently) has the form +// `{ strong, weak, data }`. +struct ArcNode(PhantomData); + +// We should never touch the future `T` on any thread other than the one owning +// `FuturesUnordered`, so this should be a safe operation. +unsafe impl Send for ArcNode {} +unsafe impl Sync for ArcNode {} + +// We need to implement `UnsafeWake` trait directly and can't implement `Wake` +// for `Node` because `T`, the future, isn't required to have a static +// lifetime. `UnsafeWake` lets us forget about `T` and its lifetime. This is +// safe because neither `drop_raw` nor `wake` touch `T`. This is the case even +// though `drop_raw` runs the destructor for `Node` because its destructor is +// guaranteed to not touch `T`. `T` must already have been dropped by the time +// it runs. See `Drop` impl for `Node` for more details. +unsafe impl UnsafeWake for ArcNode { + #[inline] + unsafe fn clone_raw(&self) -> Waker { + let me: *const ArcNode = self; + let node = &*(&me as *const *const ArcNode as *const Arc>); + Node::waker(node) + } + + #[inline] + unsafe fn drop_raw(&self) { + let mut me: *const ArcNode = self; + let node_ptr = &mut me as *mut *const ArcNode as *mut Arc>; + ptr::drop_in_place(node_ptr); + } + + #[inline] + unsafe fn wake(&self) { + let me: *const ArcNode = self; + let node = &*(&me as *const *const ArcNode as *const Arc>); + Node::wake(node) + } +} diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs new file mode 100644 index 0000000000..52d57fa211 --- /dev/null +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -0,0 +1,108 @@ +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::Arc; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel}; + +use futures_core::task::AtomicWaker; + +use super::abort::abort; +use super::node::Node; + +pub(super) enum Dequeue { + Data(*const Node), + Empty, + Inconsistent, +} + +pub(super) struct ReadyToRunQueue { + // The task using `FuturesUnordered`. + pub(super) parent: AtomicWaker, + + // Head/tail of the readiness queue + pub(super) head: AtomicPtr>, + pub(super) tail: UnsafeCell<*const Node>, + pub(super) stub: Arc>, +} + +impl ReadyToRunQueue { + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. + pub(super) fn enqueue(&self, node: *const Node) { + unsafe { + debug_assert!((*node).queued.load(Relaxed)); + + // This action does not require any coordination + (*node).next_ready_to_run.store(ptr::null_mut(), Relaxed); + + // Note that these atomic orderings come from 1024cores + let node = node as *mut _; + let prev = self.head.swap(node, AcqRel); + (*prev).next_ready_to_run.store(node, Release); + } + } + + /// The dequeue function from the 1024cores intrusive MPSC queue algorithm + /// + /// Note that this is unsafe as it required mutual exclusion (only one + /// thread can call this) to be guaranteed elsewhere. + pub(super) unsafe fn dequeue(&self) -> Dequeue { + let mut tail = *self.tail.get(); + let mut next = (*tail).next_ready_to_run.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return Dequeue::Empty; + } + + *self.tail.get() = next; + tail = next; + next = (*next).next_ready_to_run.load(Acquire); + } + + if !next.is_null() { + *self.tail.get() = next; + debug_assert!(tail != self.stub()); + return Dequeue::Data(tail); + } + + if self.head.load(Acquire) as *const _ != tail { + return Dequeue::Inconsistent; + } + + self.enqueue(self.stub()); + + next = (*tail).next_ready_to_run.load(Acquire); + + if !next.is_null() { + *self.tail.get() = next; + return Dequeue::Data(tail); + } + + Dequeue::Inconsistent + } + + pub(super) fn stub(&self) -> *const Node { + &*self.stub + } +} + +impl Drop for ReadyToRunQueue { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out the + // mpsc queue of nodes if there's anything left in there. + // + // Note that each node has a strong reference count associated with it + // which is owned by the mpsc queue. All nodes should have had their + // futures dropped already by the `FuturesUnordered` destructor above, + // so we're just pulling out nodes and dropping their refcounts. + unsafe { + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } + } + } +} diff --git a/futures-util/src/stream/iter.rs b/futures-util/src/stream/iter.rs index 5fc457edb2..8cc2472012 100644 --- a/futures-util/src/stream/iter.rs +++ b/futures-util/src/stream/iter.rs @@ -28,10 +28,8 @@ impl Unpin for Iter {} /// use futures::stream; /// use futures_executor::block_on; /// -/// # fn main() { -/// let mut stream = stream::iter_ok::<_, ()>(vec![17, 19]); -/// assert_eq!(Ok(vec![17, 19]), block_on(stream.collect())); -/// # } +/// let mut stream = stream::iter(vec![17, 19]); +/// assert_eq!(vec![17, 19], block_on(stream.collect::>())); /// ``` pub fn iter(i: I) -> Iter where I: IntoIterator, diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index d5ffad7130..830963b0b5 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -73,7 +73,7 @@ if_std! { mod collect; //mod select_all; //mod split; - //mod futures_unordered; + mod futures_unordered; //mod futures_ordered; //pub use self::buffered::Buffered; //pub use self::buffer_unordered::BufferUnordered; @@ -82,7 +82,7 @@ if_std! { pub use self::collect::Collect; //pub use self::select_all::{select_all, SelectAll}; //pub use self::split::{SplitStream, SplitSink, ReuniteError}; - //pub use self::futures_unordered::{futures_unordered, FuturesUnordered}; + pub use self::futures_unordered::{futures_unordered, FuturesUnordered}; //pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; } @@ -155,12 +155,11 @@ pub trait StreamExt: Stream { /// # extern crate futures; /// # extern crate futures_channel; /// use futures::prelude::*; + /// use futures::future; /// use futures_channel::mpsc; /// - /// # fn main() { /// let (_tx, rx) = mpsc::channel::(1); - /// let evens = rx.filter(|x| Ok(x % 2 == 0)); - /// # } + /// let evens = rx.filter(|x| future::ready(x % 2 == 0)); /// ``` fn filter(self, pred: P) -> Filter where P: FnMut(&Self::Item) -> R, @@ -188,12 +187,12 @@ pub trait StreamExt: Stream { /// # extern crate futures; /// # extern crate futures_channel; /// use futures::prelude::*; + /// use futures::future; /// use futures_channel::mpsc; /// - /// # fn main() { /// let (_tx, rx) = mpsc::channel::(1); /// let evens_plus_one = rx.filter_map(|x| { - /// Ok( + /// future::ready( /// if x % 0 == 2 { /// Some(x + 1) /// } else { @@ -201,7 +200,6 @@ pub trait StreamExt: Stream { /// } /// ) /// }); - /// # } /// ``` fn filter_map(self, f: F) -> FilterMap where F: FnMut(Self::Item) -> R, @@ -234,18 +232,12 @@ pub trait StreamExt: Stream { /// # extern crate futures; /// # extern crate futures_channel; /// use futures::prelude::*; - /// use futures_channel::mpsc; + /// use futures::channel::mpsc; + /// use futures::future; /// - /// # fn main() { /// let (_tx, rx) = mpsc::channel::(1); /// - /// let rx = rx.then(|result| { - /// match result { - /// Ok(e) => Ok(e + 3), - /// Err(_) => Err(4), - /// } - /// }); - /// # } + /// let rx = rx.then(|x| future::ready(x + 3)); /// ``` fn then(self, f: F) -> Then where F: FnMut(Self::Item) -> U, @@ -271,26 +263,23 @@ pub trait StreamExt: Stream { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// # extern crate futures_channel; /// use std::thread; /// /// use futures::prelude::*; /// use futures_channel::mpsc; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { /// let (mut tx, rx) = mpsc::unbounded(); /// /// thread::spawn(move || { - /// for i in (0..5).rev() { - /// tx.unbounded_send(i + 1).unwrap(); + /// for i in (1..=5) { + /// tx.unbounded_send(i).unwrap(); /// } /// }); /// - /// let result = block_on(rx.collect()); - /// assert_eq!(result, Ok(vec![5, 4, 3, 2, 1])); - /// # } + /// let output = block_on(rx.collect::>()); + /// assert_eq!(output, vec![1, 2, 3, 4, 5]); /// ``` #[cfg(feature = "std")] fn collect>(self) -> Collect @@ -310,13 +299,12 @@ pub trait StreamExt: Stream { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// # extern crate futures_channel; /// use std::thread; /// /// use futures::prelude::*; /// use futures_channel::mpsc; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// /// # fn main() { /// let (mut tx, rx) = mpsc::unbounded(); @@ -328,7 +316,7 @@ pub trait StreamExt: Stream { /// } /// }); /// let result = block_on(rx.concat()); - /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); + /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); /// # } /// ``` fn concat(self) -> Concat @@ -351,17 +339,14 @@ pub trait StreamExt: Stream { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::stream; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let number_stream = stream::iter_ok::<_, ()>(0..6); - /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x)); - /// assert_eq!(block_on(sum), Ok(15)); - /// # } + /// let number_stream = stream::iter(0..6); + /// let sum = number_stream.fold(0, |acc, x| future::ready(acc + x)); + /// assert_eq!(block_on(sum), 15); /// ``` fn fold(self, init: T, f: F) -> Fold where F: FnMut(T, Self::Item) -> Fut, @@ -376,16 +361,14 @@ pub trait StreamExt: Stream { /// ``` /// # extern crate futures; /// # extern crate futures_channel; - /// # extern crate futures_executor; /// use std::thread; /// /// use futures::prelude::*; /// use futures_channel::mpsc; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let (tx1, rx1) = mpsc::unbounded::(); - /// let (tx2, rx2) = mpsc::unbounded::(); + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); /// let (tx3, rx3) = mpsc::unbounded(); /// /// thread::spawn(move || { @@ -401,9 +384,8 @@ pub trait StreamExt: Stream { /// tx3.unbounded_send(rx2).unwrap(); /// }); /// - /// let result = block_on(rx3.flatten().collect()); - /// assert_eq!(result, Ok(vec![1, 2, 3, 4])); - /// # } + /// let output = block_on(rx3.flatten().collect::>()); + /// assert_eq!(output, vec![1, 2, 3, 4]); /// ``` fn flatten(self) -> Flatten where Self::Item: Stream, @@ -509,22 +491,19 @@ pub trait StreamExt: Stream { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::stream; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let mut stream = stream::iter_ok::<_, ()>(1..5); + /// let mut stream = stream::iter(1..5); /// - /// let sum = block_on(stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b))); - /// assert_eq!(sum, Ok(3)); + /// let sum = block_on(stream.by_ref().take(2).fold(0, |a, b| future::ready(a + b))); + /// assert_eq!(sum, 3); /// /// // You can use the stream again - /// let sum = block_on(stream.take(2).fold(0, |a, b| future::ok(a + b))); - /// assert_eq!(sum, Ok(7)); - /// # } + /// let sum = block_on(stream.take(2).fold(0, |a, b| future::ready(a + b))); + /// assert_eq!(sum, 7); /// ``` fn by_ref(&mut self) -> &mut Self where Self: Sized @@ -554,27 +533,24 @@ pub trait StreamExt: Stream { /// /// ```rust /// # extern crate futures; - /// # extern crate futures_executor; /// /// use futures::prelude::*; /// use futures::stream; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + /// let stream = stream::iter(vec![Some(10), None, Some(11)]); /// // panic on second element /// let stream_panicking = stream.map(|o| o.unwrap()); /// // collect all the results - /// let stream = stream_panicking.catch_unwind().then(|r| Ok::<_, ()>(r)); + /// let stream = stream_panicking.catch_unwind(); /// - /// let results: Vec<_> = block_on(stream.collect()).unwrap(); + /// let results: Vec> = block_on(stream.collect()); /// match results[0] { - /// Ok(Ok(10)) => {} + /// Ok(10) => {} /// _ => panic!("unexpected result!"), /// } /// assert!(results[1].is_err()); /// assert_eq!(results.len(), 2); - /// # } /// ``` #[cfg(feature = "std")] fn catch_unwind(self) -> CatchUnwind @@ -644,26 +620,22 @@ pub trait StreamExt: Stream { /// /// ```rust /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::stream; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]); - /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]); + /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); + /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); /// - /// let stream = stream1.chain(stream2) - /// .then(|result| Ok::<_, ()>(result)); + /// let stream = stream1.chain(stream2); /// - /// let result: Vec<_> = block_on(stream.collect()).unwrap(); + /// let result: Vec<_> = block_on(stream.collect()); /// assert_eq!(result, vec![ /// Ok(10), /// Err(false), /// Err(true), /// Ok(20), /// ]); - /// # } /// ``` fn chain(self, other: S) -> Chain where S: Stream, diff --git a/futures-util/src/stream/once.rs b/futures-util/src/stream/once.rs index 8d71fc23d7..8d09d8e9d4 100644 --- a/futures-util/src/stream/once.rs +++ b/futures-util/src/stream/once.rs @@ -20,18 +20,13 @@ pub struct Once { /// # extern crate futures; /// # extern crate futures_executor; /// use futures::prelude::*; +/// use futures::future; +/// use futures::executor::block_on; /// use futures::stream; -/// use futures_executor::block_on; /// -/// # fn main() { -/// let mut stream = stream::once::>(Err(17)); -/// let collected: Result, _> = block_on(stream.collect()); -/// assert_eq!(collected, Err(17)); -/// -/// let mut stream = stream::once::>(Ok(92)); -/// let collected: Result, _> = block_on(stream.collect()); -/// assert_eq!(collected, Ok(vec![92])); -/// # } +/// let mut stream = stream::once(future::ready(17)); +/// let collected = block_on(stream.collect::>()); +/// assert_eq!(collected, vec![17]); /// ``` pub fn once(item: F) -> Once { Once { fut: Some(item) } diff --git a/futures-util/src/stream/poll_fn.rs b/futures-util/src/stream/poll_fn.rs index 8706239bb6..3fd1a5f15b 100644 --- a/futures-util/src/stream/poll_fn.rs +++ b/futures-util/src/stream/poll_fn.rs @@ -24,19 +24,18 @@ impl Unpin for PollFn {} /// # Examples /// /// ``` +/// # #![feature(futures_api)] /// # extern crate futures; /// use futures::prelude::*; /// use futures::stream::poll_fn; /// -/// # fn main() { /// let mut counter = 1usize; /// -/// let read_stream = poll_fn(move |_| -> Poll, std::io::Error> { -/// if counter == 0 { return Ok(Async::Ready(None)); } +/// let read_stream = poll_fn(move |_| -> Poll> { +/// if counter == 0 { return Poll::Ready(None); } /// counter -= 1; -/// Ok(Async::Ready(Some("Hello, World!".to_owned()))) +/// Poll::Ready(Some("Hello, World!".to_owned())) /// }); -/// # } /// ``` pub fn poll_fn(f: F) -> PollFn where diff --git a/futures-util/src/stream/repeat.rs b/futures-util/src/stream/repeat.rs index 0dba1ccfa6..9d91bb14fb 100644 --- a/futures-util/src/stream/repeat.rs +++ b/futures-util/src/stream/repeat.rs @@ -26,10 +26,8 @@ pub struct Repeat { /// use futures::stream; /// use futures_executor::block_on; /// -/// # fn main() { -/// let mut stream = stream::repeat::<_, bool>(10); -/// assert_eq!(Ok(vec![10, 10, 10]), block_on(stream.take(3).collect())); -/// # } +/// let mut stream = stream::repeat(9); +/// assert_eq!(vec![9, 9, 9], block_on(stream.take(3).collect::>())); /// ``` pub fn repeat(item: T) -> Repeat where T: Clone diff --git a/futures-util/src/stream/unfold.rs b/futures-util/src/stream/unfold.rs index 607bab769c..e44740ed4f 100644 --- a/futures-util/src/stream/unfold.rs +++ b/futures-util/src/stream/unfold.rs @@ -40,21 +40,18 @@ use futures_core::task; /// use futures::future; /// use futures_executor::block_on; /// -/// # fn main() { /// let mut stream = stream::unfold(0, |state| { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; -/// let fut = future::ok::<_, u32>((yielded, next_state)); -/// Some(fut) +/// future::ready(Some((yielded, next_state))) /// } else { -/// None +/// future::ready(None) /// } /// }); /// -/// let result = block_on(stream.collect()); -/// assert_eq!(result, Ok(vec![0, 2, 4])); -/// # } +/// let result = block_on(stream.collect::>()); +/// assert_eq!(result, vec![0, 2, 4]); /// ``` pub fn unfold(init: T, f: F) -> Unfold where F: FnMut(T) -> Fut, diff --git a/futures-util/src/try_future/mod.rs b/futures-util/src/try_future/mod.rs index ee4f470eb5..e457943573 100644 --- a/futures-util/src/try_future/mod.rs +++ b/futures-util/src/try_future/mod.rs @@ -59,32 +59,26 @@ pub trait TryFutureExt: TryFuture { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let future = future::ok::(1); - /// let new_future = future.map(|x| x + 3); + /// let future = future::ready::>(Ok(1)); + /// let new_future = future.map_ok(|x| x + 3); /// assert_eq!(block_on(new_future), Ok(4)); - /// # } /// ``` /// - /// Calling `map` on an errored `Future` has no effect: + /// Calling `map_ok` on an errored `Future` has no effect: /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let future = future::err::(1); - /// let new_future = future.map(|x| x + 3); + /// let future = future::ready::>(Err(1)); + /// let new_future = future.map_ok(|x| x + 3); /// assert_eq!(block_on(new_future), Err(1)); - /// # } /// ``` fn map_ok(self, f: F) -> MapOk where F: FnOnce(Self::Item) -> U, @@ -111,32 +105,26 @@ pub trait TryFutureExt: TryFuture { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::future::err; + /// use futures::future; /// use futures::prelude::*; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let future = err::(1); + /// let future = future::ready::>(Err(1)); /// let new_future = future.map_err(|x| x + 3); /// assert_eq!(block_on(new_future), Err(4)); - /// # } /// ``` /// /// Calling `map_err` on a successful `Future` has no effect: /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::future::ok; + /// use futures::future; /// use futures::prelude::*; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let future = ok::(1); + /// let future = future::ready::>(Ok(1)); /// let new_future = future.map_err(|x| x + 3); /// assert_eq!(block_on(new_future), Ok(1)); - /// # } /// ``` fn map_err(self, f: F) -> MapErr where F: FnOnce(Self::Error) -> E, @@ -163,10 +151,8 @@ pub trait TryFutureExt: TryFuture { /// use futures::prelude::*; /// use futures::future; /// - /// # fn main() { - /// let future_with_err_u8 = future::err::<(), u8>(1); - /// let future_with_err_u32 = future_with_err_u8.err_into::(); - /// # } + /// let future_with_err_u8 = future::ready::>(Err(1)); + /// let future_with_err_i32 = future_with_err_u8.err_into::(); /// ``` fn err_into(self) -> ErrInto where Self: Sized, @@ -197,19 +183,17 @@ pub trait TryFutureExt: TryFuture { /// ``` /// # extern crate futures; /// use futures::prelude::*; - /// use futures::future::{self, TryFuture}; + /// use futures::future::{self, ReadyFuture}; /// - /// # fn main() { - /// let future_of_1 = future::ok::(1); + /// let future_of_1 = future::ready::>(Ok(1)); /// let future_of_4 = future_of_1.and_then(|x| { - /// Ok(x + 3) + /// future::ready(Ok(x + 3)) /// }); /// - /// let future_of_err_1 = future::err::(1); - /// future_of_err_1.and_then(|_| -> TryFuture { + /// let future_of_err_1 = future::ready::>(Err(1)); + /// future_of_err_1.and_then(|_| -> ReadyFuture> { /// panic!("should not be called in case of an error"); /// }); - /// # } /// ``` fn and_then(self, f: F) -> AndThen where F: FnOnce(Self::Item) -> B, @@ -241,19 +225,17 @@ pub trait TryFutureExt: TryFuture { /// ``` /// # extern crate futures; /// use futures::prelude::*; - /// use futures::future::{self, TryFuture}; + /// use futures::future::{self, ReadyFuture}; /// - /// # fn main() { - /// let future_of_err_1 = future::err::(1); - /// let future_of_4 = future_of_err_1.or_else(|x| -> Result { - /// Ok(x + 3) + /// let future_of_err_1 = future::ready::>(Err(1)); + /// let future_of_4 = future_of_err_1.or_else(|x| { + /// future::ready::>(Ok(x + 3)) /// }); /// - /// let future_of_1 = future::ok::(1); - /// future_of_1.or_else(|_| -> TryFuture { + /// let future_of_1 = future::ready::>(Ok(1)); + /// future_of_1.or_else(|_| -> ReadyFuture> { /// panic!("should not be called in case of success"); /// }); - /// # } /// ``` fn or_else(self, f: F) -> OrElse where F: FnOnce(Self::Error) -> B, @@ -300,8 +282,7 @@ pub trait TryFutureExt: TryFuture { /// Err(Either::Right((e, _))) => Box::new(future::err(e)), /// } /// })) - /// } - /// # fn main() {} + /// }} /// ``` fn select(self, other: B) -> Select where B: IntoFuture, Self: Sized @@ -326,14 +307,12 @@ pub trait TryFutureExt: TryFuture { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let a = future::ok::(1); - /// let b = future::ok::(2); + /// let a = future::ok::(1); + /// let b = future::ok::(2); /// let pair = a.join(b); /// /// assert_eq!(block_on(pair), Ok((1, 2))); @@ -345,14 +324,12 @@ pub trait TryFutureExt: TryFuture { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let a = future::ok::(1); - /// let b = future::err::(2); + /// let a = future::ok::(1); + /// let b = future::err::(2); /// let pair = a.join(b); /// /// assert_eq!(block_on(pair), Err(2)); @@ -410,16 +387,13 @@ pub trait TryFutureExt: TryFuture { /// /// ``` /// # extern crate futures; - /// # extern crate futures_executor; /// use futures::prelude::*; /// use futures::future; - /// use futures_executor::block_on; + /// use futures::executor::block_on; /// - /// # fn main() { - /// let future = future::err::<(), &str>("something went wrong"); - /// let new_future = future.recover::(|_| ()); - /// assert_eq!(block_on(new_future), Ok(())); - /// # } + /// let future = future::ready::>(Err("Boom!")); + /// let new_future = future.recover(|_| ()); + /// assert_eq!(block_on(new_future), ()); /// ``` fn recover(self, f: F) -> Recover where Self: Sized, diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 2d6060187b..ea017d55a8 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures" -version = "0.2.0" +version = "0.3.0-alpha" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" readme = "README.md" @@ -19,18 +19,20 @@ travis-ci = { repository = "rust-lang-nursery/futures-rs" } appveyor = { repository = "rust-lang-nursery/futures-rs" } [dependencies] -futures-async-runtime = { path = "../futures-async-runtime", version = "0.2.0", default-features = false } -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } -futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = false } -futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false } -futures-stable = { path = "../futures-stable", version = "0.2.0", default-features = false } -futures-util = { path = "../futures-util", version = "0.2.0", default-features = false } -futures-macro-async = { path = "../futures-macro-async", version = "0.2.0", optional = true } -futures-macro-await = { path = "../futures-macro-await", version = "0.2.0", optional = true } +# futures-async-runtime = { path = "../futures-async-runtime", version = "0.3.0-alpha", default-features = false } +futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false } +futures-executor = { path = "../futures-executor", version = "0.3.0-alpha", default-features = false } +futures-io = { path = "../futures-io", version = "0.3.0-alpha", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.0-alpha", default-features = false } +# futures-stable = { path = "../futures-stable", version = "0.3.0-alpha", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.0-alpha", default-features = false } +# futures-macro-async = { path = "../futures-macro-async", version = "0.3.0-alpha", optional = true } +# futures-macro-await = { path = "../futures-macro-await", version = "0.3.0-alpha", optional = true } [features] -nightly = ["futures-core/nightly", "futures-stable/nightly", "futures-async-runtime/nightly", "futures-macro-async", "futures-macro-await", "futures-macro-async/nightly"] -std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-stable/std", "futures-util/std", "futures-async-runtime/std"] +#nightly = ["futures-core/nightly", "futures-stable/nightly", "futures-async-runtime/nightly", "futures-macro-async", "futures-macro-await", "futures-macro-async/nightly"] +#std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-stable/std", "futures-util/std", "futures-async-runtime/std"] +nightly = ["futures-core/nightly"] +std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-util/std"] default = ["std"] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index fcfc219fa5..b2976127bb 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -19,25 +19,26 @@ //! streams and sinks, and then spawned as independent tasks that are run to //! completion, but *do not block* the thread running them. +#![feature(futures_api)] + #![no_std] #![doc(html_root_url = "https://docs.rs/futures/0.2.0")] #![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "nightly", feature(use_extern_macros))] -extern crate futures_async_runtime; +// extern crate futures_async_runtime; extern crate futures_core; extern crate futures_channel; extern crate futures_executor; extern crate futures_io; extern crate futures_sink; -extern crate futures_stable; extern crate futures_util; #[cfg(feature = "nightly")] extern crate futures_macro_async; #[cfg(feature = "nightly")] extern crate futures_macro_await; -pub use futures_core::future::{Future, IntoFuture}; +pub use futures_core::future::Future; pub use futures_util::future::FutureExt; pub use futures_core::stream::Stream; pub use futures_util::stream::StreamExt; @@ -83,7 +84,7 @@ macro_rules! task_local { ) } -pub use futures_core::{Async, Poll, Never}; +pub use futures_core::Poll; #[cfg(feature = "std")] pub mod channel { @@ -122,9 +123,10 @@ pub mod executor { //! then spawn further tasks back onto the pool to complete its work: //! //! ``` + //! # #![feature(pin, arbitrary_self_types, futures_api)] //! use futures::executor::ThreadPool; //! # use futures::future::{Future, lazy}; - //! # let my_app: Box> = Box::new(lazy(|_| Ok(()))); + //! # let my_app = lazy(|_| 42); //! //! // assumping `my_app: Future` //! ThreadPool::new().expect("Failed to create threadpool").run(my_app); @@ -173,7 +175,7 @@ pub mod executor { ThreadPool, ThreadPoolBuilder, JoinHandle, block_on, block_on_stream, enter, spawn, spawn_with_handle }; - pub use futures_core::executor::{SpawnError, Executor}; + pub use futures_core::executor::{SpawnObjError, Executor}; } pub mod future { @@ -192,19 +194,20 @@ pub mod future { //! immediate defined value. pub use futures_core::future::{ - FutureOption, FutureResult, Future, IntoFuture, err, ok, result + FutureOption, Future, TryFuture, ReadyFuture, ready }; pub use futures_util::future::{ - AndThen, Empty, Flatten, FlattenStream, ErrInto, Fuse, - Inspect, IntoStream, Join, Join3, Join4, Join5, Lazy, LoopFn, - Map, MapErr, OrElse, PollFn, Select, Then, Either, Loop, FutureExt, empty, - lazy, loop_fn, poll_fn + Empty, Flatten, FlattenStream, Fuse, Inspect, IntoStream, Lazy, + Then, Either, PollFn, Map, FutureExt, empty, lazy, poll_fn, + // AndThen, ErrInto, Join, Join3, Join4, Join5, LoopFn, + // MapErr, OrElse, Select, Loop, loop_fn, }; #[cfg(feature = "std")] pub use futures_util::future::{ - CatchUnwind, JoinAll, SelectAll, SelectOk, Shared, SharedError, SharedItem, - join_all, select_all, select_ok + CatchUnwind, + // JoinAll, SelectAll, SelectOk, Shared, SharedError, SharedItem, + // join_all, select_all, select_ok }; } @@ -234,14 +237,6 @@ pub mod io { }; } -#[cfg(feature = "std")] -pub mod never { - //! This module contains the `Never` type. - //! - //! Values of this type can never be created and will never exist. - pub use futures_core::never::*; -} - pub mod prelude { //! A "prelude" for crates using the `futures` crate. //! @@ -256,13 +251,7 @@ pub mod prelude { //! The prelude may grow over time as additional items see ubiquitous use. pub use futures_core::{ - Future, - IntoFuture, - Stream, - Async, - Poll, - Never, - task, + Future, TryFuture, Stream, Poll, task }; #[cfg(feature = "std")] @@ -287,6 +276,7 @@ pub mod prelude { pub use futures_util::{ FutureExt, + TryFutureExt, StreamExt, SinkExt, }; @@ -326,8 +316,9 @@ pub mod sink { pub use futures_sink::Sink; pub use futures_util::sink::{ - Close, Fanout, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, - WithFlatMap, SinkExt, + Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, + SinkExt, Fanout, + // WithFlatMap, }; #[cfg(feature = "std")] @@ -353,18 +344,19 @@ pub mod stream { pub use futures_core::stream::Stream; pub use futures_util::stream::{ - AndThen, Chain, Concat, Empty, Filter, FilterMap, Flatten, Fold, - ForEach, Forward, ErrInto, Fuse, Inspect, InspectErr, IterOk, - IterResult, Map, MapErr, Once, OrElse, Peekable, PollFn, Repeat, Select, - Skip, SkipWhile, StreamFuture, Take, TakeWhile, Then, Unfold, Zip, - StreamExt, empty, iter_ok, iter_result, once, poll_fn, repeat, unfold, + Chain, Concat, Empty, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, + Inspect, Map, Once, Peekable, PollFn, Repeat, Select, Skip, SkipWhile, + StreamFuture, Take, TakeWhile, Then, Unfold, Zip, StreamExt, empty, + once, poll_fn, repeat, unfold, iter + // AndThen, Forward, ErrInto, InspectErr MapErr, OrElse }; #[cfg(feature = "std")] pub use futures_util::stream::{ - futures_unordered, select_all, BufferUnordered, Buffered, CatchUnwind, Chunks, Collect, - FuturesUnordered, FuturesOrdered, ReuniteError, SelectAll, SplitSink, SplitStream, - futures_ordered, + CatchUnwind, Chunks, Collect, futures_unordered + // , select_all, BufferUnordered, Buffered, + // FuturesUnordered, FuturesOrdered, ReuniteError, SelectAll, SplitSink, + // SplitStream, futures_ordered, }; } @@ -389,14 +381,14 @@ pub mod task { //! executors or dealing with synchronization issues around task wakeup. pub use futures_core::task::{ - Context, LocalMap, Waker, UnsafeWake, + Context, Waker, UnsafeWake }; #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] pub use futures_core::task::AtomicWaker; #[cfg(feature = "std")] - pub use futures_core::task::{LocalKey, Wake}; + pub use futures_core::task::Wake; } #[cfg(feature = "nightly")] diff --git a/futures/tests/all.rs b/futures/tests_disabled/all.rs similarity index 100% rename from futures/tests/all.rs rename to futures/tests_disabled/all.rs diff --git a/futures/tests/async_await/elisions.rs b/futures/tests_disabled/async_await/elisions.rs similarity index 100% rename from futures/tests/async_await/elisions.rs rename to futures/tests_disabled/async_await/elisions.rs diff --git a/futures/tests/async_await/mod.rs b/futures/tests_disabled/async_await/mod.rs similarity index 100% rename from futures/tests/async_await/mod.rs rename to futures/tests_disabled/async_await/mod.rs diff --git a/futures/tests/async_await/pinned.rs b/futures/tests_disabled/async_await/pinned.rs similarity index 100% rename from futures/tests/async_await/pinned.rs rename to futures/tests_disabled/async_await/pinned.rs diff --git a/futures/tests/async_await/smoke.rs b/futures/tests_disabled/async_await/smoke.rs similarity index 100% rename from futures/tests/async_await/smoke.rs rename to futures/tests_disabled/async_await/smoke.rs diff --git a/futures/tests/async_await_tests.rs b/futures/tests_disabled/async_await_tests.rs similarity index 100% rename from futures/tests/async_await_tests.rs rename to futures/tests_disabled/async_await_tests.rs diff --git a/futures/tests/buffer_unordered.rs b/futures/tests_disabled/buffer_unordered.rs similarity index 100% rename from futures/tests/buffer_unordered.rs rename to futures/tests_disabled/buffer_unordered.rs diff --git a/futures/tests/eager_drop.rs b/futures/tests_disabled/eager_drop.rs similarity index 100% rename from futures/tests/eager_drop.rs rename to futures/tests_disabled/eager_drop.rs diff --git a/futures/tests/eventual.rs b/futures/tests_disabled/eventual.rs similarity index 100% rename from futures/tests/eventual.rs rename to futures/tests_disabled/eventual.rs diff --git a/futures/tests/fuse.rs b/futures/tests_disabled/fuse.rs similarity index 100% rename from futures/tests/fuse.rs rename to futures/tests_disabled/fuse.rs diff --git a/futures/tests/future_flatten_stream.rs b/futures/tests_disabled/future_flatten_stream.rs similarity index 100% rename from futures/tests/future_flatten_stream.rs rename to futures/tests_disabled/future_flatten_stream.rs diff --git a/futures/tests/futures_ordered.rs b/futures/tests_disabled/futures_ordered.rs similarity index 100% rename from futures/tests/futures_ordered.rs rename to futures/tests_disabled/futures_ordered.rs diff --git a/futures/tests/futures_unordered.rs b/futures/tests_disabled/futures_unordered.rs similarity index 100% rename from futures/tests/futures_unordered.rs rename to futures/tests_disabled/futures_unordered.rs diff --git a/futures/tests/inspect.rs b/futures/tests_disabled/inspect.rs similarity index 100% rename from futures/tests/inspect.rs rename to futures/tests_disabled/inspect.rs diff --git a/futures/tests/ready_queue.rs b/futures/tests_disabled/ready_queue.rs similarity index 100% rename from futures/tests/ready_queue.rs rename to futures/tests_disabled/ready_queue.rs diff --git a/futures/tests/recurse.rs b/futures/tests_disabled/recurse.rs similarity index 100% rename from futures/tests/recurse.rs rename to futures/tests_disabled/recurse.rs diff --git a/futures/tests/select_all.rs b/futures/tests_disabled/select_all.rs similarity index 100% rename from futures/tests/select_all.rs rename to futures/tests_disabled/select_all.rs diff --git a/futures/tests/select_ok.rs b/futures/tests_disabled/select_ok.rs similarity index 100% rename from futures/tests/select_ok.rs rename to futures/tests_disabled/select_ok.rs diff --git a/futures/tests/shared.rs b/futures/tests_disabled/shared.rs similarity index 100% rename from futures/tests/shared.rs rename to futures/tests_disabled/shared.rs diff --git a/futures/tests/sink.rs b/futures/tests_disabled/sink.rs similarity index 100% rename from futures/tests/sink.rs rename to futures/tests_disabled/sink.rs diff --git a/futures/tests/split.rs b/futures/tests_disabled/split.rs similarity index 100% rename from futures/tests/split.rs rename to futures/tests_disabled/split.rs diff --git a/futures/tests/stream.rs b/futures/tests_disabled/stream.rs similarity index 100% rename from futures/tests/stream.rs rename to futures/tests_disabled/stream.rs diff --git a/futures/tests/stream_catch_unwind.rs b/futures/tests_disabled/stream_catch_unwind.rs similarity index 100% rename from futures/tests/stream_catch_unwind.rs rename to futures/tests_disabled/stream_catch_unwind.rs diff --git a/futures/tests/stream_select_all.rs b/futures/tests_disabled/stream_select_all.rs similarity index 100% rename from futures/tests/stream_select_all.rs rename to futures/tests_disabled/stream_select_all.rs diff --git a/futures/tests/support/mod.rs b/futures/tests_disabled/support/mod.rs similarity index 100% rename from futures/tests/support/mod.rs rename to futures/tests_disabled/support/mod.rs diff --git a/futures/tests/unfold.rs b/futures/tests_disabled/unfold.rs similarity index 100% rename from futures/tests/unfold.rs rename to futures/tests_disabled/unfold.rs