Skip to content

Commit 8ace694

Browse files
committed
feat(preview1): implement poll_oneoff
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent 96f0083 commit 8ace694

2 files changed

Lines changed: 224 additions & 7 deletions

File tree

crates/test-programs/tests/wasi-preview1-host-in-preview2.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,10 @@ async fn path_symlink_trailing_slashes() {
260260
run("path_symlink_trailing_slashes", false).await.unwrap()
261261
}
262262
#[test_log::test(tokio::test(flavor = "multi_thread"))]
263-
#[should_panic]
264263
async fn poll_oneoff_files() {
265264
run("poll_oneoff_files", false).await.unwrap()
266265
}
267266
#[test_log::test(tokio::test(flavor = "multi_thread"))]
268-
// This is a known bug with the preview 2 implementation:
269-
#[should_panic]
270267
async fn poll_oneoff_stdio() {
271268
run("poll_oneoff_stdio", true).await.unwrap()
272269
}

crates/wasi/src/preview2/preview1.rs

Lines changed: 224 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::preview2::bindings::cli::{
55
use crate::preview2::bindings::clocks::{monotonic_clock, wall_clock};
66
use crate::preview2::bindings::filesystem::{preopens, types as filesystem};
77
use crate::preview2::bindings::io::streams;
8-
use crate::preview2::bindings::poll;
8+
use crate::preview2::bindings::poll::poll;
99
use crate::preview2::filesystem::TableFsExt;
1010
use crate::preview2::host::filesystem::TableReaddirExt;
1111
use crate::preview2::{bindings, IsATTY, TableError, WasiView};
@@ -69,7 +69,7 @@ impl BlockingMode {
6969
}
7070
async fn write(
7171
&self,
72-
host: &mut (impl streams::Host + poll::poll::Host),
72+
host: &mut (impl streams::Host + poll::Host),
7373
output_stream: streams::OutputStream,
7474
mut bytes: &[u8],
7575
) -> Result<usize, types::Error> {
@@ -1992,15 +1992,235 @@ impl<
19921992
})
19931993
}
19941994

