diff --git a/auditlog/.gitignore b/auditlog/.gitignore new file mode 100644 index 0000000000..9775372824 --- /dev/null +++ b/auditlog/.gitignore @@ -0,0 +1,3 @@ +target/ + +connector-config/plugins diff --git a/auditlog/README.md b/auditlog/README.md index e83f4b7560..fe2cd909e4 100755 --- a/auditlog/README.md +++ b/auditlog/README.md @@ -5,37 +5,37 @@ It accompanies the blog post [Building Audit Logs with Change Data Capture and S There are two applications (based on [Quarkus](https://quarkus.io/)): -* _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database; -as part of its processing, it will not only update its actual "business table" `vegetable`, -but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`: -the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header) -and a use case identifier (as specified in an annotation on the REST API methods). -* _log-enricher_: a Kafka Streams application, -which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table; -this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`. -The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic. +- _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database; + as part of its processing, it will not only update its actual "business table" `vegetable`, + but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`: + the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header) + and a use case identifier (as specified in an annotation on the REST API methods). +- _log-enricher_: a Kafka Streams application, + which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table; + this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`. + The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic. ## Building the Demo ```console -$ mvn clean package +mvn clean package ``` ```console -$ export DEBEZIUM_VERSION=1.8 -$ docker-compose up --build +export DEBEZIUM_VERSION=1.8 +docker-compose up --build ``` ## Deploy the Debezium Postgres Connector ```console -$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json +http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json ``` ## Modifying Some Data and Observing the Audit Log ```console -$ http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy! +http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy! ``` This uses a pre-generated JWT token (with expiration date set to 2099-12-31 and user set to "farmerbob"). @@ -44,13 +44,13 @@ To regenerate the token with different data, use the [Jwtenizr](https://github.c You can also update an existing vegetable record (this token is for "farmermargaret"): ```console -$ http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty! +http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty! ``` Or delete a record (again using the "farmerbob" token): ```console -$ http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' +http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' ``` Doing so, observe the contents of the `dbserver1.inventory.vegetable`, `dbserver1.inventory.transaction_context_data` and `dbserver1.inventory.vegetable.enriched` topics: @@ -144,19 +144,102 @@ http POST http://localhost:8085/vegetables/{uuid}/auditData/{tuuid} audit:='{"us This would then fix the missing event in the transaction context data topic and trigger the enricher to provide a new log entry. -## Stopping All Services +## Fowarding Events to a Downstream Postgres Database ```console -$ docker-compose down +$ docker run -it --rm \ + --network auditlog_default \ + quay.io/debezium/tooling:1.2 \ + /bin/bash -c "kafkacat -b kafka:9092 \ + -C -o beginning -q -u -t dbserver1.inventory.vegetable.enriched | jq ." +``` + +This will show you the enriched events fowarded to the `dbserver1.inventory.vegetable.enriched` topic. +We need to pass this events to lightweight `postgres-sink` database defined in `docker-compose.yaml` using the [Debezium JDBC connector](https://debezium.io/documentation/reference/stable/connectors/jdbc.html). + +Download the connector plugin `.tar.gz` from the Debzium [plugin archive](https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/) and save it to the `/config/plugins/debezium-connector-jdbc` directory. + +Restart your Kafka connect container to pick up the new plugin + +```console +docker compose down -v connect +docker compose up -d connect +``` + +Create your debezium sink connector by running + +```console +http POST http://localhost:8083/connectors < connector-config/config/jdbc-connector-config.json +``` + +The connector should read from the `dbserver1.inventory.vegetable.enriched` topic and populate the `dbserver1_inventory_vegetable` table. + +Confirm the changes by running the `exec` command into your `postgres-sink` container + +```console +docker compose exec postgres-sink psql -U postgresuser -d postgres +``` + +View the rows in your `dbserver1_inventory_vegetable` table: + +```sql +SELECT * FROM dbserver1_inventory_vegetable; +``` + +An exmaple of the rows should be like this: + +```console +| __connect_topic | __connect_partition | __connect_offset | id | description | name | __deleted | op | lsn | ts_ms | tx_id | client_date | usecase | user_name | +|------------------------------------------|---------------------|------------------|----|-------------|---------|-----------|----|----------|---------------|-------|------------------|-------------------|-----------------| +| dbserver1.inventory.vegetable.enriched | 0 | 0 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob | +| dbserver1.inventory.vegetable.enriched | 0 | 2 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob | +| dbserver1.inventory.vegetable.enriched | 0 | 4 | 10 | Tasty! | Tomatoe | false | u | 36690432 | 1773921804910 | 770 | 1566461551000000 | UPDATE VEGETABLE | farmermargaret | +| dbserver1.inventory.vegetable.enriched | 0 | 5 | 10 | | | true | d | 36690800 | 1773921809082 | 771 | 1566461551000000 | DELETE VEGETABLE | farmerbob | +``` + +With this our audit log events have been successfully propagated to our downstream database. + +The table `dbserver_inventory_vegetable` can be defined as: + +```sql +-- This version is used with the Debezium JDBC Sink Connector +CREATE TABLE dbserver1_inventory_vegetable_enriched ( + -- Business Data + id INTEGER, + name VARCHAR(255) NULL, + description TEXT NULL, + + -- Audit Metadata + tx_id VARCHAR(255), + user_name VARCHAR(255) NULL, + usecase VARCHAR(255) NULL, + client_date BIGINT NULL, + + -- Change Metadata (for uniqueness) + op VARCHAR(1), + lsn BIGINT, + ts_ms BIGINT, + + -- Kafka Metadata to prevent squashing of events + __connect_partition INTEGER, + __connect_offset BIGINT, + __connect_topic VARCHAR(255), + + -- To ensure that all events are kept we will use kafka metadata to + -- To ensure that all events are kept we will use kafka metadata to + -- identify unique rows +PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic) +); + ``` -## Running the Quarkus Applications Locally +## Running the Quarkus applications locally Set `ADVERTISED_HOST_NAME` of the `kafka` service in _docker-compose.yaml_ to the IP address of your host machine. Start all services except the `vegetables-service` and the `log-enricher`: ```console -$ docker-compose up --scale vegetables-service=0 --scale log-enricher=0 +docker-compose up --scale vegetables-service=0 --scale log-enricher=0 ``` Then start the three services via the Quarkus dev mode: diff --git a/auditlog/connector-config/config/jdbc-connector-config.json b/auditlog/connector-config/config/jdbc-connector-config.json new file mode 100644 index 0000000000..ff9a018312 --- /dev/null +++ b/auditlog/connector-config/config/jdbc-connector-config.json @@ -0,0 +1,21 @@ +{ + "name": "jdbc-connector", + "config": { + "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", + "tasks.max": "1", + "connection.url": "jdbc:postgresql://postgres-sink:5432/postgres", + "connection.username": "postgresuser", + "connection.password": "postgrespw", + "primary.key.mode": "kafka", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "true", + "schema.evolution": "basic", + "topics": "dbserver1.inventory.vegetable.enriched", + "use.time.zone": "UTC", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.add.fields": "op,lsn,source.ts_ms:ts_ms,txId:tx_id,audit.client_date:client_date,audit.usecase:usecase,audit.user_name:user_name", + "transforms.unwrap.add.fields.prefix": "", + "transforms.unwrap.delete.tombstone.handling.mode": "rewrite" + } +} \ No newline at end of file diff --git a/auditlog/connector-config/scripts/init.sql b/auditlog/connector-config/scripts/init.sql new file mode 100644 index 0000000000..a17d529965 --- /dev/null +++ b/auditlog/connector-config/scripts/init.sql @@ -0,0 +1,28 @@ +-- This version is used with the Debezium JDBC Sink Connector +CREATE TABLE dbserver1_inventory_vegetable_enriched ( + -- Business Data + id INTEGER, + name VARCHAR(255) NULL, + description TEXT NULL, + + -- Audit Metadata + tx_id VARCHAR(255), + user_name VARCHAR(255) NULL, + usecase VARCHAR(255) NULL, + client_date BIGINT NULL, + + -- Change Metadata (for uniqueness) + op VARCHAR(1), + lsn BIGINT, + ts_ms BIGINT, + + -- Kafka Metadata to prevent squashing of events + __connect_partition INTEGER, + __connect_offset BIGINT, + __connect_topic VARCHAR(255), + + -- To ensure that all events are kept we will use kafka metadata to + -- identify unique rows +PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic) +); + diff --git a/auditlog/docker-compose.yaml b/auditlog/docker-compose.yaml index 3677d96aee..19a18969af 100644 --- a/auditlog/docker-compose.yaml +++ b/auditlog/docker-compose.yaml @@ -18,27 +18,32 @@ services: vegetables-db: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - - 5432:5432 + - 5432:5432 environment: - - POSTGRES_USER=postgresuser - - POSTGRES_PASSWORD=postgrespw - - POSTGRES_DB=vegetablesdb + - POSTGRES_USER=postgresuser + - POSTGRES_PASSWORD=postgrespw + - POSTGRES_DB=vegetablesdb connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - - 8083:8083 + - 8083:8083 depends_on: - - kafka - - vegetables-db + - kafka + - vegetables-db environment: - - BOOTSTRAP_SERVERS=kafka:9092 - - GROUP_ID=1 - - CONFIG_STORAGE_TOPIC=my_source_connect_configs - - OFFSET_STORAGE_TOPIC=my_source_connect_offsets - - STATUS_STORAGE_TOPIC=my_source_connect_statuses - - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false - - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_source_connect_configs + - OFFSET_STORAGE_TOPIC=my_source_connect_offsets + - STATUS_STORAGE_TOPIC=my_source_connect_statuses + - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false + # We set the {CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE} to true to ensure our JSON Schema is generated when we first pass messages to Kafka topics + - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true + volumes: + - ./connector-config/plugins/camel-jdbc-kafka-connector/:/kafka/connect/camel-jdbc-kafka-connector/ + - ./connector-config/plugins/vegetable-smt/:/kafka/connect/vegetable-smt/ + - ./connector-config/plugins/debezium-connector-jdbc/:/kafka/connect/debezium-connector-jdbc/ vegetables-service: image: debezium-examples/auditing-vegetables-service:${DEBEZIUM_VERSION} @@ -46,14 +51,14 @@ services: context: vegetables-service dockerfile: src/main/docker/Dockerfile.jvm ports: - - 8080:8080 + - 8080:8080 depends_on: - - vegetables-db + - vegetables-db environment: - - QUARKUS_DATASOURCE_URL=jdbc:postgresql://vegetables-db:5432/vegetablesdb?currentSchema=inventory - #depends_on: - # vegetable-db: - # condition: service_healthy + - QUARKUS_DATASOURCE_URL=jdbc:postgresql://vegetables-db:5432/vegetablesdb?currentSchema=inventory + #depends_on: + # vegetable-db: + # condition: service_healthy log-enricher: image: debezium-examples/auditing-log-enricher:${DEBEZIUM_VERSION} @@ -61,11 +66,11 @@ services: context: log-enricher dockerfile: src/main/docker/Dockerfile.jvm ports: - - 8081:8080 + - 8081:8080 depends_on: - - kafka + - kafka environment: - - QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=kafka:9092 + - QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=kafka:9092 admin-service: image: debezium-examples/auditing-admin-service:${DEBEZIUM_VERSION} @@ -73,10 +78,24 @@ services: context: admin-service dockerfile: src/main/docker/Dockerfile.jvm ports: - - 8085:8080 + - 8085:8080 depends_on: - - kafka + - kafka environment: - - MP_MESSAGING_INCOMING_VEGETABLES_BOOTSTRAP_SERVERS=kafka:9092 - - MP_MESSAGING_INCOMING_TRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092 - - MP_MESSAGING_OUTGOING_MISSINGTRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092 + - MP_MESSAGING_INCOMING_VEGETABLES_BOOTSTRAP_SERVERS=kafka:9092 + - MP_MESSAGING_INCOMING_TRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092 + - MP_MESSAGING_OUTGOING_MISSINGTRANSACTIONS_BOOTSTRAP_SERVERS=kafka:9092 + + # Lightweight postgres acting as the destination for the audit log enriched events + postgres-sink: + image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} + ports: + - 5433:5432 + environment: + - POSTGRES_USER=postgresuser + - POSTGRES_PASSWORD=postgrespw + - POSTGRES_DB=postgres + + # debezium-tooling: + # image: quay.io/debezium/tooling:1.2 + # platform: linux/amd64 diff --git a/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/ChangeEventEnricher.java b/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/ChangeEventEnricher.java index 962f185f0a..051c772449 100644 --- a/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/ChangeEventEnricher.java +++ b/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/ChangeEventEnricher.java @@ -5,7 +5,10 @@ import java.util.Optional; import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; import javax.json.JsonObject; +import javax.json.JsonValue; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; @@ -39,7 +42,8 @@ class ChangeEventEnricher implements Transformer) context.getStateStore(TopologyProducer.STREAM_BUFFER_NAME); - txMetaDataStore = (TimestampedKeyValueStore) context.getStateStore(TopologyProducer.STORE_NAME); + txMetaDataStore = (TimestampedKeyValueStore) context + .getStateStore(TopologyProducer.STORE_NAME); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, ts -> enrichAndEmitBufferedEvents()); } @@ -79,12 +83,13 @@ private boolean enrichAndEmitBufferedEvents() { boolean enrichedAllBuffered = true; - for(long i = sequence.getFirstValue(); i < sequence.getNextValue(); i++) { + for (long i = sequence.getFirstValue(); i < sequence.getNextValue(); i++) { JsonObject buffered = streamBuffer.get(i); LOG.info("Processing buffered change event for key {}", buffered.getJsonObject("key")); - KeyValue enriched = enrichWithTxMetaData(buffered.getJsonObject("key"), buffered.getJsonObject("changeEvent")); + KeyValue enriched = enrichWithTxMetaData(buffered.getJsonObject("key"), + buffered.getJsonObject("changeEvent")); if (enriched == null) { enrichedAllBuffered = false; break; @@ -102,6 +107,46 @@ private boolean enrichAndEmitBufferedEvents() { return enrichedAllBuffered; } + /** + * Adds the audit schema to the original schema. + * Because Kafka Streams are schema-agnostic + * we have to manually add the audit field to the schema + * + * @param originalSchema + * @return {@link JsonObject} the original schema with the audit field added + */ + private JsonObject createEnrichedSchema(JsonObject originalSchema) { + // 1. Get the existing fields array + JsonArray fields = originalSchema.getJsonArray("fields"); + + // 2. Create the new 'audit' field definition + JsonObject auditFieldSchema = Json.createObjectBuilder() + .add("type", "struct") + .add("field", "audit") + .add("optional", true) + .add("fields", Json.createArrayBuilder() + .add(Json.createObjectBuilder().add("type", "int64").add("optional", true).add("field", + "client_date")) + .add(Json.createObjectBuilder().add("type", "string").add("optional", true).add("field", + "usecase")) + .add(Json.createObjectBuilder().add("type", "string").add("optional", true).add("field", + "user_name")) + .build()) + .build(); + + // Rebuild the fields array including the new audit field + JsonArrayBuilder newFields = Json.createArrayBuilder(); + for (JsonValue field : fields) { + newFields.add(field); + } + newFields.add(auditFieldSchema); + + // Return the new root schema + return Json.createObjectBuilder(originalSchema) + .add("fields", newFields.build()) + .build(); + } + /** * Adds the given change event to the stream-side buffer. */ @@ -117,8 +162,7 @@ private void bufferChangeEvent(JsonObject key, JsonObject changeEvent) { streamBuffer.putAll(Arrays.asList( KeyValue.pair(sequence.getNextValueAndIncrement(), wrapper), - KeyValue.pair(BUFFER_OFFSETS_KEY, sequence.toJson()) - )); + KeyValue.pair(BUFFER_OFFSETS_KEY, sequence.toJson()))); } /** @@ -130,7 +174,9 @@ private void bufferChangeEvent(JsonObject key, JsonObject changeEvent) { */ private KeyValue enrichWithTxMetaData(JsonObject key, JsonObject changeEvent) { JsonObject txId = Json.createObjectBuilder() - .add("transaction_id", changeEvent.get("source").asJsonObject().getJsonNumber("txId").longValue()) + .add("transaction_id", + changeEvent.get("payload").asJsonObject().get("source").asJsonObject().getJsonNumber("txId") + .longValue()) .build(); ValueAndTimestamp metaData = txMetaDataStore.get(txId); @@ -138,16 +184,26 @@ private KeyValue enrichWithTxMetaData(JsonObject key, Js if (metaData != null) { LOG.info("Enriched change event for key {}", key); - JsonObject txMetaData = Json.createObjectBuilder(metaData.value().get("after").asJsonObject()) + JsonObject txMetaData = Json + .createObjectBuilder(metaData.value().get("payload").asJsonObject().get("after").asJsonObject()) .remove("transaction_id") .build(); + // Rebuild the payload to include the new "audit" field + JsonObject updatedPayload = Json.createObjectBuilder(changeEvent.getJsonObject("payload")) + .add("audit", txMetaData) + .build(); + + // Rebuild the top-level event with the updated payload + JsonObject enrichedEvent = Json.createObjectBuilder() + .add("payload", updatedPayload) + .add("schema", createEnrichedSchema(changeEvent.getJsonObject("schema"))) + .build(); + + LOG.info("Enriched change event for key {}", key); return KeyValue.pair( key, - Json.createObjectBuilder(changeEvent) - .add("audit", txMetaData) - .build() - ); + enrichedEvent); } LOG.warn("No metadata found for transaction {}", txId); @@ -158,8 +214,7 @@ private Optional bufferOffsets() { JsonObject bufferOffsets = streamBuffer.get(BUFFER_OFFSETS_KEY); if (bufferOffsets == null) { return Optional.empty(); - } - else { + } else { return Optional.of(BufferOffsets.fromJson(bufferOffsets)); } } diff --git a/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/TopologyProducer.java b/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/TopologyProducer.java index 9154948d1f..f64d9c9a45 100644 --- a/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/TopologyProducer.java +++ b/auditlog/log-enricher/src/main/java/io/debezium/demos/auditing/enricher/TopologyProducer.java @@ -32,15 +32,13 @@ public class TopologyProducer { public Topology buildTopology() { StreamsBuilder builder = new StreamsBuilder(); - StoreBuilder> streamBufferStateStore = - Stores - .keyValueStoreBuilder( + StoreBuilder> streamBufferStateStore = Stores + .keyValueStoreBuilder( Stores.persistentKeyValueStore(STREAM_BUFFER_NAME), new Serdes.LongSerde(), - new JsonObjectSerde() - ) - .withCachingDisabled(); - builder.addStateStore(streamBufferStateStore); + new JsonObjectSerde()) + .withCachingDisabled(); + builder.addStateStore(streamBufferStateStore); builder.globalTable(txContextDataTopic, Materialized.as(STORE_NAME)); @@ -49,8 +47,9 @@ public Topology buildTopology() { // seems more reasonable than compaction .filter((id, changeEvent) -> changeEvent != null) // exclude snapshot events - .filter((id, changeEvent) -> !changeEvent.getString("op").equals("r")) - // enrich change events with transaction metadata via the statestore of the TX topic + .filter((id, changeEvent) -> !changeEvent.getJsonObject("payload").getString("op").equals("r")) + // enrich change events with transaction metadata via the statestore of the TX + // topic .transform(() -> new ChangeEventEnricher(), STREAM_BUFFER_NAME) .to(vegetablesEnrichedTopic);