Skip to content

Commit 9adb0f9

Browse files
authored
Track whether there are any async tasks in a store (#13246)
* Track whether there are any async tasks in a store This commit adds a new API to `Accessor` to learn whether there are any async tasks that are "interesting" in a store. This is borne out of many discussions over the past few weeks amongst a number of folks where the basic problem is that for WASIp3 HTTP guests there's no way right now to send a signal of "keep me alive after I send my response" because that's post-return-style work. In WASIp2 this was modeled where the outparam that was passed in was set, and then work was done before returning. With WASIp3, however, a response is sent by returning which means that this is no longer possible. The general idea is that Wasmtime will now consider all tasks which have not yet exited as "interesting". These tasks in theory mean that the store should be kept alive as the guest still has work to complete. Note that tasks first return, and then exit, and they're still considered interesting after returning before they exit. In the future it's expected that the component model might have an ABI option to say "this task, by default, isn't interesting" with an opt-in method at runtime of saying "ok wait but yes this is interesting". In this manner it's expected that not all tasks for all of time will be considered interesting, only those that are intentionally opted-in to being interesting (and today that just so happens to be all of them). The API itself on `Accessor` is a `poll_*`-style function which enables both checking to see if there are no more tasks remaining as well as registering a callback to get notified when there are actually no more tasks remaining. In this manner HTTP servers can await this as one of the events that concludes when a guest is finished processing. The implementation of this API required some refactoring of `concurrent.rs`. Namely I wanted to have a relatively "narrow waist" through which increments/decrements of this internal counter happened to ensure that nothing was forgot. This is a bit non-trivial because there are a number of situations which means that a task is "exited", such as: * A synchronously lifted function returns. * An asynchronously lifted function returns `CALLBACK_CODE_EXIT`. * An asynchronously lifted function is stuck in the `STARTING` state, and then gets cancelled. * A task is running, is then cancelled, acknowledges the cancellation, and then exits. * A task with threads exits without actually returning, and then one of its threads returns for it. For now the logic of "are interesting tasks done" is now hooked into `cleanup_thread`. This will likely require adjusting in the future for threads because in this last bullet the `task.return` invocation is the point where all conditions are met (main thread exited, task returned) as opposed to when the thread itself exits. For now though this is intended to be as close an approximation as possible. Throughout this implementation I've performed a number of refactorings to reduce duplication, shuffle where methods are implemented, reduce some verbosity, etc. There are debug assertions in place for if this predicate ever goes wrong, which is intended to assist during development. * Include instances in the graceful shutdown of `wasmtime serve` Now that their lifetime is extended they need to be included in this calculation.
1 parent 1551750 commit 9adb0f9

10 files changed

Lines changed: 566 additions & 313 deletions

File tree

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response};
2+
use test_programs::p3::{service, wit_future};
3+
4+
struct T;
5+
6+
service::export!(T);
7+
8+
impl service::exports::wasi::http::handler::Guest for T {
9+
async fn handle(_request: Request) -> Result<Response, ErrorCode> {
10+
let (body_result_tx, body_result_rx) = wit_future::new(|| Ok(None));
11+
let (response, _future_result) = Response::new(Fields::new(), None, body_result_rx);
12+
drop(body_result_tx);
13+
14+
wit_bindgen::spawn(async move {
15+
for _ in 0..10 {
16+
wit_bindgen::yield_async().await;
17+
}
18+
println!("please see me");
19+
});
20+
Ok(response)
21+
}
22+
}
23+
24+
fn main() {
25+
unreachable!()
26+
}

crates/wasi-http/src/handler.rs

Lines changed: 149 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
#[cfg(feature = "p3")]
55
use crate::p3;
6-
use futures::stream::{FuturesUnordered, StreamExt};
6+
use futures::stream::{FuturesUnordered, Stream};
77
use std::collections::VecDeque;
88
use std::collections::btree_map::{BTreeMap, Entry};
99
use std::future;
@@ -350,101 +350,166 @@ where
350350
accept_task(task, &mut futures, &mut reuse_count);
351351
}
352352

