Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
548e398
Revamp component model stream/future host API (again)
dicej Aug 15, 2025
a734339
Add `Accessor::getter`, rename `with_data` to `with_getter`
alexcrichton Aug 25, 2025
c3204ca
fixup bindgen invocation
rvolosatovs Aug 25, 2025
61c5a24
add support for zero-length writes/reads to/from host
dicej Aug 25, 2025
056f064
add `{Destination,Source}::remaining` methods
dicej Aug 26, 2025
ddb3036
wasi: migrate sockets to new API
rvolosatovs Aug 27, 2025
cae75ff
tests: read the socket stream until EOF
rvolosatovs Aug 27, 2025
99397e7
p3-sockets: account for cancellation
rvolosatovs Aug 28, 2025
7a0b96f
p3-sockets: mostly ensure byte buffer cancellation-safety
rvolosatovs Aug 28, 2025
efdf6d3
p3-filesystem: switch to new API
rvolosatovs Aug 28, 2025
014adb1
fixup! p3-sockets: mostly ensure byte buffer cancellation-safety
rvolosatovs Aug 28, 2025
368a1e4
p3-cli: switch to new API
rvolosatovs Aug 28, 2025
58ef641
p3: limit maximum buffer size
rvolosatovs Aug 28, 2025
ec41ec5
p3-sockets: remove reuseaddr test loop workaround
rvolosatovs Aug 28, 2025
2b6f216
p3: drive I/O in `when_ready`
rvolosatovs Aug 28, 2025
f7ff957
fixup! p3: drive I/O in `when_ready`
rvolosatovs Aug 28, 2025
879be7d
Refine `Stream{Producer,Consumer}` APIs
dicej Aug 29, 2025
2461caf
Merge remote-tracking branch 'origin/main' into stream-future-api-revamp
dicej Sep 2, 2025
ca8a435
being integration of new API
rvolosatovs Sep 3, 2025
00047c3
update wasi/src/p3/filesystem to use new stream API
dicej Sep 3, 2025
0f9f372
update wasi/src/p3/cli to use new stream API
dicej Sep 3, 2025
7d6edb1
fix: remove `'a` bound on `&self`
rvolosatovs Sep 4, 2025
fce49cb
finish `wasi:sockets` adaptation
rvolosatovs Sep 4, 2025
cdd1ce5
finish `wasi:cli` adaptation
rvolosatovs Sep 4, 2025
2554c17
remove redundant loop in sockets
rvolosatovs Sep 4, 2025
9784254
wasi: buffer on 0-length reads
rvolosatovs Sep 4, 2025
55ad074
finish `wasi:filesystem` adaptation
rvolosatovs Sep 4, 2025
4bc782d
remove `MAX_BUFFER_CAPACITY`
rvolosatovs Sep 4, 2025
1197b64
refactor `Cursor` usage
rvolosatovs Sep 4, 2025
4e99221
impl Default for VecBuffer
rvolosatovs Sep 4, 2025
afac783
refactor: use consistent import styling
rvolosatovs Sep 4, 2025
f0dfa55
feature-gate fs Arc accessors
rvolosatovs Sep 4, 2025
7db82ff
Update test expectations
alexcrichton Sep 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 74 additions & 110 deletions crates/wasi/src/p3/cli/host.rs
Original file line number Diff line number Diff line change
@@ -1,155 +1,119 @@
use crate::I32Exit;
use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
use crate::p3::DEFAULT_BUFFER_CAPACITY;
use crate::p3::bindings::cli::{
environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
terminal_stdin, terminal_stdout,
};
use crate::p3::cli::{TerminalInput, TerminalOutput};
use crate::p3::{DEFAULT_BUFFER_CAPACITY, MAX_BUFFER_CAPACITY};
use anyhow::{Context as _, anyhow};
use bytes::BytesMut;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::io::Cursor;
use std::pin::Pin;
use std::task::{self, Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use wasmtime::StoreContextMut;
use wasmtime::component::{
Accessor, Destination, Resource, Source, StreamConsumer, StreamProducer, StreamReader,
StreamResult,
};

struct InputStreamProducer<T> {
rx: T,
struct InputStreamProducer {
rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
}

impl<T, D> StreamProducer<D> for InputStreamProducer<T>
where
T: AsyncRead + Send + Unpin + 'static,
{
impl<D> StreamProducer<D> for InputStreamProducer {
type Item = u8;
type Buffer = Cursor<BytesMut>;

fn poll_produce<'a>(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<'a, D>,
destination: &'a mut Destination<'a, Self::Item, Self::Buffer>,
dst: &'a mut Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<wasmtime::Result<StreamResult>> {
if finish {
return Poll::Ready(Ok(StreamResult::Cancelled));
}

let me = self.get_mut();

Poll::Ready(Ok(
if let Some(mut destination) = destination.as_direct_destination(store)
&& !destination.remaining().is_empty()
{
let mut buffer = ReadBuf::new(destination.remaining());
match task::ready!(Pin::new(&mut me.rx).poll_read(cx, &mut buffer)) {
Ok(()) => {
if buffer.filled().is_empty() {
StreamResult::Dropped
} else {
let count = buffer.filled().len();
destination.mark_written(count);
StreamResult::Completed
}
}
Err(_) => {
// TODO: Report the error to the guest
StreamResult::Dropped
}
if let Some(mut dst) = dst.as_direct_destination(store) {
let mut buf = ReadBuf::new(dst.remaining());
match self.rx.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(Ok(())) if buf.capacity() == 0 => {
Poll::Ready(Ok(StreamResult::Completed))
}
} else {
let capacity = destination
.remaining(store)
.unwrap_or(DEFAULT_BUFFER_CAPACITY)
// In the case of small or zero-length reads, we read more than
// was asked for; this will save the runtime from having to
// block or call `poll_produce` on subsequent reads. See the
// documentation for `StreamProducer::poll_produce` for details.
.max(DEFAULT_BUFFER_CAPACITY)
.min(MAX_BUFFER_CAPACITY);

let mut buffer = destination.take_buffer().into_inner();
buffer.clear();
buffer.reserve(capacity);

let mut readbuf = ReadBuf::uninit(buffer.spare_capacity_mut());
let result = Pin::new(&mut me.rx).poll_read(cx, &mut readbuf);
let count = readbuf.filled().len();
// SAFETY: `ReadyBuf::filled` promised us `count` bytes have
// been initialized.
unsafe {
buffer.set_len(count);
Poll::Ready(Ok(())) if buf.filled().is_empty() => {
Poll::Ready(Ok(StreamResult::Dropped))
}

destination.set_buffer(Cursor::new(buffer));

match task::ready!(result) {
Ok(()) => {
if count == 0 {
StreamResult::Dropped
} else {
StreamResult::Completed
}
}
Err(_) => {
// TODO: Report the error to the guest
StreamResult::Dropped
}
Poll::Ready(Ok(())) => {
let n = buf.filled().len();
dst.mark_written(n);
Poll::Ready(Ok(StreamResult::Completed))
}
Poll::Ready(Err(..)) => {
// TODO: Report the error to the guest
Poll::Ready(Ok(StreamResult::Dropped))
}
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
Poll::Pending => Poll::Pending,
}
} else {
let mut buf = dst.take_buffer();
debug_assert!(buf.get_ref().is_empty());
buf.get_mut().reserve(DEFAULT_BUFFER_CAPACITY);
let mut rbuf = ReadBuf::uninit(buf.get_mut().spare_capacity_mut());
match self.rx.as_mut().poll_read(cx, &mut rbuf) {
Poll::Ready(Ok(())) if rbuf.filled().is_empty() => {
Poll::Ready(Ok(StreamResult::Dropped))
}
Poll::Ready(Ok(())) => {
let n = rbuf.filled().len();
// SAFETY: `ReadyBuf::filled` promised us `count` bytes have
// been initialized.
unsafe { buf.get_mut().set_len(n) };
dst.set_buffer(buf);
Poll::Ready(Ok(StreamResult::Completed))
}
Poll::Ready(Err(..)) => {
// TODO: Report the error to the guest
Poll::Ready(Ok(StreamResult::Dropped))
}
},
))
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
Poll::Pending => Poll::Pending,
}
}
}
}

struct OutputStreamConsumer<T> {
tx: T,
struct OutputStreamConsumer {
tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
}

impl<T, D> StreamConsumer<D> for OutputStreamConsumer<T>
where
T: AsyncWrite + Send + Unpin + 'static,
{
impl<D> StreamConsumer<D> for OutputStreamConsumer {
type Item = u8;

fn poll_consume(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut store: StoreContextMut<D>,
source: &mut Source<Self::Item>,
store: StoreContextMut<D>,
src: &mut Source<Self::Item>,
finish: bool,
) -> Poll<wasmtime::Result<StreamResult>> {
let me = self.get_mut();

let mut source = source.as_direct_source(store);

let (mut count, mut result) = if !source.remaining().is_empty() {
match task::ready!(Pin::new(&mut me.tx).poll_write(cx, source.remaining())) {
Ok(count) => (count, StreamResult::Completed),
Err(_) => {
// TODO: Report the error to the guest
(0, StreamResult::Dropped)
}
let mut src = src.as_direct_source(store);
let buf = src.remaining();
match self.tx.as_mut().poll_write(cx, buf) {
Poll::Ready(Ok(n)) if buf.is_empty() => {
debug_assert_eq!(n, 0);
Poll::Ready(Ok(StreamResult::Completed))
}
} else {
(0, StreamResult::Completed)
};

if task::ready!(Pin::new(&mut me.tx).poll_flush(cx)).is_err() {
// TODO: Report the error to the guest
count = 0;
result = StreamResult::Dropped;
}

if count > 0 {
source.mark_read(count);
Poll::Ready(Ok(n)) => {
src.mark_read(n);
Poll::Ready(Ok(StreamResult::Completed))
}
Poll::Ready(Err(..)) => {
// TODO: Report the error to the guest
Poll::Ready(Ok(StreamResult::Dropped))
}
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
Poll::Pending => Poll::Pending,
}

Poll::Ready(Ok(result))
}
}

Expand Down
Loading