-
Notifications
You must be signed in to change notification settings - Fork 671
Executors and partly reenable futures crate #1037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Because this PR is so big and requires careful vetting, #1034 should be reviewed and merged first. I will then rebase on top of it after it has landed. |
|
In this recent commit I renamed
Calling both things "task" would be too confusing. Alternative to what I implemented:
I quite like the alternative. What do you think? |
|
Okay here are some updated thoughts on this: What about we rename
Edit: I no longer believe we should do this. |
The plan was originally to add a method to |
No-- the executor inside of |
|
I now think we should stick with calling the top level future "task". (And rename |
|
I rebased and updated it |
futures-executor/src/local_pool.rs
Outdated
| struct Task { | ||
| fut: Box<Future<Item = (), Error = Never>>, | ||
| map: LocalMap, | ||
| struct TaskContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to this that describes what it's for? Something like "wrapper for TaskObj that implements Future with a unit output" would help-- it's less obvious what its purpose is now that it no longer pairs the task with a LocalMap.
N.B. I think that TaskObj should implement Future-- we should add that impl in std. Opened rust-lang/rust#51667
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already made TaskObj implement Future
I‘ll add the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting rid of TaskContainer entirely. We can use TaskObj directly. Can't do that in ThreadPool, but here it's simple
futures-executor/src/local_pool.rs
Outdated
| pub fn run_until<F>(&mut self, mut f: F, exec: &mut Executor) -> Result<F::Item, F::Error> | ||
| where F: Future | ||
| pub fn run_until<F, Exec>(&mut self, mut f: F, exec: &mut Exec) -> F::Output | ||
| where F: Future + Unpin, Exec: Executor + Sized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Unpin bound isn't needed. Can you add this macro to futures-core and use it instead?
futures-executor/src/local_pool.rs
Outdated
| /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over | ||
| /// spawned tasks. | ||
| pub fn block_on<F: Future>(f: F) -> Result<F::Item, F::Error> { | ||
| pub fn block_on<F: Future + Unpin>(f: F) -> F::Output { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm Unpin
futures-executor/src/spawn.rs
Outdated
| type Item = (); | ||
| type Error = Never; | ||
| fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> { | ||
| impl<F: Future<Output = ()> + Unpin + Send + 'static> Future for Spawn<F> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Unpin bound isn't needed.
futures-executor/src/spawn.rs
Outdated
| tx: Option<Sender<T>>, | ||
| keep_running_flag: Arc<AtomicBool>, | ||
| } | ||
| impl<F, T> Unpin for MySender<F, T> {} // ToDo: May I do this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would only add it where F: Unpin, and I would remove the Unpin bound on the Future impl. You will want to use the unsafe_pinned_field macro to create a safe function for accessing PinMut<F>
futures-executor/src/thread_pool.rs
Outdated
| /// Note that the function will return when the provided future completes, | ||
| /// even if some of the tasks it spawned are still running. | ||
| pub fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> { | ||
| pub fn run<F: Future + Unpin>(&mut self, f: F) -> F::Output { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm Unpin
futures-util/src/sink/mod.rs
Outdated
|
|
||
| mod close; | ||
| mod fanout; | ||
| // mod fanout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is fanout commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the tests fail. There is no more iter_ok() for instance. Fixing now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this is a rabbit hole... it requires converting future_util/src/stream/forwards.rs and probably various other things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave fanout, but disable the tests? I'd like to make sure I know what code still needs fixing to work on 0.3 (regardless of tests) and what still needs fixing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Tests are disabled now.
| impl<T: Sync> Sync for FuturesUnordered<T> {} | ||
| unsafe impl<T: Send> Send for FuturesUnordered<T> {} | ||
| unsafe impl<T: Sync> Sync for FuturesUnordered<T> {} | ||
| impl<T: Unpin> Unpin for FuturesUnordered<T> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FuturesUnordered should be Unpin regardless of T.
|
|
||
| impl<T> FuturesUnordered<T> | ||
| where T: Future, | ||
| where T: Future + Unpin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and elsewhere, FuturesUnordered shouldn't require that the future is Unpin, since it's allocating it. Instead it should track usage to ensure the future isn't moved.
| Waker::new(hide_lt(ptr)) | ||
| let ptr: Arc<Node<T>> = handle.0.clone(); | ||
| let ptr: *mut ArcNode<T> = mem::transmute(ptr); | ||
| let ptr = mem::transmute(ptr as *mut UnsafeWake); // Hide lifetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you supply manual type arguments to mem::transmute? It's hard to tell what it's transmuting between here.
|
I've split |
|
@cramertj To make reviewing easier, look at the " |
| // At this point, it may be worth yielding the thread & | ||
| // spinning a few times... but for now, just yield using the | ||
| // task system. | ||
| cx.waker().wake(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can use local_waker().wake() here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| use super::node::Node; | ||
|
|
||
| #[derive(Debug)] | ||
| /// Mutable iterator over all futures in the unordered set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subtle: this is unsound now because it gives mutable access to the futures allowing them to be mem::replaced, although they may have been polled. This should either yield PinMut<F> or iter_mut should be changed to include an F: Unpin bound. Perhaps offer both as iter_mut and iter_pin_mut?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Done.
| let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr); | ||
| let ptr = ptr as *mut UnsafeWake; | ||
| // Hide lifetime | ||
| let ptr = mem::transmute::<*mut UnsafeWake, *mut UnsafeWake>(ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a no-op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It hides the lifetime. Before my PR it was in a function called hide_lt. Otherwise we get:
error[E0310]: the parameter type `T` may not live long enough
--> futures-util/src/stream/futures_unordered/node.rs:96:23
|
91 | impl<'a, T> From<NodeToHandle<'a, T>> for LocalWaker {
| - help: consider adding an explicit lifetime bound `T: 'static`...
...
96 | let ptr = ptr as *mut UnsafeWake;
| ^^^
|
note: ...so that the type `stream::futures_unordered::node::ArcNode<T>` will meet its required lifetime bounds
| unsafe impl<T> Send for ArcNode<T> {} | ||
| unsafe impl<T> Sync for ArcNode<T> {} | ||
|
|
||
| unsafe impl<T> UnsafeWake for ArcNode<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This impl looks like it could be written as an impl of the Wake trait for Node<T>, which would make the From impls above trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't because the UnsafeWake trait is used to hide the lifetime. I've added a bunch of comments in my next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, thanks!
|
|
||
| #[doc(hidden)] | ||
| impl<'a, T> From<NodeToHandle<'a, T>> for LocalWaker { | ||
| fn from(handle: NodeToHandle<'a, T>) -> LocalWaker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the function below are the same modulo the call to either LocalWaker::new or Waker::new-- can you break out the shared bits into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| #[derive(Debug)] | ||
| /// Mutable iterator over all futures in the unordered set. | ||
| pub struct IterPinMut<'a, F: 'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than duplicating the logic between IterPinMut and IterMut, I'd implement IterMut in terms of IterPinMut. The Unpin bound will make it safe to go from PinMut of the yielded value to &mut of the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
| } | ||
|
|
||
| /// Returns an iterator that allows modifying each future in the set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an Unpin bound to the iter_mut method itself? It could be confusing to have iter_mut be callable but the result not be usable as an iterator.
| pub struct IterMut<'a, F: 'a> { | ||
| pub(super) node: *const Node<F>, | ||
| pub(super) len: usize, | ||
| pub(super) _marker: PhantomData<&'a mut FuturesUnordered<F>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an Unpin bound to the IterMut type itself, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| self.len -= 1; | ||
| Some(future) | ||
| } | ||
| PinMut::new(&mut self.0).next().map(|f| unsafe { PinMut::get_mut(f) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the PinMut::new here? You should be able to remove that and have self.0.next().map(|f| unsafe { PinMut::get_mut(f) })
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ^^'
|
@MajorBreakfast Looks great! Thanks for being so patient with all the changes. |
|
🎉 |
This needs careful review. I'm not entirely sure what I did ^^' All I can say is that it builds now and some of the tests run.
Notes:
futures-coresubcrate does not yet reexport fromcoreexecutor.spawn_obj(Box::new(future).into())looks ugly ^^' I think we should introduce anIntoTaskObjtrait. Unless there's going to be a simpler API layered on top of course.#![feature(arbitrary_self_types)]. Can this be done? Didn't seem to work for me. Maybe I just did it the wrong way.Executorto beSized. Should it become the default, so that we don't need to type it everywhere?Taskstructs should probably be renamed to something likeTaskContainer { task_obj, ... }.