diff --git a/kstreams/README.md b/kstreams/README.md index e9330eb18e..5726b2fb16 100644 --- a/kstreams/README.md +++ b/kstreams/README.md @@ -1,7 +1,7 @@ # Debezium KStreams Example This demo shows how to join two CDC event streams created by Debezium into a single topic and -sink the aggregated change events into MongoDB, using the [Kafka Connect MongoDB sink connector](https://github.com/hpgrahsl/kafka-connect-mongodb). +sink the aggregated change events into MongoDB, using the [Kafka Connect MongoDB sink connector](https://github.com/mongodb/kafka-connect-mongodb). ## Preparations @@ -32,7 +32,7 @@ docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \ Examine the target collection in the MongoDB sink database: ```shell -docker-compose exec mongodb bash -c 'mongo inventory' +docker-compose exec mongodb bash -c 'mongosh inventory' > db.customers_with_addresses.find().pretty() ``` diff --git a/kstreams/debezium-mongodb/Dockerfile b/kstreams/debezium-mongodb/Dockerfile index efa5937a49..c7c8f72443 100644 --- a/kstreams/debezium-mongodb/Dockerfile +++ b/kstreams/debezium-mongodb/Dockerfile @@ -1,18 +1,38 @@ FROM quay.io/debezium/connect:2.0 ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb + USER root RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all +ENV MONGODB_DRIVER_VERSION="4.7.2" + +# These should point to the https://central.sonatype.com/artifact/org.mongodb.kafka/mongo-kafka-connect version used +ENV MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_MONGODB_DIR \ + MONGODB_SINK_REPO="org/mongodb/kafka" \ + MONGODB_SINK_GROUP="mongo-kafka-connect" \ + MONGODB_SINK_VERSION="1.10.0" \ + MONGODB_SINK_MD5="e7655c74c65eeb457d3e65420b62478d" \ + MONGODB_DRIVER_REPO="org/mongodb" \ + MONGODB_DRIVER_GROUP="mongodb-driver-sync" \ + MONGODB_DRIVER_MD5="3cc24cf6ff3290cbc3bc4764eefc17b9" \ + MONGODB_DRIVER_CORE_REPO="org/mongodb" \ + MONGODB_DRIVER_CORE_GROUP="mongodb-driver-core" \ + MONGODB_DRIVER_CORE_MD5="b037bee5dfb20be843d2b425603499f5" \ + AVRO_REPO="org/apache/avro" \ + AVRO_GROUP="avro" \ + AVRO_VERSION="1.9.2" \ + AVRO_MD5="cb70195f70f52b27070f9359b77690bb" \ + BSON_REPO="org/mongodb" \ + BSON_GROUP="bson" \ + BSON_VERSION="$MONGODB_DRIVER_VERSION" \ + BSON_MD5="0e02308c0d69d7d470e1b8a83cffece7" + USER kafka -# Deploy MongoDB Sink Connector -RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \ - git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \ - cd kafka-connect-mongodb && \ - git fetch --tags && \ - git checkout tags/v1.2.0 && \ - sed -i 's/http:\/\/packages.confluent.io\/maven\//https:\/\/packages.confluent.io\/maven\//g' pom.xml && \ - mvn clean package -DskipTests=true -DskipITs=true && \ - mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \ - cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb +RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && \ + docker-maven-download central "$MONGODB_SINK_REPO" "$MONGODB_SINK_GROUP" "$MONGODB_SINK_VERSION" "$MONGODB_SINK_MD5" && \ + docker-maven-download central "$AVRO_REPO" "$AVRO_GROUP" "$AVRO_VERSION" "$AVRO_MD5" && \ + docker-maven-download central "$MONGODB_DRIVER_REPO" "$MONGODB_DRIVER_GROUP" "$MONGODB_DRIVER_VERSION" "$MONGODB_DRIVER_MD5" && \ + docker-maven-download central "$MONGODB_DRIVER_CORE_REPO" "$MONGODB_DRIVER_CORE_GROUP" "$MONGODB_DRIVER_VERSION" "$MONGODB_DRIVER_CORE_MD5" && \ + docker-maven-download central "$BSON_REPO" "$BSON_GROUP" "$BSON_VERSION" "$BSON_MD5" diff --git a/kstreams/mongodb-sink.json b/kstreams/mongodb-sink.json index d446db1b77..1ffd42a2df 100644 --- a/kstreams/mongodb-sink.json +++ b/kstreams/mongodb-sink.json @@ -1,12 +1,13 @@ { "name": "mongodb-sink", "config": { - "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector", + "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max": "1", "topics": "final_ddd_aggregates", - "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true", - "mongodb.collection": "customers_with_addresses", - "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy", - "mongodb.delete.on.null.values": "true" + "connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true", + "collection": "customers_with_addresses", + "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy", + "delete.on.null.values": "true", + "database": "db" } }