Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
04842ae
Executors and partly reenable futures crate
MajorBreakfast Jun 12, 2018
54d6b2f
Rename res to output
MajorBreakfast Jun 12, 2018
8baad68
Rename Task to TaskContainer
MajorBreakfast Jun 12, 2018
883ddff
Update to latest nightly
MajorBreakfast Jun 20, 2018
95273d6
Split futures_unordered.rs
MajorBreakfast Jun 21, 2018
52c0fe4
`Future + !Unpin` support for `FuturesUnordered`
MajorBreakfast Jun 21, 2018
cc6fa32
transmute type args
MajorBreakfast Jun 21, 2018
146e818
Use TaskObj directly in LocalPool
MajorBreakfast Jun 21, 2018
ff85f87
Fix bug in stream iter doc
MajorBreakfast Jun 21, 2018
251059d
Enable executors crate for future-util
MajorBreakfast Jun 21, 2018
9c20c18
Remove Unpin restriction from LocalPool
MajorBreakfast Jun 21, 2018
3fa5c6e
Lift Unpin restriction from Spawn
MajorBreakfast Jun 21, 2018
bec00f7
Move ThreadNotify into local_pool.rs
MajorBreakfast Jun 21, 2018
2df964b
Prefix free function with module
MajorBreakfast Jun 21, 2018
41853e7
Make some more doctests run successfully
MajorBreakfast Jun 21, 2018
ddbb39d
Make all enabled doc tests green
MajorBreakfast Jun 22, 2018
f0c8787
Additionally export `Poll` under `task` module (like in std)
MajorBreakfast Jun 22, 2018
d7cc312
Improve `Node` of `FuturesUnordered`
MajorBreakfast Jun 22, 2018
5a3c6c1
Mostly updates to comments
MajorBreakfast Jun 22, 2018
ea55900
Use `cx.local_waker().wake()`
MajorBreakfast Jun 22, 2018
a8472bd
Bring back `SinkExt::Fanout" but keep its tests disabled for now
MajorBreakfast Jun 22, 2018
d4d5234
Rename `Inner` to `ReadyToRunQueue` for clarity
MajorBreakfast Jun 22, 2018
a2e07b0
Restrict `FuturesUnorderdered::iter_mut` to `Unpin` futures. Add `ite…
MajorBreakfast Jun 22, 2018
7206d59
Add more `Unpin` bounds to `FuturesUnordered::iter_mut`
MajorBreakfast Jun 22, 2018
f235429
Implement `IterMut` in terms of `IterPinMut`
MajorBreakfast Jun 22, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[workspace]
members = [
# "futures",
"futures",
# "futures-async-runtime",
"futures-core",
"futures-channel",
# "futures-executor",
"futures-executor",
"futures-io",
# "futures-macro-async",
# "futures-macro-await",
Expand Down
2 changes: 1 addition & 1 deletion futures-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ default = ["std"]
futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false }

[dev-dependencies]
# futures = { path = "../futures", version = "0.2.0", default-features = true }
futures = { path = "../futures", version = "0.3.0-alpha", default-features = true }
# futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = true }
2 changes: 1 addition & 1 deletion futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ struct Inner<T> {
/// # let t =
/// thread::spawn(|| {
/// let future = c.map(|i| {
/// println!("got: {}", i);
/// println!("got: {:?}", i);
/// });
/// // ...
/// # return future;
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod poll;
pub use poll::Poll;

pub mod future;
pub use future::{Future, TryFuture};
pub use future::{Future, CoreFutureExt, TryFuture};

pub mod stream;
pub use stream::Stream;
Expand Down
5 changes: 3 additions & 2 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

use Future;

pub use core::task::{UnsafeWake, Waker};
pub use core::task::{UnsafeWake, Waker, LocalWaker};

#[cfg(feature = "std")]
pub use std::task::Wake;
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};

pub use core::task::Context;

Expand Down
12 changes: 6 additions & 6 deletions futures-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-executor"
version = "0.2.0"
version = "0.3.0-alpha"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/rust-lang-nursery/futures-rs"
Expand All @@ -15,12 +15,12 @@ std = ["num_cpus", "futures-core/std", "futures-util/std", "futures-channel/std"
default = ["std"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false}
futures-util = { path = "../futures-util", version = "0.2.0", default-features = false}
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false}
futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false}
futures-util = { path = "../futures-util", version = "0.3.0-alpha", default-features = false}
futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false}
num_cpus = { version = "1.0", optional = true }
lazy_static = { version = "1.0", optional = true }

