From 83484da662fbcb6311eceb77edffd49ffd9ee21f Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Tue, 19 May 2026 09:02:56 +0200 Subject: [PATCH] Limit ingress HTTP request body size Adds a `RequestBodyLimitLayer` to the ingress HTTP server so oversized request bodies are rejected with 413 Payload Too Large rather than being streamed in full. Introduces a new `ingress.request-size-limit` config option. It defaults to (and is clamped at) `networking.message-size-limit`, since requests larger than that cap cannot be transmitted over the cluster-internal network. Fixes #4153 --- Cargo.lock | 1 + crates/ingress-http/Cargo.toml | 2 +- crates/ingress-http/src/handler/awakeables.rs | 18 ++-- crates/ingress-http/src/handler/error.rs | 19 ++-- crates/ingress-http/src/handler/invocation.rs | 15 ++-- crates/ingress-http/src/handler/lookup.rs | 6 +- crates/ingress-http/src/handler/mod.rs | 20 +++-- .../src/handler/service_handler.rs | 18 ++-- crates/ingress-http/src/handler/tests.rs | 86 ++++++++++++++++++- crates/ingress-http/src/handler/workflow.rs | 20 +++-- crates/ingress-http/src/lib.rs | 3 +- crates/ingress-http/src/server.rs | 19 +++- crates/types/src/config/ingress.rs | 34 +++++++- crates/types/src/config/mod.rs | 8 +- crates/types/src/config_loader.rs | 4 +- lite/src/lib.rs | 4 +- .../unreleased/ingress-request-size-limit.md | 63 ++++++++++++++ workspace-hack/Cargo.toml | 4 +- 18 files changed, 279 insertions(+), 65 deletions(-) create mode 100644 release-notes/unreleased/ingress-request-size-limit.md diff --git a/Cargo.lock b/Cargo.lock index d76b308cdc..262869c1f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10565,6 +10565,7 @@ dependencies = [ "futures-util", "http 1.4.0", "http-body 1.0.1", + "http-body-util", "iri-string", "pin-project-lite", "tokio", diff --git a/crates/ingress-http/Cargo.toml b/crates/ingress-http/Cargo.toml index 69cf5880d7..fe7ae72c13 100644 --- a/crates/ingress-http/Cargo.toml +++ b/crates/ingress-http/Cargo.toml @@ -46,7 +46,7 @@ tracing-opentelemetry = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tower = { workspace = true, features = ["util"] } -tower-http = { workspace = true, features = ["cors", "normalize-path", "trace"] } +tower-http = { workspace = true, features = ["cors", "normalize-path", "trace", "limit"] } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/ingress-http/src/handler/awakeables.rs b/crates/ingress-http/src/handler/awakeables.rs index 2af0a4462e..0dc6253e03 100644 --- a/crates/ingress-http/src/handler/awakeables.rs +++ b/crates/ingress-http/src/handler/awakeables.rs @@ -7,22 +7,24 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::str::FromStr; -use super::Handler; -use super::HandlerError; -use super::path_parsing::AwakeableRequestType; - -use crate::RequestDispatcher; use bytes::Bytes; use http::{Method, Request, Response, StatusCode}; use http_body_util::BodyExt; use http_body_util::Full; +use tracing::{debug, trace, warn}; + +use restate_types::errors::GenericError; use restate_types::errors::{InvocationError, codes}; use restate_types::identifiers::{AwakeableIdentifier, ExternalSignalIdentifier, WithInvocationId}; use restate_types::invocation::{InvocationResponse, JournalCompletionTarget, ResponseResult}; use restate_types::journal_v2::{Signal, SignalResult}; -use std::str::FromStr; -use tracing::{debug, trace, warn}; + +use super::Handler; +use super::HandlerError; +use super::path_parsing::AwakeableRequestType; +use crate::RequestDispatcher; impl Handler where @@ -34,7 +36,7 @@ where awakeable_request_type: AwakeableRequestType, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { // Check HTTP Method if req.method() != Method::POST { diff --git a/crates/ingress-http/src/handler/error.rs b/crates/ingress-http/src/handler/error.rs index 36d7720069..724f4e9969 100644 --- a/crates/ingress-http/src/handler/error.rs +++ b/crates/ingress-http/src/handler/error.rs @@ -7,18 +7,20 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::string; -use super::APPLICATION_JSON; - -use crate::RequestDispatcherError; use bytes::Bytes; use http::{Response, StatusCode, header}; -use restate_types::errors::{IdDecodeError, InvocationError}; +use http_body_util::LengthLimitError; +use serde::Serialize; + +use restate_types::errors::{GenericError, IdDecodeError, InvocationError}; use restate_types::identifiers::DeploymentId; use restate_types::schema::invocation_target::InputValidationError; use restate_util_string::RestrictedValueError; -use serde::Serialize; -use std::string; + +use super::APPLICATION_JSON; +use crate::RequestDispatcherError; #[derive(Debug, thiserror::Error)] pub(crate) enum HandlerError { @@ -75,7 +77,7 @@ pub(crate) enum HandlerError { #[error("the invoked service is not public")] PrivateService, #[error("cannot read body: {0:?}")] - Body(anyhow::Error), + Body(GenericError), #[error("unavailable")] Unavailable, #[error("the invocation exists but has not completed yet")] @@ -160,6 +162,9 @@ impl HandlerError { // TODO add more distinctions between different dispatcher errors (unavailable, etc) StatusCode::INTERNAL_SERVER_ERROR } + HandlerError::Body(inner) if inner.downcast_ref::().is_some() => { + StatusCode::PAYLOAD_TOO_LARGE + } HandlerError::Body(_) => StatusCode::INTERNAL_SERVER_ERROR, HandlerError::Unavailable => StatusCode::SERVICE_UNAVAILABLE, HandlerError::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED, diff --git a/crates/ingress-http/src/handler/invocation.rs b/crates/ingress-http/src/handler/invocation.rs index b001b9a1c7..74be758765 100644 --- a/crates/ingress-http/src/handler/invocation.rs +++ b/crates/ingress-http/src/handler/invocation.rs @@ -13,6 +13,7 @@ use http::{Method, Request, Response}; use http_body_util::{BodyExt, Full}; use tracing::warn; +use restate_types::errors::GenericError; use restate_types::identifiers::IdempotencyId; use restate_types::invocation::InvocationQuery; use restate_types::invocation::client::{AttachInvocationResponse, GetInvocationOutputResponse}; @@ -34,7 +35,7 @@ where invocation_request_type: InvocationRequestType, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { match invocation_request_type { InvocationRequestType::Attach(invocation_target_type) => { @@ -86,7 +87,7 @@ where invocation_query: InvocationQuery, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { if req.method() != Method::GET { return Err(HandlerError::MethodNotAllowed); @@ -100,7 +101,7 @@ where invocation_query: InvocationQuery, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { if req.method() != Method::GET { return Err(HandlerError::MethodNotAllowed); @@ -113,7 +114,7 @@ where req: Request, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { let invocation_query = Self::parse_invocation_target_body(req).await?; self.attach_invocation_query(invocation_query).await @@ -124,7 +125,7 @@ where req: Request, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { let invocation_query = Self::parse_invocation_target_body(req).await?; self.get_invocation_output_query(invocation_query).await @@ -134,7 +135,7 @@ where req: Request, ) -> Result where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { if req.method() != Method::POST { return Err(HandlerError::MethodNotAllowed); @@ -148,7 +149,7 @@ where .to_bytes(); let target_request: InvocationTargetRequest = serde_json::from_slice(&body_bytes) - .map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid request body: {e}")))?; + .map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid request body: {e}").into()))?; target_request.into_invocation_query() } diff --git a/crates/ingress-http/src/handler/lookup.rs b/crates/ingress-http/src/handler/lookup.rs index cd1051decc..016a4e6807 100644 --- a/crates/ingress-http/src/handler/lookup.rs +++ b/crates/ingress-http/src/handler/lookup.rs @@ -13,7 +13,7 @@ use http::{Method, Request, Response, StatusCode, header}; use http_body_util::{BodyExt, Full}; use serde::Serialize; -use restate_types::identifiers::InvocationId; +use restate_types::{errors::GenericError, identifiers::InvocationId}; use super::{APPLICATION_JSON, Handler, HandlerError, InvocationTargetRequest}; @@ -30,7 +30,7 @@ impl Handler { req: Request, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { if req.method() != Method::POST { return Err(HandlerError::MethodNotAllowed); @@ -44,7 +44,7 @@ impl Handler { .to_bytes(); let target_request: InvocationTargetRequest = serde_json::from_slice(&body_bytes) - .map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid lookup body: {e}")))?; + .map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid lookup body: {e}").into()))?; let invocation_query = target_request.into_invocation_query()?; let invocation_id = invocation_query.to_invocation_id(); diff --git a/crates/ingress-http/src/handler/mod.rs b/crates/ingress-http/src/handler/mod.rs index 1ca4e57599..0a0e7a876d 100644 --- a/crates/ingress-http/src/handler/mod.rs +++ b/crates/ingress-http/src/handler/mod.rs @@ -21,10 +21,9 @@ mod tests; mod tracing; mod workflow; -use super::*; -use crate::handler::path_parsing::{ - AwakeableRequestType, InvocationRequestType, ServiceRequestType, WorkflowRequestType, -}; +use std::convert::Infallible; +use std::task::{Context, Poll}; + use bytestring::ByteString; use error::HandlerError; use futures::FutureExt; @@ -32,16 +31,21 @@ use futures::future::BoxFuture; use http_body_util::Full; use hyper::http::HeaderValue; use hyper::{Request, Response}; +use serde::Deserialize; + use restate_types::Scope; +use restate_types::errors::GenericError; use restate_types::identifiers::{IdempotencyId, ServiceId}; use restate_types::invocation::InvocationQuery; use restate_types::live::Live; use restate_types::schema::invocation_target::InvocationTargetResolver; use restate_types::schema::service::ServiceMetadataResolver; use restate_util_string::{ReString, RestrictedValue}; -use serde::Deserialize; -use std::convert::Infallible; -use std::task::{Context, Poll}; + +use super::*; +use crate::handler::path_parsing::{ + AwakeableRequestType, InvocationRequestType, ServiceRequestType, WorkflowRequestType, +}; const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); @@ -85,7 +89,7 @@ where Dispatcher: RequestDispatcher + Clone + Send + Sync + 'static, Body: http_body::Body + Send + 'static, ::Data: Send + 'static, - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { type Response = Response>; type Error = Infallible; diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index 455917f012..998aea8715 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -21,15 +21,9 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use tracing::{Instrument, debug, trace, trace_span}; -use super::HandlerError; -use super::path_parsing::{InvokeType, ServiceRequestType, TargetType}; -use super::tracing::prepare_tracing_span; -use super::{APPLICATION_JSON, Handler}; -use crate::RequestDispatcher; -use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID}; -use crate::metric_definitions::{INGRESS_REQUEST_DURATION, INGRESS_REQUESTS, REQUEST_COMPLETED}; use restate_types::Scope; use restate_types::config::Configuration; +use restate_types::errors::GenericError; use restate_types::identifiers::{InvocationId, WithInvocationId}; use restate_types::invocation::{ Header, InvocationRequest, InvocationRequestHeader, InvocationTarget, InvocationTargetType, @@ -42,6 +36,14 @@ use restate_types::schema::invocation_target::{ use restate_types::time::MillisSinceEpoch; use restate_util_string::{ReString, RestateString}; +use super::HandlerError; +use super::path_parsing::{InvokeType, ServiceRequestType, TargetType}; +use super::tracing::prepare_tracing_span; +use super::{APPLICATION_JSON, Handler}; +use crate::RequestDispatcher; +use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID}; +use crate::metric_definitions::{INGRESS_REQUEST_DURATION, INGRESS_REQUESTS, REQUEST_COMPLETED}; + pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key"); const LIMIT_KEY_HEADER: HeaderName = HeaderName::from_static("x-restate-limit-key"); const LIMIT_KEY_QUERY_PARAM: &str = "limit-key"; @@ -81,7 +83,7 @@ where service_request: ServiceRequestType, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { let start_time = Instant::now(); diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index 127ce03a0f..9735f5c5ea 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -8,17 +8,20 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::convert::Infallible; use std::future::ready; use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use bytestring::ByteString; -use futures::FutureExt; +use futures::{FutureExt, stream}; use http::StatusCode; use http::{HeaderValue, Method, Request, Response}; -use http_body_util::{BodyExt, Empty, Full}; -use tower::ServiceExt; +use http_body::Frame; +use http_body_util::{BodyExt, Empty, Full, StreamBody}; +use tower::{ServiceBuilder, ServiceExt}; +use tower_http::limit::{RequestBodyLimitLayer, ResponseBody as LimitResponseBody}; use tracing_test::traced_test; use restate_core::TestCoreEnv; @@ -974,6 +977,57 @@ async fn invalid_input() { assert_eq!(response.status(), StatusCode::BAD_REQUEST); } +const SIZE_LIMIT_BYTES: usize = 1024; + +#[restate_core::test] +#[traced_test] +async fn request_with_content_length_exceeding_limit_rejected_early() { + // Content-Length declares the body is larger than the configured limit. + // tower-http's RequestBodyLimit rejects with 413 before the handler runs, + // so the dispatcher must never be touched (the strict mock would panic + // on any unexpected call). + let big_body = Bytes::from(vec![b'a'; SIZE_LIMIT_BYTES * 2]); + let req = hyper::Request::builder() + .uri("http://localhost/greeter.Greeter/greet") + .method(Method::POST) + .header("content-type", "application/json") + .header(http::header::CONTENT_LENGTH, big_body.len().to_string()) + .body(Full::new(big_body)) + .unwrap(); + + let response = + handle_with_size_limit(req, SIZE_LIMIT_BYTES, MockRequestDispatcher::default()).await; + + assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); +} + +#[restate_core::test] +#[traced_test] +async fn streaming_request_exceeding_limit_returns_413() { + // Body has no Content-Length and streams in 64-byte chunks past the limit. + // The handler starts reading; once cumulative bytes exceed the limit, the + // body emits a LengthLimitError, which the handler's error mapping turns + // into 413. + let chunk = Bytes::from(vec![b'a'; 64]); + let chunks = (SIZE_LIMIT_BYTES / chunk.len()) + 2; + let frames: Vec, Infallible>> = (0..chunks) + .map(|_| Ok(Frame::data(chunk.clone()))) + .collect(); + let body = StreamBody::new(stream::iter(frames)); + + let req = hyper::Request::builder() + .uri("http://localhost/greeter.Greeter/greet") + .method(Method::POST) + .header("content-type", "application/json") + .body(body) + .unwrap(); + + let response = + handle_with_size_limit(req, SIZE_LIMIT_BYTES, MockRequestDispatcher::default()).await; + + assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); +} + #[restate_core::test] #[traced_test] async fn set_custom_content_type_on_response() { @@ -1161,6 +1215,32 @@ where handle_with_schemas_and_dispatcher(req, mock_schemas(), mock_request_dispatcher).await } +async fn handle_with_size_limit( + mut req: Request, + size_limit: usize, + dispatcher: MockRequestDispatcher, +) -> Response>> +where + B: http_body::Body + Send + 'static, + ::Data: Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, +{ + let _env = TestCoreEnv::create_with_single_node(1, 1).await; + + req.extensions_mut() + .insert(ConnectInfo::new(SocketAddress::Anonymous)); + req.extensions_mut().insert(opentelemetry::Context::new()); + + let svc = ServiceBuilder::new() + .layer(RequestBodyLimitLayer::new(size_limit)) + .service(Handler::new( + Live::from_value(mock_schemas()), + Arc::new(dispatcher), + )); + + svc.oneshot(req).await.unwrap() +} + // -- /restate attach / output / lookup ------------------------------------ #[restate_core::test] diff --git a/crates/ingress-http/src/handler/workflow.rs b/crates/ingress-http/src/handler/workflow.rs index 00f59f7ecd..1e54176127 100644 --- a/crates/ingress-http/src/handler/workflow.rs +++ b/crates/ingress-http/src/handler/workflow.rs @@ -8,19 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::Handler; -use super::HandlerError; -use super::path_parsing::WorkflowRequestType; - -use crate::RequestDispatcher; use bytes::Bytes; use http::{Method, Request, Response}; use http_body_util::Full; +use tracing::{info, warn}; + +use restate_types::errors::GenericError; use restate_types::identifiers::ServiceId; use restate_types::invocation::InvocationQuery; use restate_types::invocation::client::{AttachInvocationResponse, GetInvocationOutputResponse}; use restate_types::schema::invocation_target::InvocationTargetResolver; -use tracing::{info, warn}; + +use super::Handler; +use super::HandlerError; +use super::path_parsing::WorkflowRequestType; +use crate::RequestDispatcher; impl Handler where @@ -33,7 +35,7 @@ where workflow_request_type: WorkflowRequestType, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { match workflow_request_type { WorkflowRequestType::Attach(name, key) => { @@ -56,7 +58,7 @@ where workflow_id: ServiceId, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { // Check HTTP Method if req.method() != Method::GET { @@ -100,7 +102,7 @@ where workflow_id: ServiceId, ) -> Result>, HandlerError> where - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: Into, { // Check HTTP Method if req.method() != Method::GET { diff --git a/crates/ingress-http/src/lib.rs b/crates/ingress-http/src/lib.rs index 5c8bfd2ac5..8895d0dcc2 100644 --- a/crates/ingress-http/src/lib.rs +++ b/crates/ingress-http/src/lib.rs @@ -17,10 +17,11 @@ mod server; pub use rpc_request_dispatcher::InvocationClientRequestDispatcher; pub use server::{HyperServerIngress, IngressServerError}; -use bytes::Bytes; use std::future::Future; use std::sync::Arc; +use bytes::Bytes; + use restate_types::identifiers::InvocationId; use restate_types::invocation::client::{ AttachInvocationResponse, GetInvocationOutputResponse, InvocationOutput, diff --git a/crates/ingress-http/src/server.rs b/crates/ingress-http/src/server.rs index b112d2a391..d1e71d00da 100644 --- a/crates/ingress-http/src/server.rs +++ b/crates/ingress-http/src/server.rs @@ -22,12 +22,14 @@ use tokio_util::either::Either; use tower::{ServiceBuilder, ServiceExt}; use tower_http::classify::ServerErrorsFailureClass; use tower_http::cors::CorsLayer; +use tower_http::limit::RequestBodyLimitLayer; use tower_http::normalize_path::NormalizePathLayer; use tower_http::trace::TraceLayer; use tracing::{Span, debug, info, info_span, instrument}; use restate_core::{TaskCenter, TaskKind, cancellation_watcher}; use restate_types::config::IngressOptions; +use restate_types::errors::GenericError; use restate_types::health::HealthStatus; use restate_types::live::Live; use restate_types::net::address::{HttpIngressPort, ListenerPort, SocketAddress}; @@ -50,6 +52,7 @@ pub enum IngressServerError { pub struct HyperServerIngress { listeners: Listeners, concurrency_limit: usize, + request_size_limit: usize, // Parameters to build the layers schemas: Live, @@ -74,6 +77,7 @@ where HyperServerIngress::new( listeners, ingress_options.concurrent_api_requests_limit(), + ingress_options.request_size_limit().get(), schemas, dispatcher, health, @@ -89,6 +93,7 @@ where pub(crate) fn new( listeners: Listeners, concurrency_limit: usize, + request_size_limit: usize, schemas: Live, dispatcher: Dispatcher, health: HealthStatus, @@ -98,6 +103,7 @@ where Self { listeners, concurrency_limit, + request_size_limit, schemas, dispatcher, health, @@ -114,6 +120,7 @@ where let HyperServerIngress { mut listeners, concurrency_limit, + request_size_limit, schemas, dispatcher, health, @@ -169,11 +176,18 @@ where ), ) .layer(NormalizePathLayer::trim_trailing_slash()) - .layer(layers::load_shed::LoadShedLayer::new(concurrency_limit)) + .layer(RequestBodyLimitLayer::new(request_size_limit)) .layer(CorsLayer::very_permissive()) + .layer(layers::load_shed::LoadShedLayer::new(concurrency_limit)) .layer(layers::tracing_context_extractor::HttpTraceContextExtractorLayer) .service(Handler::new(schemas, dispatcher)); + // todo(azmy): `CorsLayer` should sit above `RequestBodyLimitLayer` so CORS is applied + // as early as possible. This is currently blocked because `CorsLayer` requires the + // response body to implement `Default`, which `RequestBodyLimitLayer`'s body does not. + // Tracked upstream in https://github.com/tower-rs/tower-http/pull/679 once merged, + // move `CorsLayer` above `RequestBodyLimitLayer`. + let mut shutdown = std::pin::pin!(cancellation_watcher()); if let Some(uds_path) = listeners.uds_address() { @@ -226,7 +240,7 @@ where F: Send, B: http_body::Body + Send + 'static, ::Data: Send + 'static, - ::Error: std::error::Error + Sync + Send + 'static, + ::Error: Into, T: tower::Service< Request, Response = Response, @@ -402,6 +416,7 @@ mod tests { let ingress = HyperServerIngress::new( listeners, Semaphore::MAX_PERMITS, + 10 * 1024 * 1024, // 10MB Live::from_value(mock_schemas()), Arc::new(mock_request_dispatcher), health.ingress_status(), diff --git a/crates/types/src/config/ingress.rs b/crates/types/src/config/ingress.rs index c9c6210f47..f55576a168 100644 --- a/crates/types/src/config/ingress.rs +++ b/crates/types/src/config/ingress.rs @@ -10,10 +10,11 @@ use std::num::NonZeroUsize; +use restate_memory::NonZeroByteCount; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; -use crate::config::IngestionOptions; +use crate::config::{DEFAULT_MESSAGE_SIZE_LIMIT, IngestionOptions, NetworkingOptions}; use crate::net::address::{AdvertisedAddress, BindAddress, HttpIngressPort}; use crate::net::listener::AddressBook; @@ -48,6 +49,19 @@ pub struct IngressOptions { #[serde(default, skip_serializing_if = "Option::is_none")] advertised_ingress_endpoint: Option>, + /// # Request size limit + /// + /// Maximum size of request that can be received over ingress. If a request size is + /// larger than this limit, the request will fail. + /// + /// If unset, defaults to `networking.message-size-limit`. If set, it will be clamped at + /// the value of `networking.message-size-limit` since larger requests cannot be transmitted + /// over the cluster internal network. + /// + /// Since v1.7.0 + #[serde(skip_serializing_if = "Option::is_none")] + request_size_limit: Option, + /// # Ingestion Options /// /// Settings for the ingestion client @@ -56,6 +70,12 @@ pub struct IngressOptions { } impl IngressOptions { + pub fn request_size_limit(&self) -> NonZeroUsize { + self.request_size_limit + .map(|v| v.as_non_zero_usize()) + .unwrap_or(DEFAULT_MESSAGE_SIZE_LIMIT) + } + pub fn bind_address(&self) -> BindAddress { self.ingress_listener_options.bind_address() } @@ -96,8 +116,18 @@ impl IngressOptions { } /// set derived values if they are not configured to reduce verbose configurations - pub fn set_derived_values(&mut self, common: &CommonOptions) { + pub fn set_derived_values(&mut self, common: &CommonOptions, networking: &NetworkingOptions) { self.ingress_listener_options .merge(common.fabric_listener_options()); + + self.merge(networking); + } + + fn merge(&mut self, opts: &NetworkingOptions) { + self.request_size_limit = Some( + self.request_size_limit + .map(|limit| limit.min(opts.message_size_limit)) + .unwrap_or(opts.message_size_limit), + ); } } diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index c1eb5a9fdc..84ec630e10 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -290,7 +290,9 @@ impl Configuration { config.worker.set_derived_values(&config.networking); config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); - config.ingress.set_derived_values(&config.common); + config + .ingress + .set_derived_values(&config.common, &config.networking); config } @@ -304,7 +306,9 @@ impl Configuration { config.worker.set_derived_values(&config.networking); config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); - config.ingress.set_derived_values(&config.common); + config + .ingress + .set_derived_values(&config.common, &config.networking); config } diff --git a/crates/types/src/config_loader.rs b/crates/types/src/config_loader.rs index ac62d2d826..4098c3c7aa 100644 --- a/crates/types/src/config_loader.rs +++ b/crates/types/src/config_loader.rs @@ -80,7 +80,9 @@ impl ConfigLoader { // network base options need to be propagated downstream config.common.set_derived_values(&config.networking)?; config.admin.set_derived_values(&config.common); - config.ingress.set_derived_values(&config.common); + config + .ingress + .set_derived_values(&config.common, &config.networking); config.worker.set_derived_values(&config.networking); if self.metadata_migration_mode { diff --git a/lite/src/lib.rs b/lite/src/lib.rs index b44fba3420..887cc6d6cc 100644 --- a/lite/src/lib.rs +++ b/lite/src/lib.rs @@ -201,7 +201,9 @@ impl Restate { // apply config cascading propagation config.common.set_derived_values(&config.networking)?; - config.ingress.set_derived_values(&config.common); + config + .ingress + .set_derived_values(&config.common, &config.networking); config.admin.set_derived_values(&config.common); config.worker.set_derived_values(&config.networking); let config = config.apply_cascading_values(); diff --git a/release-notes/unreleased/ingress-request-size-limit.md b/release-notes/unreleased/ingress-request-size-limit.md new file mode 100644 index 0000000000..8996ffb522 --- /dev/null +++ b/release-notes/unreleased/ingress-request-size-limit.md @@ -0,0 +1,63 @@ +# Release Notes: Ingress request size limit + +## New Feature / Behavioral Change + +### What Changed + +The HTTP ingress now enforces a maximum request body size. Requests whose body +exceeds the limit are rejected with `413 Payload Too Large` before reaching the +handler. + +A new configuration option `ingress.request-size-limit` controls the cap. If +unset, it defaults to `networking.message-size-limit` (32 MiB by default). If +set, the value is always clamped at `networking.message-size-limit`, since +larger requests cannot be transmitted over the cluster-internal network. + +### Why This Matters + +Previously, the ingress accepted request bodies of unbounded size and only +failed later — often deep in the pipeline — once cluster-internal size limits +were hit. The errors surfaced to clients were confusing and the server had +already paid the cost of buffering the oversized payload. + +With this change, oversized requests are rejected early with a standard 413 +response, protecting the server from unbounded buffering and giving clients an +unambiguous error. + +### Configuration + +```toml +[ingress] +# Maximum request body size accepted by the HTTP ingress. +# Defaults to `networking.message-size-limit` if unset. +# Always clamped at `networking.message-size-limit`. +request-size-limit = "10MB" +``` + +### Impact on Users + +- **Default behavior changes**: requests larger than `networking.message-size-limit` + (32 MiB by default) are now rejected with 413 instead of being accepted and + failing later. Clients that relied on submitting larger payloads must either + reduce request size or raise `networking.message-size-limit`. +- **New deployments**: no action required; the default 32 MiB cap matches what + the cluster-internal network can carry. +- **Existing deployments**: review whether any clients send bodies larger than + 32 MiB. If so, either shrink the payload or increase + `networking.message-size-limit` (and optionally set an explicit + `ingress.request-size-limit`). + +### Migration Guidance + +To raise the ingress limit, raise the networking limit as well: + +```toml +[networking] +message-size-limit = "64MB" + +[ingress] +request-size-limit = "64MB" +``` + +Setting `ingress.request-size-limit` higher than `networking.message-size-limit` +has no effect — the value is clamped to the networking limit. diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 06feccd8a6..4e4447e09f 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -127,7 +127,7 @@ toml_edit = { version = "0.22", features = ["serde"] } toml_parser = { version = "1" } tonic = { version = "0.14", features = ["gzip", "tls-native-roots", "tls-ring", "zstd"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "retry", "timeout"] } -tower-http = { version = "0.6", features = ["compression-br", "compression-gzip", "compression-zstd", "cors", "follow-redirect", "map-response-body", "normalize-path", "trace"] } +tower-http = { version = "0.6", features = ["compression-br", "compression-gzip", "compression-zstd", "cors", "follow-redirect", "limit", "map-response-body", "normalize-path", "trace"] } tracing = { version = "0.1", features = ["log", "max_level_trace", "release_max_level_debug"] } tracing-core = { version = "0.1" } tracing-log = { version = "0.2" } @@ -259,7 +259,7 @@ toml_edit = { version = "0.22", features = ["serde"] } toml_parser = { version = "1" } tonic = { version = "0.14", features = ["gzip", "tls-native-roots", "tls-ring", "zstd"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "retry", "timeout"] } -tower-http = { version = "0.6", features = ["compression-br", "compression-gzip", "compression-zstd", "cors", "follow-redirect", "map-response-body", "normalize-path", "trace"] } +tower-http = { version = "0.6", features = ["compression-br", "compression-gzip", "compression-zstd", "cors", "follow-redirect", "limit", "map-response-body", "normalize-path", "trace"] } tracing = { version = "0.1", features = ["log", "max_level_trace", "release_max_level_debug"] } tracing-core = { version = "0.1" } tracing-log = { version = "0.2" }