1995-
#[allow(unused_variables)]
19961995
#[instrument(skip(self))]
19971996
async fn poll_oneoff<'a>(
19981997
&mut self,
19991998
subs: &GuestPtr<'a, types::Subscription>,
20001999
events: &GuestPtr<'a, types::Event>,
20012000
nsubscriptions: types::Size,
20022001
) -> Result<types::Size, types::Error> {
2003-
todo!("preview1 poll_oneoff is not implemented")
2002+
if nsubscriptions == 0 {
2003+
// Indefinite sleeping is not supported in preview1.
2004+
return Err(types::Errno::Inval.into());
2005+
}
2006+
let subs = subs.as_array(nsubscriptions);
2007+
let events = events.as_array(nsubscriptions);
2008+
2009+
let n = usize::try_from(nsubscriptions).unwrap_or(usize::MAX);
2010+
let mut pollables = Vec::with_capacity(n);
2011+
for sub in subs.iter() {
2012+
let sub = sub?.read()?;
2013+
let p = match sub.u {
2014+
types::SubscriptionU::Clock(types::SubscriptionClock {
2015+
id,
2016+
timeout,
2017+
flags,
2018+
..
2019+
}) => {
2020+
let absolute = flags.contains(types::Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME);
2021+
let (timeout, absolute) = match id {
2022+
types::Clockid::Monotonic => (timeout, absolute),
2023+
types::Clockid::Realtime if !absolute => (timeout, false),
2024+
types::Clockid::Realtime => {
2025+
let now = wall_clock::Host::now(self)
2026+
.context("failed to call `wall_clock::now`")
2027+
.map_err(types::Error::trap)?;
2028+
2029+
// Convert `timeout` to `Datetime` format.
2030+
let seconds = timeout / 1_000_000_000;
2031+
let nanoseconds = timeout % 1_000_000_000;
2032+
2033+
let timeout = if now.seconds < seconds
2034+
|| now.seconds == seconds
2035+
&& u64::from(now.nanoseconds) < nanoseconds
2036+
{
2037+
// `now` is less than `timeout`, which is expressable as u64,
2038+
// substract the nanosecond counts directly
2039+
now.seconds * 1_000_000_000 + u64::from(now.nanoseconds) - timeout
2040+
} else {
2041+
0
2042+
};
2043+
(timeout, false)
2044+
}
2045+
_ => return Err(types::Errno::Inval.into()),
2046+
};
2047+
monotonic_clock::Host::subscribe(self, timeout, absolute)
2048+
.context("failed to call `monotonic_clock::subscribe`")
2049+
.map_err(types::Error::trap)?
2050+
}
2051+
types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite {
2052+
file_descriptor,
2053+
}) => {
2054+
let desc = self.transact()?.get_descriptor(file_descriptor)?.clone();
2055+
let stream = match desc {
2056+
Descriptor::Stdin { input_stream, .. } => input_stream,
2057+
Descriptor::File(File { fd, position, .. }) if self.table().is_file(fd) => {
2058+
let pos = position.load(Ordering::Relaxed);
2059+
self.read_via_stream(fd, pos).map_err(|e| {
2060+
e.try_into()
2061+
.context("failed to call `read-via-stream`")
2062+
.unwrap_or_else(types::Error::trap)
2063+
})?
2064+
}
2065+
// TODO: Support sockets
2066+
_ => return Err(types::Errno::Badf.into()),
2067+
};
2068+
self.subscribe_to_input_stream(stream)
2069+
.context("failed to call `subscribe-to-input-stream`")
2070+
.map_err(types::Error::trap)?
2071+
}
2072+
types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite {
2073+
file_descriptor,
2074+
}) => {
2075+
let desc = self.transact()?.get_descriptor(file_descriptor)?.clone();
2076+
let stream = match desc {
2077+
Descriptor::Stdout { output_stream, .. }
2078+
| Descriptor::Stderr { output_stream, .. } => output_stream,
2079+
Descriptor::File(File {
2080+
fd,
2081+
position,
2082+
append,
2083+
..
2084+
}) if self.table().is_file(fd) => {
2085+
let pos = position.load(Ordering::Relaxed);
2086+
if append {
2087+
self.append_via_stream(fd).map_err(|e| {
2088+
e.try_into()
2089+
.context("failed to call `append-via-stream`")
2090+
.unwrap_or_else(types::Error::trap)
2091+
})?
2092+
} else {
2093+
self.write_via_stream(fd, pos).map_err(|e| {
2094+
e.try_into()
2095+
.context("failed to call `write-via-stream`")
2096+
.unwrap_or_else(types::Error::trap)
2097+
})?
2098+
}
2099+
}
2100+
// TODO: Support sockets
2101+
_ => return Err(types::Errno::Badf.into()),
2102+
};
2103+
self.subscribe_to_output_stream(stream)
2104+
.context("failed to call `subscribe-to-output-stream`")
2105+
.map_err(types::Error::trap)?
2106+
}
2107+
};
2108+
pollables.push(p);
2109+
}
2110+
let ready = self
2111+
.poll_oneoff(pollables)
2112+
.await
2113+
.context("failed to call `poll-oneoff`")
2114+
.map_err(types::Error::trap)?;
2115+
2116+
let mut count: types::Size = 0;
2117+
for (sub, event) in subs
2118+
.iter()
2119+
.zip(ready)
2120+
.filter_map(|(sub, ready)| ready.then_some(sub))
2121+
.zip(events.iter())
2122+
{
2123+
let sub = sub?.read()?;
2124+
let event = event?;
2125+
let e = match sub.u {
2126+
types::SubscriptionU::Clock(..) => types::Event {
2127+
userdata: sub.userdata,
2128+
error: types::Errno::Success,
2129+
type_: types::Eventtype::Clock,
2130+
fd_readwrite: types::EventFdReadwrite {
2131+
flags: types::Eventrwflags::empty(),
2132+
nbytes: 0,
2133+
},
2134+
},
2135+
types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite {
2136+
file_descriptor,
2137+
}) => {
2138+
let desc = self.transact()?.get_descriptor(file_descriptor)?.clone();
2139+
match desc {
2140+
Descriptor::Stdin { .. } => types::Event {
2141+
userdata: sub.userdata,
2142+
error: types::Errno::Success,
2143+
type_: types::Eventtype::FdRead,
2144+
fd_readwrite: types::EventFdReadwrite {
2145+
flags: types::Eventrwflags::empty(),
2146+
nbytes: 1,
2147+
},
2148+
},
2149+
Descriptor::File(File { fd, position, .. }) if self.table().is_file(fd) => {
2150+
match self
2151+
.stat(fd)
2152+
.await
2153+
.map_err(|e| e.try_into().context("failed to call `stat`"))
2154+
{
2155+
Ok(filesystem::DescriptorStat { size, .. }) => {
2156+
let pos = position.load(Ordering::Relaxed);
2157+
let nbytes = size.saturating_sub(pos);
2158+
types::Event {
2159+
userdata: sub.userdata,
2160+
error: types::Errno::Success,
2161+
type_: types::Eventtype::FdRead,
2162+
fd_readwrite: types::EventFdReadwrite {
2163+
flags: if nbytes == 0 {
2164+
types::Eventrwflags::FD_READWRITE_HANGUP
2165+
} else {
2166+
types::Eventrwflags::empty()
2167+
},
2168+
nbytes: 1,
2169+
},
2170+
}
2171+
}
2172+
Err(Ok(error)) => types::Event {
2173+
userdata: sub.userdata,
2174+
error,
2175+
type_: types::Eventtype::FdRead,
2176+
fd_readwrite: types::EventFdReadwrite {
2177+
flags: types::Eventrwflags::empty(),
2178+
nbytes: 1,
2179+
},
2180+
},
2181+
Err(Err(error)) => return Err(types::Error::trap(error)),
2182+
}
2183+
}
2184+
// TODO: Support sockets
2185+
_ => return Err(types::Errno::Badf.into()),
2186+
}
2187+
}
2188+
types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite {
2189+
file_descriptor,
2190+
}) => {
2191+
let desc = self.transact()?.get_descriptor(file_descriptor)?.clone();
2192+
match desc {
2193+
Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => types::Event {
2194+
userdata: sub.userdata,
2195+
error: types::Errno::Success,
2196+
type_: types::Eventtype::FdWrite,
2197+
fd_readwrite: types::EventFdReadwrite {
2198+
flags: types::Eventrwflags::empty(),
2199+
nbytes: 1,
2200+
},
2201+
},
2202+
Descriptor::File(File { fd, .. }) if self.table().is_file(fd) => {
2203+
types::Event {
2204+
userdata: sub.userdata,
2205+
error: types::Errno::Success,
2206+
type_: types::Eventtype::FdWrite,
2207+
fd_readwrite: types::EventFdReadwrite {
2208+
flags: types::Eventrwflags::empty(),
2209+
nbytes: 1,
2210+
},
2211+
}
2212+
}
2213+
// TODO: Support sockets
2214+
_ => return Err(types::Errno::Badf.into()),
2215+
}
2216+
}
2217+
};
2218+
event.write(e)?;
2219+
count = count
2220+
.checked_add(1)
2221+
.ok_or_else(|| types::Error::from(types::Errno::Overflow))?
2222+
}
2223+
Ok(count)
20042224
}
20052225

20062226
#[instrument(skip(self))]

0 commit comments

Comments
 (0)