Skip to content

Commit 7011a68

Browse files
authored
time: add StreamExt::chunks_timeout (#4695)
1 parent c728016 commit 7011a68

3 files changed

Lines changed: 226 additions & 0 deletions

File tree

tokio-stream/src/stream_ext.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ cfg_time! {
6161
use tokio::time::Duration;
6262
mod throttle;
6363
use throttle::{throttle, Throttle};
64+
mod chunks_timeout;
65+
use chunks_timeout::ChunksTimeout;
6466
}
6567

6668
/// An extension trait for the [`Stream`] trait that provides a variety of
@@ -1005,6 +1007,62 @@ pub trait StreamExt: Stream {
10051007
{
10061008
throttle(duration, self)
10071009
}
1010+
1011+
/// Batches the items in the given stream using a maximum duration and size for each batch.
1012+
///
1013+
/// This stream returns the next batch of items in the following situations:
1014+
/// 1. The inner stream has returned at least `max_size` many items since the last batch.
1015+
/// 2. The time since the first item of a batch is greater than the given duration.
1016+
/// 3. The end of the stream is reached.
1017+
///
1018+
/// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1019+
/// will not be emitted if no items are received upstream.
1020+
///
1021+
/// # Panics
1022+
///
1023+
/// This function panics if `max_size` is zero
1024+
///
1025+
/// # Example
1026+
///
1027+
/// ```rust
1028+
/// use std::time::Duration;
1029+
/// use tokio::time;
1030+
/// use tokio_stream::{self as stream, StreamExt};
1031+
/// use futures::FutureExt;
1032+
///
1033+
/// #[tokio::main]
1034+
/// # async fn _unused() {}
1035+
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1036+
/// async fn main() {
1037+
/// let iter = vec![1, 2, 3, 4].into_iter();
1038+
/// let stream0 = stream::iter(iter);
1039+
///
1040+
/// let iter = vec![5].into_iter();
1041+
/// let stream1 = stream::iter(iter)
1042+
/// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1043+
///
1044+
/// let chunk_stream = stream0
1045+
/// .chain(stream1)
1046+
/// .chunks_timeout(3, Duration::from_secs(2));
1047+
/// tokio::pin!(chunk_stream);
1048+
///
1049+
/// // a full batch was received
1050+
/// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1051+
/// // deadline was reached before max_size was reached
1052+
/// assert_eq!(chunk_stream.next().await, Some(vec![4]));
1053+
/// // last element in the stream
1054+
/// assert_eq!(chunk_stream.next().await, Some(vec![5]));
1055+
/// }
1056+
/// ```
1057+
#[cfg(feature = "time")]
1058+
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1059+
fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1060+
where
1061+
Self: Sized,
1062+
{
1063+
assert!(max_size > 0, "`max_size` must be non-zero.");
1064+
ChunksTimeout::new(self, max_size, duration)
1065+
}
10081066
}
10091067

10101068
impl<St: ?Sized> StreamExt for St where St: Stream {}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use crate::stream_ext::Fuse;
2+
use crate::Stream;
3+
use tokio::time::{sleep, Instant, Sleep};
4+
5+
use core::future::Future;
6+
use core::pin::Pin;
7+
use core::task::{Context, Poll};
8+
use pin_project_lite::pin_project;
9+
use std::time::Duration;
10+
11+
pin_project! {
12+
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
13+
#[must_use = "streams do nothing unless polled"]
14+
#[derive(Debug)]
15+
pub struct ChunksTimeout<S: Stream> {
16+
#[pin]
17+
stream: Fuse<S>,
18+
#[pin]
19+
deadline: Sleep,
20+
duration: Duration,
21+
items: Vec<S::Item>,
22+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
23+
}
24+
}
25+
26+
impl<S: Stream> ChunksTimeout<S> {
27+
pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28+
ChunksTimeout {
29+
stream: Fuse::new(stream),
30+
deadline: sleep(duration),
31+
duration,
32+
items: Vec::with_capacity(max_size),
33+
cap: max_size,
34+
}
35+
}
36+
}
37+
38+
impl<S: Stream> Stream for ChunksTimeout<S> {
39+
type Item = Vec<S::Item>;
40+
41+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42+
let mut me = self.as_mut().project();
43+
loop {
44+
match me.stream.as_mut().poll_next(cx) {
45+
Poll::Pending => break,
46+
Poll::Ready(Some(item)) => {
47+
if me.items.is_empty() {
48+
me.deadline.as_mut().reset(Instant::now() + *me.duration);
49+
me.items.reserve_exact(*me.cap);
50+
}
51+
me.items.push(item);
52+
if me.items.len() >= *me.cap {
53+
return Poll::Ready(Some(std::mem::take(me.items)));
54+
}
55+
}
56+
Poll::Ready(None) => {
57+
// Returning Some here is only correct because we fuse the inner stream.
58+
let last = if me.items.is_empty() {
59+
None
60+
} else {
61+
Some(std::mem::take(me.items))
62+
};
63+
64+
return Poll::Ready(last);
65+
}
66+
}
67+
}
68+
69+
if !me.items.is_empty() {
70+
ready!(me.deadline.poll(cx));
71+
return Poll::Ready(Some(std::mem::take(me.items)));
72+
}
73+
74+
Poll::Pending
75+
}
76+
77+
fn size_hint(&self) -> (usize, Option<usize>) {
78+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
79+
let (lower, upper) = self.stream.size_hint();
80+
let lower = (lower / self.cap).saturating_add(chunk_len);
81+
let upper = upper.and_then(|x| x.checked_add(chunk_len));
82+
(lower, upper)
83+
}
84+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
3+
4+
use tokio::time;
5+
use tokio_stream::{self as stream, StreamExt};
6+
use tokio_test::assert_pending;
7+
use tokio_test::task;
8+
9+
use futures::FutureExt;
10+
use std::time::Duration;
11+
12+
#[tokio::test(start_paused = true)]
13+
async fn usage() {
14+
let iter = vec![1, 2, 3].into_iter();
15+
let stream0 = stream::iter(iter);
16+
17+
let iter = vec![4].into_iter();
18+
let stream1 =
19+
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
20+
21+
let chunk_stream = stream0
22+
.chain(stream1)
23+
.chunks_timeout(4, Duration::from_secs(2));
24+
25+
let mut chunk_stream = task::spawn(chunk_stream);
26+
27+
assert_pending!(chunk_stream.poll_next());
28+
time::advance(Duration::from_secs(2)).await;
29+
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
30+
31+
assert_pending!(chunk_stream.poll_next());
32+
time::advance(Duration::from_secs(2)).await;
33+
assert_eq!(chunk_stream.next().await, Some(vec![4]));
34+
}
35+
36+
#[tokio::test(start_paused = true)]
37+
async fn full_chunk_with_timeout() {
38+
let iter = vec![1, 2].into_iter();
39+
let stream0 = stream::iter(iter);
40+
41+
let iter = vec![3].into_iter();
42+
let stream1 =
43+
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
44+
45+
let iter = vec![4].into_iter();
46+
let stream2 =
47+
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
48+
49+
let chunk_stream = stream0
50+
.chain(stream1)
51+
.chain(stream2)
52+
.chunks_timeout(3, Duration::from_secs(2));
53+
54+
let mut chunk_stream = task::spawn(chunk_stream);
55+
56+
assert_pending!(chunk_stream.poll_next());
57+
time::advance(Duration::from_secs(2)).await;
58+
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
59+
60+
assert_pending!(chunk_stream.poll_next());
61+
time::advance(Duration::from_secs(2)).await;
62+
assert_eq!(chunk_stream.next().await, Some(vec![4]));
63+
}
64+
65+
#[tokio::test]
66+
#[ignore]
67+
async fn real_time() {
68+
let iter = vec![1, 2, 3, 4].into_iter();
69+
let stream0 = stream::iter(iter);
70+
71+
let iter = vec![5].into_iter();
72+
let stream1 =
73+
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
74+
75+
let chunk_stream = stream0
76+
.chain(stream1)
77+
.chunks_timeout(3, Duration::from_secs(2));
78+
79+
let mut chunk_stream = task::spawn(chunk_stream);
80+
81+
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
82+
assert_eq!(chunk_stream.next().await, Some(vec![4]));
83+
assert_eq!(chunk_stream.next().await, Some(vec![5]));
84+
}

0 commit comments

Comments
 (0)