Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support publishing events consumed from [NATS](https://nats.io) topics. See the [documentation](https://accenture.github.io/reactive-interaction-gateway/docs/event-streams.html#nats) for how to get started. [#297](https://github.com/Accenture/reactive-interaction-gateway/issues/297)
- Added validation for reverse proxy configuration. Now it crashes RIG on start when configuration is not valid or returns `400` when using REST API to update configuration. [#277](https://github.com/Accenture/reactive-interaction-gateway/issues/277)
- Added basic distributed tracing support in [W3C Trace Context specification](https://www.w3.org/TR/trace-context/) with Jaeger and Openzipkin exporters. RIG opens a span at the API Gateway and emits trace context in Cloud Events following the [distributed tracing spec](https://github.com/cloudevents/spec/blob/v1.0/extensions/distributed-tracing.md). [#281](https://github.com/Accenture/reactive-interaction-gateway/issues/281)
- Added possibility to set response code and response headers (structured mode only) for `response_from` messages in reverse proxy. [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)

### Changed

- Incorporated [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex) to handle binary and structured modes for [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) in a proper way. This introduces some **breaking changes**:
- Binary mode is now using `ce_` prefix for CloudEvents context attribute headers, before it was `ce-` - done according to the [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md)
- Change above affects also `"response_from": "kafka"` proxy functionality. RIG will forward to clients only Kafka body, no headers. This means, when using binary mode, clients receive only the data part, no CloudEvents context attributes.
- Changed `response_from` handler to expect a message, **NOT** a cloud event. [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)

### Fixed

Expand Down
3 changes: 3 additions & 0 deletions config/rig_inbound_gateway/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ config :rig, RigInboundGatewayWeb.V1.SubscriptionController, cors: cors
config :rig, RigInboundGatewayWeb.V1.SSE, cors: cors
config :rig, RigInboundGatewayWeb.V1.LongpollingController, cors: cors

config :rig, RigInboundGateway.ApiProxy.ResponseFromParser,
schema_registry_host: {:system, "KAFKA_SCHEMA_REGISTRY_HOST", nil}

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
59 changes: 46 additions & 13 deletions docs/api-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ The endpoint expects the following request format:
### Wait for response

Sometimes it makes sense to provide a simple request-response API to something that runs asynchronously on the backend. For example, let's say there's a ticket reservation process that takes 10 seconds in total and involves three different services that communicate via message passing. For an external client, it may be simpler to wait 10 seconds for the response instead of polling for a response every other second.
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).

> In case you want to use _binary_ transport mode, make sure that `rig` extension (containing correlation ID) is prefixed with `ce_` as well.
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). **Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).**

Configuration of such API endpoint might look like this:

Expand All @@ -207,25 +205,60 @@ Configuration of such API endpoint might look like this:

> Note the presence of `response_from` field. This tells RIG to wait for different event with the same correlation ID.

As an alternative you can set `response_from` to `http_async`. This means that correlated response has to be sent to internal `:4010/v2/responses` `POST` endpoint with a body like this:
#### Supported combinations (`target` -> `response_from`)

- HTTP -> `kafka`/`http_async`/`kinesis`
- Kafka -> `kafka`
- Kinesis -> **not supported**
- Nats -> `nats`

`http_async` means that correlated response has to be sent to internal `:4010/v2/responses` `POST` endpoint.

`response_from="kafka"` will try to decode Avro encoded message.

#### Supported formats

All `response_from` options are using message structures as below.

##### Binary

Message headers:

```plaintext
rig-correlation: "correlation_id_sent_by_rig"
rig-response-code: "201"
Comment thread
kevinbader marked this conversation as resolved.
```

> `rig-correlation` is required.

Message body:

```json
{
"foo": "bar"
}
```

##### Structured

Message body:

```json
{
"id": "1",
"specversion": "0.2",
"source": "my-service",
"type": "com.example",
"rig": {
"correlation": "_id_"
"correlation": "correlation_id_sent_by_rig",
"response_code": 201
},
"data": {
...
"headers": {
"foo": "bar"
},
"body": {
"foo": "bar"
Comment thread
mmacai marked this conversation as resolved.
Outdated
}
...
}
```

> **NOTE:** Kinesis doesn't support `response_from` field yet.
> `rig.correlation` is required.

## Auth

