Skip to content

Commit aa862fe

Browse files
xushiyanfengjian
authored andcommitted
[HUDI-2673] Add kafka connect bundle to validation test (apache#7131)
1 parent f6feade commit aa862fe

7 files changed

Lines changed: 157 additions & 10 deletions

File tree

hudi-kafka-connect/demo/setupKafka.sh

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#!/bin/bash
12
# Licensed to the Apache Software Foundation (ASF) under one
23
# or more contributor license agreements. See the NOTICE file
34
# distributed with this work for additional information
@@ -14,8 +15,6 @@
1415
# See the License for the specific language governing permissions and
1516
# limitations under the License.
1617

17-
#!/bin/bash
18-
1918
#########################
2019
# The command line help #
2120
#########################
@@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
7978
recreateTopic="N"
8079
printf "Argument recreate-topic is N (reuse Kafka topic) \n"
8180
;;
82-
k)
81+
f)
8382
rawDataFile="$OPTARG"
8483
printf "Argument raw-file is %s\n" "$rawDataFile"
8584
;;
86-
f)
85+
k)
8786
kafkaTopicName="$OPTARG"
8887
printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
8988
;;

packaging/bundle-validation/Dockerfile-base

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717
FROM adoptopenjdk/openjdk8:alpine
1818

19-
RUN apk add --no-cache --upgrade bash
19+
RUN apk add --no-cache --upgrade bash curl jq
2020

2121
RUN mkdir /opt/bundle-validation
2222
ENV WORKDIR=/opt/bundle-validation
@@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3
2727
ARG DERBY_VERSION=10.14.1.0
2828
ARG SPARK_VERSION=3.1.3
2929
ARG SPARK_HADOOP_VERSION=2.7
30+
ARG CONFLUENT_VERSION=5.5.12
31+
ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13
3032

3133
RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -P "$WORKDIR" \
3234
&& tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \
@@ -47,3 +49,15 @@ RUN wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK
4749
&& tar -xf $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \
4850
&& rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
4951
ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION
52+
53+
RUN wget https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -P "$WORKDIR" \
54+
&& tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C $WORKDIR/ \
55+
&& rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
56+
ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION
57+
58+
RUN wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -P "$WORKDIR" \
59+
&& mkdir $WORKDIR/kafka-connectors \
60+
&& unzip $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d $WORKDIR/kafka-connectors/ \
61+
&& rm $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \
62+
&& printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >> $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
63+
ENV KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib

packaging/bundle-validation/ci_run.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,27 +36,35 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then
3636
DERBY_VERSION=10.10.2.0
3737
SPARK_VERSION=2.4.8
3838
SPARK_HADOOP_VERSION=2.7
39+
CONFLUENT_VERSION=5.5.12
40+
KAFKA_CONNECT_HDFS_VERSION=10.1.13
3941
IMAGE_TAG=spark248hive239
4042
elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
4143
HADOOP_VERSION=2.7.7
4244
HIVE_VERSION=3.1.3
4345
DERBY_VERSION=10.14.1.0
4446
SPARK_VERSION=3.1.3
4547
SPARK_HADOOP_VERSION=2.7
48+
CONFLUENT_VERSION=5.5.12
49+
KAFKA_CONNECT_HDFS_VERSION=10.1.13
4650
IMAGE_TAG=spark313hive313
4751
elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
4852
HADOOP_VERSION=2.7.7
4953
HIVE_VERSION=3.1.3
5054
DERBY_VERSION=10.14.1.0
5155
SPARK_VERSION=3.2.2
5256
SPARK_HADOOP_VERSION=2.7
57+
CONFLUENT_VERSION=5.5.12
58+
KAFKA_CONNECT_HDFS_VERSION=10.1.13
5359
IMAGE_TAG=spark322hive313
5460
elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
5561
HADOOP_VERSION=2.7.7
5662
HIVE_VERSION=3.1.3
5763
DERBY_VERSION=10.14.1.0
5864
SPARK_VERSION=3.3.0
5965
SPARK_HADOOP_VERSION=2
66+
CONFLUENT_VERSION=5.5.12
67+
KAFKA_CONNECT_HDFS_VERSION=10.1.13
6068
IMAGE_TAG=spark330hive313
6169
fi
6270

