Skip to content

Commit bfd1d9d

Browse files
wip: feat(wasm): allow streaming incoming body
Signed-off-by: Brooks Townsend <[email protected]>
1 parent b494dff commit bfd1d9d

File tree

3 files changed

+28
-28
lines changed

3 files changed

+28
-28
lines changed

examples/wasm_component/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent {
1515
.expect("should be able to get response body");
1616
ResponseOutparam::set(response_out, Ok(response));
1717

18-
let response =
18+
let mut response =
1919
futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send())
2020
.expect("should get response bytes");
21-
let incoming_body = response.bytes_stream().expect("should get incoming body");
22-
let stream = incoming_body.stream().expect("should get bytes stream");
21+
let body_stream = response.bytes_stream().expect("should get incoming body");
2322
stream_input_to_output(
24-
stream,
23+
body_stream,
2524
response_body
2625
.write()
2726
.expect("should be able to write to response body"),

src/wasm/component/client/future.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::{
55

66
use futures_core::Future;
77
use wasi::{
8-
self,
9-
http::{
10-
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
11-
types::{OutgoingBody, OutputStream},
12-
},
8+
self,
9+
http::{
10+
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
11+
types::{OutgoingBody, OutputStream},
12+
},
1313
};
1414

1515
use crate::{Body, Request, Response};
@@ -69,6 +69,10 @@ impl Future for ResponseFuture {
6969
},
7070
RequestState::Response(future) => {
7171
if !future.subscribe().ready() {
72+
// NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that
73+
// the future is ready to be polled again. Sleeping for a nanosecond appears to
74+
// allow the future to be polled again without causing a busy loop.
75+
std::thread::sleep(std::time::Duration::from_nanos(1));
7276
cx.waker().wake_by_ref();
7377
return Poll::Pending;
7478
}

src/wasm/component/response.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub struct Response {
1313
// Boxed to save space (11 words to 1 word), and it's not accessed
1414
// frequently internally.
1515
url: Box<Url>,
16+
// The incoming body must be persisted if streaming to keep the stream open
17+
incoming_body: Option<wasi::http::types::IncomingBody>,
1618
}
1719

1820
impl Response {
@@ -23,6 +25,7 @@ impl Response {
2325
Response {
2426
http: res,
2527
url: Box::new(url),
28+
incoming_body: None,
2629
}
2730
}
2831

@@ -121,27 +124,21 @@ impl Response {
121124
Ok(body.into())
122125
}
123126

124-
/// Convert the response into a `Stream` of `Bytes` from the body.
127+
/// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can
128+
/// then be used to stream the body.
125129
#[cfg(feature = "stream")]
126-
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
127-
let web_response = self.http.into_body();
128-
let abort = self._abort;
129-
let body = web_response
130+
pub fn bytes_stream(&mut self) -> crate::Result<wasi::io::streams::InputStream> {
131+
let body = self
132+
.http
130133
.body()
131-
.expect("could not create wasm byte stream");
132-
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
133-
Box::pin(body.into_stream().map(move |buf_js| {
134-
// Keep the abort guard alive as long as this stream is.
135-
let _abort = &abort;
136-
let buffer = Uint8Array::new(
137-
&buf_js
138-
.map_err(crate::error::wasm)
139-
.map_err(crate::error::decode)?,
140-
);
141-
let mut bytes = vec![0; buffer.length() as usize];
142-
buffer.copy_to(&mut bytes);
143-
Ok(bytes.into())
144-
}))
134+
.consume()
135+
.map_err(|_| crate::error::decode("failed to consume response body"))?;
136+
137+
let stream = body
138+
.stream()
139+
.map_err(|_| crate::error::decode("failed to stream response body"));
140+
self.incoming_body = Some(body);
141+
stream
145142
}
146143

147144
// util methods

0 commit comments

Comments
 (0)