[dev-dependencies]
futures = { path = "../futures", version = "0.2.0" }
futures-channel = { path = "../futures-channel", version = "0.2.0" }
futures = { path = "../futures", version = "0.3.0-alpha" }
futures-channel = { path = "../futures-channel", version = "0.3.0-alpha" }
2 changes: 2 additions & 0 deletions futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Built-in executors and related tools.

#![feature(pin, arbitrary_self_types, futures_api)]

#![no_std]
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/futures-executor/0.2.0")]
Expand Down
124 changes: 56 additions & 68 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use std::prelude::v1::*;

use std::cell::{RefCell};
use std::rc::{Rc, Weak};
use std::mem::PinMut;
use std::marker::Unpin;

use futures_core::{Future, Poll, Async, Stream};
use futures_core::task::{Context, Waker, LocalMap};
use futures_core::executor::{Executor, SpawnError};
use futures_core::never::Never;
use futures_core::{Future, CoreFutureExt, Poll, Stream};
use futures_core::task::{Context, LocalWaker, TaskObj, local_waker_from_nonlocal};
use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;

use thread::ThreadNotify;
use enter;
use ThreadPool;

struct Task {
fut: Box<Future<Item = (), Error = Never>>,
map: LocalMap,
struct TaskContainer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to this that describes what it's for? Something like "wrapper for TaskObj that implements Future with a unit output" would help-- it's less obvious what its purpose is now that it no longer pairs the task with a LocalMap.

N.B. I think that TaskObj should implement Future-- we should add that impl in std. Opened rust-lang/rust#51667

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already made TaskObj implement Future

I‘ll add the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting rid of TaskContainer entirely. We can use TaskObj directly. Can't do that in ThreadPool, but here it's simple

task: TaskObj,
}

