Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions debezium-platform/postgresql-kafka-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
Using Debezium-platfrom to manage and stream changes
===
This example will walk you thru on how to use the Debezium-platfrom-stage to manage and stream changes from a PostgreSQL database into Apache Kafka with Debezium Server deployed in a Kubernetes cluster.


Preparing the Environment
---
As the first step we will provision a local Kubernetes cluster using [minikube](https://minikube.sigs.k8s.io/docs/) and will install an ingress controller. For this example, considering a local setup, we will use the `/etc/hosts` to resolve the domain.
The following script, when executed, will use minikube to provision a local k8s cluster named `debezium` and will add the required ingress controllers. It will also updarte the `/etc/hosts` to add the domain url.

```sh
./create-environment.sh
```
> **_NOTE:_**
If you are using minikube on Mac, you need also to run the `minikube tunnel -p debezium` command. For more details see [this](https://minikube.sigs.k8s.io/docs/drivers/docker/#known-issues) and [this](https://stackoverflow.com/questions/70961901/ingress-with-minikube-working-differently-on-mac-vs-ubuntu-when-to-set-etc-host).

Now that you have the required k8s environment setup, its time to fire the required infra for this example. As we will be using PostgreSQL database and the Apache Kafka broker as ssource and the destination for our pipeline. The following script will create a dedicated namespace `debezium-platform` and use it going forward for further installations of our example. It will alos provision the both PostgreSQL database and the Apache Kafka broke

```shell
./setup-infra.sh
```

Deploying Debezium-platform Operator
---
We will install debezium-platfrom platform through helm

```shell
cd helm &&
helm dependency build &&
helm install debezium-platform . -f ./example.yaml &&
cd ..
```

after all pods are running you should access the Debezium-platform-stage(UI) from `http://platform.debezium.io/`, now you have completed the installing and running the debezium-platform part.

Using the debezium-platfrom-stage(UI) for seting up our data pipeline
---
Now once you have running platfrom-stage(UI), we will create a data pipeline and all its resources i.e source, destination and transform(as per need) thru it. You will see different side navigation option to configure them.

For this demo, see the connection properties you can use for each connector type as illustrated below:

### Source
#### PostgreSQL

```shell
{
"name": "test-source",
"description": "postgreSQL database",
"type": "io.debezium.connector.postgresql.PostgresConnector",
"schema": "schema123",
"vaults": [],
"config": {
"topic.prefix": "inventory",
"database.port": 5432,
"database.user": "debezium",
"database.dbname": "debezium",
"database.hostname": "postgresql",
"database.password": "debezium",
"schema.include.list": "inventory"
}
}

```

![PostgreSQL Connnector](./resources/source.png)


### Destination
```shell
{
"name": "test-destination",
"description": "Kafka destination",
"type": "kafka",
"schema": "schema123",
"vaults": [],
"config": {
"producer.key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"producer.value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"producer.bootstrap.servers": "dbz-kafka-kafka-bootstrap.debezium-platform:9092"
}
}

```

![Kafka Connnector](./resources/destination.png)

### Transform

**Transform class**: o.debezium.transforms.ExtractNewRecordState
**Transform name**: Debezium marker
**Description**: Extract Debezium payload
**Adds the specified fields to the header if they exist**: db,table
**Adds the specified field(s) to the message if they exist.**: op
**Predicate type**: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
**Pattern**: inventory.inventory.products

```shell
{
"config": {
"add.fields": "op",
"add.headers": "db,table"
},
"description": "Extract Debezium payload",
"name": "Debezium marker",
"predicate": {
"config": {
"pattern": "inventory.inventory.products"
},
"negate": false,
"type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
},
"schema": "string",
"type": "io.debezium.transforms.ExtractNewRecordState",
"vaults": []
}

```

![ExtractNewRecordState](./resources/transform.png)

### Pipeline
The use of [Operator Lifecycle Manager](https://olm.operatorframework.io/) allows you to configure the scope of namespaces watched by the operator from a single namespace to the entire cluster. The process below will install the operator into the `operators` namespace -- which is by default intended for cluster-wide operators.






Verifying Change Events
---
You can verify that the _Debezium Server_ instance deployed in the previous section consumed all initial data from the database with the following command:

```sh
kubectl exec dbz-kafka-kafka-0 -n debezium -- /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--property print.key=true \
--topic inventory.inventory.orders
```

Cleanup
---
To remove the Kubernetes environment used in this tutorial, execute the cleanup script:

```sh
./destroy-environment.sh
```
Empty file.
71 changes: 71 additions & 0 deletions debezium-platform/postgresql-kafka-example/create-environment.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#! /usr/bin/env bash

source env.sh

# Update /etc/hosts to resolve the domain
echo ">>> Checking and updating /etc/hosts entry for platform domain..."

IP=$(kubectl cluster-info | sed -n 's/.*https:\/\/\([0-9.]*\).*/\1/p' | head -n 1)
HOSTNAME=${DEBEZIUM_PLATFORM_DOMAIN:-platform.debezium.io}
EXISTING=$(grep "$HOSTNAME" /etc/hosts)

if [ -z "$EXISTING" ]; then
echo "$IP $HOSTNAME" | sudo tee -a /etc/hosts
echo "Added new entry: $IP $HOSTNAME"
else
EXISTING_IP=$(echo "$EXISTING" | awk '{print $1}')
if [ "$EXISTING_IP" != "$IP" ]; then
echo "WARNING: $HOSTNAME is already associated with IP $EXISTING_IP"
echo "Current kubectl IP is $IP"
read -p "Do you want to update the entry? (y/n) " -r
if [[ $REPLY =~ ^[Yy]$ ]]; then
sudo sed -i "/$HOSTNAME/d" /etc/hosts
echo "$IP $HOSTNAME" | sudo tee -a /etc/hosts
echo "Updated hosts file with new IP"
else
echo "Hosts file not modified"
fi
else
echo "Entry for $HOSTNAME already exists with the same IP"
fi
fi

echo ">>> Creating minikube cluster 'debezium'..."
minikube start -p $CLUSTER --addons ingress

echo ">>> Waiting for minikube components to be ready..."
# Wait for all minikube components to show expected status
MAX_ATTEMPTS=30
ATTEMPT=1
while [[ $ATTEMPT -le $MAX_ATTEMPTS ]]; do
STATUS=$(minikube status --profile "$CLUSTER")

if echo "$STATUS" | grep -q "host: Running" &&
echo "$STATUS" | grep -q "kubelet: Running" &&
echo "$STATUS" | grep -q "apiserver: Running" &&
echo "$STATUS" | grep -q "kubeconfig: Configured"; then
echo ">>> Minikube components are ready"
break
else
echo "Waiting... ($ATTEMPT/$MAX_ATTEMPTS)"
sleep 5
((ATTEMPT++))
fi
done

if [[ $ATTEMPT -gt $MAX_ATTEMPTS ]]; then
echo "❌ Timed out waiting for minikube to be ready"
exit 1
fi

echo ">>> Waiting for Kubernetes environment to be ready..."
kubectl wait --for=condition=Ready nodes --all --timeout=300s

echo ">>> Kubernetes environment is ready"


# Only run minikube tunnel on macOS
if [[ "$(uname)" == "Darwin" ]]; then
echo ">>> Starting 'minikube tunnel' in the background (macOS only)..."
minikube tunnel -p "$CLUSTER"
fi
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: dual-role
labels:
strimzi.io/cluster: dbz-kafka
spec:
replicas: 1
roles:
- controller
- broker
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: dbz-kafka
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.8.0
metadataVersion: 3.8-IV0
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
entityOperator:
topicOperator: {}
userOperator: {}
4 changes: 4 additions & 0 deletions debezium-platform/postgresql-kafka-example/env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CLUSTER=debezium
NAMESPACE=debezium-platform
DEBEZIUM_PLATFORM_DOMAIN=platform.debezium.io
TIMEOUT=300s
9 changes: 9 additions & 0 deletions debezium-platform/postgresql-kafka-example/helm/Chart.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dependencies:
- name: debezium-operator
repository: https://charts.debezium.io
version: 3.0.7-final
- name: database
repository: ""
version: 0.0.1
digest: sha256:0a11b83693c18158c21f6d665e134a4c8b2d6d4122257817b4e8d52ec632dd1b
generated: "2025-04-08T12:43:45.716497+05:30"
17 changes: 17 additions & 0 deletions debezium-platform/postgresql-kafka-example/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: debezium-platform
description: Debezium Platform
version: 0.0.1
apiVersion: v2
keywords:
- debezium
- cdc
sources:
- https://github.com/debezium/debezium-platform
home: https://github.com/debezium/debezium-platform
dependencies:
- name: debezium-operator
version: "3.0.7-final"
repository: "https://charts.debezium.io"
- name: database
version: 0.0.1
condition: database.enabled
73 changes: 73 additions & 0 deletions debezium-platform/postgresql-kafka-example/helm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
This chart will install the components required to run the Debezium Platform.

1. Conductor: The back-end component which provides a set of APIs to orchestrate and control Debezium deployments.
2. Stage: The front-end component which provides a user interface to interact with the Conductor.
3. Debezium operator: operator that manages the creation of Debezium Server custom resource.
4. [Optional] PostgreSQL database used by conductor to store its data.
5. [Optional] Strimzi operator: operator for creating Kakfa cluster. In case you want to use a Kafka destination in you
pipeline.

# Prerequisites

The chart use an ingress to expose `debezium-stage (UI)` and `debezium-conductor (backend)`,
this will require to have
an [ingress controller](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/) installed in you
cluster.
You need also to have domain that must point to the cluster IP and then configure the `domain.url` property in
you `values.yaml` with your domain.

### Configurations

| Name | Description | Default |
|:-------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------|
| domain.url | domain used as ingress host | "" |
| stage.image | Image for the stage (UI) | quay.io/debezium/platform-stage:latest |
| conductor.image | Image for the conductor | quay.io/debezium/platform-conductor:latest |
| conductor.offset.existingConfigMap | Name of the config map used to store conductor offsets. If empty it will be automatically created. | "" |
| database.enabled | Enable the installation of PostgreSQL by the chart | false |
| database.name | Database name | postgres |
| database.host | Database host | postgres |
| database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If empty a secret will be created using the `username` and `password` properties | "" |
| database.auth.username | Database username | user |
| database.auth.password | Database password | password |
| offset.reusePlatformDatabase | Pipelines will use database to store offsets. By default the database used by the conductor service is used.<br/> If you want to use a dedicated one set this property to false | true |
| offset.database.name | Database name | postgres |
| offset.database.host | Database host | postgres |
| offset.database.port | Database port | 5432 | | |
| offset.database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If not set `offset.database.auth.username` and `offset.database.auth.password` will be used. | "" |
| offset.database.auth.username | Database username | user |
| offset.database.auth.password | Database password | password | | |
| schemaHistory.reusePlatformDatabase | Pipelines will use database to store schema history. By default the database used by the conductor service is used.<br/> If you want to use a dedicated one set this property to false | true |
| schemaHistory.database.name | Database name | postgres |
| schemaHistory.database.host | Database host | postgres |
| schemaHistory.database.port | Database port | 5432 | | |
| schemaHistory.database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If not set `schemaHistory.database.auth.username` and `schemaHistory.database.auth.password` will be used. | "" |
| schemaHistory.database.auth.username | Database username | user |
| schemaHistory.database.auth.password | Database password | password | | | |
| env | List of env variable to pass to the conductor | [] |

# Install

```shell
helm dependency build
```

Thi will download the required [Debezium Operator](https://github.com/debezium/debezium-operator) chart.

```shell
helm install <release_name> .
```

# Uninstall

Find the release name you want to uninstall

```shell
helm list --all
```

then uninstall it

```shell
helm uninstall <release_name>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name: database
description: A PostgreSQL database enabled for CDC
version: 0.0.1
apiVersion: v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{{/*
Get the database secret name.
*/}}
{{- define "database.secretName" -}}
{{- if .Values.auth.existingSecret -}}
{{- printf "%s" .Values.auth.existingSecret -}}
{{- else -}}
{{- printf "%s-%s" .Chart.Name "secret" -}}
{{- end -}}
{{- end -}}
Loading