Expand Down
25 changes: 11 additions & 14 deletions lib/rig_api/v1/responses.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule RigApi.V1.Responses do
use RigApi, :controller
use PhoenixSwagger

alias Rig.Connection.Codec
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.ApiProxy.ResponseFromParser

@prefix "/v1"

Expand Down Expand Up @@ -36,18 +35,16 @@ defmodule RigApi.V1.Responses do

Note that body has to contain following field `"rig": { "correlation": "_id_" }`.
"""
def create(conn, message) do
with {:ok, cloud_event} <- CloudEvent.parse(message),
{:ok, rig_metadata} <- Map.fetch(message, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, cloud_event.json})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")
else
def create(%{req_headers: req_headers} = conn, message) do
case ResponseFromParser.parse(req_headers, message) do
{deserialized_pid, response_code, response, response_headers} ->
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, response, response_code, response_headers})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")

err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)

Expand Down
25 changes: 11 additions & 14 deletions lib/rig_api/v2/responses.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule RigApi.V2.Responses do
use RigApi, :controller
use PhoenixSwagger

alias Rig.Connection.Codec
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.ApiProxy.ResponseFromParser

@prefix "/v2"

Expand Down Expand Up @@ -36,18 +35,16 @@ defmodule RigApi.V2.Responses do

Note that body has to contain following field `"rig": { "correlation": "_id_" }`.
"""
def create(conn, message) do
with {:ok, cloud_event} <- CloudEvent.parse(message),
{:ok, rig_metadata} <- Map.fetch(message, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, cloud_event.json})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")
else
def create(%{req_headers: req_headers} = conn, message) do
case ResponseFromParser.parse(req_headers, message) do
{deserialized_pid, response_code, response, response_headers} ->
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, response, response_code, response_headers})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")

err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)

Expand Down
7 changes: 5 additions & 2 deletions lib/rig_inbound_gateway/api_proxy/handler/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Http do
end

