Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions tokio/src/task/coop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,16 @@ cfg_coop! {
use std::pin::Pin;
use std::task::{ready, Context, Poll};

/// Value returned by the [`poll_proceed`](poll_proceed) method.
#[derive(Debug)]
#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);
pub struct RestoreOnPending(Cell<Budget>);

impl RestoreOnPending {
pub(crate) fn made_progress(&self) {
/// Signals that the task that obtained this `RestoreOnPending` was able to make
/// progress. This prevents the task budget from being restored to the value
/// it had prior to obtaining this instance when it is dropped.
pub fn made_progress(&self) {
self.0.set(Budget::unconstrained());
}
}
Expand All @@ -275,20 +280,63 @@ cfg_coop! {
}
}

/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
/// Decrements the task budget and returns `Poll::Pending` if the budget is depleted.
/// This indicates that the task should yield to the scheduler. Otherwise, returns
/// `RestoreOnPending` which can be used to commit the budget consumption.
///
/// When you call this method, the current budget is decremented. However, to ensure that
/// progress is made every time a task is polled, the budget is automatically restored to its
/// former value if the returned `RestoreOnPending` is dropped. It is the caller's
/// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
/// that the budget empties appropriately.
/// The returned [`RestoreOnPending`](RestoreOnPending) will revert the budget to its former
/// value when dropped unless [`RestoreOnPending::made_progress`](RestoreOnPending::made_progress)
/// is called. It is the caller's responsibility to do so when it _was_ able to
/// make progress after the call to `poll_pending`.
/// Restoring the budget automatically ensures the task can try to make progress in some other
/// way.
///
/// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
/// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
/// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
/// `RestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
/// that progress was made.
///
/// # Examples
///
/// This example shows a `Future` implementation that uses `poll_proceed` to participate in
/// cooperative scheduling. If the work to be done consists of polling another future, consider
/// using the higher-level [`cooperative`](cooperative) function instead.
///
/// ```ignore
/// use std::future::Future;
/// use std::pin::Pin;
/// use std::task::{ready, Context, Poll};
///
/// impl<T> Future for CooperativeFuture<T> {
/// type Output = T;
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// // `poll_proceed` checks with the runtime if this task is still allowed to proceed
/// // with performing work.
/// // If not, `Pending` is returned and `ready!` ensure this
/// // function returns.
/// // If we are allowed to proceed, coop now represents the budget consumption
/// let coop = ready!(tokio::task::coop::poll_proceed(cx));
///
/// // Here we try to do some work
/// // If for some reason that's not possible and we exit the function with `Pending`
/// // the drop of `coop` will ensure the consumed budget is restored freeing it up
/// // for use elsewhere.
/// let ret : Poll<T> = ...;
///
/// // If the value we will return is `Ready(_)` the task did perform some actual work
/// // and the task budget consumption needs to be committed. We do that by calling
/// // `made_progress`. It is essential that this call is made because otherwise the
/// // the budget will be restored and the task may keep running indefinitely.
/// if ret.is_ready() {
/// coop.made_progress();
/// }
/// ret
/// }
/// }
/// ```
#[inline]
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
context::budget(|cell| {
let mut budget = cell.get();

Expand Down Expand Up @@ -388,7 +436,7 @@ cfg_coop! {
/// the result of the inner one. If polling the inner future is pending, polling this future
/// type will also return a `Poll::Pending`.
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Coop<F: Future> {
pub struct Coop<F: Future> {
#[pin]
pub(crate) fut: F,
}
Expand All @@ -413,7 +461,7 @@ cfg_coop! {
/// If the future exceeds its budget while being polled, control is yielded back to the
/// runtime.
#[inline]
pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
Coop { fut }
}
}
Expand Down