Skip to content

Commit 1041083

Browse files
authored
Merge pull request rust-lang#134 from Stebalien/unordered
[RFC] Add an unordered buffer stream adapter.
2 parents fc7483d + 6ea5139 commit 1041083

3 files changed

Lines changed: 144 additions & 0 deletions

File tree

src/stream/buffer_unordered.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use std::prelude::v1::*;
2+
3+
use {Async, IntoFuture, Poll, Future};
4+
use stream::{Stream, Fuse};
5+
6+
/// An adaptor for a stream of futures to execute the futures concurrently, if
7+
/// possible, delivering results as they become available.
8+
///
9+
/// This adaptor will buffer up a list of pending futures, and then return their
10+
/// results in the order that they complete. This is created by the
11+
/// `Stream::buffer_unordered` method.
12+
#[must_use = "streams do nothing unless polled"]
13+
pub struct BufferUnordered<S>
14+
where S: Stream,
15+
S::Item: IntoFuture,
16+
{
17+
stream: Fuse<S>,
18+
futures: Vec<Option<<S::Item as IntoFuture>::Future>>,
19+
cur: usize,
20+
}
21+
22+
pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
23+
where S: Stream,
24+
S::Item: IntoFuture<Error=<S as Stream>::Error>,
25+
{
26+
BufferUnordered {
27+
stream: super::fuse::new(s),
28+
futures: (0..amt).map(|_| None).collect(),
29+
cur: 0,
30+
}
31+
}
32+
33+
impl<S> Stream for BufferUnordered<S>
34+
where S: Stream,
35+
S::Item: IntoFuture<Error=<S as Stream>::Error>,
36+
{
37+
type Item = <S::Item as IntoFuture>::Item;
38+
type Error = <S as Stream>::Error;
39+
40+
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
41+
// First, try to fill in all the futures
42+
for future in &mut self.futures {
43+
if future.is_none() {
44+
match try!(self.stream.poll()) {
45+
Async::Ready(Some(s)) => {
46+
*future = Some(s.into_future());
47+
}
48+
Async::Ready(None) => break,
49+
Async::NotReady => break,
50+
}
51+
}
52+
}
53+
54+
// Next, try and step futures forward until we find a ready one.
55+
// Always start at `cur` for fairness.
56+
let mut waiting = false;
57+
for i in 0..self.futures.len() {
58+
let mut idx = self.cur + i;
59+
if idx >= self.futures.len() {
60+
idx -= self.futures.len();
61+
}
62+
let future = &mut self.futures[idx];
63+
let result = match *future {
64+
Some(ref mut s) => match s.poll() {
65+
Ok(Async::NotReady) => {
66+
waiting = true;
67+
continue
68+
},
69+
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
70+
Err(e) => Err(e),
71+
},
72+
None => continue,
73+
};
74+
self.cur = i + 1;
75+
*future = None;
76+
return result;
77+
}
78+
79+
Ok(if waiting || !self.stream.is_done() {
80+
Async::NotReady
81+
} else {
82+
Async::Ready(None)
83+
})
84+
}
85+
}

src/stream/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ pub use self::peek::Peekable;
5757

5858
if_std! {
5959
mod buffered;
60+
mod buffer_unordered;
6061
mod channel;
6162
mod collect;
6263
mod wait;
6364
pub use self::buffered::Buffered;
65+
pub use self::buffer_unordered::BufferUnordered;
6466
pub use self::channel::{channel, Sender, Receiver, FutureSender};
6567
pub use self::collect::Collect;
6668
pub use self::wait::Wait;
@@ -650,6 +652,24 @@ pub trait Stream {
650652
buffered::new(self, amt)
651653
}
652654

655+
/// An adaptor for creating a buffered list of pending futures (unordered).
656+
///
657+
/// If this stream's item can be converted into a future, then this adaptor
658+
/// will buffer up to `amt` futures and then return results in the order
659+
/// in which they complete. No more than `amt` futures will be buffered at
660+
/// any point in time, and less than `amt` may also be buffered depending on
661+
/// the state of each future.
662+
///
663+
/// The returned stream will be a stream of each future's result, with
664+
/// errors passed through whenever they occur.
665+
#[cfg(feature = "use_std")]
666+
fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
667+
where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
668+
Self: Sized
669+
{
670+
buffer_unordered::new(self, amt)
671+
}
672+
653673
/// An adapter for merging the output of two streams.
654674
///
655675
/// The merged stream produces items from one or both of the underlying

tests/stream.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,45 @@ fn buffered() {
155155
assert_eq!(rx.next(), None);
156156
}
157157

158+
#[test]
159+
fn unordered() {
160+
let (tx, rx) = channel::<_, u32>();
161+
let (a, b) = oneshot::<u32>();
162+
let (c, d) = oneshot::<u32>();
163+
164+
tx.send(Ok(b.map_err(|_| 2).boxed()))
165+
.and_then(|tx| tx.send(Ok(d.map_err(|_| 4).boxed())))
166+
.forget();
167+
168+
let mut rx = rx.buffer_unordered(2);
169+
sassert_empty(&mut rx);
170+
let mut rx = rx.wait();
171+
c.complete(3);
172+
assert_eq!(rx.next(), Some(Ok(3)));
173+
a.complete(5);
174+
assert_eq!(rx.next(), Some(Ok(5)));
175+
assert_eq!(rx.next(), None);
176+
177+
let (tx, rx) = channel::<_, u32>();
178+
let (a, b) = oneshot::<u32>();
179+
let (c, d) = oneshot::<u32>();
180+
181+
tx.send(Ok(b.map_err(|_| 2).boxed()))
182+
.and_then(|tx| tx.send(Ok(d.map_err(|_| 4).boxed())))
183+
.forget();
184+
185+
// We don't even get to see `c` until `a` completes.
186+
let mut rx = rx.buffer_unordered(1);
187+
sassert_empty(&mut rx);
188+
c.complete(3);
189+
sassert_empty(&mut rx);
190+
a.complete(5);
191+
let mut rx = rx.wait();
192+
assert_eq!(rx.next(), Some(Ok(5)));
193+
assert_eq!(rx.next(), Some(Ok(3)));
194+
assert_eq!(rx.next(), None);
195+
}
196+
158197
#[test]
159198
fn zip() {
160199
assert_done(|| list().zip(list()).collect(),

0 commit comments

Comments
 (0)