receive do
{:response_received, response} ->
{:response_received, response, response_code, response_headers} ->
ProxyMetrics.count_proxy_request(
conn.method,
conn.request_path,
Expand All @@ -116,7 +116,10 @@ defmodule RigInboundGateway.ApiProxy.Handler.Http do
|> with_cors()
|> Tracing.Plug.put_resp_header(Tracing.context())
|> Conn.put_resp_content_type("application/json")
|> Conn.send_resp(:ok, response)
|> Map.update!(:resp_headers, fn existing_headers ->
existing_headers ++ Map.to_list(response_headers)
end)
Comment thread
mmacai marked this conversation as resolved.
|> Conn.send_resp(response_code, response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the content type? Ideally, we'd forward the body as-is and also retain the content-type, then the backend can send whatever

after
response_timeout ->
ProxyMetrics.count_proxy_request(
Expand Down
32 changes: 13 additions & 19 deletions lib/rig_inbound_gateway/api_proxy/handler/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
use Rig.KafkaConsumerSetup, [:cors, :request_topic, :request_schema, :response_timeout]

alias Plug.Conn

alias RigMetrics.ProxyMetrics

alias RigInboundGateway.ApiProxy.Handler
@behaviour Handler

alias Rig.Connection.Codec

alias RIG.Tracing
alias RigInboundGateway.ApiProxy.Handler
@behaviour Handler
alias RigInboundGateway.ApiProxy.ResponseFromParser
alias RigMetrics.ProxyMetrics

@help_text """
Produce the request to a Kafka topic and optionally wait for the (correlated) response.
Expand Down Expand Up @@ -46,20 +43,14 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
| {:error, %{:__exception__ => true, :__struct__ => atom(), optional(atom()) => any()},
any()}
def kafka_handler(message, headers) do
with {:ok, event} <- Cloudevents.from_kafka_message(message, headers),
{:ok, rig_metadata} <- Map.fetch(event.extensions, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
with decoded_message <- ResponseFromParser.try_decode_message(message),
{deserialized_pid, response_code, response, response_headers} <-
ResponseFromParser.parse(headers, decoded_message) do
Logger.debug(fn ->
"HTTP response via Kafka to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

data =
if headers == [],
do: Cloudevents.to_json(event),
else: Jason.encode!(event.data)

send(deserialized_pid, {:response_received, data})
send(deserialized_pid, {:response_received, response, response_code, response_headers})
else
err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)
Expand Down Expand Up @@ -267,7 +258,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
conf = config()

receive do
{:response_received, response} ->
{:response_received, response, response_code, response_headers} ->
ProxyMetrics.count_proxy_request(
conn.method,
conn.request_path,
Expand All @@ -279,7 +270,10 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
conn
|> Tracing.Plug.put_resp_header(Tracing.context())
|> Conn.put_resp_content_type("application/json")
|> Conn.send_resp(:ok, response)
|> Map.update!(:resp_headers, fn existing_headers ->
existing_headers ++ Map.to_list(response_headers)
end)
Comment thread
mmacai marked this conversation as resolved.
|> Conn.send_resp(response_code, response)
Comment thread
kevinbader marked this conversation as resolved.
after
conf.response_timeout ->
ProxyMetrics.count_proxy_request(
Expand Down
2 changes: 1 addition & 1 deletion lib/rig_inbound_gateway/api_proxy/handler/kinesis.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kinesis do
Conn.send_resp(conn, :bad_request, response)
end

# ---
# --- Not used at the moment

defp wait_for_response(conn, response_from) do
conf = config()
Expand Down
103 changes: 103 additions & 0 deletions lib/rig_inbound_gateway/api_proxy/response_from_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
defmodule RigInboundGateway.ApiProxy.ResponseFromParser do
@moduledoc """
Handles parsing of messages coming from different sources (e.g. Kafka)

"""

require Logger

use Rig.Config, [:schema_registry_host]

alias Rig.Connection.Codec
alias RigKafka.Avro

@default_response_code 200

# ---

@spec parse([{String.t(), String.t()}, ...], map()) ::
{pid, pos_integer(), any(), map()} | String.t()
# structured
def parse(
_headers,
%{
"body" => body,
"rig" => rig_metadata
} = message
) do
Comment thread
mmacai marked this conversation as resolved.
Outdated
with {:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id),
raw_response_code <- Map.get(rig_metadata, "response_code", @default_response_code),
Comment thread
kevinbader marked this conversation as resolved.
Outdated
# convert status code to int if needed, HTTP headers can't have number as a value
{response_code, _} <- to_int(raw_response_code),
Comment thread
kevinbader marked this conversation as resolved.
Outdated
response_headers <- Map.get(message, "headers", %{}),
response_body <- try_encode(body) do
Logger.debug(fn ->
"Parsed structured HTTP response: body=#{inspect(response_body)}, headers=#{
inspect(response_headers)
}, code=#{inspect(response_code)}"
end)

{deserialized_pid, response_code, response_body, response_headers}
else
err -> err
end
end

# binary
def parse(
headers,
message
) do
with headers_map <- Enum.into(headers, %{}),
{:ok, correlation_id} <- Map.fetch(headers_map, "rig-correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id),
raw_response_code <- Map.get(headers_map, "rig-response-code", @default_response_code),
# convert status code to int if needed, HTTP headers can't have number as a value
{response_code, _} <- to_int(raw_response_code),
response_body <- try_encode(message) do
Comment thread
kevinbader marked this conversation as resolved.
Logger.debug(fn ->
"Parsed binary HTTP response: body=#{inspect(message)}, code=#{inspect(response_code)}"
end)

{deserialized_pid, response_code, response_body, %{}}
else
err -> err
end
end

# ---

@spec try_decode_message(<<_::3>> | String.t()) :: String.t() | map()
def try_decode_message(<<0::8, _id::32, _body::binary>> = message),
do: Avro.decode(message, config().schema_registry_host)

def try_decode_message(message) when is_binary(message) do
case Jason.decode(message) do
{:ok, val_map} ->
val_map

_ ->
message
end
end

# ---

defp try_encode(message) when is_binary(message), do: message

defp try_encode(message) do
case Jason.encode(message) do
{:ok, val_map} ->
val_map

_ ->
message
end
end

# ---

defp to_int(value) when is_integer(value), do: {value, nil}
defp to_int(value), do: Integer.parse(value)
end
4 changes: 2 additions & 2 deletions lib/rig_kafka/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ defmodule RigKafka.Client do
{data, additional_headers} = Map.pop(plaintext_map, "data", %{})

prefixed_headers =
headers
|> Enum.concat(additional_headers)
additional_headers
|> Serializer.add_prefix()
|> Enum.concat(headers)
|> Enum.concat([{"content-type", "avro/binary"}])

{prefixed_headers,
Expand Down
Loading