Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1892,9 +1892,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.9.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d"
checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14"
dependencies = [
"h2",
"http",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.9", features = ["inbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
rangemap = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { version = "0.9", features = [
linkerd2-proxy-api = { version = "0.10", features = [
"destination",
"arbitrary",
] }
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/integration/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute {
}],
filters: Vec::new(),
backends: Some(http_first_available(std::iter::once(backend(dst)))),
request_timeout: None,
}],
}
}
Expand Down Expand Up @@ -214,6 +215,7 @@ pub fn http_first_available(
.map(|backend| http_route::RouteBackend {
backend: Some(backend),
filters: Vec::new(),
request_timeout: None,
})
.collect(),
},
Expand Down
5 changes: 5 additions & 0 deletions linkerd/app/integration/src/tests/client_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),
request_timeout: None,
};

let route = outbound::HttpRoute {
Expand All @@ -236,6 +237,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// x-hello-city: sf | x-hello-city: san francisco
mk_header_rule(
Expand Down Expand Up @@ -398,6 +400,8 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),

request_timeout: None,
};

let route = outbound::HttpRoute {
Expand All @@ -411,6 +415,7 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// /goodbye/*
mk_path_rule(
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.9", features = ["outbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["outbound"] }
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_OPAQ_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_OPAQ_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
}),
Expand All @@ -223,10 +225,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_HTTP_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_HTTP_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
},
Expand Down
9 changes: 9 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(crate) struct Route<T, F, E> {
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
pub(super) request_timeout: Option<std::time::Duration>,
}

pub(crate) type MatchedRoute<T, M, F, E> = Matched<M, Route<T, F, E>>;
Expand Down Expand Up @@ -111,6 +112,8 @@ where
.push_on_service(svc::LoadShed::layer())
// TODO(ver) attach the `E` typed failure policy to requests.
.push(filters::NewApplyFilters::<Self, _, _>::layer())
// Sets an optional request timeout.
.push(http::NewTimeout::layer())
.push(classify::NewClassify::layer())
.push(svc::ArcNewService::layer())
.into_inner()
Expand All @@ -124,6 +127,12 @@ impl<T: Clone, M, F, E> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T
}
}

impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, E> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ where
filters,
distribution,
failure_policy,
request_timeout,
}| {
let route_ref = RouteRef(meta);
let distribution = mk_distribution(&route_ref, &distribution);
Expand All @@ -214,6 +215,7 @@ where
filters,
failure_policy,
distribution,
request_timeout,
}
};

Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ async fn header_based_route() {
}),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([policy::RouteBackend {
filters: Arc::new([]),
backend,
request_timeout: None,
}])),
};

Expand Down Expand Up @@ -197,6 +199,7 @@ async fn http_filter_request_headers() {
policy: policy::RoutePolicy {
meta: policy::Meta::new_default("turtles"),
failure_policy: Default::default(),
request_timeout: None,
filters: Arc::new([policy::http::Filter::RequestHeaders(
policy::http::filter::ModifyHeader {
add: vec![(PIZZA.clone(), TUBULAR.clone())],
Expand All @@ -212,6 +215,7 @@ async fn http_filter_request_headers() {
..Default::default()
},
)]),
request_timeout: None,
},
])),
},
Expand Down
62 changes: 62 additions & 0 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,66 @@ async fn balancer_doesnt_select_tripped_breakers() {
}
}

#[tokio::test(flavor = "current_thread")]
async fn route_request_timeout() {
tokio::time::pause();
let _trace = trace::test::trace_init();
const REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2);

let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT);
let dest: NameAddr = format!("{AUTHORITY}:{PORT}")
.parse::<NameAddr>()
.expect("dest addr is valid");
let (svc, mut handle) = tower_test::mock::pair();
let connect = HttpConnect::default().service(addr, svc);
let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default());
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_http_cached(resolve)
.into_inner();

let (_route_tx, routes) = {
let backend = default_backend(&dest);
let mut route = default_route(backend.clone());
// set the request timeout on the route.
route.rules[0].policy.request_timeout = Some(REQUEST_TIMEOUT);
watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
})))
};
let target = Target {
num: 1,
version: http::Version::H2,
routes,
};
let svc = stack.new_service(target);

