Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ tracing-opentelemetry = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tower = { workspace = true, features = ["util"] }
tower-http = { workspace = true, features = ["cors", "normalize-path", "trace"] }
tower-http = { workspace = true, features = ["cors", "normalize-path", "trace", "limit"] }
url = { workspace = true }
urlencoding = { workspace = true }

Expand Down
18 changes: 10 additions & 8 deletions crates/ingress-http/src/handler/awakeables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,24 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::str::FromStr;

use super::Handler;
use super::HandlerError;
use super::path_parsing::AwakeableRequestType;

use crate::RequestDispatcher;
use bytes::Bytes;
use http::{Method, Request, Response, StatusCode};
use http_body_util::BodyExt;
use http_body_util::Full;
use tracing::{debug, trace, warn};

use restate_types::errors::{InvocationError, codes};
use restate_types::identifiers::{AwakeableIdentifier, ExternalSignalIdentifier, WithInvocationId};
use restate_types::invocation::{InvocationResponse, JournalCompletionTarget, ResponseResult};
use restate_types::journal_v2::{Signal, SignalResult};
use std::str::FromStr;
use tracing::{debug, trace, warn};

use super::Handler;
use super::HandlerError;
use super::path_parsing::AwakeableRequestType;
use crate::BoxError;
use crate::RequestDispatcher;

