From eead83f557fa80d7e77a6f8675bec4bef324f527 Mon Sep 17 00:00:00 2001 From: akasar Date: Sun, 8 Mar 2026 00:43:21 +0530 Subject: [PATCH] debezium/dbz#585 Add example for capturing changes from multiple SQL Server databases Add a new sql-server-multi-database example that demonstrates how to use a single Debezium SQL Server connector to capture changes from multiple logical databases on the same host using the database.names configuration property introduced in DBZ-4783. Signed-off-by: akasar --- sql-server-multi-database/README.md | 65 +++++++++++++++++ .../debezium-sqlserver-init/inventory.sql | 71 +++++++++++++++++++ sql-server-multi-database/docker-compose.yaml | 37 ++++++++++ .../register-sqlserver.json | 16 +++++ 4 files changed, 189 insertions(+) create mode 100644 sql-server-multi-database/README.md create mode 100644 sql-server-multi-database/debezium-sqlserver-init/inventory.sql create mode 100644 sql-server-multi-database/docker-compose.yaml create mode 100644 sql-server-multi-database/register-sqlserver.json diff --git a/sql-server-multi-database/README.md b/sql-server-multi-database/README.md new file mode 100644 index 000000000..0480215e0 --- /dev/null +++ b/sql-server-multi-database/README.md @@ -0,0 +1,65 @@ +# Capturing Changes from Multiple SQL Server Databases + +This example demonstrates how to use a single Debezium SQL Server connector to capture changes from multiple logical databases on the same SQL Server instance using the `database.names` configuration property. + +This capability was introduced via [DBZ-4783](https://issues.redhat.com/browse/DBZ-4783). + +``` + SQL Server + +--------------+ + | | + | testDB1 | + | (products) | + | | +------------------+ +---------+ + | testDB2 +------> Kafka Connect +------> Kafka | + | (customers, | | | | | + | orders) | | +-----------+ | | Topics: | + | | | | Debezium | | | server1.| + +--------------+ | +-----------+ | | testDB1.| + +------------------+ | testDB2.| + +---------+ +``` + +A single connector captures CDC events from both `testDB1` and `testDB2` and publishes them to database-specific Kafka topics. + +## Using SQL Server Multi-Database CDC + +```shell +# Start the topology as defined in docker-compose.yaml +export DEBEZIUM_VERSION=3.2 +docker-compose up -d + +# Initialize both databases with CDC enabled +cat debezium-sqlserver-init/inventory.sql | docker-compose exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' + +# Start the Debezium SQL Server connector (database.names = testDB1,testDB2) +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json + +# Consume messages from testDB1 +docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server kafka:9092 \ + --from-beginning \ + --property print.key=true \ + --topic server1.testDB1.dbo.products + +# Consume messages from testDB2 +docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server kafka:9092 \ + --from-beginning \ + --property print.key=true \ + --topic server1.testDB2.dbo.customers + +# Modify records in both databases via SQL Server client (do not forget to add GO command to execute the statement) +docker-compose exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB1' + +# Shut down the cluster +docker-compose down +``` + +The init SQL script creates: +- **testDB1** with a `products` table (6 rows) +- **testDB2** with `customers` (4 rows) and `orders` (4 rows) tables + +All tables have CDC enabled via `sp_cdc_enable_table`. + +The key connector configuration is `"database.names": "testDB1,testDB2"` which tells the connector to capture from both databases. Topics follow the naming pattern `...`. diff --git a/sql-server-multi-database/debezium-sqlserver-init/inventory.sql b/sql-server-multi-database/debezium-sqlserver-init/inventory.sql new file mode 100644 index 000000000..abb7eda6e --- /dev/null +++ b/sql-server-multi-database/debezium-sqlserver-init/inventory.sql @@ -0,0 +1,71 @@ +-- ========================================== +-- Database 1: testDB1 (Products) +-- ========================================== +CREATE DATABASE testDB1; +GO +USE testDB1; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE products ( + id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO products(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO products(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO products(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO products(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO products(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO products(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0; +GO + +-- ========================================== +-- Database 2: testDB2 (Customers & Orders) +-- ========================================== +CREATE DATABASE testDB2; +GO +USE testDB2; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE customers ( + id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0; + +CREATE TABLE orders ( + id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES customers(id) +); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('16-JAN-2016', 1001, 1, 102); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('17-JAN-2016', 1002, 2, 105); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('19-FEB-2016', 1002, 2, 106); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('21-FEB-2016', 1003, 1, 107); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0; +GO diff --git a/sql-server-multi-database/docker-compose.yaml b/sql-server-multi-database/docker-compose.yaml new file mode 100644 index 000000000..370c17c31 --- /dev/null +++ b/sql-server-multi-database/docker-compose.yaml @@ -0,0 +1,37 @@ +services: + kafka: + image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} + ports: + - 9092:9092 + - 9093:9093 + environment: + - CLUSTER_ID=oh-sxaDRTcyAr6pFRbXyzA + - NODE_ID=1 + - NODE_ROLE=combined + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + + sqlserver: + image: mcr.microsoft.com/mssql/server:2019-latest + ports: + - 1433:1433 + environment: + - ACCEPT_EULA=Y + - MSSQL_PID=Standard + - SA_PASSWORD=Password! + - MSSQL_AGENT_ENABLED=true + + connect: + image: quay.io/debezium/connect:${DEBEZIUM_VERSION} + ports: + - 8083:8083 + depends_on: + - kafka + - sqlserver + environment: + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses diff --git a/sql-server-multi-database/register-sqlserver.json b/sql-server-multi-database/register-sqlserver.json new file mode 100644 index 000000000..d5681ed7c --- /dev/null +++ b/sql-server-multi-database/register-sqlserver.json @@ -0,0 +1,16 @@ +{ + "name": "inventory-connector", + "config": { + "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", + "tasks.max": "1", + "topic.prefix": "server1", + "database.hostname": "sqlserver", + "database.port": "1433", + "database.user": "sa", + "database.password": "Password!", + "database.names": "testDB1,testDB2", + "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", + "schema.history.internal.kafka.topic": "schema-changes.inventory", + "database.encrypt": "false" + } +}