Skip to content

Commit 6af3d54

Browse files
committed
Expand documentation for cooperative
1 parent d0263c6 commit 6af3d54

File tree

2 files changed

+49
-22
lines changed

2 files changed

+49
-22
lines changed

tokio/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ features = [
137137
tokio-test = { version = "0.4.0", path = "../tokio-test" }
138138
tokio-stream = { version = "0.1", path = "../tokio-stream" }
139139
futures = { version = "0.3.0", features = ["async-await"] }
140+
futures-channel = "0.3.0"
141+
futures-util = "0.3.0"
140142
mockall = "0.13.0"
141143
async-stream = "0.3"
142144
futures-concurrency = "7.6.3"

tokio/src/task/coop/mod.rs

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ cfg_coop! {
254254
use std::pin::Pin;
255255
use std::task::{ready, Context, Poll};
256256

257-
/// Value returned by the [`poll_proceed`](poll_proceed) method.
257+
/// Value returned by the [`poll_proceed`] method.
258258
#[derive(Debug)]
259259
#[must_use]
260260
pub struct RestoreOnPending(Cell<Budget>, PhantomData<*mut ()>);
@@ -288,25 +288,25 @@ cfg_coop! {
288288
}
289289
}
290290

291-
/// Decrements the task budget and returns `Poll::Pending` if the budget is depleted.
291+
/// Decrements the task budget and returns [`Poll::Pending`] if the budget is depleted.
292292
/// This indicates that the task should yield to the scheduler. Otherwise, returns
293-
/// `RestoreOnPending` which can be used to commit the budget consumption.
293+
/// [`RestoreOnPending`] which can be used to commit the budget consumption.
294294
///
295-
/// The returned [`RestoreOnPending`](RestoreOnPending) will revert the budget to its former
296-
/// value when dropped unless [`RestoreOnPending::made_progress`](RestoreOnPending::made_progress)
295+
/// The returned [`RestoreOnPending`] will revert the budget to its former
296+
/// value when dropped unless [`RestoreOnPending::made_progress`]
297297
/// is called. It is the caller's responsibility to do so when it _was_ able to
298-
/// make progress after the call to `poll_pending`.
298+
/// make progress after the call to [`poll_proceed`].
299299
/// Restoring the budget automatically ensures the task can try to make progress in some other
300300
/// way.
301301
///
302-
/// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
303-
/// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
304-
/// `RestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
302+
/// Note that [`RestoreOnPending`] restores the budget **as it was before [`poll_proceed`]**.
303+
/// Therefore, if the budget is _further_ adjusted between when [`poll_proceed`] returns and
304+
/// [`RestoreOnPending`] is dropped, those adjustments are erased unless the caller indicates
305305
/// that progress was made.
306306
///
307307
/// # Examples
308308
///
309-
/// This example shows a simple countdown latch that uses `poll_proceed` to participate in
309+
/// This example shows a simple countdown latch that uses [`poll_proceed`] to participate in
310310
/// cooperative scheduling.
311311
///
312312
/// ```
@@ -336,7 +336,7 @@ cfg_coop! {
336336
///
337337
/// self.counter -= 1;
338338
/// if self.counter == 0 {
339-
/// if let Some(w) = std::mem::replace(&mut self.waker, None) {
339+
/// if let Some(w) = self.waker.take() {
340340
/// w.wake();
341341
/// }
342342
/// }
@@ -358,7 +358,7 @@ cfg_coop! {
358358
///
359359
/// // Next we check if the latch is ready to release its value
360360
/// if this.counter == 0 {
361-
/// let t = std::mem::replace(&mut this.value, None);
361+
/// let t = this.value.take();
362362
/// // The latch made progress so call `made_progress` to ensure the budget
363363
/// // is not reverted.
364364
/// coop.made_progress();
@@ -466,13 +466,7 @@ cfg_coop! {
466466
}
467467

468468
pin_project! {
469-
/// Future wrapper to ensure cooperative scheduling.
470-
///
471-
/// When being polled `poll_proceed` is called before the inner future is polled to check
472-
/// if the inner future has exceeded its budget. If the inner future resolves, this will
473-
/// automatically call `RestoreOnPending::made_progress` before resolving this future with
474-
/// the result of the inner one. If polling the inner future is pending, polling this future
475-
/// type will also return a `Poll::Pending`.
469+
/// Future wrapper to ensure cooperative scheduling created by [`cooperative`].
476470
#[must_use = "futures do nothing unless polled"]
477471
pub struct Coop<F: Future> {
478472
#[pin]
@@ -495,9 +489,40 @@ cfg_coop! {
495489
}
496490
}
497491

498-
/// Run a future with a budget constraint for cooperative scheduling.
499-
/// If the future exceeds its budget while being polled, control is yielded back to the
500-
/// runtime.
492+
/// Creates a wrapper future that makes the inner future cooperate with the Tokio scheduler.
493+
///
494+
/// When polled, the wrapper will first call [`poll_proceed`] to consume task budget, and
495+
/// immediately yield if the budget has been depleted. If budget was available, the inner future
496+
/// is polled. The budget consumption will be made final using [`RestoreOnPending::made_progress`]
497+
/// if the inner future resolves to its final value.
498+
///
499+
/// # Examples
500+
///
501+
/// When you call `recv` on the `Receiver` of a [`tokio::sync::mpsc`](crate::sync::mpsc)
502+
/// channel, task budget will automatically be consumed when the next value is returned.
503+
/// This makes tasks that use Tokio mpsc channels automatically cooperative.
504+
///
505+
/// If you're using `futures::channel::mpsc` instead, automatic task budget consumption will
506+
/// not happen. This example shows how can use `cooperative` to make `futures::channel::mpsc`
507+
/// channels cooperate with the scheduler in the same way Tokio channels do.
508+
///
509+
/// ```
510+
/// use tokio::task::coop::cooperative;
511+
/// use futures_channel::mpsc::Receiver;
512+
/// use futures_util::stream::StreamExt;
513+
///
514+
/// async fn receive_next<T>(receiver: &mut Receiver<T>) -> Option<T>
515+
/// where
516+
/// T: Unpin,
517+
/// {
518+
/// // Use `StreamExt::next` to obtain a `Future` that resolves to the next value
519+
/// let recv_future = receiver.next();
520+
/// // Wrap it an a cooperative wrapper
521+
/// let coop_future = cooperative(recv_future);
522+
/// // And await
523+
/// coop_future.await
524+
/// }
525+
501526
#[inline]
502527
pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
503528
Coop { fut }

0 commit comments

Comments
 (0)