Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1892,9 +1892,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.9.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d"
checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14"
dependencies = [
"h2",
"http",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.9", features = ["inbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
rangemap = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { version = "0.9", features = [
linkerd2-proxy-api = { version = "0.10", features = [
"destination",
"arbitrary",
] }
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/integration/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute {
}],
filters: Vec::new(),
backends: Some(http_first_available(std::iter::once(backend(dst)))),
request_timeout: None,
}],
}
}
Expand Down Expand Up @@ -214,6 +215,7 @@ pub fn http_first_available(
.map(|backend| http_route::RouteBackend {
backend: Some(backend),
filters: Vec::new(),
request_timeout: None,
})
.collect(),
},
Expand Down
5 changes: 5 additions & 0 deletions linkerd/app/integration/src/tests/client_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),
request_timeout: None,
};

let route = outbound::HttpRoute {
Expand All @@ -236,6 +237,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// x-hello-city: sf | x-hello-city: san francisco
mk_header_rule(
Expand Down Expand Up @@ -398,6 +400,8 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),

request_timeout: None,
};

let route = outbound::HttpRoute {
Expand All @@ -411,6 +415,7 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// /goodbye/*
mk_path_rule(
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.9", features = ["outbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["outbound"] }
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_OPAQ_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_OPAQ_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
}),
Expand All @@ -223,10 +225,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_HTTP_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_HTTP_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
},
Expand Down
9 changes: 9 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(crate) struct Route<T, F, E> {
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
pub(super) request_timeout: Option<std::time::Duration>,
}

pub(crate) type MatchedRoute<T, M, F, E> = Matched<M, Route<T, F, E>>;
Expand Down Expand Up @@ -109,6 +110,8 @@ where
// consideration, so we must eagerly fail requests to prevent
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
// Sets an optional request timeout.
.push(http::NewTimeout::layer())
// TODO(ver) attach the `E` typed failure policy to requests.
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(classify::NewClassify::layer())
Expand All @@ -124,6 +127,12 @@ impl<T: Clone, M, F, E> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T
}
}

impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, E> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ where
filters,
distribution,
failure_policy,
request_timeout,
}| {
let route_ref = RouteRef(meta);
let distribution = mk_distribution(&route_ref, &distribution);
Expand All @@ -214,6 +215,7 @@ where
filters,
failure_policy,
distribution,
request_timeout,
}
};

Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ async fn header_based_route() {
}),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([policy::RouteBackend {
filters: Arc::new([]),
backend,
request_timeout: None,
}])),
};

Expand Down Expand Up @@ -197,6 +199,7 @@ async fn http_filter_request_headers() {
policy: policy::RoutePolicy {
meta: policy::Meta::new_default("turtles"),
failure_policy: Default::default(),
request_timeout: None,
filters: Arc::new([policy::http::Filter::RequestHeaders(
policy::http::filter::ModifyHeader {
add: vec![(PIZZA.clone(), TUBULAR.clone())],
Expand All @@ -212,6 +215,7 @@ async fn http_filter_request_headers() {
..Default::default()
},
)]),
request_timeout: None,
},
])),
},
Expand Down
62 changes: 62 additions & 0 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,66 @@ async fn balancer_doesnt_select_tripped_breakers() {
}
}

#[tokio::test(flavor = "current_thread")]
async fn route_request_timeout() {
tokio::time::pause();
let _trace = trace::test::trace_init();
const REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2);

let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT);
let dest: NameAddr = format!("{AUTHORITY}:{PORT}")
.parse::<NameAddr>()
.expect("dest addr is valid");
let (svc, mut handle) = tower_test::mock::pair();
let connect = HttpConnect::default().service(addr, svc);
let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default());
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_http_cached(resolve)
.into_inner();

let (_route_tx, routes) = {
let backend = default_backend(&dest);
let mut route = default_route(backend.clone());
// set the request timeout on the route.
route.rules[0].policy.request_timeout = Some(REQUEST_TIMEOUT);
watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
})))
};
let target = Target {
num: 1,
version: http::Version::H2,
routes,
};
let svc = stack.new_service(target);

