Skip to content

Commit 1af8945

Browse files
authored
feat: add ClientBuilder::read_timeout(dur) (#2241)
1 parent e99da85 commit 1af8945

File tree

4 files changed

+258
-32
lines changed

4 files changed

+258
-32
lines changed

src/async_impl/body.rs

Lines changed: 97 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use std::fmt;
22
use std::future::Future;
33
use std::pin::Pin;
44
use std::task::{Context, Poll};
5+
use std::time::Duration;
56

67
use bytes::Bytes;
78
use http_body::Body as HttpBody;
89
use http_body_util::combinators::BoxBody;
910
//use sync_wrapper::SyncWrapper;
11+
use pin_project_lite::pin_project;
1012
#[cfg(feature = "stream")]
1113
use tokio::fs::File;
1214
use tokio::time::Sleep;
@@ -23,13 +25,26 @@ enum Inner {
2325
Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
2426
}
2527

26-
/// A body with a total timeout.
27-
///
28-
/// The timeout does not reset upon each chunk, but rather requires the whole
29-
/// body be streamed before the deadline is reached.
30-
pub(crate) struct TotalTimeoutBody<B> {
31-
inner: B,
32-
timeout: Pin<Box<Sleep>>,
28+
pin_project! {
29+
/// A body with a total timeout.
30+
///
31+
/// The timeout does not reset upon each chunk, but rather requires the whole
32+
/// body be streamed before the deadline is reached.
33+
pub(crate) struct TotalTimeoutBody<B> {
34+
#[pin]
35+
inner: B,
36+
timeout: Pin<Box<Sleep>>,
37+
}
38+
}
39+
40+
pin_project! {
41+
pub(crate) struct ReadTimeoutBody<B> {
42+
#[pin]
43+
inner: B,
44+
#[pin]
45+
sleep: Option<Sleep>,
46+
timeout: Duration,
47+
}
3348
}
3449

3550
/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
@@ -289,23 +304,32 @@ pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeou
289304
}
290305
}
291306

307+
pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
308+
ReadTimeoutBody {
309+
inner: body,
310+
sleep: None,
311+
timeout,
312+
}
313+
}
314+
292315
impl<B> hyper::body::Body for TotalTimeoutBody<B>
293316
where
294-
B: hyper::body::Body + Unpin,
317+
B: hyper::body::Body,
295318
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
296319
{
297320
type Data = B::Data;
298321
type Error = crate::Error;
299322

300323
fn poll_frame(
301-
mut self: Pin<&mut Self>,
324+
self: Pin<&mut Self>,
302325
cx: &mut Context,
303326
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
304-
if let Poll::Ready(()) = self.timeout.as_mut().poll(cx) {
327+
let this = self.project();
328+
if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
305329
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
306330
}
307331
Poll::Ready(
308-
futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx))
332+
futures_core::ready!(this.inner.poll_frame(cx))
309333
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
310334
)
311335
}
@@ -321,22 +345,79 @@ where
321345
}
322346
}
323347

348+
impl<B> hyper::body::Body for ReadTimeoutBody<B>
349+
where
350+
B: hyper::body::Body,
351+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
352+
{
353+
type Data = B::Data;
354+
type Error = crate::Error;
355+
356+
fn poll_frame(
357+
self: Pin<&mut Self>,
358+
cx: &mut Context,
359+
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
360+
let mut this = self.project();
361+
362+
// Start the `Sleep` if not active.
363+
let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
364+
some
365+
} else {
366+
this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
367+
this.sleep.as_mut().as_pin_mut().unwrap()
368+
};
369+
370+
// Error if the timeout has expired.
371+
if let Poll::Ready(()) = sleep_pinned.poll(cx) {
372+
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
373+
}
374+
375+
let item = futures_core::ready!(this.inner.poll_frame(cx))
376+
.map(|opt_chunk| opt_chunk.map_err(crate::error::body));
377+
// a ready frame means timeout is reset
378+
this.sleep.set(None);
379+
Poll::Ready(item)
380+
}
381+
382+
#[inline]
383+
fn size_hint(&self) -> http_body::SizeHint {
384+
self.inner.size_hint()
385+
}
386+
387+
#[inline]
388+
fn is_end_stream(&self) -> bool {
389+
self.inner.is_end_stream()
390+
}
391+
}
392+
324393
pub(crate) type ResponseBody =
325394
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
326395

