Skip to content

Commit 6954fb7

Browse files
committed
Merge branch 'master' into MINOR_delete_redundant_code
2 parents 092991f + d90fd1f commit 6954fb7

17 files changed

Lines changed: 509 additions & 192 deletions

File tree

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.schema;
20+
21+
import org.apache.hudi.common.config.TypedProperties;
22+
import org.apache.hudi.exception.HoodieIOException;
23+
import org.apache.hudi.util.StreamerUtil;
24+
25+
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.ObjectMapper;
27+
import org.apache.avro.Schema;
28+
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.net.HttpURLConnection;
32+
import java.net.URL;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Base64;
35+
import java.util.Collections;
36+
import java.util.regex.Matcher;
37+
import java.util.regex.Pattern;
38+
39+
/**
40+
* Obtains latest schema from the Confluent/Kafka schema-registry.
41+
* <p>
42+
* https://github.com/confluentinc/schema-registry
43+
*/
44+
public class SchemaRegistryProvider extends SchemaProvider {
45+
46+
private final TypedProperties config;
47+
48+
49+
/**
50+
* Configs supported.
51+
*/
52+
public static class Config {
53+
54+
private static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
55+
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
56+
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
57+
}
58+
59+
/**
60+
* The method takes the provided url {@code registryUrl} and gets the schema from the schema registry using that url.
61+
* If the caller provides userInfo credentials in the url (e.g "https://foo:bar@schemaregistry.org") then the credentials
62+
* are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization
63+
* header.
64+
*
65+
* @param registryUrl
66+
* @return the Schema in String form.
67+
* @throws IOException
68+
*/
69+
public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
70+
URL registry;
71+
HttpURLConnection connection;
72+
Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
73+
if (matcher.find()) {
74+
String creds = matcher.group(1);
75+
String urlWithoutCreds = registryUrl.replace(creds + "@", "");
76+
registry = new URL(urlWithoutCreds);
77+
connection = (HttpURLConnection) registry.openConnection();
78+
setAuthorizationHeader(matcher.group(1), connection);
79+
} else {
80+
registry = new URL(registryUrl);
81+
connection = (HttpURLConnection) registry.openConnection();
82+
}
83+
ObjectMapper mapper = new ObjectMapper();
84+
JsonNode node = mapper.readTree(getStream(connection));
85+
return node.get("schema").asText();
86+
}
87+
88+
protected void setAuthorizationHeader(String creds, HttpURLConnection connection) {
89+
String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
90+
connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
91+
}
92+
93+
protected InputStream getStream(HttpURLConnection connection) throws IOException {
94+
return connection.getInputStream();
95+
}
96+
97+
public SchemaRegistryProvider(TypedProperties props) {
98+
this.config = props;
99+
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
100+
}
101+
102+
private Schema getSchema(String registryUrl) throws IOException {
103+
return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
104+
}
105+
106+
@Override
107+
public Schema getSourceSchema() {
108+
String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
109+
try {
110+
return getSchema(registryUrl);
111+
} catch (IOException ioe) {
112+
throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
113+
}
114+
}
115+
116+
@Override
117+
public Schema getTargetSchema() {
118+
String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
119+
String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
120+
try {
121+
return getSchema(targetRegistryUrl);
122+
} catch (IOException ioe) {
123+
throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe);
124+
}
125+
}
126+
}

hudi-kafka-connect/README.md

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,36 @@
1515
* See the License for the specific language governing permissions and
1616
-->
1717

18-
# Quick Start guide for Kafka Connect Sink for Hudi
18+
# Quick Start (demo) guide for Kafka Connect Sink for Hudi
1919

