Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions tokio/src/task/coop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,16 @@ cfg_coop! {
use std::pin::Pin;
use std::task::{ready, Context, Poll};

/// Value returned by the [`poll_proceed`](poll_proceed) method.
#[derive(Debug)]
#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);
pub struct RestoreOnPending(Cell<Budget>);

impl RestoreOnPending {
pub(crate) fn made_progress(&self) {
/// Signals that the task that obtained this `RestoreOnPending` was able to make
/// progress. This prevents the task budget from being restored to the value
/// it had prior to obtaining this instance when it is dropped.
pub fn made_progress(&self) {
self.0.set(Budget::unconstrained());
}
}
Expand All @@ -275,20 +280,90 @@ cfg_coop! {
}
}

/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
/// Decrements the task budget and returns `Poll::Pending` if the budget is depleted.
/// This indicates that the task should yield to the scheduler. Otherwise, returns
/// `RestoreOnPending` which can be used to commit the budget consumption.
///
/// When you call this method, the current budget is decremented. However, to ensure that
/// progress is made every time a task is polled, the budget is automatically restored to its
/// former value if the returned `RestoreOnPending` is dropped. It is the caller's
/// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
/// that the budget empties appropriately.
/// The returned [`RestoreOnPending`](RestoreOnPending) will revert the budget to its former
/// value when dropped unless [`RestoreOnPending::made_progress`](RestoreOnPending::made_progress)
/// is called. It is the caller's responsibility to do so when it _was_ able to
/// make progress after the call to `poll_pending`.
/// Restoring the budget automatically ensures the task can try to make progress in some other
/// way.
///
/// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
/// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
/// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
/// `RestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
/// that progress was made.
///
/// # Examples
///
/// This example shows a simple countdown latch that uses `poll_proceed` to participate in
/// cooperative scheduling.
///
/// ```
/// use std::future::{Future};
/// use std::pin::Pin;
/// use std::task::{ready, Context, Poll, Waker};
/// use tokio::task::coop;
///
/// struct CountdownLatch<T> {
/// counter: usize,
/// value: Option<T>,
/// waker: Option<Waker>
/// }
///
/// impl<T> CountdownLatch<T> {
/// fn new(value: T, count: usize) -> Self {
/// CountdownLatch {
/// counter: count,
/// value: Some(value),
/// waker: None
/// }
/// }
/// fn count_down(&mut self) {
/// if self.counter <= 0 {
/// return;
/// }
///
/// self.counter -= 1;
/// if self.counter == 0 {
/// if let Some(w) = std::mem::replace(&mut self.waker, None) {
/// w.wake();
/// }
/// }
/// }
/// }
///
/// impl<T> Future for CountdownLatch<T> {
/// type Output = T;
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// // `poll_proceed` checks with the runtime if this task is still allowed to proceed
/// // with performing work.
/// // If not, `Pending` is returned and `ready!` ensures this function returns.
/// // If we are allowed to proceed, coop now represents the budget consumption
/// let coop = ready!(coop::poll_proceed(cx));
///
/// // Next we check if the latch is ready to release its value
/// if self.counter == 0 {
/// let t = std::mem::replace(&mut self.value, None);
/// // The latch made progress so call `made_progress` to ensure the budget
/// // is not reverted.
/// coop.made_progress();
/// Poll::Ready(t.unwrap())
/// } else {
/// // If the latch is not ready so return pending and simply drop `coop`.
/// // This will restore the budget making it available again to perform any
/// // other work.
/// self.waker = Some(cx.waker().clone());
/// Poll::Pending
/// }
/// }
/// }
/// ```
#[inline]
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
context::budget(|cell| {
let mut budget = cell.get();

Expand Down Expand Up @@ -388,7 +463,7 @@ cfg_coop! {
/// the result of the inner one. If polling the inner future is pending, polling this future
/// type will also return a `Poll::Pending`.
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Coop<F: Future> {
pub struct Coop<F: Future> {
#[pin]
pub(crate) fut: F,
}
Expand All @@ -413,7 +488,7 @@ cfg_coop! {
/// If the future exceeds its budget while being polled, control is yielded back to the
/// runtime.
#[inline]
pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
Coop { fut }
}
}
Expand Down
1 change: 1 addition & 0 deletions tokio/tests/async_send_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ assert_value!(tokio::task::JoinSet<NN>: !Send & !Sync & Unpin);
assert_value!(tokio::task::JoinSet<YN>: Send & Sync & Unpin);
assert_value!(tokio::task::JoinSet<YY>: Send & Sync & Unpin);
assert_value!(tokio::task::LocalSet: !Send & !Sync & Unpin);
assert_value!(tokio::task::coop::RestoreOnPending: Send & !Sync & Unpin);
async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::sync::Mutex<NN>::lock(_): !Send & !Sync & !Unpin);
async_assert_fn!(tokio::sync::Mutex<NN>::lock_owned(_): !Send & !Sync & !Unpin);
Expand Down
Loading