Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
548e398
Revamp component model stream/future host API (again)
dicej Aug 15, 2025
a734339
Add `Accessor::getter`, rename `with_data` to `with_getter`
alexcrichton Aug 25, 2025
c3204ca
fixup bindgen invocation
rvolosatovs Aug 25, 2025
61c5a24
add support for zero-length writes/reads to/from host
dicej Aug 25, 2025
056f064
add `{Destination,Source}::remaining` methods
dicej Aug 26, 2025
ddb3036
wasi: migrate sockets to new API
rvolosatovs Aug 27, 2025
cae75ff
tests: read the socket stream until EOF
rvolosatovs Aug 27, 2025
99397e7
p3-sockets: account for cancellation
rvolosatovs Aug 28, 2025
7a0b96f
p3-sockets: mostly ensure byte buffer cancellation-safety
rvolosatovs Aug 28, 2025
efdf6d3
p3-filesystem: switch to new API
rvolosatovs Aug 28, 2025
014adb1
fixup! p3-sockets: mostly ensure byte buffer cancellation-safety
rvolosatovs Aug 28, 2025
368a1e4
p3-cli: switch to new API
rvolosatovs Aug 28, 2025
58ef641
p3: limit maximum buffer size
rvolosatovs Aug 28, 2025
ec41ec5
p3-sockets: remove reuseaddr test loop workaround
rvolosatovs Aug 28, 2025
2b6f216
p3: drive I/O in `when_ready`
rvolosatovs Aug 28, 2025
f7ff957
fixup! p3: drive I/O in `when_ready`
rvolosatovs Aug 28, 2025
879be7d
Refine `Stream{Producer,Consumer}` APIs
dicej Aug 29, 2025
2461caf
Merge remote-tracking branch 'origin/main' into stream-future-api-revamp
dicej Sep 2, 2025
ca8a435
being integration of new API
rvolosatovs Sep 3, 2025
00047c3
update wasi/src/p3/filesystem to use new stream API
dicej Sep 3, 2025
0f9f372
update wasi/src/p3/cli to use new stream API
dicej Sep 3, 2025
7d6edb1
fix: remove `'a` bound on `&self`
rvolosatovs Sep 4, 2025
fce49cb
finish `wasi:sockets` adaptation
rvolosatovs Sep 4, 2025
cdd1ce5
finish `wasi:cli` adaptation
rvolosatovs Sep 4, 2025
2554c17
remove redundant loop in sockets
rvolosatovs Sep 4, 2025
9784254
wasi: buffer on 0-length reads
rvolosatovs Sep 4, 2025
55ad074
finish `wasi:filesystem` adaptation
rvolosatovs Sep 4, 2025
4bc782d
remove `MAX_BUFFER_CAPACITY`
rvolosatovs Sep 4, 2025
1197b64
refactor `Cursor` usage
rvolosatovs Sep 4, 2025
4e99221
impl Default for VecBuffer
rvolosatovs Sep 4, 2025
afac783
refactor: use consistent import styling
rvolosatovs Sep 4, 2025
f0dfa55
feature-gate fs Arc accessors
rvolosatovs Sep 4, 2025
7db82ff
Update test expectations
alexcrichton Sep 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions crates/misc/component-async-tests/src/resource_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::util::PipeProducer;
use anyhow::Result;
use wasmtime::component::{
Accessor, AccessorTask, GuardedStreamWriter, Resource, StreamReader, StreamWriter,
};
use futures::channel::mpsc;
use wasmtime::component::{Accessor, Resource, StreamReader};

use super::Ctx;

Expand Down Expand Up @@ -38,29 +38,15 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx {
accessor: &Accessor<T, Self>,
count: u32,
) -> wasmtime::Result<StreamReader<Resource<ResourceStreamX>>> {
struct Task {
tx: StreamWriter<Resource<ResourceStreamX>>,

count: u32,
}

impl<T> AccessorTask<T, Ctx, Result<()>> for Task {
async fn run(self, accessor: &Accessor<T, Ctx>) -> Result<()> {
let mut tx = GuardedStreamWriter::new(accessor, self.tx);
for _ in 0..self.count {
let item = accessor.with(|mut view| view.get().table.push(ResourceStreamX))?;
tx.write_all(Some(item)).await;
}
Ok(())
accessor.with(|mut access| {
let (mut tx, rx) = mpsc::channel(usize::try_from(count).unwrap());
for _ in 0..count {
tx.try_send(access.get().table.push(ResourceStreamX)?)
.unwrap()
}
}

let (tx, rx) = accessor.with(|mut view| {
let instance = view.instance();
instance.stream(&mut view)
})?;
accessor.spawn(Task { tx, count });
Ok(rx)
let instance = access.instance();
Ok(StreamReader::new(instance, access, PipeProducer::new(rx)))
})
}
}

