Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support publishing events consumed from [NATS](https://nats.io) topics. See the [documentation](https://accenture.github.io/reactive-interaction-gateway/docs/event-streams.html#nats) for how to get started. [#297](https://github.com/Accenture/reactive-interaction-gateway/issues/297)
- Added validation for reverse proxy configuration. Now it crashes RIG on start when configuration is not valid or returns `400` when using REST API to update configuration. [#277](https://github.com/Accenture/reactive-interaction-gateway/issues/277)
- Added basic distributed tracing support in [W3C Trace Context specification](https://www.w3.org/TR/trace-context/) with Jaeger and Openzipkin exporters. RIG opens a span at the API Gateway and emits trace context in Cloud Events following the [distributed tracing spec](https://github.com/cloudevents/spec/blob/v1.0/extensions/distributed-tracing.md). [#281](https://github.com/Accenture/reactive-interaction-gateway/issues/281)
- Added possibility to set response code for `response_from` messages in reverse proxy (`kafka` and `http_async`). [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)

### Changed

- Incorporated [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex) to handle binary and structured modes for [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) in a proper way. This introduces some **breaking changes**:
- Binary mode is now using `ce_` prefix for CloudEvents context attribute headers, before it was `ce-` - done according to the [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md)
- Change above affects also `"response_from": "kafka"` proxy functionality. RIG will forward to clients only Kafka body, no headers. This means, when using binary mode, clients receive only the data part, no CloudEvents context attributes.
- Changed `response_from` handler to expect a message in binary format, **NOT** a cloud event (`kafka` and `http_async`). [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)

### Fixed

Expand Down
42 changes: 25 additions & 17 deletions docs/api-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ The endpoint expects the following request format:
### Wait for response

Sometimes it makes sense to provide a simple request-response API to something that runs asynchronously on the backend. For example, let's say there's a ticket reservation process that takes 10 seconds in total and involves three different services that communicate via message passing. For an external client, it may be simpler to wait 10 seconds for the response instead of polling for a response every other second.
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).

> In case you want to use _binary_ transport mode, make sure that `rig` extension (containing correlation ID) is prefixed with `ce_` as well.
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). **Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).**

Configuration of such API endpoint might look like this:

Expand All @@ -207,26 +205,36 @@ Configuration of such API endpoint might look like this:

> Note the presence of `response_from` field. This tells RIG to wait for different event with the same correlation ID.

As an alternative you can set `response_from` to `http_async`. This means that correlated response has to be sent to internal `:4010/v2/responses` `POST` endpoint with a body like this:
#### Supported combinations (`target` -> `response_from`)

- HTTP -> `kafka`/`http_async`/`kinesis`
- Kafka -> `kafka`
- Kinesis -> **not supported**
- Nats -> `nats`

`http_async` means that correlated response has to be sent to internal `:4010/v2/responses` `POST` endpoint.

#### Supported formats

All `response_from` options support only binary mode.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue, just a thought: is "binary mode" a common term outside the cloudevents spec?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea tbh, but I guess it makes sense to be consistent with such terms.


Message headers:

```plaintext
rig-correlation: "correlation_id_sent_by_rig"
rig-response-code: "201"
Comment thread
kevinbader marked this conversation as resolved.
```

> Both headers are required.

Message body:

```json
{
"id": "1",
"specversion": "0.2",
"source": "my-service",
"type": "com.example",
"rig": {
"correlation": "_id_"
},
"data": {
...
}
...
"foo": "bar"
}
```

> **NOTE:** Kinesis doesn't support `response_from` field yet.

## Auth

RIG can do simple auth check for endpoints. Currently supports JWT.
Expand Down
25 changes: 11 additions & 14 deletions lib/rig_api/v1/responses.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule RigApi.V1.Responses do
use RigApi, :controller
use PhoenixSwagger

alias Rig.Connection.Codec
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.ApiProxy.ResponseFromParser

@prefix "/v1"

Expand Down Expand Up @@ -36,18 +35,16 @@ defmodule RigApi.V1.Responses do