impl<Schemas, Dispatcher> Handler<Schemas, Dispatcher>
where
Expand All @@ -34,7 +36,7 @@ where
awakeable_request_type: AwakeableRequestType,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
// Check HTTP Method
if req.method() != Method::POST {
Expand Down
17 changes: 11 additions & 6 deletions crates/ingress-http/src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::string;

use super::APPLICATION_JSON;

use crate::RequestDispatcherError;
use bytes::Bytes;
use http::{Response, StatusCode, header};
use http_body_util::LengthLimitError;
use serde::Serialize;

use restate_types::errors::{IdDecodeError, InvocationError};
use restate_types::identifiers::DeploymentId;
use restate_types::schema::invocation_target::InputValidationError;
use restate_util_string::RestrictedValueError;
use serde::Serialize;
use std::string;

use super::APPLICATION_JSON;
use crate::{BoxError, RequestDispatcherError};

#[derive(Debug, thiserror::Error)]
pub(crate) enum HandlerError {
Expand Down Expand Up @@ -75,7 +77,7 @@ pub(crate) enum HandlerError {
#[error("the invoked service is not public")]
PrivateService,
#[error("cannot read body: {0:?}")]
Body(anyhow::Error),
Body(BoxError),
#[error("unavailable")]
Unavailable,
#[error("the invocation exists but has not completed yet")]
Expand Down Expand Up @@ -160,6 +162,9 @@ impl HandlerError {
// TODO add more distinctions between different dispatcher errors (unavailable, etc)
StatusCode::INTERNAL_SERVER_ERROR
}
HandlerError::Body(inner) if inner.downcast_ref::<LengthLimitError>().is_some() => {
StatusCode::PAYLOAD_TOO_LARGE
}
HandlerError::Body(_) => StatusCode::INTERNAL_SERVER_ERROR,
HandlerError::Unavailable => StatusCode::SERVICE_UNAVAILABLE,
HandlerError::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED,
Expand Down
16 changes: 8 additions & 8 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::schema::invocation_target::InvocationTargetResolver;
use super::HandlerError;
use super::path_parsing::{InvocationRequestType, InvocationTargetType, TargetType};
use super::{Handler, InvocationTargetRequest};
use crate::RequestDispatcher;
use crate::{BoxError, RequestDispatcher};

impl<Schemas, Dispatcher> Handler<Schemas, Dispatcher>
where
Expand All @@ -34,7 +34,7 @@ where
invocation_request_type: InvocationRequestType,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
match invocation_request_type {
InvocationRequestType::Attach(invocation_target_type) => {
Expand Down Expand Up @@ -86,7 +86,7 @@ where
invocation_query: InvocationQuery,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
if req.method() != Method::GET {
return Err(HandlerError::MethodNotAllowed);
Expand All @@ -100,7 +100,7 @@ where
invocation_query: InvocationQuery,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
if req.method() != Method::GET {
return Err(HandlerError::MethodNotAllowed);
Expand All @@ -113,7 +113,7 @@ where
req: Request<B>,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
let invocation_query = Self::parse_invocation_target_body(req).await?;
self.attach_invocation_query(invocation_query).await
Expand All @@ -124,7 +124,7 @@ where
req: Request<B>,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
let invocation_query = Self::parse_invocation_target_body(req).await?;
self.get_invocation_output_query(invocation_query).await
Expand All @@ -134,7 +134,7 @@ where
req: Request<B>,
) -> Result<InvocationQuery, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
if req.method() != Method::POST {
return Err(HandlerError::MethodNotAllowed);
Expand All @@ -148,7 +148,7 @@ where
.to_bytes();

let target_request: InvocationTargetRequest = serde_json::from_slice(&body_bytes)
.map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid request body: {e}")))?;
.map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid request body: {e}").into()))?;

target_request.into_invocation_query()
}
Expand Down
6 changes: 4 additions & 2 deletions crates/ingress-http/src/handler/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use serde::Serialize;

use restate_types::identifiers::InvocationId;

use crate::BoxError;

use super::{APPLICATION_JSON, Handler, HandlerError, InvocationTargetRequest};

#[derive(Debug, Serialize)]
Expand All @@ -30,7 +32,7 @@ impl<Schemas, Dispatcher> Handler<Schemas, Dispatcher> {
req: Request<B>,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
if req.method() != Method::POST {
return Err(HandlerError::MethodNotAllowed);
Expand All @@ -44,7 +46,7 @@ impl<Schemas, Dispatcher> Handler<Schemas, Dispatcher> {
.to_bytes();

let target_request: InvocationTargetRequest = serde_json::from_slice(&body_bytes)
.map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid lookup body: {e}")))?;
.map_err(|e| HandlerError::Body(anyhow::anyhow!("invalid lookup body: {e}").into()))?;

let invocation_query = target_request.into_invocation_query()?;
let invocation_id = invocation_query.to_invocation_id();
Expand Down
19 changes: 11 additions & 8 deletions crates/ingress-http/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,30 @@ mod tests;
mod tracing;
mod workflow;

use super::*;
use crate::handler::path_parsing::{
AwakeableRequestType, InvocationRequestType, ServiceRequestType, WorkflowRequestType,
};
use std::convert::Infallible;
use std::task::{Context, Poll};

use bytestring::ByteString;
use error::HandlerError;
use futures::FutureExt;
use futures::future::BoxFuture;
use http_body_util::Full;
use hyper::http::HeaderValue;
use hyper::{Request, Response};
use serde::Deserialize;

use restate_types::Scope;
use restate_types::identifiers::{IdempotencyId, ServiceId};
use restate_types::invocation::InvocationQuery;
use restate_types::live::Live;
use restate_types::schema::invocation_target::InvocationTargetResolver;
use restate_types::schema::service::ServiceMetadataResolver;
use restate_util_string::{ReString, RestrictedValue};
use serde::Deserialize;
use std::convert::Infallible;
use std::task::{Context, Poll};

use super::*;
use crate::handler::path_parsing::{
AwakeableRequestType, InvocationRequestType, ServiceRequestType, WorkflowRequestType,
};

const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");

Expand Down Expand Up @@ -85,7 +88,7 @@ where
Dispatcher: RequestDispatcher + Clone + Send + Sync + 'static,
Body: http_body::Body + Send + 'static,
<Body as http_body::Body>::Data: Send + 'static,
<Body as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<Body as http_body::Body>::Error: Into<BoxError>,
{
type Response = Response<Full<Bytes>>;
type Error = Infallible;
Expand Down
17 changes: 9 additions & 8 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tracing::{Instrument, debug, trace, trace_span};

use super::HandlerError;
use super::path_parsing::{InvokeType, ServiceRequestType, TargetType};
use super::tracing::prepare_tracing_span;
use super::{APPLICATION_JSON, Handler};
use crate::RequestDispatcher;
use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUEST_DURATION, INGRESS_REQUESTS, REQUEST_COMPLETED};
use restate_types::Scope;
use restate_types::config::Configuration;
use restate_types::identifiers::{InvocationId, WithInvocationId};
Expand All @@ -42,6 +35,14 @@ use restate_types::schema::invocation_target::{
use restate_types::time::MillisSinceEpoch;
use restate_util_string::{ReString, RestateString};

use super::HandlerError;
use super::path_parsing::{InvokeType, ServiceRequestType, TargetType};
use super::tracing::prepare_tracing_span;
use super::{APPLICATION_JSON, Handler};
use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUEST_DURATION, INGRESS_REQUESTS, REQUEST_COMPLETED};
use crate::{BoxError, RequestDispatcher};

pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key");
const LIMIT_KEY_HEADER: HeaderName = HeaderName::from_static("x-restate-limit-key");
const LIMIT_KEY_QUERY_PARAM: &str = "limit-key";
Expand Down Expand Up @@ -81,7 +82,7 @@ where
service_request: ServiceRequestType,
) -> Result<Response<Full<Bytes>>, HandlerError>
where
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
<B as http_body::Body>::Error: Into<BoxError>,
{
let start_time = Instant::now();

Expand Down
Loading
Loading