Expand Down
143 changes: 141 additions & 2 deletions crates/misc/component-async-tests/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
use futures::channel::oneshot;
use std::thread;
use anyhow::Result;
use futures::{Sink, Stream, channel::oneshot};
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
thread,
};
use wasmtime::{
StoreContextMut,
component::{
Accessor, Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
StreamProducer, StreamResult,
},
};

pub async fn sleep(duration: std::time::Duration) {
if cfg!(miri) {
Expand All @@ -21,3 +34,129 @@ pub async fn sleep(duration: std::time::Duration) {
tokio::time::sleep(duration).await;
}
}

pub struct PipeProducer<S>(S);

impl<S> PipeProducer<S> {
pub fn new(rx: S) -> Self {
Self(rx)
}
}

impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static> StreamProducer<D>
for PipeProducer<S>
{
type Item = T;
type Buffer = Option<T>;

fn poll_produce<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
_: StoreContextMut<D>,
destination: &'a mut Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<Result<StreamResult>> {
// SAFETY: This is a standard pin-projection, and we never move
// out of `self`.
let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };

match stream.poll_next(cx) {
Poll::Pending => {
if finish {
Poll::Ready(Ok(StreamResult::Cancelled))
} else {
Poll::Pending
}
}
Poll::Ready(Some(item)) => {
destination.set_buffer(Some(item));
Poll::Ready(Ok(StreamResult::Completed))
}
Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
}
}
}

pub struct PipeConsumer<T, S>(S, PhantomData<fn() -> T>);

impl<T, S> PipeConsumer<T, S> {
pub fn new(tx: S) -> Self {
Self(tx, PhantomData)
}
}

impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> + Send + 'static>
StreamConsumer<D> for PipeConsumer<T, S>
{
type Item = T;

fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
source: &mut Source<Self::Item>,
finish: bool,
) -> Poll<Result<StreamResult>> {
// SAFETY: This is a standard pin-projection, and we never move
// out of `self`.
let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };

let on_pending = || {
if finish {
Poll::Ready(Ok(StreamResult::Cancelled))
} else {
Poll::Pending
}
};

match sink.as_mut().poll_flush(cx) {
Poll::Pending => on_pending(),
Poll::Ready(result) => {
result?;
match sink.as_mut().poll_ready(cx) {
Poll::Pending => on_pending(),
Poll::Ready(result) => {
result?;
let item = &mut None;
source.read(store, item)?;
sink.start_send(item.take().unwrap())?;
Poll::Ready(Ok(StreamResult::Completed))
}
}
}
}
}
}

pub struct OneshotProducer<T>(oneshot::Receiver<T>);

impl<T> OneshotProducer<T> {
pub fn new(rx: oneshot::Receiver<T>) -> Self {
Self(rx)
}
}

impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {
type Item = T;

async fn produce(self, _: &Accessor<D>) -> Result<T> {
Ok(self.0.await?)
}
}

pub struct OneshotConsumer<T>(oneshot::Sender<T>);

impl<T> OneshotConsumer<T> {
pub fn new(tx: oneshot::Sender<T>) -> Self {
Self(tx)
}
}

impl<D, T: Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
type Item = T;

async fn consume(self, _: &Accessor<D>, value: T) -> Result<()> {
_ = self.0.send(value);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub async fn test_round_trip(
component_async_tests::round_trip::bindings::RoundTrip::new(&mut store, &instance)?;

if call_style == 0 || !cfg!(miri) {
// Now do it again using `Instance::run_concurrent`:
// Run the test using `Instance::run_concurrent`:
instance
.run_concurrent(&mut store, {
let inputs_and_outputs = inputs_and_outputs
Expand Down
Loading
Loading