327396
pub(crate) fn response(
328397
body: hyper::body::Incoming,
329-
timeout: Option<Pin<Box<Sleep>>>,
398+
deadline: Option<Pin<Box<Sleep>>>,
399+
read_timeout: Option<Duration>,
330400
) -> ResponseBody {
331401
use http_body_util::BodyExt;
332402

333-
if let Some(timeout) = timeout {
334-
total_timeout(body, timeout).map_err(Into::into).boxed()
335-
} else {
336-
body.map_err(Into::into).boxed()
403+
match (deadline, read_timeout) {
404+
(Some(total), Some(read)) => {
405+
let body = with_read_timeout(body, read).map_err(box_err);
406+
total_timeout(body, total).map_err(box_err).boxed()
407+
}
408+
(Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
409+
(None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
410+
(None, None) => body.map_err(box_err).boxed(),
337411
}
338412
}
339413

414+
fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
415+
where
416+
E: Into<Box<dyn std::error::Error + Send + Sync>>,
417+
{
418+
err.into()
419+
}
420+
340421
// ===== impl DataStream =====
341422

342423
impl<B> futures_core::Stream for DataStream<B>

src/async_impl/client.rs

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ struct Config {
108108
auto_sys_proxy: bool,
109109
redirect_policy: redirect::Policy,
110110
referer: bool,
111+
read_timeout: Option<Duration>,
111112
timeout: Option<Duration>,
112113
#[cfg(feature = "__tls")]
113114
root_certs: Vec<Certificate>,
@@ -204,6 +205,7 @@ impl ClientBuilder {
204205
auto_sys_proxy: true,
205206
redirect_policy: redirect::Policy::default(),
206207
referer: true,
208+
read_timeout: None,
207209
timeout: None,
208210
#[cfg(feature = "__tls")]
209211
root_certs: Vec::new(),
@@ -739,6 +741,7 @@ impl ClientBuilder {
739741
headers: config.headers,
740742
redirect_policy: config.redirect_policy,
741743
referer: config.referer,
744+
read_timeout: config.read_timeout,
742745
request_timeout: config.timeout,
743746
proxies,
744747
proxies_maybe_http_auth,
@@ -1028,17 +1031,29 @@ impl ClientBuilder {
10281031

10291032
// Timeout options
10301033

1031-
/// Enables a request timeout.
1034+
/// Enables a total request timeout.
10321035
///
10331036
/// The timeout is applied from when the request starts connecting until the
1034-
/// response body has finished.
1037+
/// response body has finished. Also considered a total deadline.
10351038
///
10361039
/// Default is no timeout.
10371040
pub fn timeout(mut self, timeout: Duration) -> ClientBuilder {
10381041
self.config.timeout = Some(timeout);
10391042
self
10401043
}
10411044

1045+
/// Enables a read timeout.
1046+
///
1047+
/// The timeout applies to each read operation, and resets after a
1048+
/// successful read. This is more appropriate for detecting stalled
1049+
/// connections when the size isn't known beforehand.
1050+
///
1051+
/// Default is no timeout.
1052+
pub fn read_timeout(mut self, timeout: Duration) -> ClientBuilder {
1053+
self.config.read_timeout = Some(timeout);
1054+
self
1055+
}
1056+
10421057
/// Set a timeout for only the connect phase of a `Client`.
10431058
///
10441059
/// Default is `None`.
@@ -1985,11 +2000,17 @@ impl Client {
19852000
}
19862001
};
19872002

1988-
let timeout = timeout
2003+
let total_timeout = timeout
19892004
.or(self.inner.request_timeout)
19902005
.map(tokio::time::sleep)
19912006
.map(Box::pin);
19922007

2008+
let read_timeout_fut = self
2009+
.inner
2010+
.read_timeout
2011+
.map(tokio::time::sleep)
2012+
.map(Box::pin);
2013+
19932014
Pending {
19942015
inner: PendingInner::Request(PendingRequest {
19952016
method,
@@ -2004,7 +2025,9 @@ impl Client {
20042025
client: self.inner.clone(),
20052026

20062027
in_flight,
2007-
timeout,
2028+
total_timeout,
2029+
read_timeout_fut,
2030+
read_timeout: self.inner.read_timeout,
20082031
}),
20092032
}
20102033
}
@@ -2210,6 +2233,7 @@ struct ClientRef {
22102233
redirect_policy: redirect::Policy,
22112234
referer: bool,
22122235
request_timeout: Option<Duration>,
2236+
read_timeout: Option<Duration>,
22132237
proxies: Arc<Vec<Proxy>>,
22142238
proxies_maybe_http_auth: bool,
22152239
https_only: bool,
@@ -2246,6 +2270,10 @@ impl ClientRef {
22462270
if let Some(ref d) = self.request_timeout {
22472271
f.field("timeout", d);
22482272
}
2273+
2274+
if let Some(ref d) = self.read_timeout {
2275+
f.field("read_timeout", d);
2276+
}
22492277
}
22502278
}
22512279

@@ -2277,7 +2305,10 @@ pin_project! {
22772305
#[pin]
22782306
in_flight: ResponseFuture,
22792307
#[pin]
2280-
timeout: Option<Pin<Box<Sleep>>>,
2308+
total_timeout: Option<Pin<Box<Sleep>>>,
2309+
#[pin]
2310+
read_timeout_fut: Option<Pin<Box<Sleep>>>,
2311+
read_timeout: Option<Duration>,
22812312
}
22822313
}
22832314

@@ -2292,8 +2323,12 @@ impl PendingRequest {
22922323
self.project().in_flight
22932324
}
22942325

2295-
fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
2296-
self.project().timeout
2326+
fn total_timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
2327+
self.project().total_timeout
2328+
}
2329+
2330+
fn read_timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
2331+
self.project().read_timeout_fut
22972332
}
22982333

22992334
fn urls(self: Pin<&mut Self>) -> &mut Vec<Url> {
@@ -2430,7 +2465,15 @@ impl Future for PendingRequest {
24302465
type Output = Result<Response, crate::Error>;
24312466

24322467
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2433-
if let Some(delay) = self.as_mut().timeout().as_mut().as_pin_mut() {
2468+
if let Some(delay) = self.as_mut().total_timeout().as_mut().as_pin_mut() {
2469+
if let Poll::Ready(()) = delay.poll(cx) {
2470+
return Poll::Ready(Err(
2471+
crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
2472+
));
2473+
}
2474+
}
2475+
2476+
if let Some(delay) = self.as_mut().read_timeout().as_mut().as_pin_mut() {
24342477
if let Poll::Ready(()) = delay.poll(cx) {
24352478
return Poll::Ready(Err(
24362479
crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
@@ -2622,7 +2665,8 @@ impl Future for PendingRequest {
26222665
res,
26232666
self.url.clone(),
26242667
self.client.accepts,
2625-
self.timeout.take(),
2668+
self.total_timeout.take(),
2669+
self.read_timeout,
26262670
);
26272671
return Poll::Ready(Ok(res));
26282672
}

src/async_impl/response.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::fmt;
22
use std::net::SocketAddr;
33
use std::pin::Pin;
4+
use std::time::Duration;
45

56
use bytes::Bytes;
67
use http_body_util::BodyExt;
@@ -37,12 +38,13 @@ impl Response {
3738
res: hyper::Response<hyper::body::Incoming>,
3839
url: Url,
3940
accepts: Accepts,
40-
timeout: Option<Pin<Box<Sleep>>>,
41+
total_timeout: Option<Pin<Box<Sleep>>>,
42+
read_timeout: Option<Duration>,
4143
) -> Response {
4244
let (mut parts, body) = res.into_parts();
4345
let decoder = Decoder::detect(
4446
&mut parts.headers,
45-
super::body::response(body, timeout),
47+
super::body::response(body, total_timeout, read_timeout),
4648
accepts,
4749
);
4850
let res = hyper::Response::from_parts(parts, decoder);

0 commit comments

Comments
 (0)