/// A single-threaded task pool.
Expand All @@ -31,7 +31,7 @@ struct Task {
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local`](LocalExecutor::spawn_local).
pub struct LocalPool {
pool: FuturesUnordered<Task>,
pool: FuturesUnordered<TaskContainer>,
incoming: Rc<Incoming>,
}

Expand All @@ -42,19 +42,19 @@ pub struct LocalExecutor {
incoming: Weak<Incoming>,
}

type Incoming = RefCell<Vec<Task>>;
type Incoming = RefCell<Vec<TaskContainer>>;

// Set up and run a basic single-threaded executor loop, invocing `f` on each
// turn.
fn run_executor<T, F: FnMut(&Waker) -> Async<T>>(mut f: F) -> T {
fn run_executor<T, F: FnMut(&LocalWaker) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

ThreadNotify::with_current(|thread| {
let waker = &Waker::from(thread.clone());
let local_waker = local_waker_from_nonlocal(thread.clone());
loop {
if let Async::Ready(t) = f(waker) {
if let Poll::Ready(t) = f(&local_waker) {
return t;
}
thread.park();
Expand Down Expand Up @@ -101,8 +101,8 @@ impl LocalPool {
///
/// The function will block the calling thread until *all* tasks in the pool
/// are complete, including any spawned while running existing tasks.
pub fn run(&mut self, exec: &mut Executor) {
run_executor(|waker| self.poll_pool(waker, exec))
pub fn run<Exec>(&mut self, exec: &mut Exec) where Exec: Executor + Sized {
run_executor(|local_waker| self.poll_pool(local_waker, exec))
}

/// Runs all the tasks in the pool until the given future completes.
Expand Down Expand Up @@ -132,35 +132,31 @@ impl LocalPool {
/// be inert after the call completes, but can continue with further use of
/// `run` or `run_until`. While the function is running, however, all tasks
/// in the pool will try to make progress.
pub fn run_until<F>(&mut self, mut f: F, exec: &mut Executor) -> Result<F::Item, F::Error>
where F: Future
pub fn run_until<F, Exec>(&mut self, mut f: F, exec: &mut Exec) -> F::Output
where F: Future + Unpin, Exec: Executor + Sized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Unpin bound isn't needed. Can you add this macro to futures-core and use it instead?

{
// persistent state for the "main task"
let mut main_map = LocalMap::new();

run_executor(|waker| {
run_executor(|local_waker| {
{
let mut main_cx = Context::new(&mut main_map, waker, exec);
let mut main_cx = Context::new(local_waker, exec);

// if our main task is done, so are we
match f.poll(&mut main_cx) {
Ok(Async::Ready(v)) => return Async::Ready(Ok(v)),
Err(err) => return Async::Ready(Err(err)),
match f.poll_unpin(&mut main_cx) {
Poll::Ready(output) => return Poll::Ready(output),
_ => {}
}
}

self.poll_pool(waker, exec);
Async::Pending
self.poll_pool(local_waker, exec);
Poll::Pending
})
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
fn poll_pool(&mut self, waker: &Waker, exec: &mut Executor) -> Async<()> {
fn poll_pool<Exec>(&mut self, local_waker: &LocalWaker, exec: &mut Exec) -> Poll<()>
where Exec: Executor + Sized {
// state for the FuturesUnordered, which will never be used
let mut pool_map = LocalMap::new();
let mut pool_cx = Context::new(&mut pool_map, waker, exec);
let mut pool_cx = Context::new(local_waker, exec);

loop {
// empty the incoming queue of newly-spawned tasks
Expand All @@ -171,18 +167,17 @@ impl LocalPool {
}
}

if let Ok(ret) = self.pool.poll_next(&mut pool_cx) {
// we queued up some new tasks; add them and poll again
if !self.incoming.borrow().is_empty() {
continue;
}
let ret = self.pool.poll_next_unpin(&mut pool_cx);
// we queued up some new tasks; add them and poll again
if !self.incoming.borrow().is_empty() {
continue;
}

// no queued tasks; we may be done
match ret {
Async::Pending => return Async::Pending,
Async::Ready(None) => return Async::Ready(()),
_ => {}
}
// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
}
}
}
Expand All @@ -202,14 +197,14 @@ lazy_static! {
///
/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
/// spawned tasks.
pub fn block_on<F: Future>(f: F) -> Result<F::Item, F::Error> {
pub fn block_on<F: Future + Unpin>(f: F) -> F::Output {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm Unpin

let mut pool = LocalPool::new();
pool.run_until(f, &mut GLOBAL_POOL.clone())
}

/// Turn a stream into a blocking iterator.
///
/// Whne `next` is called on the resulting `BlockingStream`, the caller
/// When `next` is called on the resulting `BlockingStream`, the caller
/// will be blocked until the next element of the `Stream` becomes available.
/// The default executor for the future is a global `ThreadPool`.
pub fn block_on_stream<S: Stream>(s: S) -> BlockingStream<S> {
Expand All @@ -226,62 +221,55 @@ impl<S: Stream> BlockingStream<S> {
}
}

impl<S: Stream> Iterator for BlockingStream<S> {
type Item = Result<S::Item, S::Error>;
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
type Item = S::Item;
fn next(&mut self) -> Option<Self::Item> {
let s = self.stream.take().expect("BlockingStream shouldn't be empty");
let (item, s) =
match LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone()) {
Ok((Some(item), s)) => (Some(Ok(item)), s),
Ok((None, s)) => (None, s),
Err((e, s)) => (Some(Err(e)), s),
};
LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone());

self.stream = Some(s);
item
}
}

impl Executor for LocalExecutor {
fn spawn(&mut self, f: Box<Future<Item = (), Error = Never> + Send>) -> Result<(), SpawnError> {
self.spawn_task(Task {
fut: f,
map: LocalMap::new(),
})
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(TaskContainer { task });
Ok(())
} else {
Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() })
}
}

fn status(&self) -> Result<(), SpawnError> {
fn status(&self) -> Result<(), SpawnErrorKind> {
if self.incoming.upgrade().is_some() {
Ok(())
} else {
Err(SpawnError::shutdown())
Err(SpawnErrorKind::shutdown())
}
}
}

impl LocalExecutor {
fn spawn_task(&self, task: Task) -> Result<(), SpawnError> {
let incoming = self.incoming.upgrade().ok_or(SpawnError::shutdown())?;
incoming.borrow_mut().push(task);
Ok(())
}

/*
/// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnError>
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnObjError>
where F: Future<Item = (), Error = Never> + 'static
{
self.spawn_task(Task {
fut: Box::new(f),
map: LocalMap::new(),
})
}
*/
}

impl Future for Task {
type Item = ();
type Error = Never;
impl Future for TaskContainer {
type Output = ();

fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
self.fut.poll(&mut cx.with_locals(&mut self.map))
fn poll(mut self: PinMut<Self>, cx: &mut Context) -> Poll<()> {
self.task.poll_unpin(cx)
}
}
Loading