2020
This repo contains a sample project that can be used to start off your own source connector for Kafka Connect.
21+
This is work is tracked by [HUDI-2324](https://issues.apache.org/jira/browse/HUDI-2324)
2122

22-
## Building the connector
23+
## Building the Hudi Sink Connector
2324

2425
The first thing you need to do to start using this connector is building it. In order to do that, you need to install the following dependencies:
2526

2627
- [Java 1.8+](https://openjdk.java.net/)
2728
- [Apache Maven](https://maven.apache.org/)
29+
- Install [kcat](https://github.com/edenhill/kcat)
2830

29-
After installing these dependencies, execute the following command:
31+
After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars,
32+
including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink.
3033

3134
```bash
3235
cd $HUDI_DIR
33-
mvn clean package
36+
mvn clean -DskipTests install
3437
```
3538

36-
## Incremental Builds
39+
Henceforth, incremental builds can be performed as follows.
3740

3841
```bash
3942
mvn clean -pl hudi-kafka-connect install -DskipTests
4043
mvn clean -pl packaging/hudi-kafka-connect-bundle install
4144
```
4245

43-
## Put hudi connector in Kafka Connect classpath
46+
Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect
47+
classpath should be same as the one configured in the connector configuration file.
4448

4549
```bash
4650
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/
@@ -52,43 +56,110 @@ After building the package, we need to install the Apache Kafka
5256

5357
### 1 - Starting the environment
5458

55-
Start the ZK and Kafka:
59+
To try out the Connect Sink locally, set up a Kafka broker locally. Download the latest apache kafka from https://kafka.apache.org/downloads.
60+
Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
5661

5762
```bash
63+
export KAFKA_HOME=/path/to/kafka_install_dir
64+
cd $KAFKA_KAFKA_HOME
5865
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
5966
./bin/kafka-server-start.sh ./config/server.properties
6067
```
6168

6269
Wait until the kafka cluster is up and running.
6370

64-
### 2 - Create the Hudi Control Topic for Coordination of the transactions
71+
### 2 - Set up the schema registry
6572

66-
The control topic should only have `1` partition
73+
Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries,
74+
we use Confluent schema registry. Download the latest confluent schema registry code from https://github.com/confluentinc/schema-registry
75+
and start the schema registry service.
6776

6877
```bash
78+
cd $CONFLUENT_DIR
79+
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
80+
```
81+
82+
### 3 - Create the Hudi Control Topic for Coordination of the transactions
83+
84+
The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks.
85+
86+
```bash
87+
cd $KAFKA_HOME
6988
./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
7089
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
7190
```
7291

73-
### 3 - Create the Hudi Topic for the Sink and insert data into the topic
92+
### 4 - Create the Hudi Topic for the Sink and insert data into the topic
7493

7594
Open a terminal to execute the following command:
7695

7796
```bash
78-
bash runKafkaTrafficGenerator.sh <total_messages>
97+
cd $HUDI_DIR/demo/
98+
bash setupKafka.sh -n <total_kafka_messages>
7999
```
80100

81101
### 4 - Run the Sink connector worker (multiple workers can be run)
82102

83-
Open a terminal to execute the following command:
103+
The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks)
104+
that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with
105+
default properties to start a Hudi connector.
106+
107+
Note that if multiple workers need to be run, the webserver needs to be reconfigured for subsequent workers to ensure
108+
successful running of the workers.
84109

85110
```bash
86-
./bin/connect-distributed.sh ../hudi-kafka-connect/configs/connect-distributed.properties
111+
cd $KAFKA_HOME
112+
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
87113
```
88114

89115
### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure)
90116

117+
Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following
118+
curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink,
119+
that can be changed based on the desired properties.
120+
91121
```bash
92122
curl -X DELETE http://localhost:8083/connectors/hudi-sink
93-
curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/configs/config-sink.json http://localhost:8083/connectors
123+
curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors
124+
```
125+
126+
Now, you should see that the connector is created and tasks are running.
127+
128+
```bash
129+
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
130+
["hudi-sink"]
131+
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
94132
```
133+
134+
And, you should see your Hudi table created, which you can query using Spark/Flink.
135+
Note: HUDI-2325 tracks Hive sync, which will unlock pretty much every other query engine.
136+
137+
```bash
138+
ls -a /tmp/hoodie/hudi-test-topic
139+
. .hoodie partition-1 partition-3
140+
.. partition-0 partition-2 partition-4
141+
142+
ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
143+
total 72
144+
-rw-r--r-- 1 user wheel 346 Sep 14 10:32 hoodie.properties
145+
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.inflight
146+
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.commit.requested
147+
-rw-r--r-- 1 user wheel 9438 Sep 13 21:45 20210913214351.commit
148+
-rw-r--r-- 1 user wheel 0 Sep 13 21:43 20210913214351.inflight
149+
-rw-r--r-- 1 user wheel 0 Sep 13 21:43 20210913214351.commit.requested
150+
-rw-r--r-- 1 user wheel 18145 Sep 13 21:43 20210913214114.commit
151+
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.inflight
152+
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested
153+
drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived
154+
155+
ls -l /tmp/hoodie/hudi-test-topic/partition-0
156+
total 5168
157+
-rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet
158+
-rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet
159+
-rw-r--r-- 1 user wheel 437097 Sep 13 21:45 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214351.parquet
160+
-rw-r--r-- 1 user wheel 440219 Sep 13 21:42 D5AEE453699D5D9623D704C1CF399C8C-0_0-0-0_20210913214114.parquet
161+
-rw-r--r-- 1 user wheel 437035 Sep 13 21:45 D5AEE453699D5D9623D704C1CF399C8C-0_0-0-0_20210913214351.parquet
162+
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
163+
```
164+
165+

hudi-kafka-connect/configs/config-sink.json renamed to hudi-kafka-connect/demo/config-sink.json

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99
"value.converter.schemas.enable": "false",
1010
"topics": "hudi-test-topic",
1111
"hoodie.table.name": "hudi-test-topic",
12-
"hoodie.base.path": "file:///tmp/hoodie/sample-table",
12+
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
1313
"hoodie.datasource.write.recordkey.field": "volume",
14-
"hoodie.datasource.write.partitionpath.field": "year",
15-
"hoodie.schemaprovider.class": "org.apache.hudi.schema.FilebasedSchemaProvider",
16-
"hoodie.deltastreamer.schemaprovider.source.schema.file": "file:///tmp/hoodie/schema.avsc",
17-
"hoodie.deltastreamer.schemaprovider.target.schema.file": "file:///tmp/hoodie/schema.avsc"
14+
"hoodie.datasource.write.partitionpath.field": "date",
15+
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
16+
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest"
1817
}
1918
}

hudi-kafka-connect/configs/connect-distributed.properties renamed to hudi-kafka-connect/demo/connect-distributed.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ status.storage.replication.factor=1
3030

3131
offset.flush.interval.ms=60000
3232
listeners=HTTP://:8083
33-
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
33+
plugin.path=/usr/local/share/java

0 commit comments

Comments
 (0)