handle.allow(1);
let rsp = send_req(svc.clone(), http::Request::get("/"));
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_eq!(
rsp.await.expect("request must succeed").status(),
http::StatusCode::OK
);

// now, time out...
let rsp = send_req(svc.clone(), http::Request::get("/"));
tokio::time::sleep(REQUEST_TIMEOUT).await;
let error = rsp.await.expect_err("request must fail with a timeout");
assert!(
error.is::<LogicalError>(),
"error must originate in the logical stack"
);
assert!(errors::is_caused_by::<http::timeout::ResponseTimeoutError>(
error.as_ref()
));
}

#[derive(Clone, Debug)]
struct Target {
num: usize,
Expand Down Expand Up @@ -448,9 +508,11 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route
meta: Meta::new_default("test_route"),
filters: NO_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: NO_FILTERS.clone(),
backend,
request_timeout: None,
}])),
},
}],
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/test/src/resolver/client_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
},
}],
Expand All @@ -96,9 +98,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
}),
},
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http-route/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = "0.1"
url = "2"

[dependencies.linkerd2-proxy-api]
version = "0.9"
version = "0.10"
features = ["http-route", "grpc-route"]
optional = true

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async-stream = "0.3"
futures = { version = "0.3", default-features = false }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd2-proxy-api = { version = "0.9", features = ["destination"] }
linkerd2-proxy-api = { version = "0.10", features = ["destination"] }
linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
linkerd-tls = { path = "../../tls" }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/client-policy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ proto = [
ahash = "0.8"
ipnet = "2"
http = "0.2"
linkerd2-proxy-api = { version = "0.9", optional = true, features = [
linkerd2-proxy-api = { version = "0.10", optional = true, features = [
"outbound",
] }
linkerd-error = { path = "../../error" }
Expand Down
17 changes: 15 additions & 2 deletions linkerd/proxy/client-policy/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn default(distribution: crate::RouteDistribution<Filter>) -> Route {
filters: Arc::new([]),
distribution,
failure_policy: Codes::default(),
request_timeout: None,
},
}],
}
Expand Down Expand Up @@ -101,6 +102,7 @@ pub mod proto {
r#match::host::{proto::InvalidHostMatch, MatchHost},
},
};
use std::time::Duration;

#[derive(Debug, thiserror::Error)]
pub enum InvalidGrpcRoute {
Expand All @@ -124,6 +126,9 @@ pub mod proto {

#[error("invalid failure accrual policy: {0}")]
Breaker(#[from] InvalidFailureAccrual),

#[error("invalid duration: {0}")]
Duration(#[from] prost_types::DurationError),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -198,6 +203,7 @@ pub mod proto {
matches,
backends,
filters,
request_timeout,
} = proto;

let matches = matches
Expand All @@ -214,13 +220,16 @@ pub mod proto {
.ok_or(InvalidGrpcRoute::Missing("distribution"))?
.try_into()?;

let request_timeout = request_timeout.map(Duration::try_from).transpose()?;

Ok(Rule {
matches,
policy: Policy {
meta: meta.clone(),
filters,
distribution,
failure_policy: Codes::default(),
request_timeout,
},
})
}
Expand Down Expand Up @@ -270,10 +279,14 @@ pub mod proto {
impl TryFrom<grpc_route::RouteBackend> for RouteBackend<Filter> {
type Error = InvalidBackend;
fn try_from(
grpc_route::RouteBackend { backend, filters }: grpc_route::RouteBackend,
grpc_route::RouteBackend {
backend,
filters,
request_timeout,
}: grpc_route::RouteBackend,
) -> Result<RouteBackend<Filter>, InvalidBackend> {
let backend = backend.ok_or(InvalidBackend::Missing("backend"))?;
RouteBackend::try_from_proto(backend, filters)
RouteBackend::try_from_proto(backend, filters, request_timeout)
}
}

Expand Down
Loading