Skip to content

Commit bd2c05e

Browse files
committed
Address review comments.
1 parent c51dc6f commit bd2c05e

File tree

2 files changed

+46
-53
lines changed

2 files changed

+46
-53
lines changed

src/lib.rs

Lines changed: 31 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ mod reactor;
8585

8686
pub use driver::block_on;
8787

88+
/// Use Duration::MAX once duration_constants are stabilized.
89+
fn duration_max() -> Duration {
90+
Duration::new(u64::MAX, 1_000_000_000 - 1)
91+
}
92+
8893
/// A future that expires at a point in time.
8994
///
9095
/// Timers are futures that output the [`Instant`] at which they fired.
@@ -129,7 +134,7 @@ pub struct Timer {
129134
when: Instant,
130135

131136
/// The period.
132-
period: Option<Duration>,
137+
period: Duration,
133138
}
134139

135140
impl Timer {
@@ -164,11 +169,8 @@ impl Timer {
164169
/// # });
165170
/// ```
166171
pub fn at(instant: Instant) -> Timer {
167-
Timer {
168-
id_and_waker: None,
169-
when: instant,
170-
period: None,
171-
}
172+
// Use Duration::MAX once duration_constants are stabilized.
173+
Timer::interval_at(instant, duration_max())
172174
}
173175

174176
/// Sets the timer to expire after the new duration of time.
@@ -264,7 +266,7 @@ impl Timer {
264266
Timer {
265267
id_and_waker: None,
266268
when: start,
267-
period: Some(period),
269+
period: period,
268270
}
269271
}
270272
}
@@ -281,14 +283,33 @@ impl Drop for Timer {
281283
impl Future for Timer {
282284
type Output = Instant;
283285

284-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
286+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287+
match self.poll_next(cx) {
288+
Poll::Ready(Some(when)) => Poll::Ready(when),
289+
Poll::Pending => Poll::Pending,
290+
Poll::Ready(None) => unreachable!(),
291+
}
292+
}
293+
}
294+
295+
impl Stream for Timer {
296+
type Item = Instant;
297+
298+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285299
// Check if the timer has already fired.
286300
if Instant::now() >= self.when {
287301
if let Some((id, _)) = self.id_and_waker.take() {
288302
// Deregister the timer from the reactor.
289303
Reactor::get().remove_timer(self.when, id);
290304
}
291-
Poll::Ready(self.when)
305+
let when = self.when;
306+
if let Some(next) = when.checked_add(self.period) {
307+
self.when = next;
308+
// Register the timer in the reactor.
309+
let id = Reactor::get().insert_timer(self.when, cx.waker());
310+
self.id_and_waker = Some((id, cx.waker().clone()));
311+
}
312+
return Poll::Ready(Some(when));
292313
} else {
293314
match &self.id_and_waker {
294315
None => {
@@ -306,50 +327,8 @@ impl Future for Timer {
306327
}
307328
Some(_) => {}
308329
}
309-
Poll::Pending
310-
}
311-
}
312-
}
313-
314-
impl Stream for Timer {
315-
type Item = Instant;
316-
317-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
318-
if let Some(period) = self.period {
319-
// Check if the timer has already fired.
320-
if Instant::now() >= self.when {
321-
if let Some((id, _)) = self.id_and_waker.take() {
322-
// Deregister the timer from the reactor.
323-
Reactor::get().remove_timer(self.when, id);
324-
}
325-
let when = self.when;
326-
self.when += period;
327-
// Register the timer in the reactor.
328-
let id = Reactor::get().insert_timer(self.when, cx.waker());
329-
self.id_and_waker = Some((id, cx.waker().clone()));
330-
return Poll::Ready(Some(when));
331-
} else {
332-
match &self.id_and_waker {
333-
None => {
334-
// Register the timer in the reactor.
335-
let id = Reactor::get().insert_timer(self.when, cx.waker());
336-
self.id_and_waker = Some((id, cx.waker().clone()));
337-
}
338-
Some((id, w)) if !w.will_wake(cx.waker()) => {
339-
// Deregister the timer from the reactor to remove the old waker.
340-
Reactor::get().remove_timer(self.when, *id);
341-
342-
// Register the timer in the reactor with the new waker.
343-
let id = Reactor::get().insert_timer(self.when, cx.waker());
344-
self.id_and_waker = Some((id, cx.waker().clone()));
345-
}
346-
Some(_) => {}
347-
}
348-
}
349-
Poll::Pending
350-
} else {
351-
Poll::Ready(None)
352330
}
331+
Poll::Pending
353332
}
354333
}
355334

tests/timer.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::thread;
55
use std::time::{Duration, Instant};
66

77
use async_io::Timer;
8-
use futures_lite::{future, FutureExt};
8+
use futures_lite::{future, FutureExt, StreamExt};
99

1010
fn spawn<T: Send + 'static>(
1111
f: impl Future<Output = T> + Send + 'static,
@@ -30,6 +30,20 @@ fn smoke() {
3030
});
3131
}
3232

33+
#[test]
34+
fn interval() {
35+
future::block_on(async {
36+
let start = Instant::now();
37+
let mut timer = Timer::interval(Duration::from_secs(1));
38+
// first tick is immediate
39+
timer.next().await;
40+
timer.next().await;
41+
assert!(start.elapsed() >= Duration::from_secs(1));
42+
timer.next().await;
43+
assert!(start.elapsed() >= Duration::from_secs(2));
44+
});
45+
}
46+
3347
#[test]
3448
fn poll_across_tasks() {
3549
future::block_on(async {

0 commit comments

Comments
 (0)