diff --git a/benches/call.rs b/benches/call.rs index c2028ad7c5f1..95d9a3379746 100644 --- a/benches/call.rs +++ b/benches/call.rs @@ -134,8 +134,8 @@ fn bench_host_to_wasm( typed_params: Params, typed_results: Results, ) where - Params: WasmParams + ToVals + Copy, - Results: WasmResults + ToVals + Copy + PartialEq + Debug, + Params: WasmParams + ToVals + Copy + Sync, + Results: WasmResults + ToVals + Copy + Sync + PartialEq + Debug + 'static, { // Benchmark the "typed" version, which should be faster than the versions // below. @@ -628,7 +628,8 @@ mod component { + PartialEq + Debug + Send - + Sync, + + Sync + + 'static, { // Benchmark the "typed" version. c.bench_function(&format!("component - host-to-wasm - typed - {name}"), |b| { diff --git a/crates/wasmtime/src/runtime.rs b/crates/wasmtime/src/runtime.rs index 230178ce11b9..bc54da96cb67 100644 --- a/crates/wasmtime/src/runtime.rs +++ b/crates/wasmtime/src/runtime.rs @@ -34,6 +34,8 @@ pub(crate) mod code_memory; #[cfg(feature = "debug-builtins")] pub(crate) mod debug; pub(crate) mod externals; +#[cfg(feature = "async")] +pub(crate) mod fiber; pub(crate) mod gc; pub(crate) mod instance; pub(crate) mod instantiate; diff --git a/crates/wasmtime/src/runtime/component/func.rs b/crates/wasmtime/src/runtime/component/func.rs index 12ce0b10d57d..2d021369216d 100644 --- a/crates/wasmtime/src/runtime/component/func.rs +++ b/crates/wasmtime/src/runtime/component/func.rs @@ -547,7 +547,7 @@ impl Func { let mut store = store.as_context_mut(); assert!( store.0.async_support(), - "cannot use `call_async` without enabling async support in the config" + "cannot use `post_return_async` without enabling async support in the config" ); // Future optimization opportunity: conditionally use a fiber here since // some func's post_return will not need the async context (i.e. end up diff --git a/crates/wasmtime/src/runtime/component/func/typed.rs b/crates/wasmtime/src/runtime/component/func/typed.rs index c378742da236..f87447af7a0e 100644 --- a/crates/wasmtime/src/runtime/component/func/typed.rs +++ b/crates/wasmtime/src/runtime/component/func/typed.rs @@ -179,7 +179,7 @@ where ) -> Result where Params: Send + Sync, - Return: Send + Sync, + Return: Send + Sync + 'static, { let mut store = store.as_context_mut(); assert!( diff --git a/crates/wasmtime/src/runtime/component/linker.rs b/crates/wasmtime/src/runtime/component/linker.rs index 8d32f35ee8d5..30c5fe6b9356 100644 --- a/crates/wasmtime/src/runtime/component/linker.rs +++ b/crates/wasmtime/src/runtime/component/linker.rs @@ -441,10 +441,8 @@ impl LinkerInstance<'_, T> { self.engine.config().async_support, "cannot use `func_wrap_async` without enabling async support in the config" ); - let ff = move |mut store: StoreContextMut<'_, T>, params: Params| -> Result { - let async_cx = store.as_context_mut().0.async_cx().expect("async cx"); - let future = f(store.as_context_mut(), params); - unsafe { async_cx.block_on(Pin::from(future)) }? + let ff = move |store: StoreContextMut<'_, T>, params: Params| -> Result { + store.block_on(|store| f(store, params).into())? }; self.func_wrap(name, ff) } @@ -603,12 +601,10 @@ impl LinkerInstance<'_, T> { self.engine.config().async_support, "cannot use `func_new_async` without enabling async support in the config" ); - let ff = move |mut store: StoreContextMut<'_, T>, params: &[Val], results: &mut [Val]| { - let async_cx = store.as_context_mut().0.async_cx().expect("async cx"); - let future = f(store.as_context_mut(), params, results); - unsafe { async_cx.block_on(Pin::from(future)) }? + let ff = move |store: StoreContextMut<'_, T>, params: &[Val], results: &mut [Val]| { + store.with_blocking(|store, cx| cx.block_on(Pin::from(f(store, params, results)))?) }; - self.func_new(name, ff) + return self.func_new(name, ff); } /// Defines a [`Module`] within this instance. @@ -676,12 +672,8 @@ impl LinkerInstance<'_, T> { let dtor = Arc::new(crate::func::HostFunc::wrap_inner( &self.engine, move |mut cx: crate::Caller<'_, T>, (param,): (u32,)| { - let async_cx = cx.as_context_mut().0.async_cx().expect("async cx"); - let future = dtor(cx.as_context_mut(), param); - match unsafe { async_cx.block_on(Pin::from(future)) } { - Ok(Ok(())) => Ok(()), - Ok(Err(trap)) | Err(trap) => Err(trap), - } + cx.as_context_mut() + .block_on(|store| dtor(store, param).into())? }, )); self.insert(name, Definition::Resource(ty, dtor))?; diff --git a/crates/wasmtime/src/runtime/component/store.rs b/crates/wasmtime/src/runtime/component/store.rs index f33530051924..7889334f11bd 100644 --- a/crates/wasmtime/src/runtime/component/store.rs +++ b/crates/wasmtime/src/runtime/component/store.rs @@ -28,6 +28,13 @@ impl ComponentStoreData { pub fn next_component_instance_id(&self) -> ComponentInstanceId { self.instances.next_key() } + + #[cfg(feature = "component-model-async")] + pub(crate) fn drop_fibers(store: &mut StoreOpaque) { + _ = store; + // This function will actually do something when runtime support for + // `component-model-async` is merged. + } } impl StoreData { diff --git a/crates/wasmtime/src/runtime/fiber.rs b/crates/wasmtime/src/runtime/fiber.rs new file mode 100644 index 000000000000..11d0c41e4173 --- /dev/null +++ b/crates/wasmtime/src/runtime/fiber.rs @@ -0,0 +1,972 @@ +#![deny(unsafe_op_in_unsafe_fn)] + +use crate::prelude::*; +use crate::store::{Executor, StoreId, StoreInner, StoreOpaque}; +use crate::vm::mpk::{self, ProtectionMask}; +use crate::vm::{AsyncWasmCallState, VMStore}; +use crate::{Engine, StoreContextMut}; +use anyhow::{Result, anyhow}; +use core::mem; +use core::ops::Range; +use core::pin::Pin; +use core::ptr::{self, NonNull}; +use core::task::{Context, Poll}; +use wasmtime_fiber::{Fiber, Suspend}; + +type WasmtimeResume = Result>>; +type WasmtimeYield = StoreFiberYield; +type WasmtimeComplete = Result<()>; +type WasmtimeSuspend = Suspend; + +/// State related to asynchronous computations stored within a `Store`. +/// +/// This structure resides inside of a `Store` and is used to manage the +/// various pieces of state associated with asynchronous computations. Chiefly +/// this manages the `WasmtimeSuspend` pointer as well as `&mut Context<'_>` +/// when polling futures. This serves as storage to use these pointers across a +/// WebAssembly function boundary, for example, where the values cannot +/// otherwise be explicitly threaded through. +pub(crate) struct AsyncState { + /// The `Suspend` for the current fiber (or null if no such fiber is + /// running). + /// + /// This pointer is provided by the `wasmtime_fiber` crate when a fiber + /// first starts, but this pointer is unable to be carried through + /// WebAssembly frames for example. This serves as an alternative storage + /// location for the pointer provided by `wasmtime_fiber` within a fiber's + /// execution. + /// + /// This pointer is null when a fiber is not executing, but it is also null + /// when a `BlockingContext` is created. Note that when a fiber is suspended + /// it's always through a `BlockingContext` so this field is null whenever a + /// fiber is suspended as well. Fiber resumption will save the prior value + /// in a store and then set it to null, where suspension will then restore + /// what was previously in the store. + current_suspend: Option>, + + /// The `Context` pointer last provided in `Future for FiberFuture`. + /// + /// Like `current_suspend` above this is an example of a piece of context + /// which needs to be carried over a WebAssembly function frame which + /// otherwise doesn't take this as a parameter. This differs from + /// `current_suspend` though in that it is provided as part of a `Future` + /// poll operation but is "gone" after that poll operation completes. That + /// means that while `current_suspend` is the same for the lifetime of a + /// future this field is always changing. + /// + /// Like `current_suspend` though this is null either when a fiber isn't + /// running or when a `BlockingContext` is created (in which case this is + /// "take"en). That means that this is null on suspension/resumption of a + /// fiber. + /// + /// The value for this pointer is threaded directly through the + /// `WasmtimeResume` type which is how a pointer flows into this field from + /// a future-related poll call. This means that the `BlockingContext` + /// creation may take one value of a pointer here but restore another. That + /// would represent suspending in one call to `Future::poll` and then + /// resuming some time later in a different call to `Future::poll`. + /// + /// # Safety + /// + /// Note that this is a pretty unsafe field for two reasons. One is that + /// it's a raw pointer to a `Context` provided ephemerally to some call to + /// `Future::poll` on the stack. Another reason is that the lifetime + /// parameter of `Context` is unsafely changed to `'static` here which is + /// not correct. The ephemeral nature of this pointer is managed through the + /// take-style operations in `BlockingContext` and the `'static` lifetime is + /// handled by ensuring the signatures that work with `BlockingContext` all + /// use constrained anonymous lifetimes that are guaranteed to be shorter + /// than the original `Context` lifetime. + current_future_cx: Option>>, + + /// The last fiber stack that was in use by the store. + /// + /// We use this to cache and reuse stacks as a performance optimization. + // TODO: With stack switching and the Component Model Async ABI, there may + // be multiple concurrent fibers in play; consider caching more than one + // stack at a time and making the number tunable via `Config`. + last_fiber_stack: Option, +} + +// SAFETY: it's known that `std::task::Context` is neither `Send` nor `Sync`, +// but despite this the storage here is purely temporary in getting these +// pointers across function frames. The actual types are not sent across threads +// as when a store isn't polling anything the pointer values are all set to +// `None`. Thus if a store is being sent across threads that's done because no +// fibers are active, and once fibers are active everything will stick within +// the same thread. +unsafe impl Send for AsyncState {} +unsafe impl Sync for AsyncState {} + +impl Default for AsyncState { + fn default() -> Self { + Self { + current_suspend: None, + current_future_cx: None, + last_fiber_stack: None, + } + } +} + +impl AsyncState { + pub(crate) fn last_fiber_stack(&mut self) -> &mut Option { + &mut self.last_fiber_stack + } +} + +trait AsStoreOpaque { + fn as_store_opaque(&mut self) -> &mut StoreOpaque; +} + +impl AsStoreOpaque for StoreOpaque { + fn as_store_opaque(&mut self) -> &mut StoreOpaque { + self + } +} + +impl AsStoreOpaque for StoreInner { + fn as_store_opaque(&mut self) -> &mut StoreOpaque { + self + } +} + +/// A helper structure used to block a fiber. +/// +/// This is acquired via either `StoreContextMut::with_blocking` or +/// `StoreOpaque::with_blocking`. This structure represents the "taken" state of +/// pointers from a store's `AsyncState`, then modeling them as safe pointers. +/// +/// Note that the lifetimes here are carefully controlled in instances of this +/// structure through the construction of the `with` function. +pub(crate) struct BlockingContext<'a, 'b> { + /// Pointer to `wasmtime_fiber::Suspend` which was supplied when a fiber + /// first started. + /// + /// When a `BlockingContext` is first created this pointer is "taken" from + /// the store (the store is null'd out) and then the raw pointer previously + /// in the store is unsafely transformed to this safe pointer. This + /// represents how a `BlockingContext` temporarily has access to this + /// suspend but when the `BlockingContext` goes away this'll make its way + /// back into the store. + suspend: &'a mut WasmtimeSuspend, + + /// Pointer to the future `Context` that this fiber is being polled with. + /// + /// Similar to `suspend` above this is taken from a store when a + /// `BlockingContext` is created and it's restored when the + /// `BlockingContext` goes away. Note though that unlike `suspend`, as + /// alluded to in the documentation on `AsyncState`, this value changes over + /// time as calls to poll are made. This field becomes `None` during a + /// suspension because that means that the context is released and no longer + /// available. Upon resumption the context here is *optionally* provided. + /// Cancellation is a case where it isn't passed back and a re-poll is a + /// case where it's passed back. + future_cx: Option<&'a mut Context<'b>>, +} + +impl<'a, 'b> BlockingContext<'a, 'b> { + /// Method to go from a `store` provided (which internally contains a + /// `StoreOpaque`) to a `BlockingContext`. + /// + /// This function will "take" context from `store`'s `AsyncState` field. It + /// will then construct a `BlockingContext` and yield it to the closure `f` + /// provided. The closure can then block on futures, suspend, etc. + /// + /// Upon return of the closure `f` the state from `BlockingContext` is + /// restored within the store. The return value of `f` is the return value + /// of this function. + /// + /// Note that the `store` must be provided to this function as an argument + /// to originally acquire state from `AsyncState`. This store is then + /// supplied back to the closure `f` provided here so the store can be used + /// to construct an asynchronous or blocking computation which the + /// `BlockingContext` tries to block on. + /// + /// # Safety + /// + /// This method is safe to call at any time, but it's worth noting that the + /// safety of this function relies on the signature of this function. + /// Notably the lifetime parameters of `BlockingContext` in the `f` closure + /// here must be anonymous. That ensures that the `BlockingContext` that + /// callers get access to cannot be persisted outside of that closure call + /// and everything is scoped to just the closure `f` provided with nothing + /// escaping. + fn with(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R + where + S: AsStoreOpaque, + { + let opaque = store.as_store_opaque(); + + let state = opaque.fiber_async_state_mut(); + + // SAFETY: this is taking pointers from `AsyncState` and then unsafely + // turning them into safe references. Lifetime-wise this should be safe + // because the inferred lifetimes for all these pointers is constrained + // by the signature of `f` provided here. That ensures that everything + // is scoped purely to the closure `f` and nothing should be persisted + // outside of this function call. This, for example, ensures that the + // `Context<'static>` doesn't leak out, it's only with an anonymous + // lifetime that's forcibly shorter. + // + // Provenance-wise this should be safe as if these fields in the store + // are non-null then the pointers are provided up-the-stack on this + // fiber and for this fiber. The "take" pattern here ensures that if + // this `BlockingContext` context acquires the pointers then there are + // no other instances of these pointers in use anywhere else. + let future_cx = unsafe { Some(state.current_future_cx.take().unwrap().as_mut()) }; + let suspend = unsafe { state.current_suspend.take().unwrap().as_mut() }; + + let mut reset = ResetBlockingContext { + store, + cx: BlockingContext { future_cx, suspend }, + }; + return f(&mut reset.store, &mut reset.cx); + + struct ResetBlockingContext<'a, 'b, S: AsStoreOpaque> { + store: &'a mut S, + cx: BlockingContext<'a, 'b>, + } + + impl Drop for ResetBlockingContext<'_, '_, S> { + fn drop(&mut self) { + let store = self.store.as_store_opaque(); + let state = store.fiber_async_state_mut(); + + debug_assert!(state.current_future_cx.is_none()); + debug_assert!(state.current_suspend.is_none()); + state.current_suspend = Some(NonNull::from(&mut *self.cx.suspend)); + + if let Some(cx) = &mut self.cx.future_cx { + // SAFETY: while this is changing the lifetime to `'static` + // it should never be used while it's `'static` given this + // `BlockingContext` abstraction. + state.current_future_cx = + Some(NonNull::from(unsafe { change_context_lifetime(cx) })); + } + } + } + } + + /// Blocks on the asynchronous computation represented by `future` and + /// produces the result here, in-line. + /// + /// This function is designed to only work when it's currently executing on + /// a native fiber. This fiber provides the ability for us to handle the + /// future's `Pending` state as "jump back to whomever called the fiber in + /// an asynchronous fashion and propagate `Pending`". This tight coupling + /// with `on_fiber` below is what powers the asynchronicity of calling wasm. + /// + /// This function takes a `future` and will (appear to) synchronously wait + /// on the result. While this function is executing it will fiber switch + /// to-and-from the original frame calling `on_fiber` which should be a + /// guarantee due to how async stores are configured. + /// + /// The return value here is either the output of the future `T`, or a trap + /// which represents that the asynchronous computation was cancelled. It is + /// not recommended to catch the trap and try to keep executing wasm, so + /// we've tried to liberally document this. + /// + /// Note that this function suspends (if needed) with + /// `StoreFiberYield::KeepStore`, indicating that the store must not be used + /// (and that no other fibers may be resumed) until this fiber resumes. + /// Therefore, it is not appropriate for use in e.g. guest calls to + /// async-lowered imports implemented as host functions, since it will + /// prevent any other tasks from being run. Use `Instance::suspend` to + /// suspend and release the store to allow other tasks to run before this + /// fiber is resumed. + /// + /// # Return Value + /// + /// A return value of `Ok(value)` means that the future completed with + /// `value`. A return value of `Err(e)` means that the fiber and its future + /// have been cancelled and the fiber needs to exit and complete ASAP. + /// + /// # Safety + /// + /// This function is safe to call at any time but relies on a trait bound + /// that is manually placed here the compiler does not otherwise require. + /// Notably the `Send` bound on the future provided here is not required + /// insofar as things compile without that. The purpose of this, however, is + /// to make the `unsafe impl Send for StoreFiber` more safe. The `future` + /// here is state that is stored on the stack during the suspension of this + /// fiber and is otherwise not visible to the compiler. By having a `Send` + /// bound here it ensures that the future doesn't have things like `Rc` or + /// similar pointing into thread locals which would not be sound if this + /// fiber crosses threads. + pub(crate) fn block_on(&mut self, future: F) -> Result + where + F: Future + Send, + { + let mut future = core::pin::pin!(future); + loop { + match future.as_mut().poll(self.future_cx.as_mut().unwrap()) { + Poll::Ready(v) => break Ok(v), + Poll::Pending => self.suspend(StoreFiberYield::KeepStore)?, + } + } + } + + /// Suspend this fiber with `yield_` as the reason. + /// + /// This function will suspend the current fiber and only return after the + /// fiber has resumed. This function return `Ok(())` if the fiber was + /// resumed to be completed, and `Err(e)` indicates that the fiber has been + /// cancelled and needs to exit/complete ASAP. + pub(crate) fn suspend(&mut self, yield_: StoreFiberYield) -> Result<()> { + // Over a suspension point we're guaranteed that the `Context` provided + // here is no longer valid, so discard it. If we're supposed to be able + // to poll afterwards this will be given back as part of the resume + // value given back. + self.future_cx.take(); + + let mut new_future_cx: NonNull> = self.suspend.suspend(yield_)?; + + // SAFETY: this function is unsafe as we're doing "funky" things to the + // `new_future_cx` we have been given. The safety here relies on the + // fact that the lifetimes of `BlockingContext` are all "smaller" than + // the original `Context` itself, and that should be guaranteed through + // the exclusive constructor of this type `BlockingContext::with`. + unsafe { + self.future_cx = Some(change_context_lifetime(new_future_cx.as_mut())); + } + Ok(()) + } +} + +impl StoreContextMut<'_, T> { + /// Blocks on the future computed by `f`. + /// + /// # Panics + /// + /// Panics if this is invoked outside the context of a fiber. + pub(crate) fn block_on( + self, + f: impl FnOnce(StoreContextMut<'_, T>) -> Pin + Send + '_>>, + ) -> Result { + BlockingContext::with(self.0, |store, cx| { + cx.block_on(f(StoreContextMut(store)).as_mut()) + }) + } + + /// Creates a `BlockingContext` suitable for blocking on futures or + /// suspending the current fiber. + /// + /// # Panics + /// + /// Panics if this is invoked outside the context of a fiber. + pub(crate) fn with_blocking( + self, + f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R, + ) -> R { + BlockingContext::with(self.0, |store, cx| f(StoreContextMut(store), cx)) + } +} + +impl crate::store::StoreInner { + /// Blocks on the future computed by `f`. + /// + /// # Panics + /// + /// Panics if this is invoked outside the context of a fiber. + pub(crate) fn block_on( + &mut self, + f: impl FnOnce(StoreContextMut<'_, T>) -> Pin + Send + '_>>, + ) -> Result { + BlockingContext::with(self, |store, cx| { + cx.block_on(f(StoreContextMut(store)).as_mut()) + }) + } +} + +impl StoreOpaque { + /// Blocks on the future computed by `f`. + /// + /// # Panics + /// + /// Panics if this is invoked outside the context of a fiber. + pub(crate) fn block_on( + &mut self, + f: impl FnOnce(&mut Self) -> Pin + Send + '_>>, + ) -> Result { + BlockingContext::with(self, |store, cx| cx.block_on(f(store).as_mut())) + } + + /// Creates a `BlockingContext` suitable for blocking on futures or + /// suspending the current fiber. + /// + /// # Panics + /// + /// Panics if this is invoked outside the context of a fiber. + #[expect(dead_code, reason = "will be used by async things soon")] + pub(crate) fn with_blocking( + &mut self, + f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R, + ) -> R { + BlockingContext::with(self, |store, cx| f(store, cx)) + } + + /// Returns whether `block_on` will succeed or panic. + #[cfg(feature = "call-hook")] + pub(crate) fn can_block(&mut self) -> bool { + self.fiber_async_state_mut().current_future_cx.is_some() + } +} + +/// Indicates whether or not a fiber needs to retain exclusive access to its +/// store across a suspend/resume interval. +pub(crate) enum StoreFiberYield { + /// Indicates the fiber needs to retain exclusive access, meaning the store + /// should not be used outside of the fiber until after the fiber either + /// suspends with `ReleaseStore` or resolves. + KeepStore, + /// Indicates the fiber does _not_ need exclusive access across the + /// suspend/resume interval, meaning the store may be used as needed until + /// the fiber is resumed. + #[expect(dead_code, reason = "will be used by async thing soon")] + ReleaseStore, +} + +pub(crate) struct StoreFiber<'a> { + /// The raw `wasmtime_fiber::Fiber`. + /// + /// Note that using `StoreFiberYield` as the `Yield` type parameter allows + /// the fiber to indicate whether it needs exclusive access to the store + /// across suspend points (in which case it will pass `KeepStore` when + /// suspending , meaning the store must not be used at all until the fiber + /// is resumed again) or whether it is giving up exclusive access (in which + /// case it will pass `ReleaseStore` when yielding, meaning exclusive access + /// may be given to another fiber that runs concurrently. + /// + /// Note also that every `StoreFiber` is implicitly granted exclusive access + /// to the store when it is resumed. + fiber: Option>, + /// See `FiberResumeState` + state: Option, + /// The Wasmtime `Engine` to which this fiber belongs. + engine: Engine, + /// The id of the store with which this fiber was created. + /// + /// Any attempt to resume a fiber with a different store than the one with + /// which it was created will panic. + id: StoreId, +} + +impl StoreFiber<'_> { + pub(crate) fn dispose(&mut self, store: &mut StoreOpaque) { + if let Some(fiber) = &mut self.fiber { + if !fiber.done() { + let result = resume_fiber(store, self, Err(anyhow!("future dropped"))); + debug_assert!(result.is_ok()); + } + } + } +} + +// Note that this implementation will panic if the fiber is in-progress, which +// will abort the process if there is already a panic being unwound. That +// should only happen if we failed to call `StoreFiber::dispose` on the +// in-progress fiber prior to dropping it, which indicates a bug in this crate +// which must be fixed. +impl Drop for StoreFiber<'_> { + fn drop(&mut self) { + if self.fiber.is_none() { + return; + } + + assert!( + self.fiber.as_ref().unwrap().done(), + "attempted to drop in-progress fiber without first calling `StoreFiber::dispose`" + ); + + self.state.take().unwrap().dispose(); + + unsafe { + self.engine + .allocator() + .deallocate_fiber_stack(self.fiber.take().unwrap().into_stack()); + } + } +} + +// This is surely the most dangerous `unsafe impl Send` in the entire +// crate. There are two members in `StoreFiber` which cause it to not be +// `Send`. One is `suspend` and is entirely uninteresting. This is just used to +// manage `Suspend` when resuming, and requires raw pointers to get it to happen +// easily. Nothing too weird about the `Send`-ness, values aren't actually +// crossing threads. +// +// The really interesting piece is `fiber`. Now the "fiber" here is actual +// honest-to-god Rust code which we're moving around. What we're doing is the +// equivalent of moving our thread's stack to another OS thread. Turns out we, +// in general, have no idea what's on the stack and would generally have no way +// to verify that this is actually safe to do! +// +// Thankfully, though, Wasmtime has the power. Without being glib it's actually +// worth examining what's on the stack. It's unfortunately not super-local to +// this function itself. Our closure to `Fiber::new` runs `func`, which is given +// to us from the outside. Thankfully, though, we have tight control over +// this. Usage of `on_fiber` or `Instance::resume_fiber` is typically done +// *just* before entering WebAssembly itself, so we'll have a few stack frames +// of Rust code (all in Wasmtime itself) before we enter wasm. +// +// Once we've entered wasm, well then we have a whole bunch of wasm frames on +// the stack. We've got this nifty thing called Cranelift, though, which allows +// us to also have complete control over everything on the stack! +// +// Finally, when wasm switches back to the fiber's starting pointer (this future +// we're returning) then it means wasm has reentered Rust. Suspension can only +// happen via either `block_on` or `Instance::suspend`. This, conveniently, also +// happens entirely in Wasmtime controlled code! +// +// There's an extremely important point that should be called out here. +// User-provided futures **are not on the stack** during suspension points. This +// is extremely crucial because we in general cannot reason about Send/Sync for +// stack-local variables since rustc doesn't analyze them at all. With our +// construction, though, we are guaranteed that Wasmtime owns all stack frames +// between the stack of a fiber and when the fiber suspends (and it could move +// across threads). At this time the only user-provided piece of data on the +// stack is the future itself given to us. Lo-and-behold as you might notice the +// future is required to be `Send`! +// +// What this all boils down to is that we, as the authors of Wasmtime, need to +// be extremely careful that on the async fiber stack we only store Send +// things. For example we can't start using `Rc` willy nilly by accident and +// leave a copy in TLS somewhere. (similarly we have to be ready for TLS to +// change while we're executing wasm code between suspension points). +// +// While somewhat onerous it shouldn't be too too hard (the TLS bit is the +// hardest bit so far). This does mean, though, that no user should ever have to +// worry about the `Send`-ness of Wasmtime. If rustc says it's ok, then it's ok. +// +// With all that in mind we unsafely assert here that Wasmtime is correct. We +// declare the fiber as only containing Send data on its stack, despite not +// knowing for sure at compile time that this is correct. That's what `unsafe` +// in Rust is all about, though, right? +unsafe impl Send for StoreFiber<'_> {} +// See the docs about the `Send` impl above, which also apply to this `Sync` +// impl. `Sync` is needed since we store `StoreFiber`s and switch between them +// when executing components that export async-lifted functions. +unsafe impl Sync for StoreFiber<'_> {} + +/// State of the world when a fiber last suspended. +/// +/// This structure represents global state that a fiber clobbers during its +/// execution. For example TLS variables are updated, system resources like MPK +/// masks are updated, etc. The purpose of this structure is to track all of +/// this state and appropriately save/restore it around fiber suspension points. +struct FiberResumeState { + /// Saved list of `CallThreadState` activations that are stored on a fiber + /// stack. + /// + /// This is a linked list that references stack-stored nodes on the fiber + /// stack that is currently suspended. The `AsyncWasmCallState` type + /// documents this more thoroughly but the general gist is that when we this + /// fiber is resumed this linked list needs to be pushed on to the current + /// thread's linked list of activations. + tls: crate::runtime::vm::AsyncWasmCallState, + + /// Saved MPK protection mask, if enabled. + /// + /// When MPK is enabled then executing WebAssembly will modify the + /// processor's current mask of addressable protection keys. This means that + /// our current state may get clobbered when a fiber suspends. To ensure + /// that this function preserves context it will, when MPK is enabled, save + /// the current mask when this function is called and then restore the mask + /// when the function returns (aka the fiber suspends). + mpk: Option, + + /// The current wasm stack limit, if in use. + /// + /// This field stores the old of `VMStoreContext::stack_limit` that this + /// fiber should be using during its execution. This is saved/restored when + /// a fiber is suspended/resumed to ensure that when there are multiple + /// fibers within the store they all maintain an appropriate fiber-relative + /// stack limit. + stack_limit: usize, + + /// The executor (e.g. the Pulley interpreter state) belonging to this + /// fiber. + /// + /// This is swapped with `StoreOpaque::executor` whenever this fiber is + /// resumed, suspended, or resolved. + executor: Executor, +} + +impl FiberResumeState { + unsafe fn replace( + self, + store: &mut StoreOpaque, + fiber: &mut StoreFiber<'_>, + ) -> PriorFiberResumeState { + let tls = unsafe { self.tls.push() }; + let mpk = swap_mpk_states(self.mpk); + let async_guard_range = fiber + .fiber + .as_ref() + .unwrap() + .stack() + .guard_range() + .unwrap_or(ptr::null_mut()..ptr::null_mut()); + let mut executor = self.executor; + store.swap_executor(&mut executor); + PriorFiberResumeState { + tls, + mpk, + executor, + stack_limit: store.replace_stack_limit(self.stack_limit), + async_guard_range: store.replace_async_guard_range(async_guard_range), + + // The current suspend/future_cx are always null upon resumption, so + // insert null. Save the old values through to get preserved across + // this resume/suspend. + current_suspend: store.replace_current_suspend(None), + current_future_cx: store.replace_current_future_cx(None), + } + } + + fn dispose(self) { + self.tls.assert_null(); + } +} + +impl StoreOpaque { + /// Helper function to swap the `stack_limit` field in the `VMStoreContext` + /// within this store. + fn replace_stack_limit(&mut self, stack_limit: usize) -> usize { + mem::replace( + &mut self.vm_store_context_mut().stack_limit.get_mut(), + stack_limit, + ) + } + + /// Helper function to swap the `async_guard_range` field in the `VMStoreContext` + /// within this store. + fn replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8> { + mem::replace(&mut self.vm_store_context_mut().async_guard_range, range) + } + + fn replace_current_suspend( + &mut self, + ptr: Option>, + ) -> Option> { + mem::replace(&mut self.fiber_async_state_mut().current_suspend, ptr) + } + + fn replace_current_future_cx( + &mut self, + ptr: Option>>, + ) -> Option>> { + mem::replace(&mut self.fiber_async_state_mut().current_future_cx, ptr) + } +} + +struct PriorFiberResumeState { + tls: crate::runtime::vm::PreviousAsyncWasmCallState, + mpk: Option, + stack_limit: usize, + async_guard_range: Range<*mut u8>, + current_suspend: Option>, + current_future_cx: Option>>, + executor: Executor, +} + +impl PriorFiberResumeState { + unsafe fn replace(self, store: &mut StoreOpaque) -> FiberResumeState { + let tls = unsafe { self.tls.restore() }; + let mpk = swap_mpk_states(self.mpk); + // No need to save `_my_guard` since we can re-infer it from the fiber + // that this state is attached to. + let _my_guard = store.replace_async_guard_range(self.async_guard_range); + + // Restore the previous values of current_{suspend,future_cx} but we + // should be guaranteed that the prior values are null, so double-check + // that here. + let prev = store.replace_current_suspend(self.current_suspend); + assert!(prev.is_none()); + let prev = store.replace_current_future_cx(self.current_future_cx); + assert!(prev.is_none()); + + let mut executor = self.executor; + store.swap_executor(&mut executor); + + FiberResumeState { + tls, + mpk, + executor, + stack_limit: store.replace_stack_limit(self.stack_limit), + } + } +} + +fn swap_mpk_states(mask: Option) -> Option { + mask.map(|mask| { + let current = mpk::current_mask(); + mpk::allow(mask); + current + }) +} + +/// Resume the specified fiber, granting it exclusive access to the store with +/// which it was created. +/// +/// This will return `Ok(result)` if the fiber resolved, where `result` is the +/// returned value; it will return `Err(yield_)` if the fiber suspended, where +/// `yield_` indicates whether it released access to the store or not. See +/// `StoreFiber::fiber` for details. +fn resume_fiber<'a>( + store: &mut StoreOpaque, + fiber: &mut StoreFiber<'a>, + result: WasmtimeResume, +) -> Result { + assert_eq!(store.id(), fiber.id); + + struct Restore<'a, 'b> { + store: &'b mut StoreOpaque, + fiber: &'b mut StoreFiber<'a>, + state: Option, + } + + impl Drop for Restore<'_, '_> { + fn drop(&mut self) { + self.fiber.state = Some(unsafe { self.state.take().unwrap().replace(self.store) }); + } + } + let result = unsafe { + let prev = fiber.state.take().unwrap().replace(store, fiber); + let restore = Restore { + store, + fiber, + state: Some(prev), + }; + restore.fiber.fiber.as_ref().unwrap().resume(result) + }; + + match &result { + // The fiber has finished, so recycle its stack by disposing of the + // underlying fiber itself. + Ok(_) => { + let stack = fiber.fiber.take().map(|f| f.into_stack()); + if let Some(stack) = stack { + store.deallocate_fiber_stack(stack); + } + } + + // The fiber has not yet finished, so it stays as-is. + Err(_) => { + // If `Err` is returned that means the fiber suspended, so we + // propagate that here. + // + // An additional safety check is performed when leaving this + // function to help bolster the guarantees of `unsafe impl Send` + // above. Notably this future may get re-polled on a different + // thread. Wasmtime's thread-local state points to the stack, + // however, meaning that it would be incorrect to leave a pointer in + // TLS when this function returns. This function performs a runtime + // assert to verify that this is the case, notably that the one TLS + // pointer Wasmtime uses is not pointing anywhere within the + // stack. If it is then that's a bug indicating that TLS management + // in Wasmtime is incorrect. + if let Some(range) = fiber.fiber.as_ref().unwrap().stack().range() { + AsyncWasmCallState::assert_current_state_not_in_range(range); + } + } + } + + result +} + +/// Create a new `StoreFiber` which runs the specified closure. +pub(crate) fn make_fiber<'a>( + store: &mut dyn VMStore, + fun: impl FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync + 'a, +) -> Result> { + let engine = store.engine().clone(); + let executor = Executor::new(&engine); + let id = store.store_opaque().id(); + let stack = store.store_opaque_mut().allocate_fiber_stack()?; + let track_pkey_context_switch = store.has_pkey(); + let store = &raw mut *store; + let fiber = Fiber::new(stack, move |result: WasmtimeResume, suspend| { + let future_cx = match result { + Ok(cx) => cx, + // Cancelled before we started? Just return. + Err(_) => return Ok(()), + }; + + // SAFETY: This fiber will only be resumed using `resume_fiber`, which + // takes a `&mut StoreOpaque` parameter and has given us exclusive + // access to the store until we exit or yield it back to the resumer. + let store_ref = unsafe { &mut *store }; + + // It should be a guarantee that the store has null pointers here upon + // starting a fiber, so now's the time to fill in the pointers now that + // the fiber is running and `future_cx` and `suspend` are both in scope. + // Note that these pointers are removed when this function returns as + // that's when they fall out of scope. + let async_state = store_ref.store_opaque_mut().fiber_async_state_mut(); + assert!(async_state.current_suspend.is_none()); + assert!(async_state.current_future_cx.is_none()); + async_state.current_suspend = Some(NonNull::from(suspend)); + async_state.current_future_cx = Some(future_cx); + + struct ResetCurrentPointersToNull<'a>(&'a mut dyn VMStore); + + impl Drop for ResetCurrentPointersToNull<'_> { + fn drop(&mut self) { + let state = self.0.fiber_async_state_mut(); + + // Double-check that the current suspension isn't null (it + // should be what's in this closure). Note though that we + // can't check `current_future_cx` because it may either be + // here or not be here depending on whether this was + // cancelled or not. + debug_assert!(state.current_suspend.is_some()); + + state.current_suspend = None; + state.current_future_cx = None; + } + } + let reset = ResetCurrentPointersToNull(store_ref); + + fun(reset.0) + })?; + Ok(StoreFiber { + state: Some(FiberResumeState { + tls: crate::runtime::vm::AsyncWasmCallState::new(), + mpk: if track_pkey_context_switch { + Some(ProtectionMask::all()) + } else { + None + }, + stack_limit: usize::MAX, + executor, + }), + engine, + id, + fiber: Some(fiber), + }) +} + +/// Run the specified function on a newly-created fiber and `.await` its +/// completion. +pub(crate) async fn on_fiber( + store: &mut StoreOpaque, + func: impl FnOnce(&mut StoreOpaque) -> R + Send + Sync, +) -> Result { + let config = store.engine().config(); + debug_assert!(store.async_support()); + debug_assert!(config.async_stack_size > 0); + + let mut result = None; + let fiber = make_fiber(store.traitobj_mut(), |store| { + result = Some(func(store)); + Ok(()) + })?; + + { + let fiber = FiberFuture { + store, + fiber: Some(fiber), + on_release: OnRelease::ReturnPending, + } + .await + .unwrap(); + + debug_assert!(fiber.is_none()); + } + + Ok(result.unwrap()) +} + +/// Run the specified fiber until it either suspends with +/// `StoreFiberYield::ReleaseStore` or resolves. +/// +/// This will return `Some` if the fiber suspends with +/// `StoreFiberYield::ReleaseStore` or else `None` if it resolves. +#[cfg(feature = "component-model-async")] +#[expect(dead_code, reason = "will be used by async thing soon")] +pub(crate) async fn resolve_or_release<'a>( + store: &mut StoreOpaque, + fiber: StoreFiber<'a>, +) -> Result>> { + FiberFuture { + store, + fiber: Some(fiber), + on_release: OnRelease::ReturnReady, + } + .await +} + +/// Tells a `FiberFuture` what to do if `poll_fiber` returns +/// `Err(StoreFiberYield::ReleaseStore)`. +enum OnRelease { + /// Return `Poll::Pending` from `FiberFuture::poll` + ReturnPending, + /// Return `Poll::Ready` from `FiberFuture::poll`, handing ownership of the + /// `StoreFiber` to the caller. + #[cfg(feature = "component-model-async")] + ReturnReady, +} + +/// A `Future` implementation for running a `StoreFiber` to completion, giving +/// it exclusive access to its store until it resolves. +struct FiberFuture<'a, 'b> { + store: &'a mut StoreOpaque, + fiber: Option>, + on_release: OnRelease, +} + +impl<'b> Future for FiberFuture<'_, 'b> { + type Output = Result>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + + // SAFETY: We need to carry over this `cx` into our fiber's runtime for + // when it tries to poll sub-futures that are created. Doing this must + // be done unsafely, however, since `cx` is only alive for this one + // singular function call. Here we do a `transmute` to extend the + // lifetime of `Context` so it can be stored in our `Store`, and then we + // replace the current polling context with this one. + // + // The safety of this extension relies on never actually using + // `Context<'static>` with `'static` actually there, which should be + // satisfied by the users of this in the `BlockingContext` structure + // where the lifetime parameters there are always more constrained than + // they are here. + let cx: &mut Context<'static> = unsafe { change_context_lifetime(cx) }; + let cx = NonNull::from(cx); + + match resume_fiber(me.store, me.fiber.as_mut().unwrap(), Ok(cx)) { + Ok(Ok(())) => Poll::Ready(Ok(None)), + Ok(Err(e)) => Poll::Ready(Err(e)), + Err(StoreFiberYield::KeepStore) => Poll::Pending, + Err(StoreFiberYield::ReleaseStore) => match &me.on_release { + OnRelease::ReturnPending => Poll::Pending, + #[cfg(feature = "component-model-async")] + OnRelease::ReturnReady => Poll::Ready(Ok(me.fiber.take())), + }, + } + } +} + +impl Drop for FiberFuture<'_, '_> { + fn drop(&mut self) { + if let Some(fiber) = &mut self.fiber { + fiber.dispose(self.store); + } + } +} + +/// Changes the lifetime `'l` in `Context<'l>` to something else. +/// +/// # Safety +/// +/// Not a safe operation. Requires external knowledge about how the pointer is +/// being used to determine whether it's actually safe or not. See docs on +/// callers of this function. The purpose of this is to scope the `transmute` to +/// as small an operation as possible. +unsafe fn change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b> { + // SAFETY: See the function documentation, this is not safe in general. + unsafe { mem::transmute::<&mut Context<'_>, &mut Context<'b>>(cx) } +} diff --git a/crates/wasmtime/src/runtime/func.rs b/crates/wasmtime/src/runtime/func.rs index ad71706ff10f..0769ad75100b 100644 --- a/crates/wasmtime/src/runtime/func.rs +++ b/crates/wasmtime/src/runtime/func.rs @@ -13,10 +13,10 @@ use crate::{ }; use alloc::sync::Arc; use core::ffi::c_void; +#[cfg(feature = "async")] +use core::future::Future; use core::mem::{self, MaybeUninit}; use core::ptr::NonNull; -#[cfg(feature = "async")] -use core::{future::Future, pin::Pin}; use wasmtime_environ::VMSharedTypeIndex; /// A reference to the abstract `nofunc` heap value. @@ -513,19 +513,19 @@ impl Func { "cannot use `new_async` without enabling async support in the config" ); assert!(ty.comes_from_same_engine(store.as_context().engine())); - Func::new(store, ty, move |mut caller, params, results| { - let async_cx = caller - .store - .as_context_mut() - .0 - .async_cx() - .expect("Attempt to spawn new action on dying fiber"); - let future = func(caller, params, results); - match unsafe { async_cx.block_on(Pin::from(future)) } { - Ok(Ok(())) => Ok(()), - Ok(Err(trap)) | Err(trap) => Err(trap), - } - }) + return Func::new( + store, + ty, + move |Caller { store, caller }, params, results| { + store.with_blocking(|store, cx| { + cx.block_on(core::pin::Pin::from(func( + Caller { store, caller }, + params, + results, + ))) + })? + }, + ); } pub(crate) unsafe fn from_vm_func_ref( @@ -839,16 +839,8 @@ impl Func { store.as_context().async_support(), concat!("cannot use `wrap_async` without enabling async support on the config") ); - Func::wrap_inner(store, move |mut caller: Caller<'_, T>, args| { - let async_cx = caller - .store - .as_context_mut() - .0 - .async_cx() - .expect("Attempt to start async function on dying fiber"); - let future = func(caller, args); - - match unsafe { async_cx.block_on(Pin::from(future)) } { + Func::wrap_inner(store, move |Caller { store, caller }, args| { + match store.block_on(|store| func(Caller { store, caller }, args).into()) { Ok(ret) => ret.into_fallible(), Err(e) => R::fallible_from_error(e), } @@ -2035,6 +2027,16 @@ pub struct Caller<'a, T: 'static> { } impl Caller<'_, T> { + #[cfg(feature = "async")] + pub(crate) fn new(store: StoreContextMut<'_, T>, caller: Instance) -> Caller<'_, T> { + Caller { store, caller } + } + + #[cfg(feature = "async")] + pub(crate) fn caller(&self) -> Instance { + self.caller + } + unsafe fn with(caller: NonNull, f: F) -> R where // The closure must be valid for any `Caller` it is given; it doesn't diff --git a/crates/wasmtime/src/runtime/func/typed.rs b/crates/wasmtime/src/runtime/func/typed.rs index 0df0dcc400c2..be808a636703 100644 --- a/crates/wasmtime/src/runtime/func/typed.rs +++ b/crates/wasmtime/src/runtime/func/typed.rs @@ -132,7 +132,11 @@ where &self, mut store: impl AsContextMut, params: Params, - ) -> Result { + ) -> Result + where + Params: Sync, + Results: Sync, + { let mut store = store.as_context_mut(); assert!( store.0.async_support(), diff --git a/crates/wasmtime/src/runtime/linker.rs b/crates/wasmtime/src/runtime/linker.rs index cb49fa2d727c..6ca250c038a8 100644 --- a/crates/wasmtime/src/runtime/linker.rs +++ b/crates/wasmtime/src/runtime/linker.rs @@ -9,9 +9,9 @@ use crate::{ use crate::{IntoFunc, prelude::*}; use alloc::sync::Arc; use core::fmt::{self, Debug}; -use core::marker; #[cfg(feature = "async")] -use core::{future::Future, pin::Pin}; +use core::future::Future; +use core::marker; use log::warn; /// Structure used to link wasm modules/instances together. @@ -470,18 +470,12 @@ impl Linker { "cannot use `func_new_async` without enabling async support in the config" ); assert!(ty.comes_from_same_engine(self.engine())); - self.func_new(module, name, ty, move |mut caller, params, results| { - let async_cx = caller - .store - .as_context_mut() - .0 - .async_cx() - .expect("Attempt to spawn new function on dying fiber"); - let future = func(caller, params, results); - match unsafe { async_cx.block_on(Pin::from(future)) } { - Ok(Ok(())) => Ok(()), - Ok(Err(trap)) | Err(trap) => Err(trap), - } + self.func_new(module, name, ty, move |caller, params, results| { + let instance = caller.caller(); + caller.store.with_blocking(|store, cx| { + let caller = Caller::new(store, instance); + cx.block_on(core::pin::Pin::from(func(caller, params, results))) + })? }) } @@ -575,22 +569,18 @@ impl Linker { self.engine.config().async_support, "cannot use `func_wrap_async` without enabling async support on the config", ); - let func = HostFunc::wrap_inner( - &self.engine, - move |mut caller: Caller<'_, T>, args: Params| { - let async_cx = caller - .store - .as_context_mut() - .0 - .async_cx() - .expect("Attempt to start async function on dying fiber"); - let future = func(caller, args); - match unsafe { async_cx.block_on(Pin::from(future)) } { + let func = + HostFunc::wrap_inner(&self.engine, move |caller: Caller<'_, T>, args: Params| { + let instance = caller.caller(); + let result = caller.store.block_on(|store| { + let caller = Caller::new(store, instance); + func(caller, args).into() + }); + match result { Ok(ret) => ret.into_fallible(), Err(e) => Args::fallible_from_error(e), } - }, - ); + }); let key = self.import_key(module, Some(name)); self.insert(key, Definition::HostFunc(Arc::new(func)))?; Ok(self) diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index 018042159af6..8c883a7be63a 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -77,6 +77,10 @@ //! `wasmtime`, must uphold for the public interface to be safe. use crate::RootSet; +#[cfg(feature = "component-model-async")] +use crate::component::ComponentStoreData; +#[cfg(feature = "async")] +use crate::fiber; use crate::module::RegisteredModuleId; use crate::prelude::*; #[cfg(feature = "gc")] @@ -109,11 +113,13 @@ pub use self::data::*; mod func_refs; use func_refs::FuncRefs; #[cfg(feature = "async")] +mod token; +#[cfg(feature = "async")] +pub(crate) use token::StoreToken; +#[cfg(feature = "async")] mod async_; #[cfg(all(feature = "async", feature = "call-hook"))] pub use self::async_::CallHookHandler; -#[cfg(feature = "async")] -use self::async_::*; #[cfg(feature = "gc")] mod gc; @@ -353,7 +359,7 @@ pub struct StoreOpaque { table_count: usize, table_limit: usize, #[cfg(feature = "async")] - async_state: AsyncState, + async_state: fiber::AsyncState, // If fuel_yield_interval is enabled, then we store the remaining fuel (that isn't in // runtime_limits) here. The total amount of fuel is the runtime limits and reserve added @@ -401,12 +407,28 @@ pub struct StoreOpaque { /// /// Effectively stores Pulley interpreter state and handles conditional support /// for Cranelift at compile time. -enum Executor { +pub(crate) enum Executor { Interpreter(Interpreter), #[cfg(has_host_compiler_backend)] Native, } +impl Executor { + pub(crate) fn new(engine: &Engine) -> Self { + #[cfg(has_host_compiler_backend)] + if cfg!(feature = "pulley") && engine.target().is_pulley() { + Executor::Interpreter(Interpreter::new(engine)) + } else { + Executor::Native + } + #[cfg(not(has_host_compiler_backend))] + { + debug_assert!(engine.target().is_pulley()); + Executor::Interpreter(Interpreter::new(engine)) + } + } +} + /// A borrowed reference to `Executor` above. pub(crate) enum ExecutorRef<'a> { Interpreter(InterpreterRef<'a>), @@ -553,7 +575,7 @@ impl Store { table_count: 0, table_limit: crate::DEFAULT_TABLE_LIMIT, #[cfg(feature = "async")] - async_state: AsyncState::default(), + async_state: Default::default(), fuel_reserve: 0, fuel_yield_interval: None, store_data, @@ -568,17 +590,7 @@ impl Store { component_calls: Default::default(), #[cfg(feature = "component-model")] host_resource_data: Default::default(), - #[cfg(has_host_compiler_backend)] - executor: if cfg!(feature = "pulley") && engine.target().is_pulley() { - Executor::Interpreter(Interpreter::new(engine)) - } else { - Executor::Native - }, - #[cfg(not(has_host_compiler_backend))] - executor: { - debug_assert!(engine.target().is_pulley()); - Executor::Interpreter(Interpreter::new(engine)) - }, + executor: Executor::new(engine), }; let mut inner = Box::new(StoreInner { inner, @@ -637,9 +649,22 @@ impl Store { self.inner.data_mut() } + fn run_manual_drop_routines(&mut self) { + // We need to drop the fibers of each component instance before + // attempting to drop the instances themselves since the fibers may need + // to be resumed and allowed to exit cleanly before we yank the state + // out from under them. + #[cfg(feature = "component-model-async")] + ComponentStoreData::drop_fibers(&mut self.inner); + + // Ensure all fiber stacks, even cached ones, are all flushed out to the + // instance allocator. + self.inner.flush_fiber_stack(); + } + /// Consumes this [`Store`], destroying it, and returns the underlying data. pub fn into_data(mut self) -> T { - self.inner.flush_fiber_stack(); + self.run_manual_drop_routines(); // This is an unsafe operation because we want to avoid having a runtime // check or boolean for whether the data is actually contained within a @@ -1094,16 +1119,14 @@ impl StoreInner { CallHookInner::Sync(hook) => hook((&mut *self).as_context_mut(), s), #[cfg(all(feature = "async", feature = "call-hook"))] - CallHookInner::Async(handler) => unsafe { - self.inner - .async_cx() - .ok_or_else(|| anyhow!("couldn't grab async_cx for call hook"))? - .block_on( - handler - .handle_call_event((&mut *self).as_context_mut(), s) - .as_mut(), - )? - }, + CallHookInner::Async(handler) => { + if !self.can_block() { + bail!("couldn't grab async_cx for call hook") + } + return (&mut *self) + .as_context_mut() + .with_blocking(|store, cx| cx.block_on(handler.handle_call_event(store, s)))?; + } CallHookInner::ForceTypeParameterToBeUsed { uninhabited, .. } => { let _ = s; @@ -1345,6 +1368,11 @@ impl StoreOpaque { &self.vm_store_context } + #[inline] + pub fn vm_store_context_mut(&mut self) -> &mut VMStoreContext { + &mut self.vm_store_context + } + #[inline(never)] pub(crate) fn allocate_gc_heap(&mut self) -> Result<()> { log::trace!("allocating GC heap for store {:?}", self.id()); @@ -1926,9 +1954,14 @@ at https://bytecodealliance.org/security. ) } - #[cfg(not(feature = "async"))] - pub(crate) fn async_guard_range(&self) -> core::ops::Range<*mut u8> { - core::ptr::null_mut()..core::ptr::null_mut() + #[cfg(feature = "async")] + pub(crate) fn fiber_async_state_mut(&mut self) -> &mut fiber::AsyncState { + &mut self.async_state + } + + #[cfg(feature = "async")] + pub(crate) fn has_pkey(&self) -> bool { + self.pkey.is_some() } pub(crate) fn executor(&mut self) -> ExecutorRef<'_> { @@ -1939,6 +1972,11 @@ at https://bytecodealliance.org/security. } } + #[cfg(feature = "async")] + pub(crate) fn swap_executor(&mut self, executor: &mut Executor) { + mem::swap(&mut self.executor, executor); + } + pub(crate) fn unwinder(&self) -> &'static dyn Unwind { match &self.executor { Executor::Interpreter(i) => i.unwinder(), @@ -2094,16 +2132,13 @@ unsafe impl vm::VMStore for StoreInner { limiter(&mut self.data).memory_growing(current, desired, maximum) } #[cfg(feature = "async")] - Some(ResourceLimiterInner::Async(ref mut limiter)) => unsafe { - self.inner - .async_cx() - .expect("ResourceLimiterAsync requires async Store") - .block_on( - limiter(&mut self.data) - .memory_growing(current, desired, maximum) - .as_mut(), - )? - }, + Some(ResourceLimiterInner::Async(_)) => self.block_on(|store| { + let limiter = match &mut store.0.limiter { + Some(ResourceLimiterInner::Async(limiter)) => limiter, + _ => unreachable!(), + }; + limiter(&mut store.0.data).memory_growing(current, desired, maximum) + })?, None => Ok(true), } } @@ -2130,28 +2165,18 @@ unsafe impl vm::VMStore for StoreInner { desired: usize, maximum: Option, ) -> Result { - // Need to borrow async_cx before the mut borrow of the limiter. - // self.async_cx() panicks when used with a non-async store, so - // wrap this in an option. - #[cfg(feature = "async")] - let async_cx = if self.async_support() - && matches!(self.limiter, Some(ResourceLimiterInner::Async(_))) - { - Some(self.async_cx().unwrap()) - } else { - None - }; - match self.limiter { Some(ResourceLimiterInner::Sync(ref mut limiter)) => { limiter(&mut self.data).table_growing(current, desired, maximum) } #[cfg(feature = "async")] - Some(ResourceLimiterInner::Async(ref mut limiter)) => unsafe { - async_cx - .expect("ResourceLimiterAsync requires async Store") - .block_on(limiter(&mut self.data).table_growing(current, desired, maximum))? - }, + Some(ResourceLimiterInner::Async(_)) => self.block_on(|store| { + let limiter = match &mut store.0.limiter { + Some(ResourceLimiterInner::Async(limiter)) => limiter, + _ => unreachable!(), + }; + limiter(&mut store.0.data).table_growing(current, desired, maximum) + })?, None => Ok(true), } } @@ -2218,11 +2243,7 @@ unsafe impl vm::VMStore for StoreInner { // to clean up this fiber. Do so by raising a trap which will // abort all wasm and get caught on the other side to clean // things up. - unsafe { - self.async_cx() - .expect("attempted to pull async context during shutdown") - .block_on(future)? - } + self.block_on(|_| future)?; delta } }; @@ -2312,7 +2333,7 @@ impl fmt::Debug for Store { impl Drop for Store { fn drop(&mut self) { - self.inner.flush_fiber_stack(); + self.run_manual_drop_routines(); // for documentation on this `unsafe`, see `into_data`. unsafe { diff --git a/crates/wasmtime/src/runtime/store/async_.rs b/crates/wasmtime/src/runtime/store/async_.rs index 08551b7ea47c..4119d046239c 100644 --- a/crates/wasmtime/src/runtime/store/async_.rs +++ b/crates/wasmtime/src/runtime/store/async_.rs @@ -1,15 +1,9 @@ #[cfg(feature = "call-hook")] use crate::CallHook; +use crate::fiber::{self}; use crate::prelude::*; -use crate::runtime::vm::mpk::{self, ProtectionMask}; -use crate::store::{ResourceLimiterInner, StoreInner, StoreOpaque}; -use crate::{Engine, Store, StoreContextMut, UpdateDeadline}; -use core::cell::UnsafeCell; -use core::future::Future; -use core::ops::Range; -use core::pin::{Pin, pin}; -use core::ptr; -use core::task::{Context, Poll}; +use crate::store::{ResourceLimiterInner, StoreInner, StoreOpaque, StoreToken}; +use crate::{AsContextMut, Store, StoreContextMut, UpdateDeadline}; /// An object that can take callbacks when the runtime enters or exits hostcalls. #[cfg(feature = "call-hook")] @@ -20,45 +14,6 @@ pub trait CallHookHandler: Send { async fn handle_call_event(&self, t: StoreContextMut<'_, T>, ch: CallHook) -> Result<()>; } -pub struct AsyncState { - current_suspend: UnsafeCell<*mut wasmtime_fiber::Suspend, (), Result<()>>>, - current_poll_cx: UnsafeCell, - /// The last fiber stack that was in use by this store. - last_fiber_stack: Option, -} - -impl Default for AsyncState { - fn default() -> AsyncState { - AsyncState { - current_suspend: UnsafeCell::new(ptr::null_mut()), - current_poll_cx: UnsafeCell::new(PollContext::default()), - last_fiber_stack: None, - } - } -} - -// Lots of pesky unsafe cells and pointers in this structure. This means we need -// to declare explicitly that we use this in a threadsafe fashion. -unsafe impl Send for AsyncState {} -unsafe impl Sync for AsyncState {} - -#[derive(Clone, Copy)] -struct PollContext { - future_context: *mut Context<'static>, - guard_range_start: *mut u8, - guard_range_end: *mut u8, -} - -impl Default for PollContext { - fn default() -> PollContext { - PollContext { - future_context: core::ptr::null_mut(), - guard_range_start: core::ptr::null_mut(), - guard_range_end: core::ptr::null_mut(), - } - } -} - impl Store { /// Configures the [`ResourceLimiterAsync`](crate::ResourceLimiterAsync) /// used to limit resource creation within this [`Store`]. @@ -214,368 +169,11 @@ impl StoreOpaque { /// This function will convert the synchronous `func` into an asynchronous /// future. This is done by running `func` in a fiber on a separate native /// stack which can be suspended and resumed from. - /// - /// Most of the nitty-gritty here is how we juggle the various contexts - /// necessary to suspend the fiber later on and poll sub-futures. It's hoped - /// that the various comments are illuminating as to what's going on here. - pub(crate) async fn on_fiber( + pub(crate) async fn on_fiber( &mut self, - func: impl FnOnce(&mut Self) -> R + Send, + func: impl FnOnce(&mut Self) -> R + Send + Sync, ) -> Result { - let config = self.engine().config(); - debug_assert!(self.async_support()); - debug_assert!(config.async_stack_size > 0); - - let mut slot = None; - let mut future = { - let current_poll_cx = self.async_state.current_poll_cx.get(); - let current_suspend = self.async_state.current_suspend.get(); - let stack = self.allocate_fiber_stack()?; - let track_pkey_context_switch = self.pkey.is_some(); - - let engine = self.engine().clone(); - let slot = &mut slot; - let this = &mut *self; - let fiber = wasmtime_fiber::Fiber::new(stack, move |keep_going, suspend| { - // First check and see if we were interrupted/dropped, and only - // continue if we haven't been. - keep_going?; - - // Configure our store's suspension context for the rest of the - // execution of this fiber. Note that a raw pointer is stored here - // which is only valid for the duration of this closure. - // Consequently we at least replace it with the previous value when - // we're done. This reset is also required for correctness because - // otherwise our value will overwrite another active fiber's value. - // There should be a test that segfaults in `async_functions.rs` if - // this `Replace` is removed. - unsafe { - let _reset = Reset(current_suspend, *current_suspend); - *current_suspend = suspend; - - *slot = Some(func(this)); - Ok(()) - } - })?; - - // Once we have the fiber representing our synchronous computation, we - // wrap that in a custom future implementation which does the - // translation from the future protocol to our fiber API. - FiberFuture { - fiber: Some(fiber), - current_poll_cx, - engine, - fiber_resume_state: Some(FiberResumeState { - tls: crate::runtime::vm::AsyncWasmCallState::new(), - mpk: if track_pkey_context_switch { - Some(ProtectionMask::all()) - } else { - None - }, - }), - } - }; - (&mut future).await?; - let stack = future.fiber.take().map(|f| f.into_stack()); - drop(future); - if let Some(stack) = stack { - self.deallocate_fiber_stack(stack); - } - - return Ok(slot.unwrap()); - - struct FiberFuture<'a> { - fiber: Option, (), Result<()>>>, - current_poll_cx: *mut PollContext, - engine: Engine, - // See comments in `FiberResumeState` for this - fiber_resume_state: Option, - } - - // This is surely the most dangerous `unsafe impl Send` in the entire - // crate. There are two members in `FiberFuture` which cause it to not - // be `Send`. One is `current_poll_cx` and is entirely uninteresting. - // This is just used to manage `Context` pointers across `await` points - // in the future, and requires raw pointers to get it to happen easily. - // Nothing too weird about the `Send`-ness, values aren't actually - // crossing threads. - // - // The really interesting piece is `fiber`. Now the "fiber" here is - // actual honest-to-god Rust code which we're moving around. What we're - // doing is the equivalent of moving our thread's stack to another OS - // thread. Turns out we, in general, have no idea what's on the stack - // and would generally have no way to verify that this is actually safe - // to do! - // - // Thankfully, though, Wasmtime has the power. Without being glib it's - // actually worth examining what's on the stack. It's unfortunately not - // super-local to this function itself. Our closure to `Fiber::new` runs - // `func`, which is given to us from the outside. Thankfully, though, we - // have tight control over this. Usage of `on_fiber` is typically done - // *just* before entering WebAssembly itself, so we'll have a few stack - // frames of Rust code (all in Wasmtime itself) before we enter wasm. - // - // Once we've entered wasm, well then we have a whole bunch of wasm - // frames on the stack. We've got this nifty thing called Cranelift, - // though, which allows us to also have complete control over everything - // on the stack! - // - // Finally, when wasm switches back to the fiber's starting pointer - // (this future we're returning) then it means wasm has reentered Rust. - // Suspension can only happen via the `block_on` function of an - // `AsyncCx`. This, conveniently, also happens entirely in Wasmtime - // controlled code! - // - // There's an extremely important point that should be called out here. - // User-provided futures **are not on the stack** during suspension - // points. This is extremely crucial because we in general cannot reason - // about Send/Sync for stack-local variables since rustc doesn't analyze - // them at all. With our construction, though, we are guaranteed that - // Wasmtime owns all stack frames between the stack of a fiber and when - // the fiber suspends (and it could move across threads). At this time - // the only user-provided piece of data on the stack is the future - // itself given to us. Lo-and-behold as you might notice the future is - // required to be `Send`! - // - // What this all boils down to is that we, as the authors of Wasmtime, - // need to be extremely careful that on the async fiber stack we only - // store Send things. For example we can't start using `Rc` willy nilly - // by accident and leave a copy in TLS somewhere. (similarly we have to - // be ready for TLS to change while we're executing wasm code between - // suspension points). - // - // While somewhat onerous it shouldn't be too too hard (the TLS bit is - // the hardest bit so far). This does mean, though, that no user should - // ever have to worry about the `Send`-ness of Wasmtime. If rustc says - // it's ok, then it's ok. - // - // With all that in mind we unsafely assert here that wasmtime is - // correct. We declare the fiber as only containing Send data on its - // stack, despite not knowing for sure at compile time that this is - // correct. That's what `unsafe` in Rust is all about, though, right? - unsafe impl Send for FiberFuture<'_> {} - - impl FiberFuture<'_> { - fn fiber(&self) -> &wasmtime_fiber::Fiber<'_, Result<()>, (), Result<()>> { - self.fiber.as_ref().unwrap() - } - - /// This is a helper function to call `resume` on the underlying - /// fiber while correctly managing Wasmtime's state that the fiber - /// may clobber. - /// - /// ## Return Value - /// - /// * `Ok(Ok(()))` - the fiber successfully completed and yielded a - /// successful result. - /// * `Ok(Err(e))` - the fiber successfully completed and yielded - /// an error as a result of computation. - /// * `Err(())` - the fiber has not finished and it is suspended. - fn resume(&mut self, val: Result<()>) -> Result, ()> { - unsafe { - let prev = self.fiber_resume_state.take().unwrap().replace(); - let restore = Restore { - fiber: self, - prior_fiber_state: Some(prev), - }; - return restore.fiber.fiber().resume(val); - } - - struct Restore<'a, 'b> { - fiber: &'a mut FiberFuture<'b>, - prior_fiber_state: Option, - } - - impl Drop for Restore<'_, '_> { - fn drop(&mut self) { - unsafe { - self.fiber.fiber_resume_state = - Some(self.prior_fiber_state.take().unwrap().replace()); - } - } - } - } - } - - impl Future for FiberFuture<'_> { - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - // We need to carry over this `cx` into our fiber's runtime - // for when it tries to poll sub-futures that are created. Doing - // this must be done unsafely, however, since `cx` is only alive - // for this one singular function call. Here we do a `transmute` - // to extend the lifetime of `Context` so it can be stored in - // our `Store`, and then we replace the current polling context - // with this one. - // - // Note that the replace is done for weird situations where - // futures might be switching contexts and there's multiple - // wasmtime futures in a chain of futures. - // - // On exit from this function, though, we reset the polling - // context back to what it was to signify that `Store` no longer - // has access to this pointer. - let guard = self - .fiber() - .stack() - .guard_range() - .unwrap_or(core::ptr::null_mut()..core::ptr::null_mut()); - unsafe { - let _reset = Reset(self.current_poll_cx, *self.current_poll_cx); - *self.current_poll_cx = PollContext { - future_context: core::mem::transmute::< - &mut Context<'_>, - *mut Context<'static>, - >(cx), - guard_range_start: guard.start, - guard_range_end: guard.end, - }; - - // After that's set up we resume execution of the fiber, which - // may also start the fiber for the first time. This either - // returns `Ok` saying the fiber finished (yay!) or it - // returns `Err` with the payload passed to `suspend`, which - // in our case is `()`. - match self.resume(Ok(())) { - Ok(result) => Poll::Ready(result), - - // If `Err` is returned that means the fiber polled a - // future but it said "Pending", so we propagate that - // here. - // - // An additional safety check is performed when leaving - // this function to help bolster the guarantees of - // `unsafe impl Send` above. Notably this future may get - // re-polled on a different thread. Wasmtime's - // thread-local state points to the stack, however, - // meaning that it would be incorrect to leave a pointer - // in TLS when this function returns. This function - // performs a runtime assert to verify that this is the - // case, notably that the one TLS pointer Wasmtime uses - // is not pointing anywhere within the stack. If it is - // then that's a bug indicating that TLS management in - // Wasmtime is incorrect. - Err(()) => { - if let Some(range) = self.fiber().stack().range() { - crate::runtime::vm::AsyncWasmCallState::assert_current_state_not_in_range(range); - } - Poll::Pending - } - } - } - } - } - - // Dropping futures is pretty special in that it means the future has - // been requested to be cancelled. Here we run the risk of dropping an - // in-progress fiber, and if we were to do nothing then the fiber would - // leak all its owned stack resources. - // - // To handle this we implement `Drop` here and, if the fiber isn't done, - // resume execution of the fiber saying "hey please stop you're - // interrupted". Our `Trap` created here (which has the stack trace - // of whomever dropped us) will then get propagated in whatever called - // `block_on`, and the idea is that the trap propagates all the way back - // up to the original fiber start, finishing execution. - // - // We don't actually care about the fiber's return value here (no one's - // around to look at it), we just assert the fiber finished to - // completion. - impl Drop for FiberFuture<'_> { - fn drop(&mut self) { - if self.fiber.is_none() { - return; - } - - if !self.fiber().done() { - let result = self.resume(Err(anyhow!("future dropped"))); - // This resumption with an error should always complete the - // fiber. While it's technically possible for host code to - // catch the trap and re-resume, we'd ideally like to - // signal that to callers that they shouldn't be doing - // that. - debug_assert!(result.is_ok()); - - // Note that `result` is `Ok(r)` where `r` is either - // `Ok(())` or `Err(e)`. If it's an error that's disposed of - // here. It's expected to be a propagation of the `future - // dropped` error created above. - } - - self.fiber_resume_state.take().unwrap().dispose(); - - unsafe { - self.engine - .allocator() - .deallocate_fiber_stack(self.fiber.take().unwrap().into_stack()); - } - } - } - - /// State of the world when a fiber last suspended. - /// - /// This structure represents global state that a fiber clobbers during - /// its execution. For example TLS variables are updated, system - /// resources like MPK masks are updated, etc. The purpose of this - /// structure is to track all of this state and appropriately - /// save/restore it around fiber suspension points. - struct FiberResumeState { - /// Saved list of `CallThreadState` activations that are stored on a - /// fiber stack. - /// - /// This is a linked list that references stack-stored nodes on the - /// fiber stack that is currently suspended. The - /// `AsyncWasmCallState` type documents this more thoroughly but the - /// general gist is that when we this fiber is resumed this linked - /// list needs to be pushed on to the current thread's linked list - /// of activations. - tls: crate::runtime::vm::AsyncWasmCallState, - - /// Saved MPK protection mask, if enabled. - /// - /// When MPK is enabled then executing WebAssembly will modify the - /// processor's current mask of addressable protection keys. This - /// means that our current state may get clobbered when a fiber - /// suspends. To ensure that this function preserves context it - /// will, when MPK is enabled, save the current mask when this - /// function is called and then restore the mask when the function - /// returns (aka the fiber suspends). - mpk: Option, - } - - impl FiberResumeState { - unsafe fn replace(self) -> PriorFiberResumeState { - let tls = self.tls.push(); - let mpk = swap_mpk_states(self.mpk); - PriorFiberResumeState { tls, mpk } - } - - fn dispose(self) { - self.tls.assert_null(); - } - } - - struct PriorFiberResumeState { - tls: crate::runtime::vm::PreviousAsyncWasmCallState, - mpk: Option, - } - - impl PriorFiberResumeState { - unsafe fn replace(self) -> FiberResumeState { - let tls = self.tls.restore(); - let mpk = swap_mpk_states(self.mpk); - FiberResumeState { tls, mpk } - } - } - - fn swap_mpk_states(mask: Option) -> Option { - mask.map(|mask| { - let current = mpk::current_mask(); - mpk::allow(mask); - current - }) - } + fiber::on_fiber(self, func).await } #[cfg(feature = "gc")] @@ -636,39 +234,11 @@ impl StoreOpaque { log::trace!("End trace GC roots") } - /// Yields the async context, assuming that we are executing on a fiber and - /// that fiber is not in the process of dying. This function will return - /// None in the latter case (the fiber is dying), and panic if - /// `async_support()` is false. - #[inline] - pub fn async_cx(&self) -> Option { - assert!(self.async_support()); - - let poll_cx_box_ptr = self.async_state.current_poll_cx.get(); - if poll_cx_box_ptr.is_null() { - return None; - } - - let poll_cx_inner_ptr = unsafe { *poll_cx_box_ptr }; - if poll_cx_inner_ptr.future_context.is_null() { - return None; - } - - Some(AsyncCx { - current_suspend: self.async_state.current_suspend.get(), - current_poll_cx: unsafe { &raw mut (*poll_cx_box_ptr).future_context }, - }) - } - /// Yields execution to the caller on out-of-gas or epoch interruption. /// /// This only works on async futures and stores, and assumes that we're /// executing on a fiber. This will yield execution back to the caller once. pub fn async_yield_impl(&mut self) -> Result<()> { - use crate::runtime::vm::Yield; - - let mut future = Yield::new(); - // When control returns, we have a `Result<()>` passed // in from the host fiber. If this finished successfully then // we were resumed normally via a `poll`, so keep going. If @@ -676,136 +246,44 @@ impl StoreOpaque { // to clean up this fiber. Do so by raising a trap which will // abort all wasm and get caught on the other side to clean // things up. - unsafe { - self.async_cx() - .expect("attempted to pull async context during shutdown") - .block_on(Pin::new_unchecked(&mut future)) - } + self.block_on(|_| Box::pin(crate::runtime::vm::Yield::new())) } - fn allocate_fiber_stack(&mut self) -> Result { - if let Some(stack) = self.async_state.last_fiber_stack.take() { + pub(crate) fn allocate_fiber_stack(&mut self) -> Result { + if let Some(stack) = self.async_state.last_fiber_stack().take() { return Ok(stack); } self.engine().allocator().allocate_fiber_stack() } - fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) { + pub(crate) fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) { self.flush_fiber_stack(); - self.async_state.last_fiber_stack = Some(stack); + *self.async_state.last_fiber_stack() = Some(stack); } /// Releases the last fiber stack to the underlying instance allocator, if /// present. pub fn flush_fiber_stack(&mut self) { - if let Some(stack) = self.async_state.last_fiber_stack.take() { + if let Some(stack) = self.async_state.last_fiber_stack().take() { unsafe { self.engine.allocator().deallocate_fiber_stack(stack); } } } - - pub(crate) fn async_guard_range(&self) -> Range<*mut u8> { - unsafe { - let ptr = self.async_state.current_poll_cx.get(); - (*ptr).guard_range_start..(*ptr).guard_range_end - } - } } impl StoreContextMut<'_, T> { /// Executes a synchronous computation `func` asynchronously on a new fiber. - pub(crate) async fn on_fiber( + pub(crate) async fn on_fiber( &mut self, - func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send, + func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send + Sync, ) -> Result where T: Send + 'static, { + let token = StoreToken::new(self.as_context_mut()); self.0 - .on_fiber(|opaque| { - let store = unsafe { opaque.traitobj().cast::>().as_mut() }; - func(&mut StoreContextMut(store)) - }) + .on_fiber(|opaque| func(&mut token.as_context_mut(opaque.traitobj_mut()))) .await } } - -pub struct AsyncCx { - current_suspend: *mut *mut wasmtime_fiber::Suspend, (), Result<()>>, - current_poll_cx: *mut *mut Context<'static>, -} - -impl AsyncCx { - /// Blocks on the asynchronous computation represented by `future` and - /// produces the result here, in-line. - /// - /// This function is designed to only work when it's currently executing on - /// a native fiber. This fiber provides the ability for us to handle the - /// future's `Pending` state as "jump back to whomever called the fiber in - /// an asynchronous fashion and propagate `Pending`". This tight coupling - /// with `on_fiber` below is what powers the asynchronicity of calling wasm. - /// Note that the asynchronous part only applies to host functions, wasm - /// itself never really does anything asynchronous at this time. - /// - /// This function takes a `future` and will (appear to) synchronously wait - /// on the result. While this function is executing it will fiber switch - /// to-and-from the original frame calling `on_fiber` which should be a - /// guarantee due to how async stores are configured. - /// - /// The return value here is either the output of the future `T`, or a trap - /// which represents that the asynchronous computation was cancelled. It is - /// not recommended to catch the trap and try to keep executing wasm, so - /// we've tried to liberally document this. - pub unsafe fn block_on(&self, future: F) -> Result - where - F: Future + Send, - { - let mut future = pin!(future); - - // Take our current `Suspend` context which was configured as soon as - // our fiber started. Note that we must load it at the front here and - // save it on our stack frame. While we're polling the future other - // fibers may be started for recursive computations, and the current - // suspend context is only preserved at the edges of the fiber, not - // during the fiber itself. - // - // For a little bit of extra safety we also replace the current value - // with null to try to catch any accidental bugs on our part early. - // This is all pretty unsafe so we're trying to be careful... - // - // Note that there should be a segfaulting test in `async_functions.rs` - // if this `Reset` is removed. - let suspend = *self.current_suspend; - let _reset = Reset(self.current_suspend, suspend); - *self.current_suspend = ptr::null_mut(); - assert!(!suspend.is_null()); - - loop { - let future_result = { - let poll_cx = *self.current_poll_cx; - let _reset = Reset(self.current_poll_cx, poll_cx); - *self.current_poll_cx = ptr::null_mut(); - assert!(!poll_cx.is_null()); - future.as_mut().poll(&mut *poll_cx) - }; - - match future_result { - Poll::Ready(t) => break Ok(t), - Poll::Pending => {} - } - - (*suspend).suspend(())?; - } - } -} - -struct Reset(*mut T, T); - -impl Drop for Reset { - fn drop(&mut self) { - unsafe { - *self.0 = self.1; - } - } -} diff --git a/crates/wasmtime/src/runtime/store/gc.rs b/crates/wasmtime/src/runtime/store/gc.rs index 3171aa420532..9a68729739a7 100644 --- a/crates/wasmtime/src/runtime/store/gc.rs +++ b/crates/wasmtime/src/runtime/store/gc.rs @@ -39,12 +39,7 @@ impl StoreOpaque { if scope.async_support() { #[cfg(feature = "async")] - { - scope - .async_cx() - .expect("attempted to access async context during shutdown") - .block_on(scope.grow_or_collect_gc_heap_async(bytes_needed))?; - } + scope.block_on(|scope| Box::pin(scope.grow_or_collect_gc_heap_async(bytes_needed)))?; } else { scope.grow_or_collect_gc_heap(bytes_needed); } diff --git a/crates/wasmtime/src/runtime/store/token.rs b/crates/wasmtime/src/runtime/store/token.rs new file mode 100644 index 000000000000..b5891da02ac2 --- /dev/null +++ b/crates/wasmtime/src/runtime/store/token.rs @@ -0,0 +1,48 @@ +use crate::runtime::vm::VMStore; +use crate::{StoreContextMut, store::StoreId}; +use core::marker::PhantomData; + +/// Represents a proof that a store with a given `StoreId` has a data type +/// parameter `T`. +/// +/// This may be used to safely convert a `&mut dyn VMStore` into a +/// `StoreContextMut` using `StoreToken::as_context_mut` having witnessed +/// that the type parameter matches what was seen earlier in `StoreToken::new`. +pub struct StoreToken { + id: StoreId, + _phantom: PhantomData T>, +} + +impl Clone for StoreToken { + fn clone(&self) -> Self { + Self { + id: self.id, + _phantom: PhantomData, + } + } +} + +impl Copy for StoreToken {} + +impl StoreToken { + /// Create a new `StoreToken`, witnessing that this store has data type + /// parameter `T`. + pub fn new(store: StoreContextMut) -> Self { + Self { + id: store.0.id(), + _phantom: PhantomData, + } + } + + /// Convert the specified `&mut dyn VMStore` into a `StoreContextMut`. + /// + /// This will panic if passed a store with a different `StoreId` than the + /// one passed to `StoreToken::new` when creating this object. + pub fn as_context_mut<'a>(&self, store: &'a mut dyn VMStore) -> StoreContextMut<'a, T> { + assert_eq!(store.store_opaque().id(), self.id); + // SAFETY: We know the store with this ID has data type parameter `T` + // because we witnessed that in `Self::new`, which is the only way + // `self` could have been safely created: + unsafe { store.unchecked_context_mut::() } + } +} diff --git a/crates/wasmtime/src/runtime/vm.rs b/crates/wasmtime/src/runtime/vm.rs index 2761114298fa..e32167a76b0f 100644 --- a/crates/wasmtime/src/runtime/vm.rs +++ b/crates/wasmtime/src/runtime/vm.rs @@ -182,7 +182,7 @@ cfg_if::cfg_if! { /// APIs using `Store` are correctly inferring send/sync on the returned /// values (e.g. futures) and that internally in the runtime we aren't doing /// anything "weird" with threads for example. -pub unsafe trait VMStore { +pub unsafe trait VMStore: 'static { /// Get a shared borrow of this store's `StoreOpaque`. fn store_opaque(&self) -> &StoreOpaque; diff --git a/crates/wasmtime/src/runtime/vm/sys/unix/machports.rs b/crates/wasmtime/src/runtime/vm/sys/unix/machports.rs index 25bd49ede33c..8bf45c8a5455 100644 --- a/crates/wasmtime/src/runtime/vm/sys/unix/machports.rs +++ b/crates/wasmtime/src/runtime/vm/sys/unix/machports.rs @@ -140,9 +140,8 @@ unsafe extern "C" fn sigbus_handler( None => return, }; let faulting_addr = (*siginfo).si_addr() as usize; - let start = info.async_guard_range.start; - let end = info.async_guard_range.end; - if start as usize <= faulting_addr && faulting_addr < end as usize { + let range = &info.vm_store_context.as_ref().async_guard_range; + if range.start.addr() <= faulting_addr && faulting_addr < range.end.addr() { super::signals::abort_stack_overflow(); } }); diff --git a/crates/wasmtime/src/runtime/vm/sys/unix/signals.rs b/crates/wasmtime/src/runtime/vm/sys/unix/signals.rs index 4bd800ba6ef8..a9f1033337a0 100644 --- a/crates/wasmtime/src/runtime/vm/sys/unix/signals.rs +++ b/crates/wasmtime/src/runtime/vm/sys/unix/signals.rs @@ -169,9 +169,8 @@ unsafe extern "C" fn trap_handler( let jmp_buf = match test { TrapTest::NotWasm => { if let Some(faulting_addr) = faulting_addr { - let start = info.async_guard_range.start; - let end = info.async_guard_range.end; - if start as usize <= faulting_addr && faulting_addr < end as usize { + let range = &info.vm_store_context.as_ref().async_guard_range; + if range.start.addr() <= faulting_addr && faulting_addr < range.end.addr() { abort_stack_overflow(); } } diff --git a/crates/wasmtime/src/runtime/vm/traphandlers.rs b/crates/wasmtime/src/runtime/vm/traphandlers.rs index 4c57fa7a05d1..a96a747efe67 100644 --- a/crates/wasmtime/src/runtime/vm/traphandlers.rs +++ b/crates/wasmtime/src/runtime/vm/traphandlers.rs @@ -23,7 +23,6 @@ use crate::{EntryStoreContext, prelude::*}; use crate::{StoreContextMut, WasmBacktrace}; use core::cell::Cell; use core::num::NonZeroU32; -use core::ops::Range; use core::ptr::{self, NonNull}; pub use self::backtrace::Backtrace; @@ -477,8 +476,6 @@ mod call_thread_state { pub(crate) unwinder: &'static dyn Unwind, pub(super) prev: Cell, - #[cfg(all(has_native_signals, unix))] - pub(crate) async_guard_range: Range<*mut u8>, // The state of the runtime for the *previous* `CallThreadState` for // this same store. Our *current* state is saved in `self.vm_store_context`, @@ -507,10 +504,6 @@ mod call_thread_state { store: &mut StoreOpaque, old_state: *const EntryStoreContext, ) -> CallThreadState { - // Don't try to plumb #[cfg] everywhere for this field, just pretend - // we're using it on miri/windows to silence compiler warnings. - let _: Range<_> = store.async_guard_range(); - CallThreadState { unwind: Cell::new(None), unwinder: store.unwinder(), @@ -521,8 +514,6 @@ mod call_thread_state { #[cfg(feature = "coredump")] capture_coredump: store.engine().config().coredump_on_trap, vm_store_context: store.vm_store_context_ptr(), - #[cfg(all(has_native_signals, unix))] - async_guard_range: store.async_guard_range(), prev: Cell::new(ptr::null()), old_state, } diff --git a/crates/wasmtime/src/runtime/vm/vmcontext.rs b/crates/wasmtime/src/runtime/vm/vmcontext.rs index 9af83b1db8cd..79e0dbf837ac 100644 --- a/crates/wasmtime/src/runtime/vm/vmcontext.rs +++ b/crates/wasmtime/src/runtime/vm/vmcontext.rs @@ -13,6 +13,7 @@ use core::ffi::c_void; use core::fmt; use core::marker; use core::mem::{self, MaybeUninit}; +use core::ops::Range; use core::ptr::{self, NonNull}; use core::sync::atomic::{AtomicUsize, Ordering}; use wasmtime_environ::{ @@ -1114,6 +1115,18 @@ pub struct VMStoreContext { /// Stack information used by stack switching instructions. See documentation /// on `VMStackChain` for details. pub stack_chain: UnsafeCell, + + /// The range, in addresses, of the guard page that is currently in use. + /// + /// This field is used when signal handlers are run to determine whether a + /// faulting address lies within the guard page of an async stack for + /// example. If this happens then the signal handler aborts with a stack + /// overflow message similar to what would happen had the stack overflow + /// happened on the main thread. This field is, by default a null..null + /// range indicating that no async guard is in use (aka no fiber). In such a + /// situation while this field is read it'll never classify a fault as an + /// guard page fault. + pub async_guard_range: Range<*mut u8>, } // The `VMStoreContext` type is a pod-type with no destructor, and we don't @@ -1140,6 +1153,7 @@ impl Default for VMStoreContext { last_wasm_exit_pc: UnsafeCell::new(0), last_wasm_entry_fp: UnsafeCell::new(0), stack_chain: UnsafeCell::new(VMStackChain::Absent), + async_guard_range: ptr::null_mut()..ptr::null_mut(), } } }