Note that body has to contain following field `"rig": { "correlation": "_id_" }`.
"""
def create(conn, message) do
with {:ok, cloud_event} <- CloudEvent.parse(message),
{:ok, rig_metadata} <- Map.fetch(message, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, cloud_event.json})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")
else
def create(%{req_headers: req_headers} = conn, message) do
case ResponseFromParser.parse(req_headers, message) do
{deserialized_pid, response_code, response} ->
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, response, response_code})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")

Comment thread
kevinbader marked this conversation as resolved.
Outdated
err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)

Expand Down
25 changes: 11 additions & 14 deletions lib/rig_api/v2/responses.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule RigApi.V2.Responses do
use RigApi, :controller
use PhoenixSwagger

alias Rig.Connection.Codec
alias RigCloudEvents.CloudEvent
alias RigInboundGateway.ApiProxy.ResponseFromParser

@prefix "/v2"

Expand Down Expand Up @@ -36,18 +35,16 @@ defmodule RigApi.V2.Responses do

Note that body has to contain following field `"rig": { "correlation": "_id_" }`.
"""
def create(conn, message) do
with {:ok, cloud_event} <- CloudEvent.parse(message),
{:ok, rig_metadata} <- Map.fetch(message, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, cloud_event.json})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")
else
def create(%{req_headers: req_headers} = conn, message) do
case ResponseFromParser.parse(req_headers, message) do
{deserialized_pid, response_code, response} ->
Logger.debug(fn ->
"HTTP response via internal HTTP to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, response, response_code})
send_resp(conn, :accepted, "message sent to correlated reverse proxy request")

err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)

Expand Down
4 changes: 2 additions & 2 deletions lib/rig_inbound_gateway/api_proxy/handler/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Http do
end

