Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions auditlog/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
target/

connector-config/plugins
123 changes: 103 additions & 20 deletions auditlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,37 @@ It accompanies the blog post [Building Audit Logs with Change Data Capture and S

There are two applications (based on [Quarkus](https://quarkus.io/)):

* _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database;
as part of its processing, it will not only update its actual "business table" `vegetable`,
but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`:
the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header)
and a use case identifier (as specified in an annotation on the REST API methods).
* _log-enricher_: a Kafka Streams application,
which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table;
this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`.
The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic.
- _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database;
as part of its processing, it will not only update its actual "business table" `vegetable`,
but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`:
the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header)
and a use case identifier (as specified in an annotation on the REST API methods).
- _log-enricher_: a Kafka Streams application,
which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table;
this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`.
The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic.

## Building the Demo

```console
$ mvn clean package
mvn clean package
```

```console
$ export DEBEZIUM_VERSION=1.8
$ docker-compose up --build
export DEBEZIUM_VERSION=1.8
docker-compose up --build
```

## Deploy the Debezium Postgres Connector

```console
$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
```

## Modifying Some Data and Observing the Audit Log

```console
$ http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy!
http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy!
```

This uses a pre-generated JWT token (with expiration date set to 2099-12-31 and user set to "farmerbob").
Expand All @@ -44,13 +44,13 @@ To regenerate the token with different data, use the [Jwtenizr](https://github.c
You can also update an existing vegetable record (this token is for "farmermargaret"):

```console
$ http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty!
http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty!
```

Or delete a record (again using the "farmerbob" token):

```console
$ http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT'
http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT'
```

Doing so, observe the contents of the `dbserver1.inventory.vegetable`, `dbserver1.inventory.transaction_context_data` and `dbserver1.inventory.vegetable.enriched` topics:
Expand Down Expand Up @@ -144,19 +144,102 @@ http POST http://localhost:8085/vegetables/{uuid}/auditData/{tuuid} audit:='{"us

This would then fix the missing event in the transaction context data topic and trigger the enricher to provide a new log entry.

## Stopping All Services
## Fowarding Events to a Downstream Postgres Database

```console
$ docker-compose down
$ docker run -it --rm \
--network auditlog_default \
quay.io/debezium/tooling:1.2 \
/bin/bash -c "kafkacat -b kafka:9092 \
-C -o beginning -q -u -t dbserver1.inventory.vegetable.enriched | jq ."
```

This will show you the enriched events fowarded to the `dbserver1.inventory.vegetable.enriched` topic.
We need to pass this events to lightweight `postgres-sink` database defined in `docker-compose.yaml` using the [Debezium JDBC connector](https://debezium.io/documentation/reference/stable/connectors/jdbc.html).

Download the connector plugin `.tar.gz` from the Debzium [plugin archive](https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/) and save it to the `/config/plugins/debezium-connector-jdbc` directory.

Restart your Kafka connect container to pick up the new plugin

```console
docker compose down -v connect
docker compose up -d connect
```

Create your debezium sink connector by running

```console
http POST http://localhost:8083/connectors < connector-config/config/jdbc-connector-config.json
```

The connector should read from the `dbserver1.inventory.vegetable.enriched` topic and populate the `dbserver1_inventory_vegetable` table.

Confirm the changes by running the `exec` command into your `postgres-sink` container

```console
docker compose exec postgres-sink psql -U postgresuser -d postgres
```

View the rows in your `dbserver1_inventory_vegetable` table:

```sql
SELECT * FROM dbserver1_inventory_vegetable;
```

An exmaple of the rows should be like this:

```console
| __connect_topic | __connect_partition | __connect_offset | id | description | name | __deleted | op | lsn | ts_ms | tx_id | client_date | usecase | user_name |
|------------------------------------------|---------------------|------------------|----|-------------|---------|-----------|----|----------|---------------|-------|------------------|-------------------|-----------------|
| dbserver1.inventory.vegetable.enriched | 0 | 0 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob |
| dbserver1.inventory.vegetable.enriched | 0 | 2 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob |
| dbserver1.inventory.vegetable.enriched | 0 | 4 | 10 | Tasty! | Tomatoe | false | u | 36690432 | 1773921804910 | 770 | 1566461551000000 | UPDATE VEGETABLE | farmermargaret |
| dbserver1.inventory.vegetable.enriched | 0 | 5 | 10 | | | true | d | 36690800 | 1773921809082 | 771 | 1566461551000000 | DELETE VEGETABLE | farmerbob |
```

With this our audit log events have been successfully propagated to our downstream database.

The table `dbserver_inventory_vegetable` can be defined as:

```sql
-- This version is used with the Debezium JDBC Sink Connector
CREATE TABLE dbserver1_inventory_vegetable_enriched (
-- Business Data
id INTEGER,
name VARCHAR(255) NULL,
description TEXT NULL,

-- Audit Metadata
tx_id VARCHAR(255),
user_name VARCHAR(255) NULL,
usecase VARCHAR(255) NULL,
client_date BIGINT NULL,

-- Change Metadata (for uniqueness)
op VARCHAR(1),
lsn BIGINT,
ts_ms BIGINT,

-- Kafka Metadata to prevent squashing of events
__connect_partition INTEGER,
__connect_offset BIGINT,
__connect_topic VARCHAR(255),

-- To ensure that all events are kept we will use kafka metadata to
-- To ensure that all events are kept we will use kafka metadata to
-- identify unique rows
PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic)
);

```

## Running the Quarkus Applications Locally
## Running the Quarkus applications locally

Set `ADVERTISED_HOST_NAME` of the `kafka` service in _docker-compose.yaml_ to the IP address of your host machine.
Start all services except the `vegetables-service` and the `log-enricher`:

```console
$ docker-compose up --scale vegetables-service=0 --scale log-enricher=0
docker-compose up --scale vegetables-service=0 --scale log-enricher=0
```

Then start the three services via the Quarkus dev mode:
Expand Down
21 changes: 21 additions & 0 deletions auditlog/connector-config/config/jdbc-connector-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "jdbc-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres-sink:5432/postgres",
"connection.username": "postgresuser",
"connection.password": "postgrespw",
"primary.key.mode": "kafka",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"schema.evolution": "basic",
"topics": "dbserver1.inventory.vegetable.enriched",
"use.time.zone": "UTC",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,lsn,source.ts_ms:ts_ms,txId:tx_id,audit.client_date:client_date,audit.usecase:usecase,audit.user_name:user_name",
"transforms.unwrap.add.fields.prefix": "",
"transforms.unwrap.delete.tombstone.handling.mode": "rewrite"
}
}
28 changes: 28 additions & 0 deletions auditlog/connector-config/scripts/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- This version is used with the Debezium JDBC Sink Connector
CREATE TABLE dbserver1_inventory_vegetable_enriched (
-- Business Data
id INTEGER,
name VARCHAR(255) NULL,
description TEXT NULL,

-- Audit Metadata
tx_id VARCHAR(255),
user_name VARCHAR(255) NULL,
usecase VARCHAR(255) NULL,
client_date BIGINT NULL,

-- Change Metadata (for uniqueness)
op VARCHAR(1),
lsn BIGINT,
ts_ms BIGINT,

-- Kafka Metadata to prevent squashing of events
__connect_partition INTEGER,
__connect_offset BIGINT,
__connect_topic VARCHAR(255),

-- To ensure that all events are kept we will use kafka metadata to
-- identify unique rows
PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic)
);

75 changes: 47 additions & 28 deletions auditlog/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,84 @@ services:
vegetables-db:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
- 5432:5432
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=vegetablesdb
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=vegetablesdb

connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
- 8083:8083
depends_on:
- kafka
- vegetables-db
- kafka
- vegetables-db
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_source_connect_configs
- OFFSET_STORAGE_TOPIC=my_source_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_source_connect_configs
- OFFSET_STORAGE_TOPIC=my_source_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
# We set the {CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE} to true to ensure our JSON Schema is generated when we first pass messages to Kafka topics
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
volumes:
- ./connector-config/plugins/camel-jdbc-kafka-connector/:/kafka/connect/camel-jdbc-kafka-connector/
- ./connector-config/plugins/vegetable-smt/:/kafka/connect/vegetable-smt/
- ./connector-config/plugins/debezium-connector-jdbc/:/kafka/connect/debezium-connector-jdbc/

vegetables-service:
image: debezium-examples/auditing-vegetables-service:${DEBEZIUM_VERSION}
build:
context: vegetables-service
dockerfile: src/main/docker/Dockerfile.jvm
ports:
- 8080:8080
- 8080:8080
depends_on:
- vegetables-db
- vegetables-db
environment:
- QUARKUS_DATASOURCE_URL=jdbc:postgresql://vegetables-db:5432/vegetablesdb?currentSchema=inventory
#depends_on:
# vegetable-db:
# condition: service_healthy
- QUARKUS_DATASOURCE_URL=jdbc:postgresql://vegetables-db:5432/vegetablesdb?currentSchema=inventory
#depends_on:
# vegetable-db:
# condition: service_healthy

log-enricher:
image: debezium-examples/auditing-log-enricher:${DEBEZIUM_VERSION}
build:
context: log-enricher
dockerfile: src/main/docker/Dockerfile.jvm
ports:
- 8081:8080
- 8081:8080
depends_on:
- kafka
- kafka
environment:
- QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=kafka:9092
- QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=kafka:9092

admin-service:
image: debezium-examples/auditing-admin-service:${DEBEZIUM_VERSION}
build:
context: admin-service
dockerfile: src/main/docker/Dockerfile.jvm
ports:
- 8085:8080
- 8085:8080
depends_on:
- kafka
- kafka
environment:
- MP_MESSAGING_INCOMING_VEGETABLES_BOOTSTRAP_SERVERS=kafka:9092
- MP_MESSAGING_INCOMING_TRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092
- MP_MESSAGING_OUTGOING_MISSINGTRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092
- MP_MESSAGING_INCOMING_VEGETABLES_BOOTSTRAP_SERVERS=kafka:9092
- MP_MESSAGING_INCOMING_TRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092
- MP_MESSAGING_OUTGOING_MISSINGTRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092

# Lightweight postgres acting as the destination for the audit log enriched events
postgres-sink:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5433:5432
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=postgres

# debezium-tooling:
# image: quay.io/debezium/tooling:1.2
# platform: linux/amd64
Loading
Loading