@@ -67,6 +75,7 @@ cp ${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSI
6775
cp ${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
6876
cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
6977
cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
78+
cp ${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
7079
echo 'Validating jars below:'
7180
ls -l $TMP_JARS_DIR
7281

@@ -84,6 +93,8 @@ docker build \
8493
--build-arg DERBY_VERSION=$DERBY_VERSION \
8594
--build-arg SPARK_VERSION=$SPARK_VERSION \
8695
--build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \
96+
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
97+
--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \
8798
--build-arg IMAGE_TAG=$IMAGE_TAG \
8899
-t hudi-ci-bundle-validation:$IMAGE_TAG \
89100
.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "hudi-sink",
3+
"config": {
4+
"bootstrap.servers": "localhost:9092",
5+
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
6+
"tasks.max": "2",
7+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
9+
"value.converter.schemas.enable": "false",
10+
"topics": "hudi-test-topic",
11+
"hoodie.table.name": "hudi-test-topic",
12+
"hoodie.table.type": "COPY_ON_WRITE",
13+
"hoodie.base.path": "file:///tmp/hudi-kafka-test",
14+
"hoodie.datasource.write.recordkey.field": "volume",
15+
"hoodie.datasource.write.partitionpath.field": "date",
16+
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
17+
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest",
18+
"hoodie.kafka.commit.interval.secs": 10
19+
}
20+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties &
22+
sleep 30
23+
curl -X POST -H "Content-Type:application/json" -d @/opt/bundle-validation/kafka/config-sink.json http://localhost:8083/connectors &
24+
sleep 30
25+
curl -X DELETE http://localhost:8083/connectors/hudi-sink &
26+
sleep 10
27+
28+
# validate
29+
numCommits=$(ls /tmp/hudi-kafka-test/.hoodie/*.commit | wc -l)
30+
if [ $numCommits -gt 0 ]; then
31+
exit 0
32+
else
33+
exit 1
34+
fi
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
kafkaTopicName=hudi-test-topic
22+
23+
# Setup the schema registry
24+
SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' /opt/bundle-validation/data/stocks/schema.avsc | sed '/\/\*/,/*\//d' | jq tostring)
25+
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions
26+
27+
# produce data
28+
cat /opt/bundle-validation/data/stocks/data/batch_1.json /opt/bundle-validation/data/stocks/data/batch_2.json | $CONFLUENT_HOME/bin/kafka-console-producer \
29+
--bootstrap-server http://localhost:9092 \
30+
--topic ${kafkaTopicName}

packaging/bundle-validation/validate.sh

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar
3232
ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar
3333
ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar
3434
ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar
35+
ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar
3536

3637

3738
##
@@ -131,26 +132,64 @@ test_utilities_bundle () {
131132
}
132133

133134

135+
##
136+
# Function to test the kafka-connect bundle.
137+
# It runs zookeeper, kafka broker, schema registry, and connector worker.
138+
# After producing and consuming data, it checks successful commit under `.hoodie/`
139+
#
140+
# 1st arg: path to the hudi-kafka-connect-bundle.jar (for writing data)
141+
#
142+
# env vars (defined in container):
143+
# CONFLUENT_HOME: path to the confluent community directory
144+
# KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH: path to install hudi-kafka-connect-bundle.jar
145+
##
146+
test_kafka_connect_bundle() {
147+
KAFKA_CONNECT_JAR=$1
148+
cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH
149+
$CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties &
150+
$CONFLUENT_HOME/bin/kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties &
151+
sleep 10
152+
$CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &
153+
sleep 10
154+
$CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
155+
$WORKDIR/kafka/produce.sh
156+
$WORKDIR/kafka/consume.sh
157+
}
158+
159+
160+
############################
161+
# Execute tests
162+
############################
163+
164+
echo "::warning::validate.sh validating spark & hadoop-mr bundle"
134165
test_spark_hadoop_mr_bundles
135166
if [ "$?" -ne 0 ]; then
136167
exit 1
137168
fi
169+
echo "::warning::validate.sh done validating spark & hadoop-mr bundle"
138170

139171
if [[ $SPARK_HOME == *"spark-2.4"* ]] || [[ $SPARK_HOME == *"spark-3.1"* ]]
140172
then
141-
echo "::warning::validate.sh testing utilities bundle"
173+
echo "::warning::validate.sh validating utilities bundle"
142174
test_utilities_bundle $JARS_DIR/utilities.jar
143175
if [ "$?" -ne 0 ]; then
144176
exit 1
145177
fi
146-
echo "::warning::validate.sh done testing utilities bundle"
178+
echo "::warning::validate.sh done validating utilities bundle"
147179
else
148-
echo "::warning::validate.sh skip testing utilities bundle for non-spark2.4 & non-spark3.1 build"
180+
echo "::warning::validate.sh skip validating utilities bundle for non-spark2.4 & non-spark3.1 build"
149181
fi
150182

151-
echo "::warning::validate.sh testing utilities slim bundle"
183+
echo "::warning::validate.sh validating utilities slim bundle"
152184
test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar
153185
if [ "$?" -ne 0 ]; then
154186
exit 1
155187
fi
156-
echo "::warning::validate.sh done testing utilities slim bundle"
188+
echo "::warning::validate.sh done validating utilities slim bundle"
189+
190+
echo "::warning::validate.sh validating kafka connect bundle"
191+
test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar
192+
if [ "$?" -ne 0 ]; then
193+
exit 1
194+
fi
195+
echo "::warning::validate.sh done validating kafka connect bundle"

0 commit comments

Comments
 (0)