handle.allow(1);
let rsp = send_req(svc.clone(), http::Request::get("/"));
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_eq!(
rsp.await.expect("request must succeed").status(),
http::StatusCode::OK
);

// now, time out...
let rsp = send_req(svc.clone(), http::Request::get("/"));
tokio::time::sleep(REQUEST_TIMEOUT).await;
let error = rsp.await.expect_err("request must fail with a timeout");
assert!(
error.is::<LogicalError>(),
"error must originate in the logical stack"
);
assert!(errors::is_caused_by::<http::timeout::ResponseTimeoutError>(
error.as_ref()
));
}

#[derive(Clone, Debug)]
struct Target {
num: usize,
Expand Down Expand Up @@ -448,9 +508,11 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route
meta: Meta::new_default("test_route"),
filters: NO_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: NO_FILTERS.clone(),
backend,
request_timeout: None,
}])),
},
}],
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/test/src/resolver/client_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
},
}],
Expand All @@ -96,9 +98,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
}),
},
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http-route/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = "0.1"
url = "2"

[dependencies.linkerd2-proxy-api]
version = "0.9"
version = "0.10"
features = ["http-route", "grpc-route"]
optional = true

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async-stream = "0.3"
futures = { version = "0.3", default-features = false }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd2-proxy-api = { version = "0.9", features = ["destination"] }
linkerd2-proxy-api = { version = "0.10", features = ["destination"] }
linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
linkerd-tls = { path = "../../tls" }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/client-policy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ proto = [
ahash = "0.8"
ipnet = "2"
http = "0.2"
linkerd2-proxy-api = { version = "0.9", optional = true, features = [
linkerd2-proxy-api = { version = "0.10", optional = true, features = [
"outbound",
] }
linkerd-error = { path = "../../error" }
Expand Down
17 changes: 15 additions & 2 deletions linkerd/proxy/client-policy/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn default(distribution: crate::RouteDistribution<Filter>) -> Route {
filters: Arc::new([]),
distribution,
failure_policy: Codes::default(),
request_timeout: None,
},
}],
}
Expand Down Expand Up @@ -101,6 +102,7 @@ pub mod proto {
r#match::host::{proto::InvalidHostMatch, MatchHost},
},
};
use std::time::Duration;

#[derive(Debug, thiserror::Error)]
pub enum InvalidGrpcRoute {
Expand All @@ -124,6 +126,9 @@ pub mod proto {

#[error("invalid failure accrual policy: {0}")]
Breaker(#[from] InvalidFailureAccrual),

#[error("invalid duration: {0}")]
Duration(#[from] prost_types::DurationError),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -198,6 +203,7 @@ pub mod proto {
matches,
backends,
filters,
request_timeout,
} = proto;

let matches = matches
Expand All @@ -214,13 +220,16 @@ pub mod proto {
.ok_or(InvalidGrpcRoute::Missing("distribution"))?
.try_into()?;

let request_timeout = request_timeout.map(Duration::try_from).transpose()?;

Ok(Rule {
matches,
policy: Policy {
meta: meta.clone(),
filters,
distribution,
failure_policy: Codes::default(),
request_timeout,
},
})
}
Expand Down Expand Up @@ -270,10 +279,14 @@ pub mod proto {
impl TryFrom<grpc_route::RouteBackend> for RouteBackend<Filter> {
type Error = InvalidBackend;
fn try_from(
grpc_route::RouteBackend { backend, filters }: grpc_route::RouteBackend,
grpc_route::RouteBackend {
backend,
filters,
request_timeout,
}: grpc_route::RouteBackend,
) -> Result<RouteBackend<Filter>, InvalidBackend> {
let backend = backend.ok_or(InvalidBackend::Missing("backend"))?;
RouteBackend::try_from_proto(backend, filters)
RouteBackend::try_from_proto(backend, filters, request_timeout)
}
}

Expand Down
Loading