353+
// This is the main driver loop for this worker. This is modeled as
354+
// a `poll_fn` which internally loops around the possible events.
355+
// Events are sourced from the locals here, pinned outside of the
356+
// `poll_fn` closure.
357+
let mut futures = pin!(futures);
358+
let mut idle_timeout_set = false;
359+
let mut idle_timeout = pin!(tokio::time::sleep(Duration::MAX));
353360
let handler = self.handler.clone();
354-
while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
355-
let new_task = {
356-
let future_count = futures.len();
357-
let mut next_future = pin!(async {
358-
if futures.is_empty() {
359-
future::pending().await
360-
} else {
361-
futures.next().await.unwrap()
362-
}
363-
});
364-
let mut next_task = pin!(tokio::time::timeout(
365-
if future_count == 0 {
366-
idle_instance_timeout
367-
} else {
368-
Duration::MAX
369-
},
370-
handler.0.task_queue.pop()
371-
));
372-
// Poll any existing tasks, and if they're all `Pending`
373-
// _and_ we haven't reached any reuse limits yet, poll for a
374-
// new task from the queue.
375-
//
376-
// Note the the order of operations here is important. By
377-
// polling `next_future` first, we'll discover any tasks that
378-
// may have timed out, at which point we'll stop accepting
379-
// new tasks altogether (see below for details). This is
380-
// especially important in the case where the task was
381-
// blocked on a synchronous call to a host function which
382-
// has exclusive access to the `Store`; once that call
383-
// finishes, the first think we need to do is time out the
384-
// task. If we were to poll for a new task first, then we'd
385-
// have to wait for _that_ task to finish or time out before
386-
// we could kill the instance.
387-
future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
388-
Poll::Pending => {
389-
// Note that `Pending` here doesn't necessarily mean
390-
// all tasks are blocked on I/O. They might simply
391-
// be waiting for some deferred work to be done by
392-
// the next turn of the
393-
// `StoreContextMut::run_concurrent` event loop.
394-
// Therefore, we check `accept_concurrent` here and
395-
// only advertise we have capacity for another task
396-
// if either we have no tasks at all or all our
397-
// tasks really are blocked on I/O.
398-
self.set_available(
399-
reuse_count < max_instance_reuse_count
400-
&& future_count < max_instance_concurrent_reuse_count
401-
&& (future_count == 0 || accept_concurrent.load(Relaxed)),
402-
);
403-
404-
if self.available {
405-
next_task.as_mut().poll(cx).map(Some)
406-
} else {
407-
Poll::Pending
408-
}
409-
}
410-
Poll::Ready(Ok(start_time)) => {
411-
// Task completed; carry on!
361+
let mut incoming_tasks = pin!(futures::stream::unfold(
362+
&handler.0.task_queue,
363+
|queue| async move {
364+
let task = queue.pop().await;
365+
Some((task, queue))
366+
}
367+
));
368+
future::poll_fn(|cx| {
369+
// See docs about the idle timeout handling at the very bottom
370+
// for what this is doing.
371+
let prev_idle_timeout_set = idle_timeout_set;
372+
idle_timeout_set = false;
373+
374+
loop {
375+
// First, and crucially first , poll `futures` first. This
376+
// way we'll discover any tasks that may have timed out, at
377+
// which point we'll stop accepting new tasks altogether
378+
// (see below for details). This is especially important in
379+
// the case where the task was blocked on a synchronous call
380+
// to a host function which has exclusive access to the
381+
// `Store`; once that call finishes, the first thing we need
382+
// to do is time out the task. If we were to poll for a new
383+
// task first, then we'd have to wait for _that_ task to
384+
// finish or time out before we could kill the instance.
385+
match futures.as_mut().poll_next(cx) {
386+
// Task completed; carry on!
387+
Poll::Ready(Some(Ok(start_time))) => {
412388
if let Some(start_time) = start_time {
413389
task_start_times.lock().unwrap().remove(start_time);
414390
}
415-
Poll::Ready(None)
416391
}
417-
Poll::Ready(Err(_)) => {
418-
// Task timed out; stop accepting new tasks, but
419-
// continue polling until any other, in-progress
420-
// tasks until they have either finished or timed
421-
// out. This effectively kicks off a "graceful
422-
// shutdown" of the worker, allowing any other
423-
// concurrent tasks time to finish before we drop
424-
// the instance.
425-
//
426-
// TODO: We should also send a cancel request to the
427-
// timed-out task to give it a chance to shut down
428-
// gracefully (and delay dropping the instance for a
429-
// reasonable amount of time), but as of this
430-
// writing Wasmtime does not yet provide an API for
431-
// doing that. See issue #11833.
392+
393+
// Task timed out; stop accepting new tasks, but
394+
// continue polling until any other, in-progress tasks
395+
// until they have either finished or timed out. This
396+
// effectively kicks off a "graceful shutdown" of the
397+
// worker, allowing any other concurrent tasks time to
398+
// finish before we drop the instance.
399+
//
400+
// TODO: We should also send a cancel request to the
401+
// timed-out task to give it a chance to shut down
402+
// gracefully (and delay dropping the instance for a
403+
// reasonable amount of time), but as of this writing
404+
// Wasmtime does not yet provide an API for doing that.
405+
// See issue #11833.
406+
Poll::Ready(Some(Err(_))) => {
432407
timed_out = true;
433408
reuse_count = max_instance_reuse_count;
434-
Poll::Ready(None)
435409
}
436-
})
437-
.await
438-
};
439410

440-
match new_task {
441-
Some(Ok(task)) => {
411+
Poll::Ready(None) | Poll::Pending => {}
412+
}
413+
414+
// At this point `futures` is either empty or it's `Pending`
415+
// meaning nothing is ready. Note that `Pending` here
416+
// doesn't necessarily mean all tasks are blocked on I/O.
417+
// They might simply be waiting for some deferred work to be
418+
// done by the next turn of the
419+
// `StoreContextMut::run_concurrent` event loop. Therefore,
420+
// we check `accept_concurrent` here and only advertise we
421+
// have capacity for another task if either we have no tasks
422+
// at all or all our tasks really are blocked on I/O.
423+
self.set_available(
424+
reuse_count < max_instance_reuse_count
425+
&& futures.len() < max_instance_concurrent_reuse_count
426+
&& (futures.is_empty() || accept_concurrent.load(Relaxed)),
427+
);
428+
429+
// If we're available for accepting more requests after the
430+
// deduction above, then try to accept a new task. If that's
431+
// successful then push it into `futures` and turn this loop
432+
// again to see where we're at next time around.
433+
if self.available
434+
&& let Poll::Ready(Some(task)) = incoming_tasks.as_mut().poll_next(cx)
435+
{
442436
accept_task(task, &mut futures, &mut reuse_count);
437+
continue;
438+
}
439+
440+
// If, at this point, we still have some requests that are
441+
// being processed then go ahead and bail out of this
442+
// singular call to `poll` by saying we're not ready yet.
443+
// This means we unconditionally wait for events within
444+
// `futures` and we're also registered, optionally, for
445+
// listening for incoming connections. That's all the events
446+
// we're interested in, so this iteration of `poll` is complete.
447+
if !futures.is_empty() {
448+
break Poll::Pending;
443449
}
444-
Some(Err(_)) => break,
445-
None => {}
450+
451+
// At this point `futures` is empty, and we haven't gotten
452+
// any incoming tasks. Check the store we're using to see if
453+
// there are any "interesting" tasks around. These are tasks
454+
// which act as effectively strong references to this worker
455+
// to keep it running. If there are still interesting tasks,
456+
// then we're done with this iteration of `poll`. We'll get
457+
// woken up when anything changes, but otherwise it's time
458+
// to let something else happen.
459+
//
460+
// This is all skipped if something has timed out though. In
461+
// that situation we're basically no longer interested in
462+
// this store so we're no longer cooperatively trying to let
463+
// it keep going.
464+
if !timed_out && !accessor.poll_no_interesting_tasks(cx).is_ready() {
465+
break Poll::Pending;
466+
}
467+
468+
// And now at this point we (a) have no `futures`, (b) no
469+
// new connections came in, and (c) the store is completely
470+
// devoid of interesting work. In this situation if we're
471+
// not actually capable of accepting any more work, then
472+
// we're completely done and it's time to exit this worker.
473+
if !self.available {
474+
break Poll::Ready(());
475+
}
476+
477+
// And now, finally, we wait for a timeout. Here we're just
478+
// like above except that we're candidate for accepting more
479+
// work in the future. If this is our first time here then
480+
// reset the idle timeout to `idle_instance_timeout` from
481+
// now, but othrewise just go take a look at `idle_timeout`
482+
// and see if it's elapsed yet.
483+
//
484+
// Note that the way that this entire loop is structured is
485+
// that we've already polled all the interesting sources of
486+
// events we're interested in at this point, for example
487+
// `futures`, `accessor`, and `incoming_tasks`. Here we add
488+
// `idle_timeout` to that set and once anything is ready and
489+
// fires then this entire loop will restart and we'll check
490+
// everything again.
491+
//
492+
// Also note that the idle timeout is supposed to start when
493+
// the store is itself entirely idle. The way this loop is
494+
// structured is that when we entire this `poll` closure the
495+
// `idle_timeout_set` variable is unconditionally set to
496+
// `false`. That way if we exit out for some other reason,
497+
// such as getting work, then the idle timeout will get
498+
// reset next time we fall down here. Otherwise though if we
499+
// fell down this far we actually want to preserve
500+
// `idle_timeout_set` from when we first started, so that's
501+
// restored here.
502+
idle_timeout_set = prev_idle_timeout_set;
503+
if !idle_timeout_set {
504+
idle_timeout
505+
.as_mut()
506+
.reset(tokio::time::Instant::now() + idle_instance_timeout);
507+
idle_timeout_set = true;
508+
}
509+
break idle_timeout.as_mut().poll(cx);
446510
}
447-
}
511+
})
512+
.await;
448513

449514
accessor.with(|mut access| write_profile(access.as_context_mut()));
450515

0 commit comments

Comments
 (0)