Skip to content

Commit e52f14c

Browse files
committed
Guard against thread::park during poll
The previous implementation would lead to `Future::wait` deadlocking if `thread::park` was called during a `poll`. Instead let's keep track of notifications send to ourself, and don't actually call `thread::park` if we get a notification. Closes rust-lang#132
1 parent 1041083 commit e52f14c

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

src/task/mod.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::prelude::v1::*;
3232
use std::cell::Cell;
3333
use std::fmt;
3434
use std::sync::Arc;
35-
use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
35+
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT};
3636
use std::thread;
3737

3838
use {BoxFuture, Poll, Future, Async};
@@ -224,10 +224,10 @@ impl<F: Future> Spawn<F> {
224224
/// to complete. When a future cannot make progress it will use
225225
/// `thread::park` to block the current thread.
226226
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
227-
let unpark = Arc::new(ThreadUnpark(thread::current()));
227+
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
228228
loop {
229229
match try!(self.poll_future(unpark.clone())) {
230-
Async::NotReady => thread::park(),
230+
Async::NotReady => unpark.park(),
231231
Async::Ready(e) => return Ok(e),
232232
}
233233
}
@@ -279,10 +279,10 @@ impl<S: Stream> Spawn<S> {
279279
/// Like `wait_future`, except only waits for the next element to arrive on
280280
/// the underlying stream.
281281
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
282-
let unpark = Arc::new(ThreadUnpark(thread::current()));
282+
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
283283
loop {
284284
match self.poll_stream(unpark.clone()) {
285-
Ok(Async::NotReady) => thread::park(),
285+
Ok(Async::NotReady) => unpark.park(),
286286
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
287287
Ok(Async::Ready(None)) => return None,
288288
Err(e) => return Some(Err(e)),
@@ -331,11 +331,30 @@ pub trait Executor: Send + Sync + 'static {
331331
fn execute(&self, r: Run);
332332
}
333333

334-
struct ThreadUnpark(thread::Thread);
334+
struct ThreadUnpark {
335+
thread: thread::Thread,
336+
ready: AtomicBool,
337+
}
338+
339+
impl ThreadUnpark {
340+
fn new(thread: thread::Thread) -> ThreadUnpark {
341+
ThreadUnpark {
342+
thread: thread,
343+
ready: AtomicBool::new(false),
344+
}
345+
}
346+
347+
fn park(&self) {
348+
if !self.ready.swap(false, Ordering::SeqCst) {
349+
thread::park();
350+
}
351+
}
352+
}
335353

336354
impl Unpark for ThreadUnpark {
337355
fn unpark(&self) {
338-
self.0.unpark()
356+
self.ready.store(true, Ordering::SeqCst);
357+
self.thread.unpark()
339358
}
340359
}
341360

0 commit comments

Comments
 (0)