receive do
{:response_received, response} ->
{:response_received, response, response_code} ->
ProxyMetrics.count_proxy_request(
conn.method,
conn.request_path,
Expand All @@ -116,7 +116,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Http do
|> with_cors()
|> Tracing.Plug.put_resp_header(Tracing.context())
|> Conn.put_resp_content_type("application/json")
|> Conn.send_resp(:ok, response)
|> Conn.send_resp(response_code, response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the content type? Ideally, we'd forward the body as-is and also retain the content-type, then the backend can send whatever

after
response_timeout ->
ProxyMetrics.count_proxy_request(
Expand Down
38 changes: 14 additions & 24 deletions lib/rig_inbound_gateway/api_proxy/handler/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
use Rig.KafkaConsumerSetup, [:cors, :request_topic, :request_schema, :response_timeout]

alias Plug.Conn

alias RigMetrics.ProxyMetrics

alias RigInboundGateway.ApiProxy.Handler
@behaviour Handler

alias Rig.Connection.Codec

alias RIG.Tracing
alias RigInboundGateway.ApiProxy.Handler
@behaviour Handler
alias RigInboundGateway.ApiProxy.ResponseFromParser
alias RigMetrics.ProxyMetrics

@help_text """
Produce the request to a Kafka topic and optionally wait for the (correlated) response.
Expand Down Expand Up @@ -46,21 +43,14 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
| {:error, %{:__exception__ => true, :__struct__ => atom(), optional(atom()) => any()},
any()}
def kafka_handler(message, headers) do
with {:ok, event} <- Cloudevents.from_kafka_message(message, headers),
{:ok, rig_metadata} <- Map.fetch(event.extensions, "rig"),
{:ok, correlation_id} <- Map.fetch(rig_metadata, "correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id) do
Logger.debug(fn ->
"HTTP response via Kafka to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

data =
if headers == [],
do: Cloudevents.to_json(event),
else: Jason.encode!(event.data)

send(deserialized_pid, {:response_received, data})
else
case ResponseFromParser.parse(headers, message) do
{deserialized_pid, response_code, response} ->
Logger.debug(fn ->
"HTTP response via Kafka to #{inspect(deserialized_pid)}: #{inspect(message)}"
end)

send(deserialized_pid, {:response_received, response, response_code})

err ->
Logger.warn(fn -> "Parse error #{inspect(err)} for #{inspect(message)}" end)
:ignore
Expand Down Expand Up @@ -267,7 +257,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
conf = config()

receive do
{:response_received, response} ->
{:response_received, response, response_code} ->
ProxyMetrics.count_proxy_request(
conn.method,
conn.request_path,
Expand All @@ -279,7 +269,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
conn
|> Tracing.Plug.put_resp_header(Tracing.context())
|> Conn.put_resp_content_type("application/json")
|> Conn.send_resp(:ok, response)
|> Conn.send_resp(response_code, response)
Comment thread
kevinbader marked this conversation as resolved.
after
conf.response_timeout ->
ProxyMetrics.count_proxy_request(
Expand Down
2 changes: 1 addition & 1 deletion lib/rig_inbound_gateway/api_proxy/handler/kinesis.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kinesis do
Conn.send_resp(conn, :bad_request, response)
end

# ---
# --- Not used at the moment

defp wait_for_response(conn, response_from) do
conf = config()
Expand Down
55 changes: 55 additions & 0 deletions lib/rig_inbound_gateway/api_proxy/response_from_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule RigInboundGateway.ApiProxy.ResponseFromParser do
@moduledoc """
Handles parsing of messages coming from different sources (e.g. Kafka)

"""

require Logger

alias Rig.Connection.Codec

# ---

@spec parse([{String.t(), String.t()}, ...], map()) ::
{pid, pos_integer(), any()} | String.t()

def parse(
headers,
message
) do
with headers_map <- Enum.into(headers, %{}),
{:ok, correlation_id} <- Map.fetch(headers_map, "rig-correlation"),
{:ok, deserialized_pid} <- Codec.deserialize(correlation_id),
{:ok, raw_response_code} <- Map.fetch(headers_map, "rig-response-code"),
# convert status code to int if needed, HTTP headers can't have number as a value
response_code <- to_int(raw_response_code),
Comment thread
kevinbader marked this conversation as resolved.
Outdated
response_body <- try_encode(message) do
Comment thread
kevinbader marked this conversation as resolved.
Logger.debug(fn ->
"Parsed binary HTTP response: body=#{inspect(response_body)}, code=#{inspect(response_code)}"
end)

{deserialized_pid, response_code, response_body}
else
err -> err
end
end

# ---

defp try_encode(message) when is_binary(message), do: message

defp try_encode(message) do
case Jason.encode(message) do
{:ok, val_map} ->
val_map

_ ->
message
end
end

# ---

defp to_int(value) when is_integer(value), do: value
defp to_int(value) when is_binary(value), do: String.to_integer(value)
Comment thread
kevinbader marked this conversation as resolved.
Outdated
end
4 changes: 2 additions & 2 deletions lib/rig_kafka/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ defmodule RigKafka.Client do
{data, additional_headers} = Map.pop(plaintext_map, "data", %{})

prefixed_headers =
headers
|> Enum.concat(additional_headers)
additional_headers
|> Serializer.add_prefix()
|> Enum.concat(headers)
|> Enum.concat([{"content-type", "avro/binary"}])

{prefixed_headers,
Expand Down
10 changes: 0 additions & 10 deletions test/rig_tests/proxy/publish_to_event_stream/kafka_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ defmodule RigTests.Proxy.PublishToEventStream.KafkaTest do
endpoints: [
%{
id: endpoint_id,
type: "http",
secured: false,
method: "OPTIONS",
path: endpoint_path,
target: "kafka"
Expand Down Expand Up @@ -134,8 +132,6 @@ defmodule RigTests.Proxy.PublishToEventStream.KafkaTest do
endpoints: [
%{
id: endpoint_id,
type: "http",
secured: false,
method: "POST",
path: endpoint_path,
target: "kafka",
Expand Down Expand Up @@ -252,8 +248,6 @@ defmodule RigTests.Proxy.PublishToEventStream.KafkaTest do
endpoints: [
%{
id: endpoint_id,
type: "http",
secured: false,
method: "POST",
path: endpoint_path,
target: "kafka",
Expand Down Expand Up @@ -371,8 +365,6 @@ defmodule RigTests.Proxy.PublishToEventStream.KafkaTest do
endpoints: [
%{
id: endpoint_id,
type: "http",
secured: false,
method: "POST",
path: endpoint_path,
target: "kafka"
Expand Down Expand Up @@ -488,8 +480,6 @@ defmodule RigTests.Proxy.PublishToEventStream.KafkaTest do
endpoints: [
%{
id: endpoint_id,
type: "http",
secured: false,
method: "POST",
path: endpoint_path,
target: "kafka"
Expand Down
Loading