diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index a0d335c000..2cb2e2a192 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -16,6 +16,7 @@ pub(crate) struct Backend { pub(crate) route_ref: RouteRef, pub(crate) concrete: Concrete, pub(crate) filters: Arc<[F]>, + pub(crate) request_timeout: Option, } pub(crate) type MatchedBackend = super::Matched>; @@ -37,6 +38,7 @@ impl Clone for Backend { route_ref: self.route_ref.clone(), filters: self.filters.clone(), concrete: self.concrete.clone(), + request_timeout: self.request_timeout, } } } @@ -107,6 +109,7 @@ where }| concrete, ) .push(filters::NewApplyFilters::::layer()) + .push(http::NewTimeout::layer()) .push(count_reqs::NewCountRequests::layer_via(ExtractMetrics { metrics: metrics.clone(), })) @@ -116,6 +119,12 @@ where } } +impl svc::Param for MatchedBackend { + fn param(&self) -> http::ResponseTimeout { + http::ResponseTimeout(self.params.request_timeout) + } +} + impl filters::Apply for Http { #[inline] fn apply(&self, req: &mut ::http::Request) -> Result<()> { diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index cbea887820..e2d39d8a6c 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -179,6 +179,7 @@ where route_ref: route_ref.clone(), filters, concrete, + request_timeout: rb.request_timeout, } }; diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 755fc02995..429b9e8390 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -306,9 +306,9 @@ async fn route_request_timeout() { let (_route_tx, routes) = { let backend = default_backend(&dest); - let mut route = default_route(backend.clone()); - // set the request timeout on the route. - route.rules[0].policy.request_timeout = Some(REQUEST_TIMEOUT); + // Set a request timeout for the route, and no backend request timeout + // on the backend. + let route = timeout_route(backend.clone(), Some(REQUEST_TIMEOUT), None); watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { addr: dest.into(), meta: ParentRef(client_policy::Meta::new_default("parent")), @@ -345,6 +345,88 @@ async fn route_request_timeout() { )); } +#[tokio::test(flavor = "current_thread")] +async fn backend_request_timeout() { + tokio::time::pause(); + let _trace = trace::test::trace_init(); + // must be less than the `default_config` failfast timeout, or we'll hit + // that instead. + const ROUTE_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2); + const BACKEND_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(1); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc, mut handle) = tower_test::mock::pair(); + let connect = HttpConnect::default().service(addr, svc); + let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); + let (rt, _shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt) + .with_stack(connect) + .push_http_cached(resolve) + .into_inner(); + + let (_route_tx, routes) = { + let backend = default_backend(&dest); + // Set both a route request timeout and a backend request timeout. + let route = timeout_route( + backend.clone(), + Some(ROUTE_REQUEST_TIMEOUT), + Some(BACKEND_REQUEST_TIMEOUT), + ); + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + }))) + }; + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + handle.allow(1); + let rsp = send_req(svc.clone(), http::Request::get("/")); + serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + assert_eq!( + rsp.await.expect("request must succeed").status(), + http::StatusCode::OK + ); + + // Now, time out... + let rsp = send_req(svc.clone(), http::Request::get("/")); + // Wait until we actually get the request --- this timeout only starts once + // the service has been acquired. + handle.allow(1); + let (_, send_rsp) = handle + .next_request() + .await + .expect("service must receive request"); + tokio::time::sleep(BACKEND_REQUEST_TIMEOUT + Duration::from_millis(1)).await; + // Still send a response, so that if we didn't hit the backend timeout + // timeout, we don't hit the route timeout and succeed incorrectly. + send_rsp.send_response(mk_rsp(StatusCode::OK, "good")); + let error = rsp.await.expect_err("request must fail with a timeout"); + assert!(errors::is_caused_by::( + error.as_ref() + )); + + // The route request timeout should still apply to time spent before + // the backend is acquired. + let rsp = send_req(svc.clone(), http::Request::get("/")); + tokio::time::sleep(ROUTE_REQUEST_TIMEOUT + Duration::from_millis(1)).await; + handle.allow(1); + let error = rsp.await.expect_err("request must fail with a timeout"); + assert!(errors::is_caused_by::( + error.as_ref() + )); +} + #[derive(Clone, Debug)] struct Target { num: usize, @@ -518,3 +600,33 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route }], } } + +fn timeout_route( + backend: client_policy::Backend, + route_timeout: Option, + backend_timeout: Option, +) -> client_policy::http::Route { + use client_policy::{ + http::{self, Filter, Policy, Route, Rule}, + Meta, RouteBackend, RouteDistribution, + }; + use once_cell::sync::Lazy; + static NO_FILTERS: Lazy> = Lazy::new(|| Arc::new([])); + Route { + hosts: vec![], + rules: vec![Rule { + matches: vec![http::r#match::MatchRequest::default()], + policy: Policy { + meta: Meta::new_default("test_route"), + filters: NO_FILTERS.clone(), + failure_policy: Default::default(), + request_timeout: route_timeout, + distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { + filters: NO_FILTERS.clone(), + backend, + request_timeout: backend_timeout, + }])), + }, + }], + } +}